xref: /aosp_15_r20/external/pytorch/test/cpp/c10d/ProcessGroupGlooTest.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #ifndef _WIN32
2 #include <signal.h>
3 #include <sys/wait.h>
4 #include <unistd.h>
5 #endif
6 
7 #include <sys/types.h>
8 
9 #include <condition_variable>
10 #include <iostream>
11 #include <mutex>
12 #include <sstream>
13 #include <thread>
14 
15 #include <gtest/gtest.h>
16 #include <torch/csrc/autograd/profiler.h>
17 #include <torch/cuda.h>
18 
19 #include <c10/util/irange.h>
20 #include <torch/csrc/distributed/c10d/FileStore.hpp>
21 #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
22 #include "TestUtils.hpp"
23 
24 using namespace c10d::test;
25 using namespace torch::autograd::profiler;
26 
27 constexpr auto kSendDelay = std::chrono::milliseconds(100);
28 constexpr auto kWaitTimeout = std::chrono::milliseconds(1);
29 
30 #ifndef _WIN32
31 class SignalTest {
32  public:
SignalTest(const std::string & path)33   SignalTest(const std::string& path) : path_(path) {}
34 
~SignalTest()35   ~SignalTest() {
36     if (arm_.joinable()) {
37       arm_.join();
38     }
39   }
40 
41   // Arms test to send signal to PID when the semaphore unlocks. This
42   // happens as soon as the first collective completes successfully.
arm(int pid,int signal)43   void arm(int pid, int signal) {
44     arm_ = std::thread([=] {
45       sem_.wait();
46       kill(pid, signal);
47     });
48   }
49 
run(int rank,int size)50   c10::intrusive_ptr<::c10d::Work> run(int rank, int size) {
51     auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
52 
53     auto options = ::c10d::ProcessGroupGloo::Options::create();
54     // Set a timeout that is small enough to make this test run fast, but also
55     // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
56     options->timeout = std::chrono::milliseconds(1000);
57     options->devices.push_back(
58         ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
59 
60     ::c10d::ProcessGroupGloo pg(store, rank, size, options);
61 
62     // Initialize tensor list
63     std::vector<at::Tensor> tensors = {
64         at::ones({16, 16}),
65     };
66 
67     // Loop until an exception happens
68     c10::intrusive_ptr<::c10d::Work> work;
69     while (true) {
70       work = pg.allreduce(tensors);
71       try {
72         work->wait();
73       } catch (const std::exception& e) {
74         break;
75       }
76       sem_.post();
77     }
78 
79     return work;
80   }
81 
82  protected:
83   std::string path_;
84   std::thread arm_;
85   Semaphore sem_;
86 };
87 
testSignal(const std::string & path,int signal)88 c10::intrusive_ptr<::c10d::Work> testSignal(
89     const std::string& path,
90     int signal) {
91   Fork fork;
92   if (fork.isChild()) {
93     SignalTest test(path);
94     test.run(1, 2);
95     exit(1);
96   }
97 
98   SignalTest test(path);
99   test.arm(fork.pid, signal);
100   return test.run(0, 2);
101 }
102 #endif
103 
104 class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo {
105  public:
ProcessGroupGlooDelayed(const c10::intrusive_ptr<::c10d::Store> & store,int rank,int size,c10::intrusive_ptr<Options> options)106   ProcessGroupGlooDelayed(
107       const c10::intrusive_ptr<::c10d::Store>& store,
108       int rank,
109       int size,
110       c10::intrusive_ptr<Options> options)
111       : ProcessGroupGloo(store, rank, size, options) {}
112 
send(std::vector<at::Tensor> & tensors,int dstRank,int tag)113   c10::intrusive_ptr<::c10d::Work> send(
114       std::vector<at::Tensor>& tensors,
115       int dstRank,
116       int tag) override {
117     std::this_thread::sleep_for(kSendDelay);
118     return ::c10d::ProcessGroupGloo::send(tensors, dstRank, tag);
119   }
120 };
121 
122 class CollectiveTest {
123  public:
initialize(const std::string & path,int num,bool delayed=false)124   static std::vector<CollectiveTest> initialize(
125       const std::string& path,
126       int num,
127       bool delayed = false) {
128     std::vector<CollectiveTest> tests;
129     for (C10_UNUSED const auto i : c10::irange(num)) {
130       tests.emplace_back(CollectiveTest(path));
131     }
132 
133     std::vector<std::thread> threads;
134     for (const auto i : c10::irange(num)) {
135       threads.emplace_back(std::thread(
136           [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }));
137     }
138     for (auto& thread : threads) {
139       thread.join();
140     }
141 
142     return tests;
143   }
144 
CollectiveTest(std::string path)145   CollectiveTest(std::string path) : path_(std::move(path)) {}
146 
CollectiveTest(CollectiveTest && other)147   CollectiveTest(CollectiveTest&& other) {
148     path_ = std::move(other.path_);
149     pg_ = std::move(other.pg_);
150   }
151 
getProcessGroup()152   ::c10d::ProcessGroupGloo& getProcessGroup() {
153     return *pg_;
154   }
155 
start(int rank,int size,bool delayed)156   void start(int rank, int size, bool delayed) {
157     auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
158 
159     // Set a timeout that is small enough to make this test run fast, but also
160     // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
161     auto options = ::c10d::ProcessGroupGloo::Options::create();
162     options->timeout = std::chrono::milliseconds(1000);
163     options->devices.push_back(
164         ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
165 
166     if (!delayed) {
167       pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
168           new ::c10d::ProcessGroupGloo(store, rank, size, options));
169     } else {
170       pg_ = std::unique_ptr<ProcessGroupGlooDelayed>(
171           new ProcessGroupGlooDelayed(store, rank, size, options));
172     }
173   }
174 
175  protected:
176   std::string path_;
177   std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
178 };
179 
copyTensors(const std::vector<std::vector<at::Tensor>> & inputs)180 std::vector<std::vector<at::Tensor>> copyTensors(
181     const std::vector<std::vector<at::Tensor>>& inputs) {
182   std::vector<std::vector<at::Tensor>> outputs(inputs.size());
183   for (const auto i : c10::irange(inputs.size())) {
184     const auto& input = inputs[i];
185     std::vector<at::Tensor> output(input.size());
186     for (const auto j : c10::irange(input.size())) {
187       output[j] = input[j].cpu();
188     }
189     outputs[i] = output;
190   }
191   return outputs;
192 }
193 
waitWork(std::vector<c10::intrusive_ptr<c10d::Work>> works)194 std::vector<std::vector<at::Tensor>> waitWork(
195     std::vector<c10::intrusive_ptr<c10d::Work>> works) {
196   std::vector<std::vector<at::Tensor>> outputTensors;
197   for (auto& work : works) {
198     try {
199       work->wait();
200     } catch (const std::exception& ex) {
201       LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
202     }
203     outputTensors.emplace_back(work->result());
204   }
205   return copyTensors(outputTensors);
206 }
207 
waitFuture(std::vector<c10::intrusive_ptr<c10d::Work>> works)208 std::vector<std::vector<at::Tensor>> waitFuture(
209     std::vector<c10::intrusive_ptr<c10d::Work>> works) {
210   std::vector<std::vector<at::Tensor>> outputTensors;
211   for (auto& work : works) {
212     auto fut = work->getFuture();
213     try {
214       fut->wait();
215     } catch (const std::exception& ex) {
216       LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
217     }
218     auto result = fut->value();
219     if (result.isNone()) {
220       outputTensors.emplace_back();
221     } else if (result.isTensorList()) {
222       outputTensors.emplace_back(result.toTensorVector());
223     } else {
224       TORCH_CHECK(false, "future result should be tensor list or none");
225     }
226   }
227   return copyTensors(outputTensors);
228 }
229 
checkProfiledEvents(const thread_event_lists & event_lists,const char * expected_profile_str,int expected_count,std::vector<std::vector<int64_t>> expected_shapes,bool verify_shapes=true)230 void checkProfiledEvents(
231     const thread_event_lists& event_lists,
232     const char* expected_profile_str,
233     int expected_count,
234     std::vector<std::vector<int64_t>> expected_shapes,
235     bool verify_shapes = true) {
236   if (verify_shapes) {
237     EXPECT_EQ(expected_count, expected_shapes.size());
238   }
239   std::vector<bool> matched_shapes(expected_count);
240   for (const auto& li : event_lists) {
241     for (const auto& evt : li) {
242       auto match = !strcmp(evt.name(), expected_profile_str);
243       if (verify_shapes && match) {
244         auto shapesVec = evt.shapes();
245         for (const auto i : c10::irange(expected_count)) {
246           // Assumptions: no two expected shapes are the same
247           if (shapesVec[0] == expected_shapes[i]) {
248             matched_shapes[i] = true;
249           }
250         }
251       }
252     }
253   }
254   if (verify_shapes) {
255     for (bool match : matched_shapes) {
256       EXPECT_TRUE(match);
257     }
258   }
259 }
260 
testAllreduce(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)261 void testAllreduce(
262     const std::string& path,
263     const at::DeviceType b,
264     const at::ScalarType dtype = at::kFloat) {
265   const auto size = 4;
266   auto tests = CollectiveTest::initialize(path, size);
267 
268   // Generate inputs
269   std::vector<std::vector<at::Tensor>> inputs(size);
270   std::vector<std::vector<int64_t>> allShapes;
271   std::vector<int64_t> shapes = {16, 16};
272   for (const auto i : c10::irange(size)) {
273     auto tensor = at::ones(shapes, at::dtype(dtype).device(b)) * i;
274     std::vector<int64_t> shapesVec = shapes;
275     allShapes.emplace_back(std::move(shapesVec));
276     inputs[i] = std::vector<at::Tensor>({tensor});
277   }
278 
279   // Kick off work
280   std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
281   const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
282   enableProfilerLegacy(ProfilerConfig(
283       ProfilerState::CPU, /* report_input_shapes */ true, false));
284   for (const auto i : c10::irange(size)) {
285     work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
286   }
287   // Wait for work to complete
288   auto outputs = waitFuture(work);
289 
290   auto event_lists = disableProfilerLegacy();
291   checkProfiledEvents(
292       std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
293 
294   // Verify outputs
295   const auto expected = (size * (size - 1)) / 2;
296   for (const auto i : c10::irange(size)) {
297     auto tensor = outputs[i][0].to(at::kFloat);
298     auto data = tensor.data_ptr<float>();
299     for (const auto j : c10::irange(tensor.numel())) {
300       EXPECT_EQ(data[j], expected);
301     }
302   }
303 }
304 
305 // UsingWorkAPI tests are to make sure we still properly support work API.
306 // This should go away as we deprecate it.
testAllreduceUsingWorkAPI(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)307 void testAllreduceUsingWorkAPI(
308     const std::string& path,
309     const at::DeviceType b,
310     const at::ScalarType dtype = at::kFloat) {
311   const auto size = 4;
312   auto tests = CollectiveTest::initialize(path, size);
313 
314   // Generate inputs
315   std::vector<std::vector<at::Tensor>> inputs(size);
316   std::vector<std::vector<int64_t>> allShapes;
317   std::vector<int64_t> shapes = {16, 16};
318   for (const auto i : c10::irange(size)) {
319     auto tensor = at::ones(shapes, at::dtype(dtype).device(b)) * i;
320     std::vector<int64_t> shapesVec = shapes;
321     allShapes.emplace_back(std::move(shapesVec));
322     inputs[i] = std::vector<at::Tensor>({tensor});
323   }
324 
325   // Kick off work
326   std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
327   const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
328   enableProfilerLegacy(ProfilerConfig(
329       ProfilerState::CPU, /* report_input_shapes */ true, false));
330   for (const auto i : c10::irange(size)) {
331     work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
332   }
333   // Wait for work to complete
334   auto outputs = waitWork(work);
335 
336   auto event_lists = disableProfilerLegacy();
337   checkProfiledEvents(
338       std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
339 
340   // Verify outputs
341   const auto expected = (size * (size - 1)) / 2;
342   for (const auto i : c10::irange(size)) {
343     auto tensor = outputs[i][0].to(at::kFloat);
344     auto data = tensor.data_ptr<float>();
345     for (const auto j : c10::irange(tensor.numel())) {
346       EXPECT_EQ(data[j], expected);
347     }
348   }
349 }
350 
testBroadcast(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)351 void testBroadcast(
352     const std::string& path,
353     const at::DeviceType b,
354     const at::ScalarType dtype = at::kFloat) {
355   const auto size = 2;
356   const auto stride = 2;
357   auto tests = CollectiveTest::initialize(path, size);
358 
359   std::vector<std::vector<at::Tensor>> inputs(size);
360   std::vector<int64_t> shapes = {16, 16};
361   // Try every permutation of root rank and root tensor
362   for (const auto i : c10::irange(size)) {
363     for (const auto j : c10::irange(stride)) {
364       std::vector<std::vector<int64_t>> allShapes;
365       // Initialize inputs
366       for (const auto k : c10::irange(size)) {
367         std::vector<int64_t> shapesVec = shapes;
368         allShapes.emplace_back(std::move(shapesVec));
369         inputs[k].resize(stride);
370         // This won't work if we ever support sparse CUDA
371         at::OptionalDeviceGuard deviceGuard;
372         for (const auto l : c10::irange(stride)) {
373           if (b == at::DeviceType::CUDA) {
374             deviceGuard.reset_device(at::Device(at::kCUDA, l));
375           }
376           inputs[k][l] =
377               at::ones(shapes, at::dtype(dtype).device(b)) * (k * stride + l);
378         }
379       }
380 
381       ::c10d::BroadcastOptions options;
382       options.rootRank = i;
383       options.rootTensor = j;
384 
385       // Kick off work
386       const char* GLOO_BROADCAST_STR = "gloo:broadcast";
387       enableProfilerLegacy(ProfilerConfig(
388           ProfilerState::CPU, /* report_input_shapes */ true, false));
389       std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
390 
391       for (const auto i : c10::irange(size)) {
392         work[i] = tests[i].getProcessGroup().broadcast(inputs[i], options);
393       }
394 
395       // Wait for work to complete
396       auto outputs = waitFuture(work);
397 
398       auto event_lists = disableProfilerLegacy();
399       checkProfiledEvents(
400           std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes);
401 
402       // Verify outputs
403       const auto expected = (i * stride + j);
404       for (const auto k : c10::irange(size)) {
405         for (const auto l : c10::irange(stride)) {
406           auto tensor = outputs[k][l].to(at::kFloat);
407           auto data = tensor.data_ptr<float>();
408           for (const auto n : c10::irange(tensor.numel())) {
409             EXPECT_EQ(data[n], expected);
410           }
411         }
412       }
413     }
414   }
415 }
416 
testAlltoall(const std::string & path,const at::DeviceType b)417 void testAlltoall(const std::string& path, const at::DeviceType b) {
418   const auto size = 4;
419   auto tests = CollectiveTest::initialize(path, size);
420 
421   // Generate inputs
422   std::vector<at::Tensor> inputs(size);
423   std::vector<std::vector<int32_t>> blobs = {
424       {0, 1, 2, 3, 4, 5},
425       {10, 11, 12, 13, 14, 15, 16, 17, 18},
426       {20, 21, 22, 23, 24},
427       {30, 31, 32, 33, 34, 35, 36},
428   };
429   for (const auto rank : c10::irange(size)) {
430     const std::vector<int32_t>& blob = blobs[rank];
431     inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b);
432   }
433 
434   // Allocate outputs
435   std::vector<at::Tensor> outputs(size);
436   std::vector<int> outputLengths = {9, 7, 6, 5};
437   for (const auto rank : c10::irange(size)) {
438     outputs[rank] =
439         at::empty(outputLengths[rank], c10::TensorOptions(at::kInt).device(b));
440   }
441 
442   // Generate splits
443   std::vector<std::vector<int64_t>> inputSplits = {
444       {2, 2, 1, 1},
445       {3, 2, 2, 2},
446       {2, 1, 1, 1},
447       {2, 2, 2, 1},
448   };
449   std::vector<std::vector<int64_t>> outputSplits = {
450       {2, 3, 2, 2},
451       {2, 2, 1, 2},
452       {1, 2, 1, 2},
453       {1, 2, 1, 1},
454   };
455 
456   // Kick off work
457   std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
458   const char* GLOO_A2A_STR = "gloo:all_to_all";
459   std::vector<std::vector<int64_t>> allShapes;
460   for (const auto& vec : inputSplits) {
461     // Due to concatenation of tensors, shape will actually be the sum
462     int64_t sum = 0;
463     for (const auto& s : vec) {
464       sum += s;
465     }
466     allShapes.push_back({sum});
467   }
468   enableProfilerLegacy(ProfilerConfig(
469       ProfilerState::CPU, /* report_input_shapes */ true, false));
470   for (const auto rank : c10::irange(size)) {
471     work[rank] = tests[rank].getProcessGroup().alltoall_base(
472         outputs[rank], inputs[rank], outputSplits[rank], inputSplits[rank]);
473   }
474 
475   // Wait for work to complete
476   for (const auto i : c10::irange(size)) {
477     work[i]->wait();
478   }
479 
480   auto event_lists = disableProfilerLegacy();
481   checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes);
482   // Verify outputs
483   std::vector<std::vector<int32_t>> expected = {
484       {0, 1, 10, 11, 12, 20, 21, 30, 31},
485       {2, 3, 13, 14, 22, 32, 33},
486       {4, 15, 16, 23, 34, 35},
487       {5, 17, 18, 24, 36},
488   };
489   for (const auto rank : c10::irange(size)) {
490     at::Tensor tensor = outputs[rank].cpu();
491     EXPECT_EQ(tensor.numel(), expected[rank].size());
492     auto data = tensor.data_ptr<int32_t>();
493     for (const auto j : c10::irange(tensor.numel())) {
494       EXPECT_EQ(data[j], expected[rank][j]);
495     }
496   }
497 }
498 
testBarrier(const std::string & path)499 void testBarrier(const std::string& path) {
500   const auto size = 2;
501   auto tests = CollectiveTest::initialize(path, size);
502 
503   // Kick off work
504   enableProfilerLegacy(ProfilerConfig(
505       ProfilerState::CPU, /* report_input_shapes */ true, false));
506   std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
507   for (const auto i : c10::irange(size)) {
508     work[i] = tests[i].getProcessGroup().barrier();
509   }
510 
511   // Wait for work to complete
512   waitFuture(work);
513 
514   auto event_lists = disableProfilerLegacy();
515   const char* GLOO_STR = "gloo:barrier";
516   std::vector<std::vector<int64_t>> allShapes;
517   // Barrier does not use tensors, so skip shape checking.
518   checkProfiledEvents(
519       std::move(event_lists),
520       GLOO_STR,
521       size,
522       allShapes,
523       /* verify_shapes */ false);
524 }
525 
testMonitoredBarrier(const std::string & path)526 void testMonitoredBarrier(const std::string& path) {
527   const auto size = 2;
528   auto tests = CollectiveTest::initialize(path, size);
529   // Non-failure case: all ranks pass the blocking monitored barrier.
530   auto runMonitoredBarrier = [&](int i) {
531     tests[i].getProcessGroup().monitoredBarrier();
532   };
533   std::vector<std::thread> threads;
534   threads.reserve(size);
535   for (const auto r : c10::irange(size)) {
536     threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); }));
537   }
538   for (auto& t : threads) {
539     t.join();
540   }
541   // Failure case: Only rank 0 calls into monitored barrier, should result in
542   // error
543   auto runMonitoredBarrierWithException = [&](int i) {
544     if (i != 0) {
545       return;
546     }
547 
548     try {
549       tests[i].getProcessGroup().monitoredBarrier();
550       FAIL() << "Exception should have been thrown.";
551     } catch (const std::exception& e) {
552       auto pos = std::string(e.what()).find("Rank 1");
553       EXPECT_TRUE(pos != std::string::npos);
554     }
555   };
556   threads.clear();
557   for (const auto r : c10::irange(size)) {
558     threads.emplace_back(
559         std::thread([=]() { runMonitoredBarrierWithException(r); }));
560   }
561   for (auto& t : threads) {
562     t.join();
563   }
564 }
565 
testSequenceNumInit(const std::string & path)566 void testSequenceNumInit(const std::string& path) {
567   const auto size = 4;
568   auto tests = CollectiveTest::initialize(path, size);
569   for (const auto i : c10::irange(size)) {
570     tests[i].getProcessGroup().setSequenceNumberForGroup();
571   }
572 
573   std::unordered_set<uint64_t> nums;
574   for (const auto i : c10::irange(size)) {
575     auto seqNum = tests[i].getProcessGroup().getSequenceNumberForGroup();
576     nums.insert(seqNum);
577   }
578   EXPECT_EQ(nums.size(), 1);
579 }
580 
testWaitDelay(const std::string & path)581 void testWaitDelay(const std::string& path) {
582   const auto size = 2;
583   auto tests = CollectiveTest::initialize(path, size, /* delay */ true);
584 
585   constexpr uint64_t tag = 0x1337;
586   // test that waiting for work to be sent can be aborted successfully.
587   auto selfRank = 0;
588   auto dstRank = 1;
589   std::vector<at::Tensor> tensors = {
590       at::ones({16, 16}),
591   };
592   auto& pg = tests[selfRank].getProcessGroup();
593   auto sendWork = pg.send(tensors, dstRank, tag);
594   EXPECT_THROW(sendWork->wait(kWaitTimeout), std::exception);
595 }
596 
testSend(const std::string & path)597 void testSend(const std::string& path) {
598   const auto size = 2;
599   auto tests = CollectiveTest::initialize(path, size);
600 
601   constexpr uint64_t tag = 0x1337;
602   // test that waiting for work to be sent can be aborted successfully.
603   auto selfRank = 0;
604   auto dstRank = 1;
605   std::vector<int64_t> shapes{16, 16};
606   std::vector<std::vector<int64_t>> allShapes;
607   allShapes.push_back(shapes);
608   std::vector<at::Tensor> tensors = {
609       at::ones(shapes),
610   };
611   auto& pg = tests[selfRank].getProcessGroup();
612   const char* GLOO_SEND_STR = "gloo:send";
613   enableProfilerLegacy(ProfilerConfig(
614       ProfilerState::CPU, /* report_input_shapes */ true, false));
615   auto sendWork = pg.send(tensors, dstRank, tag);
616   bool sendCompleted;
617   std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); });
618   sendWork->abort();
619   // Block until the sendWork gets successfully aborted
620   waitSendThreadAbort.join();
621   EXPECT_FALSE(sendCompleted);
622   auto event_lists = disableProfilerLegacy();
623   checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes);
624 
625   // Now create a separate sender thread to ensure that future waitsends can
626   // complete successfully.
627 
628   // Helper receiver to simulate a real recv/send pair
629   std::thread recvThread([&]() {
630     auto selfRank = 1;
631     auto srcRank = 0;
632     auto& pg = tests[selfRank].getProcessGroup();
633     std::vector<at::Tensor> tensors = {
634         at::ones({16, 16}),
635     };
636 
637     auto recvWork = pg.recv(tensors, srcRank, tag);
638     recvWork->wait();
639   });
640 
641   // Sender thread
642   std::thread sendThread([&]() { sendCompleted = sendWork->wait(); });
643   sendThread.join();
644   recvThread.join();
645   EXPECT_TRUE(sendCompleted);
646 }
647 
testRecv(const std::string & path)648 void testRecv(const std::string& path) {
649   const auto size = 2;
650   auto tests = CollectiveTest::initialize(path, size);
651   constexpr uint64_t tag = 0x1337;
652   // test that waiting for work to be received can be aborted successfully.
653   auto selfRank = 0;
654   auto srcRank = 1;
655   std::vector<int64_t> shapes = {16, 16};
656   std::vector<std::vector<int64_t>> allShapes;
657   allShapes.push_back(shapes);
658   std::vector<at::Tensor> tensors = {
659       at::ones(shapes),
660   };
661   const char* GLOO_RECV_STR = "gloo:recv";
662   auto& pg = tests[selfRank].getProcessGroup();
663   enableProfilerLegacy(ProfilerConfig(
664       ProfilerState::CPU, /* report_input_shapes */ true, false));
665   auto recvWork = pg.recv(tensors, srcRank, tag);
666   bool recvCompleted;
667   std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); });
668   recvWork->abort();
669   // Block until the first recv gets successfully aborted
670   waitRecvThreadAbort.join();
671   EXPECT_FALSE(recvCompleted);
672   auto event_lists = disableProfilerLegacy();
673   checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes);
674 
675   // Now create a separate receiver thread to ensure that future waits can
676   // complete successfully.
677 
678   // Helper sender thread to simulate a real recv/send pair.
679   std::thread senderThread([&]() {
680     auto selfRank = 1;
681     auto destRank = 0;
682 
683     auto& pg = tests[selfRank].getProcessGroup();
684 
685     std::vector<at::Tensor> tensors = {
686         at::ones({16, 16}),
687     };
688     auto sendWork = pg.send(tensors, destRank, tag);
689     sendWork->wait();
690   });
691   // Receiver thread.
692   std::thread receiverThread([&]() { recvCompleted = recvWork->wait(); });
693   senderThread.join();
694   receiverThread.join();
695   EXPECT_TRUE(recvCompleted);
696 }
697 
testStoreSetGet(const std::string & path)698 void testStoreSetGet(const std::string& path) {
699   const auto size = 2;
700   auto tests = CollectiveTest::initialize(path, size);
701   // test that get() gets the same value as the one that was set()
702   std::vector<uint8_t> testVector = {1, 1, 1, 1};
703   // Cast to ProcessGroupGloo::GlooStore to test specific GlooStore APIs.
704   auto rank_0_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
705       tests[0].getProcessGroup()._getStore().get());
706   auto rank_1_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
707       tests[1].getProcessGroup()._getStore().get());
708 
709   rank_0_glooStore->setUint("testKey", testVector);
710   auto value = rank_1_glooStore->getUint("testKey");
711   EXPECT_TRUE(value == testVector);
712 }
713 
714 #ifndef _WIN32
TEST(ProcessGroupGlooTest,testSIGSTOPException)715 TEST(ProcessGroupGlooTest, testSIGSTOPException) {
716   // test SIGSTOP
717   // Fork() and TSAN don't play well together, so skip the test if we're testing
718   // with TSAN.
719   if (isTSANEnabled()) {
720     LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
721     return;
722   }
723 
724   TemporaryFile file;
725   auto work = testSignal(file.path, SIGSTOP);
726   EXPECT_FALSE(work->isSuccess());
727   EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
728 }
729 
TEST(ProcessGroupGlooTest,testSIGKILLException)730 TEST(ProcessGroupGlooTest, testSIGKILLException) {
731   // test SIGKILL
732   // Fork() and TSAN don't play well together, so skip the test if we're testing
733   // with TSAN.
734   if (isTSANEnabled()) {
735     LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
736     return;
737   }
738 
739   TemporaryFile file;
740   auto work = testSignal(file.path, SIGKILL);
741   EXPECT_FALSE(work->isSuccess());
742   EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
743 }
744 #endif
745 
TEST(ProcessGroupGlooTest,testAllReduceCPU)746 TEST(ProcessGroupGlooTest, testAllReduceCPU) {
747   {
748     TemporaryFile file;
749     testAllreduce(file.path, at::DeviceType::CPU);
750     testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU);
751   }
752 }
753 
TEST(ProcessGroupGlooTest,testAllReduceBfloatCPU)754 TEST(ProcessGroupGlooTest, testAllReduceBfloatCPU) {
755   {
756     TemporaryFile file;
757     testAllreduce(file.path, at::DeviceType::CPU, at::kBFloat16);
758     testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU);
759   }
760 }
761 
TEST(ProcessGroupGlooTest,testBroadcastCPU)762 TEST(ProcessGroupGlooTest, testBroadcastCPU) {
763   {
764     TemporaryFile file;
765     testBroadcast(file.path, at::DeviceType::CPU);
766   }
767 }
768 
TEST(ProcessGroupGlooTest,testBroadcastBfloatCPU)769 TEST(ProcessGroupGlooTest, testBroadcastBfloatCPU) {
770   {
771     TemporaryFile file;
772     testBroadcast(file.path, at::DeviceType::CPU, at::kBFloat16);
773   }
774 }
775 
TEST(ProcessGroupGlooTest,testAllToAllCPU)776 TEST(ProcessGroupGlooTest, testAllToAllCPU) {
777   {
778     TemporaryFile file;
779     testAlltoall(file.path, at::DeviceType::CPU);
780   }
781 }
782 
TEST(ProcessGroupGlooTest,testBarrier)783 TEST(ProcessGroupGlooTest, testBarrier) {
784   {
785     TemporaryFile file;
786     testBarrier(file.path);
787   }
788 }
789 
TEST(ProcessGroupGlooTest,testMonitoredBarrier)790 TEST(ProcessGroupGlooTest, testMonitoredBarrier) {
791   TemporaryFile file;
792   testMonitoredBarrier(file.path);
793 }
794 
TEST(ProcessGroupGlooTest,testSequenceNumInit)795 TEST(ProcessGroupGlooTest, testSequenceNumInit) {
796   TemporaryFile file;
797   testSequenceNumInit(file.path);
798 }
799 
TEST(ProcessGroupGlooTest,testSend)800 TEST(ProcessGroupGlooTest, testSend) {
801   {
802     TemporaryFile file;
803     testSend(file.path);
804   }
805 }
806 
TEST(ProcessGroupGlooTest,testRecv)807 TEST(ProcessGroupGlooTest, testRecv) {
808   {
809     TemporaryFile file;
810     testRecv(file.path);
811   }
812 }
813 
TEST(ProcessGroupGlooTest,testStoreSetGet)814 TEST(ProcessGroupGlooTest, testStoreSetGet) {
815   TemporaryFile file;
816   testStoreSetGet(file.path);
817 }
818 
TEST(ProcessGroupGlooTest,testWaitDelay)819 TEST(ProcessGroupGlooTest, testWaitDelay) {
820   {
821     TemporaryFile file;
822     testWaitDelay(file.path);
823   }
824 }
825 
826 #ifdef USE_CUDA
827 // CUDA-only tests
TEST(ProcessGroupGlooTest,testAllReduceCUDA)828 TEST(ProcessGroupGlooTest, testAllReduceCUDA) {
829   if (!torch::cuda::is_available()) {
830     LOG(INFO) << "Skipping test - requires CUDA";
831     return;
832   }
833   {
834     TemporaryFile file;
835     testAllreduce(file.path, at::DeviceType::CUDA);
836     testAllreduceUsingWorkAPI(file.path, at::DeviceType::CUDA);
837   }
838 }
839 
TEST(ProcessGroupGlooTest,testBroadcastCUDA)840 TEST(ProcessGroupGlooTest, testBroadcastCUDA) {
841   if (torch::cuda::device_count() <= 1) {
842     LOG(INFO) << "Skipping test - requires multiple CUDA devices";
843     return;
844   }
845   {
846     TemporaryFile file;
847     testBroadcast(file.path, at::DeviceType::CUDA);
848   }
849 }
850 
TEST(ProcessGroupGlooTest,testAlltoallCUDA)851 TEST(ProcessGroupGlooTest, testAlltoallCUDA) {
852   if (!torch::cuda::is_available()) {
853     LOG(INFO) << "Skipping test - requires CUDA";
854     return;
855   }
856   {
857     TemporaryFile file;
858     testAlltoall(file.path, at::DeviceType::CUDA);
859   }
860 }
861 
TEST(ProcessGroupGlooTest,testBackendName)862 TEST(ProcessGroupGlooTest, testBackendName) {
863   {
864     TemporaryFile file;
865     const auto size = 2;
866     auto tests = CollectiveTest::initialize(file.path, size);
867 
868     for (const auto i : c10::irange(size)) {
869       EXPECT_EQ(
870           tests[i].getProcessGroup().getBackendName(),
871           std::string(c10d::GLOO_BACKEND_NAME));
872     }
873   }
874 }
875 
876 #endif
877