1 #ifndef _WIN32
2 #include <signal.h>
3 #include <sys/wait.h>
4 #include <unistd.h>
5 #endif
6
7 #include <sys/types.h>
8
9 #include <condition_variable>
10 #include <iostream>
11 #include <mutex>
12 #include <sstream>
13 #include <thread>
14
15 #include <gtest/gtest.h>
16 #include <torch/csrc/autograd/profiler.h>
17 #include <torch/cuda.h>
18
19 #include <c10/util/irange.h>
20 #include <torch/csrc/distributed/c10d/FileStore.hpp>
21 #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
22 #include "TestUtils.hpp"
23
24 using namespace c10d::test;
25 using namespace torch::autograd::profiler;
26
27 constexpr auto kSendDelay = std::chrono::milliseconds(100);
28 constexpr auto kWaitTimeout = std::chrono::milliseconds(1);
29
30 #ifndef _WIN32
31 class SignalTest {
32 public:
SignalTest(const std::string & path)33 SignalTest(const std::string& path) : path_(path) {}
34
~SignalTest()35 ~SignalTest() {
36 if (arm_.joinable()) {
37 arm_.join();
38 }
39 }
40
41 // Arms test to send signal to PID when the semaphore unlocks. This
42 // happens as soon as the first collective completes successfully.
arm(int pid,int signal)43 void arm(int pid, int signal) {
44 arm_ = std::thread([=] {
45 sem_.wait();
46 kill(pid, signal);
47 });
48 }
49
run(int rank,int size)50 c10::intrusive_ptr<::c10d::Work> run(int rank, int size) {
51 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
52
53 auto options = ::c10d::ProcessGroupGloo::Options::create();
54 // Set a timeout that is small enough to make this test run fast, but also
55 // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
56 options->timeout = std::chrono::milliseconds(1000);
57 options->devices.push_back(
58 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
59
60 ::c10d::ProcessGroupGloo pg(store, rank, size, options);
61
62 // Initialize tensor list
63 std::vector<at::Tensor> tensors = {
64 at::ones({16, 16}),
65 };
66
67 // Loop until an exception happens
68 c10::intrusive_ptr<::c10d::Work> work;
69 while (true) {
70 work = pg.allreduce(tensors);
71 try {
72 work->wait();
73 } catch (const std::exception& e) {
74 break;
75 }
76 sem_.post();
77 }
78
79 return work;
80 }
81
82 protected:
83 std::string path_;
84 std::thread arm_;
85 Semaphore sem_;
86 };
87
testSignal(const std::string & path,int signal)88 c10::intrusive_ptr<::c10d::Work> testSignal(
89 const std::string& path,
90 int signal) {
91 Fork fork;
92 if (fork.isChild()) {
93 SignalTest test(path);
94 test.run(1, 2);
95 exit(1);
96 }
97
98 SignalTest test(path);
99 test.arm(fork.pid, signal);
100 return test.run(0, 2);
101 }
102 #endif
103
104 class ProcessGroupGlooDelayed : public ::c10d::ProcessGroupGloo {
105 public:
ProcessGroupGlooDelayed(const c10::intrusive_ptr<::c10d::Store> & store,int rank,int size,c10::intrusive_ptr<Options> options)106 ProcessGroupGlooDelayed(
107 const c10::intrusive_ptr<::c10d::Store>& store,
108 int rank,
109 int size,
110 c10::intrusive_ptr<Options> options)
111 : ProcessGroupGloo(store, rank, size, options) {}
112
send(std::vector<at::Tensor> & tensors,int dstRank,int tag)113 c10::intrusive_ptr<::c10d::Work> send(
114 std::vector<at::Tensor>& tensors,
115 int dstRank,
116 int tag) override {
117 std::this_thread::sleep_for(kSendDelay);
118 return ::c10d::ProcessGroupGloo::send(tensors, dstRank, tag);
119 }
120 };
121
122 class CollectiveTest {
123 public:
initialize(const std::string & path,int num,bool delayed=false)124 static std::vector<CollectiveTest> initialize(
125 const std::string& path,
126 int num,
127 bool delayed = false) {
128 std::vector<CollectiveTest> tests;
129 for (C10_UNUSED const auto i : c10::irange(num)) {
130 tests.emplace_back(CollectiveTest(path));
131 }
132
133 std::vector<std::thread> threads;
134 for (const auto i : c10::irange(num)) {
135 threads.emplace_back(std::thread(
136 [i, &tests, delayed] { tests[i].start(i, tests.size(), delayed); }));
137 }
138 for (auto& thread : threads) {
139 thread.join();
140 }
141
142 return tests;
143 }
144
CollectiveTest(std::string path)145 CollectiveTest(std::string path) : path_(std::move(path)) {}
146
CollectiveTest(CollectiveTest && other)147 CollectiveTest(CollectiveTest&& other) {
148 path_ = std::move(other.path_);
149 pg_ = std::move(other.pg_);
150 }
151
getProcessGroup()152 ::c10d::ProcessGroupGloo& getProcessGroup() {
153 return *pg_;
154 }
155
start(int rank,int size,bool delayed)156 void start(int rank, int size, bool delayed) {
157 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
158
159 // Set a timeout that is small enough to make this test run fast, but also
160 // make sure that we don't get timeouts in the ProcessGroupGloo constructor.
161 auto options = ::c10d::ProcessGroupGloo::Options::create();
162 options->timeout = std::chrono::milliseconds(1000);
163 options->devices.push_back(
164 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
165
166 if (!delayed) {
167 pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
168 new ::c10d::ProcessGroupGloo(store, rank, size, options));
169 } else {
170 pg_ = std::unique_ptr<ProcessGroupGlooDelayed>(
171 new ProcessGroupGlooDelayed(store, rank, size, options));
172 }
173 }
174
175 protected:
176 std::string path_;
177 std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
178 };
179
copyTensors(const std::vector<std::vector<at::Tensor>> & inputs)180 std::vector<std::vector<at::Tensor>> copyTensors(
181 const std::vector<std::vector<at::Tensor>>& inputs) {
182 std::vector<std::vector<at::Tensor>> outputs(inputs.size());
183 for (const auto i : c10::irange(inputs.size())) {
184 const auto& input = inputs[i];
185 std::vector<at::Tensor> output(input.size());
186 for (const auto j : c10::irange(input.size())) {
187 output[j] = input[j].cpu();
188 }
189 outputs[i] = output;
190 }
191 return outputs;
192 }
193
waitWork(std::vector<c10::intrusive_ptr<c10d::Work>> works)194 std::vector<std::vector<at::Tensor>> waitWork(
195 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
196 std::vector<std::vector<at::Tensor>> outputTensors;
197 for (auto& work : works) {
198 try {
199 work->wait();
200 } catch (const std::exception& ex) {
201 LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
202 }
203 outputTensors.emplace_back(work->result());
204 }
205 return copyTensors(outputTensors);
206 }
207
waitFuture(std::vector<c10::intrusive_ptr<c10d::Work>> works)208 std::vector<std::vector<at::Tensor>> waitFuture(
209 std::vector<c10::intrusive_ptr<c10d::Work>> works) {
210 std::vector<std::vector<at::Tensor>> outputTensors;
211 for (auto& work : works) {
212 auto fut = work->getFuture();
213 try {
214 fut->wait();
215 } catch (const std::exception& ex) {
216 LOG(ERROR) << "Exception received: " << ex.what() << std::endl;
217 }
218 auto result = fut->value();
219 if (result.isNone()) {
220 outputTensors.emplace_back();
221 } else if (result.isTensorList()) {
222 outputTensors.emplace_back(result.toTensorVector());
223 } else {
224 TORCH_CHECK(false, "future result should be tensor list or none");
225 }
226 }
227 return copyTensors(outputTensors);
228 }
229
checkProfiledEvents(const thread_event_lists & event_lists,const char * expected_profile_str,int expected_count,std::vector<std::vector<int64_t>> expected_shapes,bool verify_shapes=true)230 void checkProfiledEvents(
231 const thread_event_lists& event_lists,
232 const char* expected_profile_str,
233 int expected_count,
234 std::vector<std::vector<int64_t>> expected_shapes,
235 bool verify_shapes = true) {
236 if (verify_shapes) {
237 EXPECT_EQ(expected_count, expected_shapes.size());
238 }
239 std::vector<bool> matched_shapes(expected_count);
240 for (const auto& li : event_lists) {
241 for (const auto& evt : li) {
242 auto match = !strcmp(evt.name(), expected_profile_str);
243 if (verify_shapes && match) {
244 auto shapesVec = evt.shapes();
245 for (const auto i : c10::irange(expected_count)) {
246 // Assumptions: no two expected shapes are the same
247 if (shapesVec[0] == expected_shapes[i]) {
248 matched_shapes[i] = true;
249 }
250 }
251 }
252 }
253 }
254 if (verify_shapes) {
255 for (bool match : matched_shapes) {
256 EXPECT_TRUE(match);
257 }
258 }
259 }
260
testAllreduce(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)261 void testAllreduce(
262 const std::string& path,
263 const at::DeviceType b,
264 const at::ScalarType dtype = at::kFloat) {
265 const auto size = 4;
266 auto tests = CollectiveTest::initialize(path, size);
267
268 // Generate inputs
269 std::vector<std::vector<at::Tensor>> inputs(size);
270 std::vector<std::vector<int64_t>> allShapes;
271 std::vector<int64_t> shapes = {16, 16};
272 for (const auto i : c10::irange(size)) {
273 auto tensor = at::ones(shapes, at::dtype(dtype).device(b)) * i;
274 std::vector<int64_t> shapesVec = shapes;
275 allShapes.emplace_back(std::move(shapesVec));
276 inputs[i] = std::vector<at::Tensor>({tensor});
277 }
278
279 // Kick off work
280 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
281 const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
282 enableProfilerLegacy(ProfilerConfig(
283 ProfilerState::CPU, /* report_input_shapes */ true, false));
284 for (const auto i : c10::irange(size)) {
285 work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
286 }
287 // Wait for work to complete
288 auto outputs = waitFuture(work);
289
290 auto event_lists = disableProfilerLegacy();
291 checkProfiledEvents(
292 std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
293
294 // Verify outputs
295 const auto expected = (size * (size - 1)) / 2;
296 for (const auto i : c10::irange(size)) {
297 auto tensor = outputs[i][0].to(at::kFloat);
298 auto data = tensor.data_ptr<float>();
299 for (const auto j : c10::irange(tensor.numel())) {
300 EXPECT_EQ(data[j], expected);
301 }
302 }
303 }
304
305 // UsingWorkAPI tests are to make sure we still properly support work API.
306 // This should go away as we deprecate it.
testAllreduceUsingWorkAPI(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)307 void testAllreduceUsingWorkAPI(
308 const std::string& path,
309 const at::DeviceType b,
310 const at::ScalarType dtype = at::kFloat) {
311 const auto size = 4;
312 auto tests = CollectiveTest::initialize(path, size);
313
314 // Generate inputs
315 std::vector<std::vector<at::Tensor>> inputs(size);
316 std::vector<std::vector<int64_t>> allShapes;
317 std::vector<int64_t> shapes = {16, 16};
318 for (const auto i : c10::irange(size)) {
319 auto tensor = at::ones(shapes, at::dtype(dtype).device(b)) * i;
320 std::vector<int64_t> shapesVec = shapes;
321 allShapes.emplace_back(std::move(shapesVec));
322 inputs[i] = std::vector<at::Tensor>({tensor});
323 }
324
325 // Kick off work
326 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
327 const char* GLOO_ALLREDUCE_STR = "gloo:all_reduce";
328 enableProfilerLegacy(ProfilerConfig(
329 ProfilerState::CPU, /* report_input_shapes */ true, false));
330 for (const auto i : c10::irange(size)) {
331 work[i] = tests[i].getProcessGroup().allreduce(inputs[i]);
332 }
333 // Wait for work to complete
334 auto outputs = waitWork(work);
335
336 auto event_lists = disableProfilerLegacy();
337 checkProfiledEvents(
338 std::move(event_lists), GLOO_ALLREDUCE_STR, size, allShapes);
339
340 // Verify outputs
341 const auto expected = (size * (size - 1)) / 2;
342 for (const auto i : c10::irange(size)) {
343 auto tensor = outputs[i][0].to(at::kFloat);
344 auto data = tensor.data_ptr<float>();
345 for (const auto j : c10::irange(tensor.numel())) {
346 EXPECT_EQ(data[j], expected);
347 }
348 }
349 }
350
testBroadcast(const std::string & path,const at::DeviceType b,const at::ScalarType dtype=at::kFloat)351 void testBroadcast(
352 const std::string& path,
353 const at::DeviceType b,
354 const at::ScalarType dtype = at::kFloat) {
355 const auto size = 2;
356 const auto stride = 2;
357 auto tests = CollectiveTest::initialize(path, size);
358
359 std::vector<std::vector<at::Tensor>> inputs(size);
360 std::vector<int64_t> shapes = {16, 16};
361 // Try every permutation of root rank and root tensor
362 for (const auto i : c10::irange(size)) {
363 for (const auto j : c10::irange(stride)) {
364 std::vector<std::vector<int64_t>> allShapes;
365 // Initialize inputs
366 for (const auto k : c10::irange(size)) {
367 std::vector<int64_t> shapesVec = shapes;
368 allShapes.emplace_back(std::move(shapesVec));
369 inputs[k].resize(stride);
370 // This won't work if we ever support sparse CUDA
371 at::OptionalDeviceGuard deviceGuard;
372 for (const auto l : c10::irange(stride)) {
373 if (b == at::DeviceType::CUDA) {
374 deviceGuard.reset_device(at::Device(at::kCUDA, l));
375 }
376 inputs[k][l] =
377 at::ones(shapes, at::dtype(dtype).device(b)) * (k * stride + l);
378 }
379 }
380
381 ::c10d::BroadcastOptions options;
382 options.rootRank = i;
383 options.rootTensor = j;
384
385 // Kick off work
386 const char* GLOO_BROADCAST_STR = "gloo:broadcast";
387 enableProfilerLegacy(ProfilerConfig(
388 ProfilerState::CPU, /* report_input_shapes */ true, false));
389 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
390
391 for (const auto i : c10::irange(size)) {
392 work[i] = tests[i].getProcessGroup().broadcast(inputs[i], options);
393 }
394
395 // Wait for work to complete
396 auto outputs = waitFuture(work);
397
398 auto event_lists = disableProfilerLegacy();
399 checkProfiledEvents(
400 std::move(event_lists), GLOO_BROADCAST_STR, size, allShapes);
401
402 // Verify outputs
403 const auto expected = (i * stride + j);
404 for (const auto k : c10::irange(size)) {
405 for (const auto l : c10::irange(stride)) {
406 auto tensor = outputs[k][l].to(at::kFloat);
407 auto data = tensor.data_ptr<float>();
408 for (const auto n : c10::irange(tensor.numel())) {
409 EXPECT_EQ(data[n], expected);
410 }
411 }
412 }
413 }
414 }
415 }
416
testAlltoall(const std::string & path,const at::DeviceType b)417 void testAlltoall(const std::string& path, const at::DeviceType b) {
418 const auto size = 4;
419 auto tests = CollectiveTest::initialize(path, size);
420
421 // Generate inputs
422 std::vector<at::Tensor> inputs(size);
423 std::vector<std::vector<int32_t>> blobs = {
424 {0, 1, 2, 3, 4, 5},
425 {10, 11, 12, 13, 14, 15, 16, 17, 18},
426 {20, 21, 22, 23, 24},
427 {30, 31, 32, 33, 34, 35, 36},
428 };
429 for (const auto rank : c10::irange(size)) {
430 const std::vector<int32_t>& blob = blobs[rank];
431 inputs[rank] = at::from_blob((int32_t*)(blob.data()), blob.size()).to(b);
432 }
433
434 // Allocate outputs
435 std::vector<at::Tensor> outputs(size);
436 std::vector<int> outputLengths = {9, 7, 6, 5};
437 for (const auto rank : c10::irange(size)) {
438 outputs[rank] =
439 at::empty(outputLengths[rank], c10::TensorOptions(at::kInt).device(b));
440 }
441
442 // Generate splits
443 std::vector<std::vector<int64_t>> inputSplits = {
444 {2, 2, 1, 1},
445 {3, 2, 2, 2},
446 {2, 1, 1, 1},
447 {2, 2, 2, 1},
448 };
449 std::vector<std::vector<int64_t>> outputSplits = {
450 {2, 3, 2, 2},
451 {2, 2, 1, 2},
452 {1, 2, 1, 2},
453 {1, 2, 1, 1},
454 };
455
456 // Kick off work
457 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
458 const char* GLOO_A2A_STR = "gloo:all_to_all";
459 std::vector<std::vector<int64_t>> allShapes;
460 for (const auto& vec : inputSplits) {
461 // Due to concatenation of tensors, shape will actually be the sum
462 int64_t sum = 0;
463 for (const auto& s : vec) {
464 sum += s;
465 }
466 allShapes.push_back({sum});
467 }
468 enableProfilerLegacy(ProfilerConfig(
469 ProfilerState::CPU, /* report_input_shapes */ true, false));
470 for (const auto rank : c10::irange(size)) {
471 work[rank] = tests[rank].getProcessGroup().alltoall_base(
472 outputs[rank], inputs[rank], outputSplits[rank], inputSplits[rank]);
473 }
474
475 // Wait for work to complete
476 for (const auto i : c10::irange(size)) {
477 work[i]->wait();
478 }
479
480 auto event_lists = disableProfilerLegacy();
481 checkProfiledEvents(std::move(event_lists), GLOO_A2A_STR, size, allShapes);
482 // Verify outputs
483 std::vector<std::vector<int32_t>> expected = {
484 {0, 1, 10, 11, 12, 20, 21, 30, 31},
485 {2, 3, 13, 14, 22, 32, 33},
486 {4, 15, 16, 23, 34, 35},
487 {5, 17, 18, 24, 36},
488 };
489 for (const auto rank : c10::irange(size)) {
490 at::Tensor tensor = outputs[rank].cpu();
491 EXPECT_EQ(tensor.numel(), expected[rank].size());
492 auto data = tensor.data_ptr<int32_t>();
493 for (const auto j : c10::irange(tensor.numel())) {
494 EXPECT_EQ(data[j], expected[rank][j]);
495 }
496 }
497 }
498
testBarrier(const std::string & path)499 void testBarrier(const std::string& path) {
500 const auto size = 2;
501 auto tests = CollectiveTest::initialize(path, size);
502
503 // Kick off work
504 enableProfilerLegacy(ProfilerConfig(
505 ProfilerState::CPU, /* report_input_shapes */ true, false));
506 std::vector<c10::intrusive_ptr<::c10d::Work>> work(size);
507 for (const auto i : c10::irange(size)) {
508 work[i] = tests[i].getProcessGroup().barrier();
509 }
510
511 // Wait for work to complete
512 waitFuture(work);
513
514 auto event_lists = disableProfilerLegacy();
515 const char* GLOO_STR = "gloo:barrier";
516 std::vector<std::vector<int64_t>> allShapes;
517 // Barrier does not use tensors, so skip shape checking.
518 checkProfiledEvents(
519 std::move(event_lists),
520 GLOO_STR,
521 size,
522 allShapes,
523 /* verify_shapes */ false);
524 }
525
testMonitoredBarrier(const std::string & path)526 void testMonitoredBarrier(const std::string& path) {
527 const auto size = 2;
528 auto tests = CollectiveTest::initialize(path, size);
529 // Non-failure case: all ranks pass the blocking monitored barrier.
530 auto runMonitoredBarrier = [&](int i) {
531 tests[i].getProcessGroup().monitoredBarrier();
532 };
533 std::vector<std::thread> threads;
534 threads.reserve(size);
535 for (const auto r : c10::irange(size)) {
536 threads.emplace_back(std::thread([=]() { runMonitoredBarrier(r); }));
537 }
538 for (auto& t : threads) {
539 t.join();
540 }
541 // Failure case: Only rank 0 calls into monitored barrier, should result in
542 // error
543 auto runMonitoredBarrierWithException = [&](int i) {
544 if (i != 0) {
545 return;
546 }
547
548 try {
549 tests[i].getProcessGroup().monitoredBarrier();
550 FAIL() << "Exception should have been thrown.";
551 } catch (const std::exception& e) {
552 auto pos = std::string(e.what()).find("Rank 1");
553 EXPECT_TRUE(pos != std::string::npos);
554 }
555 };
556 threads.clear();
557 for (const auto r : c10::irange(size)) {
558 threads.emplace_back(
559 std::thread([=]() { runMonitoredBarrierWithException(r); }));
560 }
561 for (auto& t : threads) {
562 t.join();
563 }
564 }
565
testSequenceNumInit(const std::string & path)566 void testSequenceNumInit(const std::string& path) {
567 const auto size = 4;
568 auto tests = CollectiveTest::initialize(path, size);
569 for (const auto i : c10::irange(size)) {
570 tests[i].getProcessGroup().setSequenceNumberForGroup();
571 }
572
573 std::unordered_set<uint64_t> nums;
574 for (const auto i : c10::irange(size)) {
575 auto seqNum = tests[i].getProcessGroup().getSequenceNumberForGroup();
576 nums.insert(seqNum);
577 }
578 EXPECT_EQ(nums.size(), 1);
579 }
580
testWaitDelay(const std::string & path)581 void testWaitDelay(const std::string& path) {
582 const auto size = 2;
583 auto tests = CollectiveTest::initialize(path, size, /* delay */ true);
584
585 constexpr uint64_t tag = 0x1337;
586 // test that waiting for work to be sent can be aborted successfully.
587 auto selfRank = 0;
588 auto dstRank = 1;
589 std::vector<at::Tensor> tensors = {
590 at::ones({16, 16}),
591 };
592 auto& pg = tests[selfRank].getProcessGroup();
593 auto sendWork = pg.send(tensors, dstRank, tag);
594 EXPECT_THROW(sendWork->wait(kWaitTimeout), std::exception);
595 }
596
testSend(const std::string & path)597 void testSend(const std::string& path) {
598 const auto size = 2;
599 auto tests = CollectiveTest::initialize(path, size);
600
601 constexpr uint64_t tag = 0x1337;
602 // test that waiting for work to be sent can be aborted successfully.
603 auto selfRank = 0;
604 auto dstRank = 1;
605 std::vector<int64_t> shapes{16, 16};
606 std::vector<std::vector<int64_t>> allShapes;
607 allShapes.push_back(shapes);
608 std::vector<at::Tensor> tensors = {
609 at::ones(shapes),
610 };
611 auto& pg = tests[selfRank].getProcessGroup();
612 const char* GLOO_SEND_STR = "gloo:send";
613 enableProfilerLegacy(ProfilerConfig(
614 ProfilerState::CPU, /* report_input_shapes */ true, false));
615 auto sendWork = pg.send(tensors, dstRank, tag);
616 bool sendCompleted;
617 std::thread waitSendThreadAbort([&]() { sendCompleted = sendWork->wait(); });
618 sendWork->abort();
619 // Block until the sendWork gets successfully aborted
620 waitSendThreadAbort.join();
621 EXPECT_FALSE(sendCompleted);
622 auto event_lists = disableProfilerLegacy();
623 checkProfiledEvents(std::move(event_lists), GLOO_SEND_STR, 1, allShapes);
624
625 // Now create a separate sender thread to ensure that future waitsends can
626 // complete successfully.
627
628 // Helper receiver to simulate a real recv/send pair
629 std::thread recvThread([&]() {
630 auto selfRank = 1;
631 auto srcRank = 0;
632 auto& pg = tests[selfRank].getProcessGroup();
633 std::vector<at::Tensor> tensors = {
634 at::ones({16, 16}),
635 };
636
637 auto recvWork = pg.recv(tensors, srcRank, tag);
638 recvWork->wait();
639 });
640
641 // Sender thread
642 std::thread sendThread([&]() { sendCompleted = sendWork->wait(); });
643 sendThread.join();
644 recvThread.join();
645 EXPECT_TRUE(sendCompleted);
646 }
647
testRecv(const std::string & path)648 void testRecv(const std::string& path) {
649 const auto size = 2;
650 auto tests = CollectiveTest::initialize(path, size);
651 constexpr uint64_t tag = 0x1337;
652 // test that waiting for work to be received can be aborted successfully.
653 auto selfRank = 0;
654 auto srcRank = 1;
655 std::vector<int64_t> shapes = {16, 16};
656 std::vector<std::vector<int64_t>> allShapes;
657 allShapes.push_back(shapes);
658 std::vector<at::Tensor> tensors = {
659 at::ones(shapes),
660 };
661 const char* GLOO_RECV_STR = "gloo:recv";
662 auto& pg = tests[selfRank].getProcessGroup();
663 enableProfilerLegacy(ProfilerConfig(
664 ProfilerState::CPU, /* report_input_shapes */ true, false));
665 auto recvWork = pg.recv(tensors, srcRank, tag);
666 bool recvCompleted;
667 std::thread waitRecvThreadAbort([&]() { recvCompleted = recvWork->wait(); });
668 recvWork->abort();
669 // Block until the first recv gets successfully aborted
670 waitRecvThreadAbort.join();
671 EXPECT_FALSE(recvCompleted);
672 auto event_lists = disableProfilerLegacy();
673 checkProfiledEvents(std::move(event_lists), GLOO_RECV_STR, 1, allShapes);
674
675 // Now create a separate receiver thread to ensure that future waits can
676 // complete successfully.
677
678 // Helper sender thread to simulate a real recv/send pair.
679 std::thread senderThread([&]() {
680 auto selfRank = 1;
681 auto destRank = 0;
682
683 auto& pg = tests[selfRank].getProcessGroup();
684
685 std::vector<at::Tensor> tensors = {
686 at::ones({16, 16}),
687 };
688 auto sendWork = pg.send(tensors, destRank, tag);
689 sendWork->wait();
690 });
691 // Receiver thread.
692 std::thread receiverThread([&]() { recvCompleted = recvWork->wait(); });
693 senderThread.join();
694 receiverThread.join();
695 EXPECT_TRUE(recvCompleted);
696 }
697
testStoreSetGet(const std::string & path)698 void testStoreSetGet(const std::string& path) {
699 const auto size = 2;
700 auto tests = CollectiveTest::initialize(path, size);
701 // test that get() gets the same value as the one that was set()
702 std::vector<uint8_t> testVector = {1, 1, 1, 1};
703 // Cast to ProcessGroupGloo::GlooStore to test specific GlooStore APIs.
704 auto rank_0_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
705 tests[0].getProcessGroup()._getStore().get());
706 auto rank_1_glooStore = static_cast<c10d::ProcessGroupGloo::GlooStore*>(
707 tests[1].getProcessGroup()._getStore().get());
708
709 rank_0_glooStore->setUint("testKey", testVector);
710 auto value = rank_1_glooStore->getUint("testKey");
711 EXPECT_TRUE(value == testVector);
712 }
713
714 #ifndef _WIN32
TEST(ProcessGroupGlooTest,testSIGSTOPException)715 TEST(ProcessGroupGlooTest, testSIGSTOPException) {
716 // test SIGSTOP
717 // Fork() and TSAN don't play well together, so skip the test if we're testing
718 // with TSAN.
719 if (isTSANEnabled()) {
720 LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
721 return;
722 }
723
724 TemporaryFile file;
725 auto work = testSignal(file.path, SIGSTOP);
726 EXPECT_FALSE(work->isSuccess());
727 EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
728 }
729
TEST(ProcessGroupGlooTest,testSIGKILLException)730 TEST(ProcessGroupGlooTest, testSIGKILLException) {
731 // test SIGKILL
732 // Fork() and TSAN don't play well together, so skip the test if we're testing
733 // with TSAN.
734 if (isTSANEnabled()) {
735 LOG(INFO) << "Skipping test since Fork() + TSAN is broken";
736 return;
737 }
738
739 TemporaryFile file;
740 auto work = testSignal(file.path, SIGKILL);
741 EXPECT_FALSE(work->isSuccess());
742 EXPECT_THROW(std::rethrow_exception(work->exception()), std::exception);
743 }
744 #endif
745
TEST(ProcessGroupGlooTest,testAllReduceCPU)746 TEST(ProcessGroupGlooTest, testAllReduceCPU) {
747 {
748 TemporaryFile file;
749 testAllreduce(file.path, at::DeviceType::CPU);
750 testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU);
751 }
752 }
753
TEST(ProcessGroupGlooTest,testAllReduceBfloatCPU)754 TEST(ProcessGroupGlooTest, testAllReduceBfloatCPU) {
755 {
756 TemporaryFile file;
757 testAllreduce(file.path, at::DeviceType::CPU, at::kBFloat16);
758 testAllreduceUsingWorkAPI(file.path, at::DeviceType::CPU);
759 }
760 }
761
TEST(ProcessGroupGlooTest,testBroadcastCPU)762 TEST(ProcessGroupGlooTest, testBroadcastCPU) {
763 {
764 TemporaryFile file;
765 testBroadcast(file.path, at::DeviceType::CPU);
766 }
767 }
768
TEST(ProcessGroupGlooTest,testBroadcastBfloatCPU)769 TEST(ProcessGroupGlooTest, testBroadcastBfloatCPU) {
770 {
771 TemporaryFile file;
772 testBroadcast(file.path, at::DeviceType::CPU, at::kBFloat16);
773 }
774 }
775
TEST(ProcessGroupGlooTest,testAllToAllCPU)776 TEST(ProcessGroupGlooTest, testAllToAllCPU) {
777 {
778 TemporaryFile file;
779 testAlltoall(file.path, at::DeviceType::CPU);
780 }
781 }
782
TEST(ProcessGroupGlooTest,testBarrier)783 TEST(ProcessGroupGlooTest, testBarrier) {
784 {
785 TemporaryFile file;
786 testBarrier(file.path);
787 }
788 }
789
TEST(ProcessGroupGlooTest,testMonitoredBarrier)790 TEST(ProcessGroupGlooTest, testMonitoredBarrier) {
791 TemporaryFile file;
792 testMonitoredBarrier(file.path);
793 }
794
TEST(ProcessGroupGlooTest,testSequenceNumInit)795 TEST(ProcessGroupGlooTest, testSequenceNumInit) {
796 TemporaryFile file;
797 testSequenceNumInit(file.path);
798 }
799
TEST(ProcessGroupGlooTest,testSend)800 TEST(ProcessGroupGlooTest, testSend) {
801 {
802 TemporaryFile file;
803 testSend(file.path);
804 }
805 }
806
TEST(ProcessGroupGlooTest,testRecv)807 TEST(ProcessGroupGlooTest, testRecv) {
808 {
809 TemporaryFile file;
810 testRecv(file.path);
811 }
812 }
813
TEST(ProcessGroupGlooTest,testStoreSetGet)814 TEST(ProcessGroupGlooTest, testStoreSetGet) {
815 TemporaryFile file;
816 testStoreSetGet(file.path);
817 }
818
TEST(ProcessGroupGlooTest,testWaitDelay)819 TEST(ProcessGroupGlooTest, testWaitDelay) {
820 {
821 TemporaryFile file;
822 testWaitDelay(file.path);
823 }
824 }
825
826 #ifdef USE_CUDA
827 // CUDA-only tests
TEST(ProcessGroupGlooTest,testAllReduceCUDA)828 TEST(ProcessGroupGlooTest, testAllReduceCUDA) {
829 if (!torch::cuda::is_available()) {
830 LOG(INFO) << "Skipping test - requires CUDA";
831 return;
832 }
833 {
834 TemporaryFile file;
835 testAllreduce(file.path, at::DeviceType::CUDA);
836 testAllreduceUsingWorkAPI(file.path, at::DeviceType::CUDA);
837 }
838 }
839
TEST(ProcessGroupGlooTest,testBroadcastCUDA)840 TEST(ProcessGroupGlooTest, testBroadcastCUDA) {
841 if (torch::cuda::device_count() <= 1) {
842 LOG(INFO) << "Skipping test - requires multiple CUDA devices";
843 return;
844 }
845 {
846 TemporaryFile file;
847 testBroadcast(file.path, at::DeviceType::CUDA);
848 }
849 }
850
TEST(ProcessGroupGlooTest,testAlltoallCUDA)851 TEST(ProcessGroupGlooTest, testAlltoallCUDA) {
852 if (!torch::cuda::is_available()) {
853 LOG(INFO) << "Skipping test - requires CUDA";
854 return;
855 }
856 {
857 TemporaryFile file;
858 testAlltoall(file.path, at::DeviceType::CUDA);
859 }
860 }
861
TEST(ProcessGroupGlooTest,testBackendName)862 TEST(ProcessGroupGlooTest, testBackendName) {
863 {
864 TemporaryFile file;
865 const auto size = 2;
866 auto tests = CollectiveTest::initialize(file.path, size);
867
868 for (const auto i : c10::irange(size)) {
869 EXPECT_EQ(
870 tests[i].getProcessGroup().getBackendName(),
871 std::string(c10d::GLOO_BACKEND_NAME));
872 }
873 }
874 }
875
876 #endif
877