1 #include <unistd.h>
2
3 #include <c10/util/irange.h>
4 #include <torch/csrc/distributed/c10d/ProcessGroupMPI.hpp>
5
6 #include <cstdlib>
7 #include <iostream>
8 #include <sstream>
9 #include <string>
10 #include <thread>
11
12 #define STR_HELPER(x) #x
13 #define STR(x) STR_HELPER(x)
14
15 // Wait for work to complete
waitWork(c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,std::vector<c10::intrusive_ptr<c10d::Work>> works)16 std::vector<std::vector<at::Tensor>> waitWork(
17 c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
18 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
19 std::vector<std::vector<at::Tensor>> outputTensors;
20 for (auto& work : works) {
21 try {
22 work->wait();
23 } catch (const std::exception& ex) {
24 std::cerr << "Exception received: " << ex.what() << std::endl;
25 pg->abort();
26 }
27 outputTensors.emplace_back(work->result());
28 }
29 return outputTensors;
30 }
31
32 // Wait using Futures
waitFuture(c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,std::vector<c10::intrusive_ptr<c10d::Work>> works)33 std::vector<std::vector<at::Tensor>> waitFuture(
34 c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
35 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
36 std::vector<std::vector<at::Tensor>> outputTensors;
37 for (auto& work : works) {
38 auto fut = work->getFuture();
39 try {
40 fut->wait();
41 } catch (const std::exception& ex) {
42 std::cerr << "Exception received: " << ex.what() << std::endl;
43 pg->abort();
44 }
45 auto result = fut->value();
46 if (result.isNone()) {
47 outputTensors.emplace_back();
48 } else if (result.isTensorList()) {
49 outputTensors.emplace_back(result.toTensorVector());
50 } else {
51 TORCH_CHECK(false, "future result should be tensor list or none");
52 }
53 }
54 return outputTensors;
55 }
56
testAllreduce(int iter=1000)57 void testAllreduce(int iter = 1000) {
58 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
59
60 // Generate inputs
61 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
62 for (const auto i : c10::irange(iter)) {
63 auto tensor = at::ones({16, 16}) * i;
64 std::vector<at::Tensor> tensors = {tensor};
65
66 // Queue the work.
67 c10::intrusive_ptr<::c10d::Work> work = pg->allreduce(tensors);
68 works.push_back(std::move(work));
69 }
70
71 auto outputTensors = waitFuture(pg, works);
72
73 // Get the world size
74 auto worldSize = pg->getSize();
75
76 // Verify outputs
77 for (const auto i : c10::irange(iter)) {
78 const auto expected = worldSize * i;
79 auto data = outputTensors[i][0].data_ptr<float>();
80 for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
81 if (data[j] != expected) {
82 TORCH_CHECK(false, "BOOM!");
83 }
84 }
85 }
86 }
87
testBroadcast(int iter=10000)88 void testBroadcast(int iter = 10000) {
89 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
90 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
91 for (const auto i : c10::irange(iter)) {
92 auto tensors = std::vector<at::Tensor>();
93 if (pg->getRank() == 0) {
94 auto tensor = at::ones({16, 16}) * i;
95 tensors = std::vector<at::Tensor>({tensor});
96 } else {
97 auto tensor = at::zeros({16, 16});
98 tensors = std::vector<at::Tensor>({tensor});
99 }
100
101 // Queue the work.
102 c10::intrusive_ptr<::c10d::Work> work = pg->broadcast(tensors);
103 works.push_back(std::move(work));
104 }
105
106 auto outputTensors = waitFuture(pg, works);
107
108 // Verify outputs
109 for (const auto i : c10::irange(iter)) {
110 const auto expected = i;
111 auto data = outputTensors[i][0].data_ptr<float>();
112 for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
113 if (data[j] != expected) {
114 TORCH_CHECK(false, "BOOM!");
115 }
116 }
117 }
118 }
119
testReduce(int iter=10000)120 void testReduce(int iter = 10000) {
121 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
122 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
123 for (const auto i : c10::irange(iter)) {
124 auto tensor = at::ones({16, 16}) * i;
125 auto tensors = std::vector<at::Tensor>({tensor});
126
127 // Queue the work.
128 c10::intrusive_ptr<::c10d::Work> work = pg->reduce(tensors);
129 works.push_back(std::move(work));
130 }
131
132 auto outputTensors = waitFuture(pg, works);
133
134 // Get the world size
135 auto worldSize = pg->getSize();
136
137 if (pg->getRank() == 0) {
138 // Verify outputs
139 for (const auto i : c10::irange(iter)) {
140 const auto expected = worldSize * i;
141 auto data = outputTensors[i][0].data_ptr<float>();
142 for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
143 if (data[j] != expected) {
144 TORCH_CHECK(false, "BOOM!");
145 }
146 }
147 }
148 }
149 }
150
testAllgather(int iter=10000)151 void testAllgather(int iter = 10000) {
152 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
153 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
154
155 // Get the world size
156 auto worldSize = pg->getSize();
157 auto rank = pg->getRank();
158
159 // Generate inputs
160 for (const auto i : c10::irange(iter)) {
161 auto tensor = at::ones({16, 16}) * i * rank;
162 auto tensors = std::vector<at::Tensor>({tensor});
163 auto outputs = std::vector<std::vector<at::Tensor>>(1);
164 outputs[0].resize(worldSize);
165 for (const auto j : c10::irange(worldSize)) {
166 outputs[0][j] = at::zeros({16, 16});
167 }
168
169 // Queue the work.
170 c10::intrusive_ptr<::c10d::Work> work = pg->allgather(outputs, tensors);
171 works.push_back(std::move(work));
172 }
173
174 auto outputTensors = waitFuture(pg, works);
175
176 // Verify outputs
177 for (const auto i : c10::irange(iter)) {
178 for (const auto j : c10::irange(worldSize)) {
179 const auto expected = i * j;
180 auto data = outputTensors[i][j].data_ptr<float>();
181 for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
182 if (data[k] != expected) {
183 TORCH_CHECK(false, "BOOM!");
184 }
185 }
186 }
187 }
188 }
189
testGather(int iter=10000)190 void testGather(int iter = 10000) {
191 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
192 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
193
194 // Get the world size
195 auto worldSize = pg->getSize();
196 auto rank = pg->getRank();
197
198 // Generate inputs
199 for (const auto i : c10::irange(iter)) {
200 auto tensor = at::ones({16, 16}) * i * rank;
201 auto tensors = std::vector<at::Tensor>({tensor});
202 auto outputs = std::vector<std::vector<at::Tensor>>(0);
203 if (rank == 0) {
204 outputs = std::vector<std::vector<at::Tensor>>(1);
205 outputs[0].resize(worldSize);
206 for (const auto j : c10::irange(worldSize)) {
207 outputs[0][j] = at::zeros({16, 16});
208 }
209 }
210
211 // Queue the work.
212 c10::intrusive_ptr<::c10d::Work> work = pg->gather(outputs, tensors);
213 works.push_back(std::move(work));
214 }
215
216 auto outputTensors = waitFuture(pg, works);
217
218 // Verify outputs
219 if (rank == 0) {
220 for (const auto i : c10::irange(iter)) {
221 for (const auto j : c10::irange(worldSize)) {
222 const auto expected = i * j;
223 auto data = outputTensors[i][j].data_ptr<float>();
224 for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
225 if (data[k] != expected) {
226 TORCH_CHECK(false, "BOOM!");
227 }
228 }
229 }
230 }
231 } else {
232 for (const auto i : c10::irange(iter)) {
233 if (outputTensors[i].size() != 0) {
234 TORCH_CHECK(false, "BOOM!");
235 }
236 }
237 }
238 }
239
testScatter(int iter=1)240 void testScatter(int iter = 1) {
241 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
242 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
243
244 // Get the world size
245 auto worldSize = pg->getSize();
246 auto rank = pg->getRank();
247
248 // Generate inputs
249 for (const auto i : c10::irange(iter)) {
250 auto tensor = at::zeros({16, 16});
251 auto tensors = std::vector<at::Tensor>({tensor});
252 auto inputs = std::vector<std::vector<at::Tensor>>(0);
253 if (rank == 0) {
254 inputs = std::vector<std::vector<at::Tensor>>(1);
255 inputs[0].resize(worldSize);
256 for (const auto j : c10::irange(worldSize)) {
257 inputs[0][j] = at::ones({16, 16}) * i * j;
258 }
259 }
260
261 // Queue the work.
262 c10::intrusive_ptr<::c10d::Work> work = pg->scatter(tensors, inputs);
263 works.push_back(std::move(work));
264 }
265
266 auto outputTensors = waitFuture(pg, works);
267
268 // Verify outputs
269 for (const auto i : c10::irange(iter)) {
270 for (const auto j : c10::irange(worldSize)) {
271 const auto expected = i * j;
272 auto data = outputTensors[i][0].data_ptr<float>();
273 for (auto k = 0; k < outputTensors[i][0].numel(); ++k) {
274 if (data[k] != expected) {
275 TORCH_CHECK(false, "BOOM!");
276 }
277 }
278 }
279 }
280 }
281
testSendRecv(bool recvAnysource,int iter=10000)282 void testSendRecv(bool recvAnysource, int iter = 10000) {
283 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
284 // Generate inputs
285 std::vector<c10::intrusive_ptr<::c10d::Work>> works;
286
287 // pg->send does not keep sent tensors alive, so we need to.
288 std::vector<std::vector<at::Tensor>> sendTensors(iter);
289 auto rank = pg->getRank();
290 for (const auto i : c10::irange(iter)) {
291 if (rank == 0) {
292 auto tensor = at::ones({16, 16}) * i;
293 sendTensors[i] = std::vector<at::Tensor>({tensor});
294
295 // Queue the work.
296 c10::intrusive_ptr<::c10d::Work> work = pg->send(sendTensors[i], 1, 0);
297 works.push_back(std::move(work));
298 } else {
299 auto tensor = at::zeros({16, 16});
300 auto recvTensors = std::vector<at::Tensor>({tensor});
301
302 // Queue the work.
303 if (!recvAnysource) {
304 c10::intrusive_ptr<::c10d::Work> work = pg->recv(recvTensors, 0, 0);
305 works.push_back(std::move(work));
306 } else {
307 c10::intrusive_ptr<::c10d::Work> work =
308 pg->recvAnysource(recvTensors, 0);
309 works.push_back(std::move(work));
310 }
311 }
312 }
313
314 auto outputTensors = waitWork(pg, works);
315 if (rank == 0) {
316 return;
317 }
318
319 std::vector<int> srcRanks;
320 if (recvAnysource) {
321 for (const auto& work : works) {
322 srcRanks.push_back(work->sourceRank());
323 }
324 }
325
326 // Verify outputs
327 for (const auto i : c10::irange(iter)) {
328 if (recvAnysource && srcRanks[i] != 0) {
329 TORCH_CHECK(false, "src rank is wrong for recvAnysource");
330 }
331 const auto expected = i;
332 auto data = outputTensors[i][0].data_ptr<float>();
333 for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
334 if (data[j] != expected) {
335 TORCH_CHECK(false, "BOOM!");
336 }
337 }
338 }
339 }
340
testBackendName()341 void testBackendName() {
342 auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
343 if (pg->getBackendName() != std::string(c10d::MPI_BACKEND_NAME)) {
344 TORCH_CHECK(false, "BOOM!");
345 }
346 }
347
main(int argc,char ** argv)348 int main(int argc, char** argv) {
349 #ifdef MPIEXEC
350 // If we are within an openmpi mpirun, then skip the exec
351 if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
352 std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
353 execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr);
354 }
355
356 testAllreduce();
357 testBroadcast();
358 testReduce();
359 testAllgather();
360 testGather();
361 testScatter();
362 testSendRecv(false);
363 testSendRecv(true);
364 testBackendName();
365
366 std::cout << "Test successful" << std::endl;
367 #else
368 std::cout << "MPI executable not found, skipping test" << std::endl;
369 #endif
370 return EXIT_SUCCESS;
371 }
372