1 #include <chrono>
2 #include <iostream>
3
4 #include <torch/csrc/distributed/c10d/FileStore.hpp>
5 #include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
6 #include "CUDATest.hpp"
7 #include "TestUtils.hpp"
8 #include "c10d/Types.hpp"
9
10 #include <c10/cuda/CUDAGuard.h>
11 #include <c10/cuda/CUDAStream.h>
12 #include <c10/util/irange.h>
13
14 #include <gtest/gtest.h>
15 #include <torch/csrc/autograd/profiler.h>
16
17 using namespace c10d::test;
18
19 using at::cuda::CUDAStream;
20
21 class NCCLTestBase {
22 public:
NCCLTestBase(const std::string & path,const std::chrono::milliseconds pgTimeout=c10d::kProcessGroupNCCLDefaultTimeout)23 NCCLTestBase(
24 const std::string& path,
25 const std::chrono::milliseconds pgTimeout =
26 c10d::kProcessGroupNCCLDefaultTimeout)
27 : path_(path), pgTimeout_(pgTimeout) {}
28
NCCLTestBase(NCCLTestBase && other)29 NCCLTestBase(NCCLTestBase&& other) {
30 path_ = std::move(other.path_);
31 pg_ = std::move(other.pg_);
32 }
33
getProcessGroup()34 std::shared_ptr<::c10d::ProcessGroupNCCL> getProcessGroup() {
35 return pg_;
36 }
37
getProcessGroupStore()38 ::c10::intrusive_ptr<::c10d::Store>& getProcessGroupStore() {
39 return store_;
40 }
41
initialize(int rank,int size,std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from=std::nullopt)42 void initialize(
43 int rank,
44 int size,
45 std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from =
46 std::nullopt) {
47 store_ = c10::make_intrusive<::c10d::FileStore>(path_, size);
48
49 c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts =
50 c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
51 opts->timeout = pgTimeout_;
52 #ifdef NCCL_HAS_COMM_SPLIT
53 if (split_from) {
54 opts->split_from = *split_from;
55 opts->split_color = ++color_;
56 }
57 #endif
58 pg_ = std::unique_ptr<::c10d::ProcessGroupNCCL>(
59 new ::c10d::ProcessGroupNCCL(store_, rank, size, std::move(opts)));
60 }
61
62 protected:
63 std::string path_;
64 std::shared_ptr<::c10d::ProcessGroupNCCL> pg_;
65 std::chrono::milliseconds pgTimeout_;
66 ::c10::intrusive_ptr<::c10d::Store> store_;
67 int color_{1};
68 };
69
70 class NCCLTest : public NCCLTestBase {
71 public:
NCCLTest(const std::string & path,int rank,int worldSize,std::chrono::milliseconds pgTimeout=c10d::kProcessGroupNCCLDefaultTimeout,int inputDim=3)72 NCCLTest(
73 const std::string& path,
74 int rank,
75 int worldSize,
76 std::chrono::milliseconds pgTimeout =
77 c10d::kProcessGroupNCCLDefaultTimeout,
78 int inputDim = 3)
79 : NCCLTestBase(path, pgTimeout),
80 numDevices_(1), // one device per rank (thread)
81 rank_(rank),
82 worldSize_(worldSize) {
83 // Each device has a single tensor to perf the NCCL op
84 ::at::globalContext().lazyInitCUDA();
85 tensors_.resize(numDevices_);
86 inputs_.resize(numDevices_);
87 outputs_.resize(numDevices_);
88 at::cuda::OptionalCUDAGuard deviceGuard;
89 assert(numDevices_ == 1);
90 for (const auto i : c10::irange(numDevices_)) {
91 deviceGuard.set_index(rank_);
92 tensors_[i] = at::empty({inputDim, inputDim}, at::kCUDA);
93 inputs_[i].resize(worldSize_ * numDevices_);
94 outputs_[i].resize(worldSize_ * numDevices_);
95 for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
96 inputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
97 outputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
98 }
99 }
100
101 // Allocate a stream per device.
102 //
103 // The "current stream" is set globally per device in THC, so we
104 // can't make two tensors on the same device use different streams
105 // and pass this along to the collective (since it uses the THC
106 // getters to retrieve the current stream).
107 //
108 // 1 device only, hence 1 stream only
109 deviceGuard.set_index(rank_);
110 streams_.push_back(at::cuda::getStreamFromPool());
111 }
112
wait(c10::intrusive_ptr<c10d::Work> & work,std::chrono::milliseconds timeout=kNoTimeout)113 void wait(
114 c10::intrusive_ptr<c10d::Work>& work,
115 std::chrono::milliseconds timeout = kNoTimeout) {
116 c10::cuda::CUDAMultiStreamGuard guard(streams_);
117 work->wait(timeout);
118 }
119
getTensors()120 std::vector<at::Tensor> getTensors() {
121 std::vector<at::Tensor> outputs(numDevices_);
122
123 // For the duration of this function, make THC use our streams
124 c10::cuda::CUDAMultiStreamGuard guard(streams_);
125
126 // Copy inputs to outputs
127 for (const auto i : c10::irange(numDevices_)) {
128 C10_CUDA_CHECK(cudaStreamSynchronize(streams_[i].stream()));
129 outputs[i] = tensors_[i].cpu();
130 }
131
132 return outputs;
133 }
134
getInputTensors()135 std::vector<std::vector<at::Tensor>> getInputTensors() {
136 return getTensorLists(inputs_);
137 }
getOutputTensors()138 std::vector<std::vector<at::Tensor>> getOutputTensors() {
139 return getTensorLists(outputs_);
140 }
141
numDevices() const142 int numDevices() const {
143 return numDevices_;
144 }
145
146 private:
getTensorLists(std::vector<std::vector<at::Tensor>> & tensor_lists)147 std::vector<std::vector<at::Tensor>> getTensorLists(
148 std::vector<std::vector<at::Tensor>>& tensor_lists) {
149 std::vector<std::vector<at::Tensor>> outputs(numDevices_);
150 for (auto& output : outputs) {
151 output = std::vector<at::Tensor>(worldSize_ * numDevices_);
152 }
153
154 // For the duration of this function, make THC use our streams
155 c10::cuda::CUDAMultiStreamGuard guard(streams_);
156
157 // Copy inputs to outputs
158 for (const auto i : c10::irange(numDevices_)) {
159 C10_CUDA_CHECK(cudaStreamSynchronize(streams_[i].stream()));
160 for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
161 outputs[i][j] = tensor_lists[i][j].cpu();
162 }
163 }
164 return outputs;
165 }
166
167 protected:
168 // Launches sleep on every CUDA device
launchDeviceSleep()169 void launchDeviceSleep() {
170 at::cuda::OptionalCUDAGuard deviceGuard;
171 for (const auto i : c10::irange(numDevices_)) {
172 deviceGuard.set_index(rank_);
173 cudaSleep(streams_[i], 2000 * 1000 * 1000);
174 }
175 }
176
177 // Launches value initialization for every tensor
valueInitialization()178 void valueInitialization() {
179 at::cuda::OptionalCUDAGuard deviceGuard;
180 for (const auto i : c10::irange(numDevices_)) {
181 deviceGuard.set_index(rank_);
182 tensors_[i].fill_(pg_->getRank() * numDevices_ + i);
183 }
184 }
185
to_sparse_row_indices_format(at::Tensor & tensor)186 at::Tensor to_sparse_row_indices_format(at::Tensor& tensor) {
187 // Get the indices of all non-zero elements in the dense tensor
188 // Get the unique row indices of the non-zero elements
189 auto row_indices = std::get<0>(
190 at::_unique(tensor.nonzero().select(/*dim=*/1, /*index=*/0)));
191 at::Tensor sparse_values = tensor.index_select(
192 /*dim=*/0, row_indices); // get the values at the non-zero indices
193 return at::sparse_coo_tensor(
194 row_indices.unsqueeze(0), sparse_values, tensor.sizes())
195 .to(tensor.device());
196 }
197
198 // Launches value initialization for every sparse tensor
valueInitializationForSparse()199 void valueInitializationForSparse() {
200 at::cuda::OptionalCUDAGuard deviceGuard;
201 for (const auto i : c10::irange(numDevices_)) {
202 deviceGuard.set_index(rank_);
203 tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1);
204 // Convert the dense tensor to a sparse tensor in COO row format
205 tensors_[i] = to_sparse_row_indices_format(tensors_[i]);
206 }
207 }
208
209 const int numDevices_;
210 int rank_;
211 int worldSize_;
212 std::vector<at::Tensor> tensors_;
213 std::vector<std::vector<at::Tensor>> inputs_;
214 std::vector<std::vector<at::Tensor>> outputs_;
215 std::vector<CUDAStream> streams_;
216 };
217
218 class AllreduceNCCLTest : public NCCLTest {
219 public:
AllreduceNCCLTest(const std::string & path,int rank,int worldSize)220 AllreduceNCCLTest(const std::string& path, int rank, int worldSize)
221 : NCCLTest(path, rank, worldSize) {}
222
run()223 c10::intrusive_ptr<c10d::Work> run() {
224 // For the duration of this function, make THC use our streams
225 c10::cuda::CUDAMultiStreamGuard guard(streams_);
226
227 launchDeviceSleep();
228 valueInitialization();
229
230 using namespace torch::autograd::profiler;
231 // Make sure enabling profile does not make any issue. Note, in single
232 // process multi-device mode we do not expect any events be populated for
233 // collective operations, since profiling for that mode is not supported.
234 enableProfilerLegacy(ProfilerConfig(ProfilerState::CPU));
235 auto results = pg_->allreduce(tensors_);
236 disableProfilerLegacy();
237 return results;
238 }
239 };
240
241 class SparseAllreduceNCCLTest : public NCCLTest {
242 public:
SparseAllreduceNCCLTest(const std::string & path,int rank,int worldSize,int inputDim)243 SparseAllreduceNCCLTest(
244 const std::string& path,
245 int rank,
246 int worldSize,
247 int inputDim)
248 : NCCLTest(
249 path,
250 rank,
251 worldSize,
252 c10d::kProcessGroupNCCLDefaultTimeout,
253 inputDim) {}
254
run()255 c10::intrusive_ptr<c10d::Work> run() {
256 // For the duration of this function, make THC use our streams
257 c10::cuda::CUDAMultiStreamGuard guard(streams_);
258 launchDeviceSleep();
259 valueInitializationForSparse();
260 auto results = pg_->allreduce_sparse(tensors_);
261 return results;
262 }
263 };
264
265 class BroadcastNCCLTest : public NCCLTest {
266 public:
BroadcastNCCLTest(const std::string & path,int rank,int worldSize)267 BroadcastNCCLTest(const std::string& path, int rank, int worldSize)
268 : NCCLTest(path, rank, worldSize) {}
269
run(int rootRank,int rootTensor)270 c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
271 // For the duration of this function, make THC use our streams
272 c10::cuda::CUDAMultiStreamGuard guard(streams_);
273
274 launchDeviceSleep();
275 valueInitialization();
276
277 ::c10d::BroadcastOptions options;
278 options.rootRank = rootRank;
279 options.rootTensor = rootTensor;
280 return pg_->broadcast(tensors_, options);
281 }
282 };
283
284 class ReduceNCCLTest : public NCCLTest {
285 public:
ReduceNCCLTest(const std::string & path,int rank,int worldSize)286 ReduceNCCLTest(const std::string& path, int rank, int worldSize)
287 : NCCLTest(path, rank, worldSize) {}
288
run(int rootRank,int rootTensor)289 c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
290 // For the duration of this function, make THC use our streams
291 c10::cuda::CUDAMultiStreamGuard guard(streams_);
292
293 launchDeviceSleep();
294 valueInitialization();
295
296 ::c10d::ReduceOptions options;
297 options.rootRank = rootRank;
298 options.rootTensor = rootTensor;
299 return pg_->reduce(tensors_, options);
300 }
301 };
302
303 class AllgatherNCCLTest : public NCCLTest {
304 public:
AllgatherNCCLTest(const std::string & path,int rank,int worldSize)305 AllgatherNCCLTest(const std::string& path, int rank, int worldSize)
306 : NCCLTest(path, rank, worldSize) {}
307
run()308 c10::intrusive_ptr<c10d::Work> run() {
309 // For the duration of this function, make THC use our streams
310 c10::cuda::CUDAMultiStreamGuard guard(streams_);
311
312 launchDeviceSleep();
313 valueInitialization();
314
315 return pg_->allgather(outputs_, tensors_);
316 }
317 };
318
319 class AllgatherBaseNCCLTest : public NCCLTest {
320 public:
AllgatherBaseNCCLTest(const std::string & path,int rank,int worldSize)321 AllgatherBaseNCCLTest(const std::string& path, int rank, int worldSize)
322 : NCCLTest(path, rank, worldSize) {
323 output_tensor_ = at::empty({worldSize_, 3, 3}, at::kCUDA);
324 }
325
run()326 c10::intrusive_ptr<c10d::Work> run() {
327 // For the duration of this function, make THC use our streams
328 c10::cuda::CUDAMultiStreamGuard guard(streams_);
329
330 launchDeviceSleep();
331 valueInitialization();
332 // contains at least one element otherwise wouldn't run.
333 // this is a flattened allgather, hence one rank contributes
334 // only 1 tensor, regardless of number of devices
335 return pg_->_allgather_base(output_tensor_, tensors_[0]);
336 }
337
getOutputTensor()338 at::Tensor getOutputTensor() {
339 c10::cuda::CUDAMultiStreamGuard guard(streams_);
340 return output_tensor_.cpu();
341 }
342
getInputTensor()343 at::Tensor getInputTensor() {
344 c10::cuda::CUDAMultiStreamGuard guard(streams_);
345 return tensors_[0].cpu();
346 }
347
348 private:
349 at::Tensor output_tensor_;
350 };
351
352 struct ReduceScatterNCCLTest : NCCLTest {
ReduceScatterNCCLTestReduceScatterNCCLTest353 ReduceScatterNCCLTest(const std::string& path, int rank, int worldSize)
354 : NCCLTest(path, rank, worldSize) {}
355
runReduceScatterNCCLTest356 c10::intrusive_ptr<c10d::Work> run() {
357 // For the duration of this function, make THC use our streams
358 c10::cuda::CUDAMultiStreamGuard guard(streams_);
359
360 at::cuda::OptionalCUDAGuard deviceGuard;
361 launchDeviceSleep();
362
363 // Launch value initialization for every tensor
364 for (auto j = 0; j < worldSize_; ++j) {
365 inputs_[0][j].fill_(rank_ * worldSize_ + j);
366 }
367
368 return pg_->reduce_scatter(tensors_, inputs_);
369 }
370 };
371
372 class ReduceScatterBaseNCCLTest : public NCCLTest {
373 public:
ReduceScatterBaseNCCLTest(const std::string & path,int rank,int worldSize)374 ReduceScatterBaseNCCLTest(const std::string& path, int rank, int worldSize)
375 : NCCLTest(path, rank, worldSize) {
376 at::cuda::OptionalCUDAGuard deviceGuard;
377 deviceGuard.set_index(rank_);
378 output_tensor_ = at::empty({1}, at::kCUDA);
379 input_tensor_ = at::empty({worldSize}, at::kCUDA);
380 for (const auto i : c10::irange(worldSize)) {
381 input_tensor_[i] = i;
382 }
383 }
384
run()385 c10::intrusive_ptr<c10d::Work> run() {
386 // For the duration of this function, make THC use our streams
387 at::cuda::CUDAMultiStreamGuard guard(streams_);
388
389 launchDeviceSleep();
390 return pg_->_reduce_scatter_base(output_tensor_, input_tensor_);
391 }
392
getOutputTensor()393 at::Tensor getOutputTensor() {
394 at::cuda::CUDAMultiStreamGuard guard(streams_);
395 return output_tensor_.cpu();
396 }
397
getInputTensor()398 at::Tensor getInputTensor() {
399 at::cuda::CUDAMultiStreamGuard guard(streams_);
400 return input_tensor_.cpu();
401 }
402
403 private:
404 at::Tensor output_tensor_;
405 at::Tensor input_tensor_;
406 };
407
testAllreduce(const std::string & path,int rank,int size)408 void testAllreduce(const std::string& path, int rank, int size) {
409 auto test = AllreduceNCCLTest(path, rank, size);
410 test.initialize(rank, size);
411 auto work = test.run();
412 // Wait for work to finish
413 test.wait(work);
414
415 // Validation
416 const int totalNumGPUs = test.numDevices() * size;
417 const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
418 const auto tensors = test.getTensors();
419 for (const auto& tensor : tensors) {
420 const auto* const data = tensor.const_data_ptr<float>();
421 for (const auto k : c10::irange(tensor.numel())) {
422 EXPECT_EQ(data[k], expected)
423 << "Allreduce outputs do not match expected outputs";
424 }
425 }
426 }
427
testSparseAllreduce(const std::string & path,int rank,int size)428 void testSparseAllreduce(const std::string& path, int rank, int size) {
429 const int inputDim = 3;
430 auto test = SparseAllreduceNCCLTest(path, rank, size, inputDim);
431 test.initialize(rank, size);
432 auto work = test.run();
433 // Wait for work to finish
434 test.wait(work);
435
436 const auto input_tensors = test.getTensors();
437
438 // validate the work output is same as tensor
439 auto output_tensor = work->result();
440 // Validation
441 int totalNumGPUs = test.numDevices() * size;
442 // Add one since we are seeding with an additional 1 to prevent empty tensors
443 totalNumGPUs++;
444 const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
445 for (const auto i : c10::irange(input_tensors.size())) {
446 const auto& tensor = input_tensors[i];
447
448 // validate the tensor is sparse
449 EXPECT_EQ(tensor.is_sparse(), true);
450
451 auto indices = tensor._indices();
452 auto values = tensor._values();
453
454 // validate indices are expected size
455 auto sizes = indices.sizes();
456 EXPECT_EQ(sizes.size(), 2);
457 if (sizes[0] == 1) {
458 // row indices
459 EXPECT_EQ(sizes[1], inputDim);
460 } else if (sizes[0] == 2) {
461 // coordinate indices
462 EXPECT_EQ(sizes[1], inputDim * inputDim);
463 }
464
465 // validate all tensor values are expected value
466 const auto* const data = values.const_data_ptr<float>();
467 for (const auto k : c10::irange(values.numel())) {
468 EXPECT_EQ(data[k], expected)
469 << "Allreduce outputs do not match expected outputs";
470 }
471
472 // expect the input and output tensors should be the same
473 auto input_dense = tensor.to_dense();
474 auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
475 EXPECT_TRUE(input_dense.allclose(output_dense));
476 }
477 }
478
testSparseAllreduceLarge(const std::string & path,int rank,int size)479 void testSparseAllreduceLarge(const std::string& path, int rank, int size) {
480 const int inputDim = 2500;
481 auto test = SparseAllreduceNCCLTest(path, rank, size, inputDim);
482 test.initialize(rank, size);
483 auto work = test.run();
484 // Wait for work to finish
485 test.wait(work);
486
487 const auto input_tensors = test.getTensors();
488
489 // validate the work output is same as tensor
490 auto output_tensor = work->result();
491 // Validation
492 int totalNumGPUs = test.numDevices() * size;
493 // Add one since we are seeding with an additional 1 to prevent empty tensors
494 totalNumGPUs++;
495 const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
496 for (const auto i : c10::irange(input_tensors.size())) {
497 const auto& tensor = input_tensors[i];
498
499 // validate the tensor is sparse
500 EXPECT_EQ(tensor.is_sparse(), true);
501
502 auto indices = tensor._indices();
503 auto values = tensor._values();
504
505 // validate indices are expected size
506 auto sizes = indices.sizes();
507 EXPECT_EQ(sizes.size(), 2);
508 if (sizes[0] == 1) {
509 // row indices
510 EXPECT_EQ(sizes[1], inputDim);
511 } else if (sizes[0] == 2) {
512 // coordinate indices
513 EXPECT_EQ(sizes[1], inputDim * inputDim);
514 }
515
516 // validate all tensor values are expected value
517 const auto* const data = values.const_data_ptr<float>();
518 for (const auto k : c10::irange(values.numel())) {
519 EXPECT_EQ(data[k], expected)
520 << "Allreduce outputs do not match expected outputs";
521 }
522
523 // expect the input and output tensors should be the same
524 auto input_dense = tensor.to_dense();
525 auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
526 EXPECT_TRUE(input_dense.allclose(output_dense));
527 }
528 }
529
testBroadcast(const std::string & path,int rank,int size)530 void testBroadcast(const std::string& path, int rank, int size) {
531 auto test = BroadcastNCCLTest(path, rank, size);
532 test.initialize(rank, size);
533
534 const int numDevices = test.numDevices();
535 // try every permutation of root rank and root tensor
536 for (const auto rootRank : c10::irange(size)) {
537 for (const auto rootTensor : c10::irange(numDevices)) {
538 auto work = test.run(rootRank, rootTensor);
539
540 // wait for work to complete
541 test.wait(work);
542
543 // Check results
544 const auto expected = (rootRank * numDevices + rootTensor);
545 const auto tensors = test.getTensors();
546 for (const auto& tensor : tensors) {
547 const auto* const data = tensor.const_data_ptr<float>();
548 for (const auto k : c10::irange(tensor.numel())) {
549 EXPECT_EQ(data[k], expected)
550 << "Broadcast outputs do not match expected outputs";
551 }
552 }
553 }
554 }
555 }
556
testReduce(const std::string & path,int rank,int size)557 void testReduce(const std::string& path, int rank, int size) {
558 auto test = ReduceNCCLTest(path, rank, size);
559 test.initialize(rank, size);
560
561 const int numDevices = test.numDevices();
562 // try every permutation of root rank and root tensor
563 for (const auto rootRank : c10::irange(size)) {
564 for (const auto rootTensor : c10::irange(numDevices)) {
565 auto work = test.run(rootRank, rootTensor);
566
567 // wait for work to complete
568 test.wait(work);
569
570 // Validation
571 const int totalNumGPUs = numDevices * size;
572 const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
573 auto tensors = test.getTensors();
574 if (rank == rootRank) {
575 auto& tensor = tensors[rootTensor];
576 auto data = tensor.data_ptr<float>();
577 for (const auto k : c10::irange(tensor.numel())) {
578 EXPECT_EQ(data[k], expected)
579 << "Reduce outputs do not match expected outputs";
580 }
581 }
582 }
583 }
584 }
585
testAllgather(const std::string & path,int rank,int size)586 void testAllgather(const std::string& path, int rank, int size) {
587 auto test = AllgatherNCCLTest(path, rank, size);
588 test.initialize(rank, size);
589 auto work = test.run();
590 // Wait for work to finish
591 test.wait(work);
592
593 // Validation
594 auto tensors = test.getOutputTensors();
595 // device index
596 for (auto& device : tensors) {
597 // rank index
598 for (const auto j : c10::irange(device.size())) {
599 const auto expected = j;
600 auto& tensor = device[j];
601 auto data = tensor.data_ptr<float>();
602 for (const auto k : c10::irange(tensor.numel())) {
603 EXPECT_EQ(data[k], expected)
604 << "Allgather outputs do not match expected outputs";
605 }
606 }
607 }
608 }
609
testAllgatherBase(const std::string & path,int rank,int size)610 void testAllgatherBase(const std::string& path, int rank, int size) {
611 auto test = AllgatherBaseNCCLTest(path, rank, size);
612 test.initialize(rank, size);
613 auto work = test.run();
614 // Wait for work to finish
615 test.wait(work);
616 // Validation
617 auto output_tensor = test.getOutputTensor();
618 auto input_tensor = test.getInputTensor();
619
620 auto data = output_tensor.data_ptr<float>();
621
622 // Rank index
623 for (const auto i : c10::irange(output_tensor.numel())) {
624 // expected is i // input.numel() <- rank, and each rank contributed rank *
625 // num_gpu
626 const auto expected = (i / input_tensor.numel()) * test.numDevices();
627 EXPECT_EQ(data[i], expected)
628 << "Allgather_base outputs do not match expected outputs";
629 }
630 }
testReduceScatterBase(const std::string & path,int rank,int size)631 void testReduceScatterBase(const std::string& path, int rank, int size) {
632 auto test = ReduceScatterBaseNCCLTest(path, rank, size);
633 test.initialize(rank, size);
634 auto work = test.run();
635 // Wait for work to finish
636 test.wait(work);
637 // Validation
638 auto output_tensor = test.getOutputTensor();
639 auto input_tensor = test.getInputTensor();
640
641 auto data = output_tensor.data_ptr<float>();
642
643 // Rank index
644 for (const auto i : c10::irange(output_tensor.numel())) {
645 // expected is i * input.numel() <- rank, and each rank contributed rank *
646 // num_gpu
647 const auto expected = size * rank * test.numDevices();
648 EXPECT_EQ(data[i], expected)
649 << "Reducescatter_base outputs do not match expected outputs";
650 }
651 }
652
testReduceScatter(const std::string & path,int rank,int size)653 void testReduceScatter(const std::string& path, int rank, int size) {
654 auto test = ReduceScatterNCCLTest(path, rank, size);
655 test.initialize(rank, size);
656 auto work = test.run();
657 // Wait for work to finish
658 test.wait(work);
659
660 const auto participants = size;
661 const auto base = (participants * (participants - 1)) / 2;
662
663 // Validation
664 auto tensors = test.getTensors();
665 const auto modifier = rank * participants;
666 const auto expected = base * participants + modifier;
667 auto& tensor = tensors[0];
668 auto data = tensor.data_ptr<float>();
669 for (const auto j : c10::irange(tensor.numel())) {
670 EXPECT_EQ(data[j], expected)
671 << "ReduceScatter outputs do not match expected outputs!";
672 }
673 }
674
testSequenceNumInit(const std::string & path,int rank,int size)675 void testSequenceNumInit(const std::string& path, int rank, int size) {
676 NCCLTest test(path, rank, size);
677 test.initialize(rank, size);
678 test.getProcessGroup()->setSequenceNumberForGroup();
679 auto seqNum = test.getProcessGroup()->getSequenceNumberForGroup();
680 EXPECT_EQ(seqNum, 0);
681 }
682
testSplittingCommunicator(const std::string & path,int rank,int size)683 void testSplittingCommunicator(const std::string& path, int rank, int size) {
684 auto test1 = BroadcastNCCLTest(path, rank, size);
685 test1.initialize(rank, size);
686
687 auto test2 = BroadcastNCCLTest(path, rank, size);
688 test2.initialize(rank, size, test1.getProcessGroup());
689
690 // Steal the broadcast test and issue it for both of our groups.
691 // This ensures consistent full collective communication. TODO:
692 // maybe refactor the guts rather than copy-pasta, but it may not be
693 // worth it.
694 for (auto test : {&test1, &test2}) {
695 const int numDevices = test->numDevices();
696 // try every permutation of root rank and root tensor
697 for (const auto rootRank : c10::irange(size)) {
698 for (const auto rootTensor : c10::irange(numDevices)) {
699 auto work = test->run(rootRank, rootTensor);
700 test->wait(work);
701
702 // Check results
703 const auto expected = (rootRank * numDevices + rootTensor);
704 const auto tensors = test->getTensors();
705 for (const auto& tensor : tensors) {
706 const auto* const data = tensor.const_data_ptr<float>();
707 for (const auto k : c10::irange(tensor.numel())) {
708 EXPECT_EQ(data[k], expected)
709 << "Broadcast outputs do not match expected outputs";
710 }
711 }
712 }
713 }
714 }
715
716 // Now that we've run full operations on both the original and split process
717 // group, ensure we saw exactly as many splits as we expected: 0 in the
718 // original process group, and one per device in the second.
719 EXPECT_EQ(test2.getProcessGroup()->getCommSplitCounter(), 0);
720 EXPECT_EQ(test1.getProcessGroup()->getCommSplitCounter(), test1.numDevices());
721 }
722
723 // All testAbc's use this signature
724 using FuncType = void (*)(const std::string&, int, int);
725
726 class ProcessGroupNCCLTest : public ::testing::Test {
727 protected:
SetUp()728 void SetUp() override {
729 c10::initLogging();
730 // Use WORLD_SIZE and RANK environmental variables to do multi-node
731 // distributed testing
732 auto sizeEnv = std::getenv("WORLD_SIZE");
733 if (sizeEnv) {
734 size_ = std::stoi(std::string(sizeEnv));
735 }
736 LOG(INFO) << "ProcessGroupNCCLTest world size: " << size_;
737 }
738
TearDown()739 void TearDown() override {
740 // Reset TORCH_NCCL_BLOCKING_WAIT environment variable after each run.
741 ASSERT_TRUE(setenv(c10d::TORCH_NCCL_BLOCKING_WAIT[0].c_str(), "0", 1) == 0);
742 }
743
skipTest()744 bool skipTest() {
745 // Skip tests if CUDA is not available.
746 if (!at::cuda::is_available()) {
747 LOG(INFO) << "CUDA not available, skipping test";
748 return true;
749 }
750 return false;
751 }
752
multiThreadRun(FuncType testFunc)753 void multiThreadRun(FuncType testFunc) {
754 TemporaryFile file;
755 std::vector<std::thread> threads;
756 threads.reserve(size_);
757 for (const auto rank : c10::irange(size_)) {
758 threads.emplace_back(std::thread(testFunc, file.path, rank, size_));
759 }
760 for (const auto rank : c10::irange(size_)) {
761 threads[rank].join();
762 }
763 }
764
765 int size_{1};
766 };
767
TEST_F(ProcessGroupNCCLTest,testAllreduce)768 TEST_F(ProcessGroupNCCLTest, testAllreduce) {
769 if (skipTest()) {
770 return;
771 }
772 multiThreadRun(testAllreduce);
773 }
774
TEST_F(ProcessGroupNCCLTest,testBroadcast)775 TEST_F(ProcessGroupNCCLTest, testBroadcast) {
776 if (skipTest()) {
777 return;
778 }
779 multiThreadRun(testBroadcast);
780 }
781
TEST_F(ProcessGroupNCCLTest,testReduce)782 TEST_F(ProcessGroupNCCLTest, testReduce) {
783 if (skipTest()) {
784 return;
785 }
786 multiThreadRun(testReduce);
787 }
788
TEST_F(ProcessGroupNCCLTest,testAllgather)789 TEST_F(ProcessGroupNCCLTest, testAllgather) {
790 if (skipTest()) {
791 return;
792 }
793 multiThreadRun(testAllgather);
794 }
795
TEST_F(ProcessGroupNCCLTest,testAllgatherBase)796 TEST_F(ProcessGroupNCCLTest, testAllgatherBase) {
797 if (skipTest()) {
798 return;
799 }
800 multiThreadRun(testAllgatherBase);
801 }
802
TEST_F(ProcessGroupNCCLTest,testReduceScatter)803 TEST_F(ProcessGroupNCCLTest, testReduceScatter) {
804 if (skipTest()) {
805 return;
806 }
807 multiThreadRun(testReduceScatter);
808 }
809
TEST_F(ProcessGroupNCCLTest,testSequenceNumInit)810 TEST_F(ProcessGroupNCCLTest, testSequenceNumInit) {
811 if (skipTest()) {
812 return;
813 }
814 multiThreadRun(testSequenceNumInit);
815 }
816
TEST_F(ProcessGroupNCCLTest,testReduceScatterBase)817 TEST_F(ProcessGroupNCCLTest, testReduceScatterBase) {
818 if (skipTest()) {
819 return;
820 }
821 multiThreadRun(testReduceScatterBase);
822 }
823
TEST_F(ProcessGroupNCCLTest,testBackendName)824 TEST_F(ProcessGroupNCCLTest, testBackendName) {
825 if (skipTest()) {
826 return;
827 }
828 TemporaryFile file;
829 auto test = NCCLTestBase(file.path);
830 test.initialize(/*rank=*/0, /*world_size=*/1);
831 EXPECT_EQ(
832 test.getProcessGroup()->getBackendName(),
833 std::string(c10d::NCCL_BACKEND_NAME));
834 }
835
TEST_F(ProcessGroupNCCLTest,testSplittingCommunicator)836 TEST_F(ProcessGroupNCCLTest, testSplittingCommunicator) {
837 if (skipTest()) {
838 return;
839 }
840 multiThreadRun(testSplittingCommunicator);
841 }
842
843 #ifdef IS_NCCLX
TEST_F(ProcessGroupNCCLTest,testSparseAllreduce)844 TEST_F(ProcessGroupNCCLTest, testSparseAllreduce) {
845 if (skipTest()) {
846 return;
847 }
848 multiThreadRun(testSparseAllreduce);
849 multiThreadRun(testSparseAllreduceLarge);
850 }
851 #endif
852