xref: /aosp_15_r20/external/pytorch/test/cpp/c10d/ProcessGroupMPITest.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <unistd.h>
2 
3 #include <c10/util/irange.h>
4 #include <torch/csrc/distributed/c10d/ProcessGroupMPI.hpp>
5 
6 #include <cstdlib>
7 #include <iostream>
8 #include <sstream>
9 #include <string>
10 #include <thread>
11 
12 #define STR_HELPER(x) #x
13 #define STR(x) STR_HELPER(x)
14 
15 // Wait for work to complete
waitWork(c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,std::vector<c10::intrusive_ptr<c10d::Work>> works)16 std::vector<std::vector<at::Tensor>> waitWork(
17     c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
18     std::vector<c10::intrusive_ptr<c10d::Work>> works) {
19   std::vector<std::vector<at::Tensor>> outputTensors;
20   for (auto& work : works) {
21     try {
22       work->wait();
23     } catch (const std::exception& ex) {
24       std::cerr << "Exception received: " << ex.what() << std::endl;
25       pg->abort();
26     }
27     outputTensors.emplace_back(work->result());
28   }
29   return outputTensors;
30 }
31 
32 // Wait using Futures
waitFuture(c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,std::vector<c10::intrusive_ptr<c10d::Work>> works)33 std::vector<std::vector<at::Tensor>> waitFuture(
34     c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg,
35     std::vector<c10::intrusive_ptr<c10d::Work>> works) {
36   std::vector<std::vector<at::Tensor>> outputTensors;
37   for (auto& work : works) {
38     auto fut = work->getFuture();
39     try {
40       fut->wait();
41     } catch (const std::exception& ex) {
42       std::cerr << "Exception received: " << ex.what() << std::endl;
43       pg->abort();
44     }
45     auto result = fut->value();
46     if (result.isNone()) {
47       outputTensors.emplace_back();
48     } else if (result.isTensorList()) {
49       outputTensors.emplace_back(result.toTensorVector());
50     } else {
51       TORCH_CHECK(false, "future result should be tensor list or none");
52     }
53   }
54   return outputTensors;
55 }
56 
testAllreduce(int iter=1000)57 void testAllreduce(int iter = 1000) {
58   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
59 
60   // Generate inputs
61   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
62   for (const auto i : c10::irange(iter)) {
63     auto tensor = at::ones({16, 16}) * i;
64     std::vector<at::Tensor> tensors = {tensor};
65 
66     // Queue the work.
67     c10::intrusive_ptr<::c10d::Work> work = pg->allreduce(tensors);
68     works.push_back(std::move(work));
69   }
70 
71   auto outputTensors = waitFuture(pg, works);
72 
73   // Get the world size
74   auto worldSize = pg->getSize();
75 
76   // Verify outputs
77   for (const auto i : c10::irange(iter)) {
78     const auto expected = worldSize * i;
79     auto data = outputTensors[i][0].data_ptr<float>();
80     for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
81       if (data[j] != expected) {
82         TORCH_CHECK(false, "BOOM!");
83       }
84     }
85   }
86 }
87 
testBroadcast(int iter=10000)88 void testBroadcast(int iter = 10000) {
89   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
90   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
91   for (const auto i : c10::irange(iter)) {
92     auto tensors = std::vector<at::Tensor>();
93     if (pg->getRank() == 0) {
94       auto tensor = at::ones({16, 16}) * i;
95       tensors = std::vector<at::Tensor>({tensor});
96     } else {
97       auto tensor = at::zeros({16, 16});
98       tensors = std::vector<at::Tensor>({tensor});
99     }
100 
101     // Queue the work.
102     c10::intrusive_ptr<::c10d::Work> work = pg->broadcast(tensors);
103     works.push_back(std::move(work));
104   }
105 
106   auto outputTensors = waitFuture(pg, works);
107 
108   // Verify outputs
109   for (const auto i : c10::irange(iter)) {
110     const auto expected = i;
111     auto data = outputTensors[i][0].data_ptr<float>();
112     for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
113       if (data[j] != expected) {
114         TORCH_CHECK(false, "BOOM!");
115       }
116     }
117   }
118 }
119 
testReduce(int iter=10000)120 void testReduce(int iter = 10000) {
121   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
122   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
123   for (const auto i : c10::irange(iter)) {
124     auto tensor = at::ones({16, 16}) * i;
125     auto tensors = std::vector<at::Tensor>({tensor});
126 
127     // Queue the work.
128     c10::intrusive_ptr<::c10d::Work> work = pg->reduce(tensors);
129     works.push_back(std::move(work));
130   }
131 
132   auto outputTensors = waitFuture(pg, works);
133 
134   // Get the world size
135   auto worldSize = pg->getSize();
136 
137   if (pg->getRank() == 0) {
138     // Verify outputs
139     for (const auto i : c10::irange(iter)) {
140       const auto expected = worldSize * i;
141       auto data = outputTensors[i][0].data_ptr<float>();
142       for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
143         if (data[j] != expected) {
144           TORCH_CHECK(false, "BOOM!");
145         }
146       }
147     }
148   }
149 }
150 
testAllgather(int iter=10000)151 void testAllgather(int iter = 10000) {
152   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
153   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
154 
155   // Get the world size
156   auto worldSize = pg->getSize();
157   auto rank = pg->getRank();
158 
159   // Generate inputs
160   for (const auto i : c10::irange(iter)) {
161     auto tensor = at::ones({16, 16}) * i * rank;
162     auto tensors = std::vector<at::Tensor>({tensor});
163     auto outputs = std::vector<std::vector<at::Tensor>>(1);
164     outputs[0].resize(worldSize);
165     for (const auto j : c10::irange(worldSize)) {
166       outputs[0][j] = at::zeros({16, 16});
167     }
168 
169     // Queue the work.
170     c10::intrusive_ptr<::c10d::Work> work = pg->allgather(outputs, tensors);
171     works.push_back(std::move(work));
172   }
173 
174   auto outputTensors = waitFuture(pg, works);
175 
176   // Verify outputs
177   for (const auto i : c10::irange(iter)) {
178     for (const auto j : c10::irange(worldSize)) {
179       const auto expected = i * j;
180       auto data = outputTensors[i][j].data_ptr<float>();
181       for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
182         if (data[k] != expected) {
183           TORCH_CHECK(false, "BOOM!");
184         }
185       }
186     }
187   }
188 }
189 
testGather(int iter=10000)190 void testGather(int iter = 10000) {
191   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
192   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
193 
194   // Get the world size
195   auto worldSize = pg->getSize();
196   auto rank = pg->getRank();
197 
198   // Generate inputs
199   for (const auto i : c10::irange(iter)) {
200     auto tensor = at::ones({16, 16}) * i * rank;
201     auto tensors = std::vector<at::Tensor>({tensor});
202     auto outputs = std::vector<std::vector<at::Tensor>>(0);
203     if (rank == 0) {
204       outputs = std::vector<std::vector<at::Tensor>>(1);
205       outputs[0].resize(worldSize);
206       for (const auto j : c10::irange(worldSize)) {
207         outputs[0][j] = at::zeros({16, 16});
208       }
209     }
210 
211     // Queue the work.
212     c10::intrusive_ptr<::c10d::Work> work = pg->gather(outputs, tensors);
213     works.push_back(std::move(work));
214   }
215 
216   auto outputTensors = waitFuture(pg, works);
217 
218   // Verify outputs
219   if (rank == 0) {
220     for (const auto i : c10::irange(iter)) {
221       for (const auto j : c10::irange(worldSize)) {
222         const auto expected = i * j;
223         auto data = outputTensors[i][j].data_ptr<float>();
224         for (auto k = 0; k < outputTensors[i][j].numel(); ++k) {
225           if (data[k] != expected) {
226             TORCH_CHECK(false, "BOOM!");
227           }
228         }
229       }
230     }
231   } else {
232     for (const auto i : c10::irange(iter)) {
233       if (outputTensors[i].size() != 0) {
234         TORCH_CHECK(false, "BOOM!");
235       }
236     }
237   }
238 }
239 
testScatter(int iter=1)240 void testScatter(int iter = 1) {
241   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
242   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
243 
244   // Get the world size
245   auto worldSize = pg->getSize();
246   auto rank = pg->getRank();
247 
248   // Generate inputs
249   for (const auto i : c10::irange(iter)) {
250     auto tensor = at::zeros({16, 16});
251     auto tensors = std::vector<at::Tensor>({tensor});
252     auto inputs = std::vector<std::vector<at::Tensor>>(0);
253     if (rank == 0) {
254       inputs = std::vector<std::vector<at::Tensor>>(1);
255       inputs[0].resize(worldSize);
256       for (const auto j : c10::irange(worldSize)) {
257         inputs[0][j] = at::ones({16, 16}) * i * j;
258       }
259     }
260 
261     // Queue the work.
262     c10::intrusive_ptr<::c10d::Work> work = pg->scatter(tensors, inputs);
263     works.push_back(std::move(work));
264   }
265 
266   auto outputTensors = waitFuture(pg, works);
267 
268   // Verify outputs
269   for (const auto i : c10::irange(iter)) {
270     for (const auto j : c10::irange(worldSize)) {
271       const auto expected = i * j;
272       auto data = outputTensors[i][0].data_ptr<float>();
273       for (auto k = 0; k < outputTensors[i][0].numel(); ++k) {
274         if (data[k] != expected) {
275           TORCH_CHECK(false, "BOOM!");
276         }
277       }
278     }
279   }
280 }
281 
testSendRecv(bool recvAnysource,int iter=10000)282 void testSendRecv(bool recvAnysource, int iter = 10000) {
283   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
284   // Generate inputs
285   std::vector<c10::intrusive_ptr<::c10d::Work>> works;
286 
287   // pg->send does not keep sent tensors alive, so we need to.
288   std::vector<std::vector<at::Tensor>> sendTensors(iter);
289   auto rank = pg->getRank();
290   for (const auto i : c10::irange(iter)) {
291     if (rank == 0) {
292       auto tensor = at::ones({16, 16}) * i;
293       sendTensors[i] = std::vector<at::Tensor>({tensor});
294 
295       // Queue the work.
296       c10::intrusive_ptr<::c10d::Work> work = pg->send(sendTensors[i], 1, 0);
297       works.push_back(std::move(work));
298     } else {
299       auto tensor = at::zeros({16, 16});
300       auto recvTensors = std::vector<at::Tensor>({tensor});
301 
302       // Queue the work.
303       if (!recvAnysource) {
304         c10::intrusive_ptr<::c10d::Work> work = pg->recv(recvTensors, 0, 0);
305         works.push_back(std::move(work));
306       } else {
307         c10::intrusive_ptr<::c10d::Work> work =
308             pg->recvAnysource(recvTensors, 0);
309         works.push_back(std::move(work));
310       }
311     }
312   }
313 
314   auto outputTensors = waitWork(pg, works);
315   if (rank == 0) {
316     return;
317   }
318 
319   std::vector<int> srcRanks;
320   if (recvAnysource) {
321     for (const auto& work : works) {
322       srcRanks.push_back(work->sourceRank());
323     }
324   }
325 
326   // Verify outputs
327   for (const auto i : c10::irange(iter)) {
328     if (recvAnysource && srcRanks[i] != 0) {
329       TORCH_CHECK(false, "src rank is wrong for recvAnysource");
330     }
331     const auto expected = i;
332     auto data = outputTensors[i][0].data_ptr<float>();
333     for (auto j = 0; j < outputTensors[i][0].numel(); ++j) {
334       if (data[j] != expected) {
335         TORCH_CHECK(false, "BOOM!");
336       }
337     }
338   }
339 }
340 
testBackendName()341 void testBackendName() {
342   auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI();
343   if (pg->getBackendName() != std::string(c10d::MPI_BACKEND_NAME)) {
344     TORCH_CHECK(false, "BOOM!");
345   }
346 }
347 
main(int argc,char ** argv)348 int main(int argc, char** argv) {
349 #ifdef MPIEXEC
350   // If we are within an openmpi mpirun, then skip the exec
351   if (!std::getenv("OMPI_COMM_WORLD_SIZE")) {
352     std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl;
353     execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr);
354   }
355 
356   testAllreduce();
357   testBroadcast();
358   testReduce();
359   testAllgather();
360   testGather();
361   testScatter();
362   testSendRecv(false);
363   testSendRecv(true);
364   testBackendName();
365 
366   std::cout << "Test successful" << std::endl;
367 #else
368   std::cout << "MPI executable not found, skipping test" << std::endl;
369 #endif
370   return EXIT_SUCCESS;
371 }
372