xref: /aosp_15_r20/external/pytorch/test/cpp/c10d/ProcessGroupNCCLTest.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <chrono>
2 #include <iostream>
3 
4 #include <torch/csrc/distributed/c10d/FileStore.hpp>
5 #include <torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp>
6 #include "CUDATest.hpp"
7 #include "TestUtils.hpp"
8 #include "c10d/Types.hpp"
9 
10 #include <c10/cuda/CUDAGuard.h>
11 #include <c10/cuda/CUDAStream.h>
12 #include <c10/util/irange.h>
13 
14 #include <gtest/gtest.h>
15 #include <torch/csrc/autograd/profiler.h>
16 
17 using namespace c10d::test;
18 
19 using at::cuda::CUDAStream;
20 
21 class NCCLTestBase {
22  public:
NCCLTestBase(const std::string & path,const std::chrono::milliseconds pgTimeout=c10d::kProcessGroupNCCLDefaultTimeout)23   NCCLTestBase(
24       const std::string& path,
25       const std::chrono::milliseconds pgTimeout =
26           c10d::kProcessGroupNCCLDefaultTimeout)
27       : path_(path), pgTimeout_(pgTimeout) {}
28 
NCCLTestBase(NCCLTestBase && other)29   NCCLTestBase(NCCLTestBase&& other) {
30     path_ = std::move(other.path_);
31     pg_ = std::move(other.pg_);
32   }
33 
getProcessGroup()34   std::shared_ptr<::c10d::ProcessGroupNCCL> getProcessGroup() {
35     return pg_;
36   }
37 
getProcessGroupStore()38   ::c10::intrusive_ptr<::c10d::Store>& getProcessGroupStore() {
39     return store_;
40   }
41 
initialize(int rank,int size,std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from=std::nullopt)42   void initialize(
43       int rank,
44       int size,
45       std::optional<::std::shared_ptr<::c10d::ProcessGroupNCCL>> split_from =
46           std::nullopt) {
47     store_ = c10::make_intrusive<::c10d::FileStore>(path_, size);
48 
49     c10::intrusive_ptr<c10d::ProcessGroupNCCL::Options> opts =
50         c10::make_intrusive<c10d::ProcessGroupNCCL::Options>();
51     opts->timeout = pgTimeout_;
52 #ifdef NCCL_HAS_COMM_SPLIT
53     if (split_from) {
54       opts->split_from = *split_from;
55       opts->split_color = ++color_;
56     }
57 #endif
58     pg_ = std::unique_ptr<::c10d::ProcessGroupNCCL>(
59         new ::c10d::ProcessGroupNCCL(store_, rank, size, std::move(opts)));
60   }
61 
62  protected:
63   std::string path_;
64   std::shared_ptr<::c10d::ProcessGroupNCCL> pg_;
65   std::chrono::milliseconds pgTimeout_;
66   ::c10::intrusive_ptr<::c10d::Store> store_;
67   int color_{1};
68 };
69 
70 class NCCLTest : public NCCLTestBase {
71  public:
NCCLTest(const std::string & path,int rank,int worldSize,std::chrono::milliseconds pgTimeout=c10d::kProcessGroupNCCLDefaultTimeout,int inputDim=3)72   NCCLTest(
73       const std::string& path,
74       int rank,
75       int worldSize,
76       std::chrono::milliseconds pgTimeout =
77           c10d::kProcessGroupNCCLDefaultTimeout,
78       int inputDim = 3)
79       : NCCLTestBase(path, pgTimeout),
80         numDevices_(1), // one device per rank (thread)
81         rank_(rank),
82         worldSize_(worldSize) {
83     // Each device has a single tensor to perf the NCCL op
84     ::at::globalContext().lazyInitCUDA();
85     tensors_.resize(numDevices_);
86     inputs_.resize(numDevices_);
87     outputs_.resize(numDevices_);
88     at::cuda::OptionalCUDAGuard deviceGuard;
89     assert(numDevices_ == 1);
90     for (const auto i : c10::irange(numDevices_)) {
91       deviceGuard.set_index(rank_);
92       tensors_[i] = at::empty({inputDim, inputDim}, at::kCUDA);
93       inputs_[i].resize(worldSize_ * numDevices_);
94       outputs_[i].resize(worldSize_ * numDevices_);
95       for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
96         inputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
97         outputs_[i][j] = at::empty({inputDim, inputDim}, at::kCUDA);
98       }
99     }
100 
101     // Allocate a stream per device.
102     //
103     // The "current stream" is set globally per device in THC, so we
104     // can't make two tensors on the same device use different streams
105     // and pass this along to the collective (since it uses the THC
106     // getters to retrieve the current stream).
107     //
108     // 1 device only, hence 1 stream only
109     deviceGuard.set_index(rank_);
110     streams_.push_back(at::cuda::getStreamFromPool());
111   }
112 
wait(c10::intrusive_ptr<c10d::Work> & work,std::chrono::milliseconds timeout=kNoTimeout)113   void wait(
114       c10::intrusive_ptr<c10d::Work>& work,
115       std::chrono::milliseconds timeout = kNoTimeout) {
116     c10::cuda::CUDAMultiStreamGuard guard(streams_);
117     work->wait(timeout);
118   }
119 
getTensors()120   std::vector<at::Tensor> getTensors() {
121     std::vector<at::Tensor> outputs(numDevices_);
122 
123     // For the duration of this function, make THC use our streams
124     c10::cuda::CUDAMultiStreamGuard guard(streams_);
125 
126     // Copy inputs to outputs
127     for (const auto i : c10::irange(numDevices_)) {
128       C10_CUDA_CHECK(cudaStreamSynchronize(streams_[i].stream()));
129       outputs[i] = tensors_[i].cpu();
130     }
131 
132     return outputs;
133   }
134 
getInputTensors()135   std::vector<std::vector<at::Tensor>> getInputTensors() {
136     return getTensorLists(inputs_);
137   }
getOutputTensors()138   std::vector<std::vector<at::Tensor>> getOutputTensors() {
139     return getTensorLists(outputs_);
140   }
141 
numDevices() const142   int numDevices() const {
143     return numDevices_;
144   }
145 
146  private:
getTensorLists(std::vector<std::vector<at::Tensor>> & tensor_lists)147   std::vector<std::vector<at::Tensor>> getTensorLists(
148       std::vector<std::vector<at::Tensor>>& tensor_lists) {
149     std::vector<std::vector<at::Tensor>> outputs(numDevices_);
150     for (auto& output : outputs) {
151       output = std::vector<at::Tensor>(worldSize_ * numDevices_);
152     }
153 
154     // For the duration of this function, make THC use our streams
155     c10::cuda::CUDAMultiStreamGuard guard(streams_);
156 
157     // Copy inputs to outputs
158     for (const auto i : c10::irange(numDevices_)) {
159       C10_CUDA_CHECK(cudaStreamSynchronize(streams_[i].stream()));
160       for (auto j = 0; j < worldSize_ * numDevices_; ++j) {
161         outputs[i][j] = tensor_lists[i][j].cpu();
162       }
163     }
164     return outputs;
165   }
166 
167  protected:
168   // Launches sleep on every CUDA device
launchDeviceSleep()169   void launchDeviceSleep() {
170     at::cuda::OptionalCUDAGuard deviceGuard;
171     for (const auto i : c10::irange(numDevices_)) {
172       deviceGuard.set_index(rank_);
173       cudaSleep(streams_[i], 2000 * 1000 * 1000);
174     }
175   }
176 
177   // Launches value initialization for every tensor
valueInitialization()178   void valueInitialization() {
179     at::cuda::OptionalCUDAGuard deviceGuard;
180     for (const auto i : c10::irange(numDevices_)) {
181       deviceGuard.set_index(rank_);
182       tensors_[i].fill_(pg_->getRank() * numDevices_ + i);
183     }
184   }
185 
to_sparse_row_indices_format(at::Tensor & tensor)186   at::Tensor to_sparse_row_indices_format(at::Tensor& tensor) {
187     // Get the indices of all non-zero elements in the dense tensor
188     // Get the unique row indices of the non-zero elements
189     auto row_indices = std::get<0>(
190         at::_unique(tensor.nonzero().select(/*dim=*/1, /*index=*/0)));
191     at::Tensor sparse_values = tensor.index_select(
192         /*dim=*/0, row_indices); // get the values at the non-zero indices
193     return at::sparse_coo_tensor(
194                row_indices.unsqueeze(0), sparse_values, tensor.sizes())
195         .to(tensor.device());
196   }
197 
198   // Launches value initialization for every sparse tensor
valueInitializationForSparse()199   void valueInitializationForSparse() {
200     at::cuda::OptionalCUDAGuard deviceGuard;
201     for (const auto i : c10::irange(numDevices_)) {
202       deviceGuard.set_index(rank_);
203       tensors_[i].fill_(pg_->getRank() * numDevices_ + i + 1);
204       // Convert the dense tensor to a sparse tensor in COO row format
205       tensors_[i] = to_sparse_row_indices_format(tensors_[i]);
206     }
207   }
208 
209   const int numDevices_;
210   int rank_;
211   int worldSize_;
212   std::vector<at::Tensor> tensors_;
213   std::vector<std::vector<at::Tensor>> inputs_;
214   std::vector<std::vector<at::Tensor>> outputs_;
215   std::vector<CUDAStream> streams_;
216 };
217 
218 class AllreduceNCCLTest : public NCCLTest {
219  public:
AllreduceNCCLTest(const std::string & path,int rank,int worldSize)220   AllreduceNCCLTest(const std::string& path, int rank, int worldSize)
221       : NCCLTest(path, rank, worldSize) {}
222 
run()223   c10::intrusive_ptr<c10d::Work> run() {
224     // For the duration of this function, make THC use our streams
225     c10::cuda::CUDAMultiStreamGuard guard(streams_);
226 
227     launchDeviceSleep();
228     valueInitialization();
229 
230     using namespace torch::autograd::profiler;
231     // Make sure enabling profile does not make any issue. Note, in single
232     // process multi-device mode we do not expect any events be populated for
233     // collective operations, since profiling for that mode is not supported.
234     enableProfilerLegacy(ProfilerConfig(ProfilerState::CPU));
235     auto results = pg_->allreduce(tensors_);
236     disableProfilerLegacy();
237     return results;
238   }
239 };
240 
241 class SparseAllreduceNCCLTest : public NCCLTest {
242  public:
SparseAllreduceNCCLTest(const std::string & path,int rank,int worldSize,int inputDim)243   SparseAllreduceNCCLTest(
244       const std::string& path,
245       int rank,
246       int worldSize,
247       int inputDim)
248       : NCCLTest(
249             path,
250             rank,
251             worldSize,
252             c10d::kProcessGroupNCCLDefaultTimeout,
253             inputDim) {}
254 
run()255   c10::intrusive_ptr<c10d::Work> run() {
256     // For the duration of this function, make THC use our streams
257     c10::cuda::CUDAMultiStreamGuard guard(streams_);
258     launchDeviceSleep();
259     valueInitializationForSparse();
260     auto results = pg_->allreduce_sparse(tensors_);
261     return results;
262   }
263 };
264 
265 class BroadcastNCCLTest : public NCCLTest {
266  public:
BroadcastNCCLTest(const std::string & path,int rank,int worldSize)267   BroadcastNCCLTest(const std::string& path, int rank, int worldSize)
268       : NCCLTest(path, rank, worldSize) {}
269 
run(int rootRank,int rootTensor)270   c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
271     // For the duration of this function, make THC use our streams
272     c10::cuda::CUDAMultiStreamGuard guard(streams_);
273 
274     launchDeviceSleep();
275     valueInitialization();
276 
277     ::c10d::BroadcastOptions options;
278     options.rootRank = rootRank;
279     options.rootTensor = rootTensor;
280     return pg_->broadcast(tensors_, options);
281   }
282 };
283 
284 class ReduceNCCLTest : public NCCLTest {
285  public:
ReduceNCCLTest(const std::string & path,int rank,int worldSize)286   ReduceNCCLTest(const std::string& path, int rank, int worldSize)
287       : NCCLTest(path, rank, worldSize) {}
288 
run(int rootRank,int rootTensor)289   c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
290     // For the duration of this function, make THC use our streams
291     c10::cuda::CUDAMultiStreamGuard guard(streams_);
292 
293     launchDeviceSleep();
294     valueInitialization();
295 
296     ::c10d::ReduceOptions options;
297     options.rootRank = rootRank;
298     options.rootTensor = rootTensor;
299     return pg_->reduce(tensors_, options);
300   }
301 };
302 
303 class AllgatherNCCLTest : public NCCLTest {
304  public:
AllgatherNCCLTest(const std::string & path,int rank,int worldSize)305   AllgatherNCCLTest(const std::string& path, int rank, int worldSize)
306       : NCCLTest(path, rank, worldSize) {}
307 
run()308   c10::intrusive_ptr<c10d::Work> run() {
309     // For the duration of this function, make THC use our streams
310     c10::cuda::CUDAMultiStreamGuard guard(streams_);
311 
312     launchDeviceSleep();
313     valueInitialization();
314 
315     return pg_->allgather(outputs_, tensors_);
316   }
317 };
318 
319 class AllgatherBaseNCCLTest : public NCCLTest {
320  public:
AllgatherBaseNCCLTest(const std::string & path,int rank,int worldSize)321   AllgatherBaseNCCLTest(const std::string& path, int rank, int worldSize)
322       : NCCLTest(path, rank, worldSize) {
323     output_tensor_ = at::empty({worldSize_, 3, 3}, at::kCUDA);
324   }
325 
run()326   c10::intrusive_ptr<c10d::Work> run() {
327     // For the duration of this function, make THC use our streams
328     c10::cuda::CUDAMultiStreamGuard guard(streams_);
329 
330     launchDeviceSleep();
331     valueInitialization();
332     // contains at least one element otherwise wouldn't run.
333     // this is a flattened allgather, hence one rank contributes
334     // only 1 tensor, regardless of number of devices
335     return pg_->_allgather_base(output_tensor_, tensors_[0]);
336   }
337 
getOutputTensor()338   at::Tensor getOutputTensor() {
339     c10::cuda::CUDAMultiStreamGuard guard(streams_);
340     return output_tensor_.cpu();
341   }
342 
getInputTensor()343   at::Tensor getInputTensor() {
344     c10::cuda::CUDAMultiStreamGuard guard(streams_);
345     return tensors_[0].cpu();
346   }
347 
348  private:
349   at::Tensor output_tensor_;
350 };
351 
352 struct ReduceScatterNCCLTest : NCCLTest {
ReduceScatterNCCLTestReduceScatterNCCLTest353   ReduceScatterNCCLTest(const std::string& path, int rank, int worldSize)
354       : NCCLTest(path, rank, worldSize) {}
355 
runReduceScatterNCCLTest356   c10::intrusive_ptr<c10d::Work> run() {
357     // For the duration of this function, make THC use our streams
358     c10::cuda::CUDAMultiStreamGuard guard(streams_);
359 
360     at::cuda::OptionalCUDAGuard deviceGuard;
361     launchDeviceSleep();
362 
363     // Launch value initialization for every tensor
364     for (auto j = 0; j < worldSize_; ++j) {
365       inputs_[0][j].fill_(rank_ * worldSize_ + j);
366     }
367 
368     return pg_->reduce_scatter(tensors_, inputs_);
369   }
370 };
371 
372 class ReduceScatterBaseNCCLTest : public NCCLTest {
373  public:
ReduceScatterBaseNCCLTest(const std::string & path,int rank,int worldSize)374   ReduceScatterBaseNCCLTest(const std::string& path, int rank, int worldSize)
375       : NCCLTest(path, rank, worldSize) {
376     at::cuda::OptionalCUDAGuard deviceGuard;
377     deviceGuard.set_index(rank_);
378     output_tensor_ = at::empty({1}, at::kCUDA);
379     input_tensor_ = at::empty({worldSize}, at::kCUDA);
380     for (const auto i : c10::irange(worldSize)) {
381       input_tensor_[i] = i;
382     }
383   }
384 
run()385   c10::intrusive_ptr<c10d::Work> run() {
386     // For the duration of this function, make THC use our streams
387     at::cuda::CUDAMultiStreamGuard guard(streams_);
388 
389     launchDeviceSleep();
390     return pg_->_reduce_scatter_base(output_tensor_, input_tensor_);
391   }
392 
getOutputTensor()393   at::Tensor getOutputTensor() {
394     at::cuda::CUDAMultiStreamGuard guard(streams_);
395     return output_tensor_.cpu();
396   }
397 
getInputTensor()398   at::Tensor getInputTensor() {
399     at::cuda::CUDAMultiStreamGuard guard(streams_);
400     return input_tensor_.cpu();
401   }
402 
403  private:
404   at::Tensor output_tensor_;
405   at::Tensor input_tensor_;
406 };
407 
testAllreduce(const std::string & path,int rank,int size)408 void testAllreduce(const std::string& path, int rank, int size) {
409   auto test = AllreduceNCCLTest(path, rank, size);
410   test.initialize(rank, size);
411   auto work = test.run();
412   // Wait for work to finish
413   test.wait(work);
414 
415   // Validation
416   const int totalNumGPUs = test.numDevices() * size;
417   const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
418   const auto tensors = test.getTensors();
419   for (const auto& tensor : tensors) {
420     const auto* const data = tensor.const_data_ptr<float>();
421     for (const auto k : c10::irange(tensor.numel())) {
422       EXPECT_EQ(data[k], expected)
423           << "Allreduce outputs do not match expected outputs";
424     }
425   }
426 }
427 
testSparseAllreduce(const std::string & path,int rank,int size)428 void testSparseAllreduce(const std::string& path, int rank, int size) {
429   const int inputDim = 3;
430   auto test = SparseAllreduceNCCLTest(path, rank, size, inputDim);
431   test.initialize(rank, size);
432   auto work = test.run();
433   // Wait for work to finish
434   test.wait(work);
435 
436   const auto input_tensors = test.getTensors();
437 
438   // validate the work output is same as tensor
439   auto output_tensor = work->result();
440   // Validation
441   int totalNumGPUs = test.numDevices() * size;
442   // Add one since we are seeding with an additional 1 to prevent empty tensors
443   totalNumGPUs++;
444   const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
445   for (const auto i : c10::irange(input_tensors.size())) {
446     const auto& tensor = input_tensors[i];
447 
448     // validate the tensor is sparse
449     EXPECT_EQ(tensor.is_sparse(), true);
450 
451     auto indices = tensor._indices();
452     auto values = tensor._values();
453 
454     // validate indices are expected size
455     auto sizes = indices.sizes();
456     EXPECT_EQ(sizes.size(), 2);
457     if (sizes[0] == 1) {
458       // row indices
459       EXPECT_EQ(sizes[1], inputDim);
460     } else if (sizes[0] == 2) {
461       // coordinate indices
462       EXPECT_EQ(sizes[1], inputDim * inputDim);
463     }
464 
465     // validate all tensor values are expected value
466     const auto* const data = values.const_data_ptr<float>();
467     for (const auto k : c10::irange(values.numel())) {
468       EXPECT_EQ(data[k], expected)
469           << "Allreduce outputs do not match expected outputs";
470     }
471 
472     // expect the input and output tensors should be the same
473     auto input_dense = tensor.to_dense();
474     auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
475     EXPECT_TRUE(input_dense.allclose(output_dense));
476   }
477 }
478 
testSparseAllreduceLarge(const std::string & path,int rank,int size)479 void testSparseAllreduceLarge(const std::string& path, int rank, int size) {
480   const int inputDim = 2500;
481   auto test = SparseAllreduceNCCLTest(path, rank, size, inputDim);
482   test.initialize(rank, size);
483   auto work = test.run();
484   // Wait for work to finish
485   test.wait(work);
486 
487   const auto input_tensors = test.getTensors();
488 
489   // validate the work output is same as tensor
490   auto output_tensor = work->result();
491   // Validation
492   int totalNumGPUs = test.numDevices() * size;
493   // Add one since we are seeding with an additional 1 to prevent empty tensors
494   totalNumGPUs++;
495   const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
496   for (const auto i : c10::irange(input_tensors.size())) {
497     const auto& tensor = input_tensors[i];
498 
499     // validate the tensor is sparse
500     EXPECT_EQ(tensor.is_sparse(), true);
501 
502     auto indices = tensor._indices();
503     auto values = tensor._values();
504 
505     // validate indices are expected size
506     auto sizes = indices.sizes();
507     EXPECT_EQ(sizes.size(), 2);
508     if (sizes[0] == 1) {
509       // row indices
510       EXPECT_EQ(sizes[1], inputDim);
511     } else if (sizes[0] == 2) {
512       // coordinate indices
513       EXPECT_EQ(sizes[1], inputDim * inputDim);
514     }
515 
516     // validate all tensor values are expected value
517     const auto* const data = values.const_data_ptr<float>();
518     for (const auto k : c10::irange(values.numel())) {
519       EXPECT_EQ(data[k], expected)
520           << "Allreduce outputs do not match expected outputs";
521     }
522 
523     // expect the input and output tensors should be the same
524     auto input_dense = tensor.to_dense();
525     auto output_dense = output_tensor[i].to(input_dense.device()).to_dense();
526     EXPECT_TRUE(input_dense.allclose(output_dense));
527   }
528 }
529 
testBroadcast(const std::string & path,int rank,int size)530 void testBroadcast(const std::string& path, int rank, int size) {
531   auto test = BroadcastNCCLTest(path, rank, size);
532   test.initialize(rank, size);
533 
534   const int numDevices = test.numDevices();
535   // try every permutation of root rank and root tensor
536   for (const auto rootRank : c10::irange(size)) {
537     for (const auto rootTensor : c10::irange(numDevices)) {
538       auto work = test.run(rootRank, rootTensor);
539 
540       // wait for work to complete
541       test.wait(work);
542 
543       // Check results
544       const auto expected = (rootRank * numDevices + rootTensor);
545       const auto tensors = test.getTensors();
546       for (const auto& tensor : tensors) {
547         const auto* const data = tensor.const_data_ptr<float>();
548         for (const auto k : c10::irange(tensor.numel())) {
549           EXPECT_EQ(data[k], expected)
550               << "Broadcast outputs do not match expected outputs";
551         }
552       }
553     }
554   }
555 }
556 
testReduce(const std::string & path,int rank,int size)557 void testReduce(const std::string& path, int rank, int size) {
558   auto test = ReduceNCCLTest(path, rank, size);
559   test.initialize(rank, size);
560 
561   const int numDevices = test.numDevices();
562   // try every permutation of root rank and root tensor
563   for (const auto rootRank : c10::irange(size)) {
564     for (const auto rootTensor : c10::irange(numDevices)) {
565       auto work = test.run(rootRank, rootTensor);
566 
567       // wait for work to complete
568       test.wait(work);
569 
570       // Validation
571       const int totalNumGPUs = numDevices * size;
572       const auto expected = (totalNumGPUs * (totalNumGPUs - 1)) / 2;
573       auto tensors = test.getTensors();
574       if (rank == rootRank) {
575         auto& tensor = tensors[rootTensor];
576         auto data = tensor.data_ptr<float>();
577         for (const auto k : c10::irange(tensor.numel())) {
578           EXPECT_EQ(data[k], expected)
579               << "Reduce outputs do not match expected outputs";
580         }
581       }
582     }
583   }
584 }
585 
testAllgather(const std::string & path,int rank,int size)586 void testAllgather(const std::string& path, int rank, int size) {
587   auto test = AllgatherNCCLTest(path, rank, size);
588   test.initialize(rank, size);
589   auto work = test.run();
590   // Wait for work to finish
591   test.wait(work);
592 
593   // Validation
594   auto tensors = test.getOutputTensors();
595   // device index
596   for (auto& device : tensors) {
597     // rank index
598     for (const auto j : c10::irange(device.size())) {
599       const auto expected = j;
600       auto& tensor = device[j];
601       auto data = tensor.data_ptr<float>();
602       for (const auto k : c10::irange(tensor.numel())) {
603         EXPECT_EQ(data[k], expected)
604             << "Allgather outputs do not match expected outputs";
605       }
606     }
607   }
608 }
609 
testAllgatherBase(const std::string & path,int rank,int size)610 void testAllgatherBase(const std::string& path, int rank, int size) {
611   auto test = AllgatherBaseNCCLTest(path, rank, size);
612   test.initialize(rank, size);
613   auto work = test.run();
614   // Wait for work to finish
615   test.wait(work);
616   // Validation
617   auto output_tensor = test.getOutputTensor();
618   auto input_tensor = test.getInputTensor();
619 
620   auto data = output_tensor.data_ptr<float>();
621 
622   // Rank index
623   for (const auto i : c10::irange(output_tensor.numel())) {
624     // expected is i // input.numel() <- rank, and each rank contributed rank *
625     // num_gpu
626     const auto expected = (i / input_tensor.numel()) * test.numDevices();
627     EXPECT_EQ(data[i], expected)
628         << "Allgather_base outputs do not match expected outputs";
629   }
630 }
testReduceScatterBase(const std::string & path,int rank,int size)631 void testReduceScatterBase(const std::string& path, int rank, int size) {
632   auto test = ReduceScatterBaseNCCLTest(path, rank, size);
633   test.initialize(rank, size);
634   auto work = test.run();
635   // Wait for work to finish
636   test.wait(work);
637   // Validation
638   auto output_tensor = test.getOutputTensor();
639   auto input_tensor = test.getInputTensor();
640 
641   auto data = output_tensor.data_ptr<float>();
642 
643   // Rank index
644   for (const auto i : c10::irange(output_tensor.numel())) {
645     // expected is i * input.numel() <- rank, and each rank contributed rank *
646     // num_gpu
647     const auto expected = size * rank * test.numDevices();
648     EXPECT_EQ(data[i], expected)
649         << "Reducescatter_base outputs do not match expected outputs";
650   }
651 }
652 
testReduceScatter(const std::string & path,int rank,int size)653 void testReduceScatter(const std::string& path, int rank, int size) {
654   auto test = ReduceScatterNCCLTest(path, rank, size);
655   test.initialize(rank, size);
656   auto work = test.run();
657   // Wait for work to finish
658   test.wait(work);
659 
660   const auto participants = size;
661   const auto base = (participants * (participants - 1)) / 2;
662 
663   // Validation
664   auto tensors = test.getTensors();
665   const auto modifier = rank * participants;
666   const auto expected = base * participants + modifier;
667   auto& tensor = tensors[0];
668   auto data = tensor.data_ptr<float>();
669   for (const auto j : c10::irange(tensor.numel())) {
670     EXPECT_EQ(data[j], expected)
671         << "ReduceScatter outputs do not match expected outputs!";
672   }
673 }
674 
testSequenceNumInit(const std::string & path,int rank,int size)675 void testSequenceNumInit(const std::string& path, int rank, int size) {
676   NCCLTest test(path, rank, size);
677   test.initialize(rank, size);
678   test.getProcessGroup()->setSequenceNumberForGroup();
679   auto seqNum = test.getProcessGroup()->getSequenceNumberForGroup();
680   EXPECT_EQ(seqNum, 0);
681 }
682 
testSplittingCommunicator(const std::string & path,int rank,int size)683 void testSplittingCommunicator(const std::string& path, int rank, int size) {
684   auto test1 = BroadcastNCCLTest(path, rank, size);
685   test1.initialize(rank, size);
686 
687   auto test2 = BroadcastNCCLTest(path, rank, size);
688   test2.initialize(rank, size, test1.getProcessGroup());
689 
690   // Steal the broadcast test and issue it for both of our groups.
691   // This ensures consistent full collective communication.  TODO:
692   // maybe refactor the guts rather than copy-pasta, but it may not be
693   // worth it.
694   for (auto test : {&test1, &test2}) {
695     const int numDevices = test->numDevices();
696     // try every permutation of root rank and root tensor
697     for (const auto rootRank : c10::irange(size)) {
698       for (const auto rootTensor : c10::irange(numDevices)) {
699         auto work = test->run(rootRank, rootTensor);
700         test->wait(work);
701 
702         // Check results
703         const auto expected = (rootRank * numDevices + rootTensor);
704         const auto tensors = test->getTensors();
705         for (const auto& tensor : tensors) {
706           const auto* const data = tensor.const_data_ptr<float>();
707           for (const auto k : c10::irange(tensor.numel())) {
708             EXPECT_EQ(data[k], expected)
709                 << "Broadcast outputs do not match expected outputs";
710           }
711         }
712       }
713     }
714   }
715 
716   // Now that we've run full operations on both the original and split process
717   // group, ensure we saw exactly as many splits as we expected: 0 in the
718   // original process group, and one per device in the second.
719   EXPECT_EQ(test2.getProcessGroup()->getCommSplitCounter(), 0);
720   EXPECT_EQ(test1.getProcessGroup()->getCommSplitCounter(), test1.numDevices());
721 }
722 
723 // All testAbc's use this signature
724 using FuncType = void (*)(const std::string&, int, int);
725 
726 class ProcessGroupNCCLTest : public ::testing::Test {
727  protected:
SetUp()728   void SetUp() override {
729     c10::initLogging();
730     // Use WORLD_SIZE and RANK environmental variables to do multi-node
731     // distributed testing
732     auto sizeEnv = std::getenv("WORLD_SIZE");
733     if (sizeEnv) {
734       size_ = std::stoi(std::string(sizeEnv));
735     }
736     LOG(INFO) << "ProcessGroupNCCLTest world size: " << size_;
737   }
738 
TearDown()739   void TearDown() override {
740     // Reset TORCH_NCCL_BLOCKING_WAIT environment variable after each run.
741     ASSERT_TRUE(setenv(c10d::TORCH_NCCL_BLOCKING_WAIT[0].c_str(), "0", 1) == 0);
742   }
743 
skipTest()744   bool skipTest() {
745     // Skip tests if CUDA is not available.
746     if (!at::cuda::is_available()) {
747       LOG(INFO) << "CUDA not available, skipping test";
748       return true;
749     }
750     return false;
751   }
752 
multiThreadRun(FuncType testFunc)753   void multiThreadRun(FuncType testFunc) {
754     TemporaryFile file;
755     std::vector<std::thread> threads;
756     threads.reserve(size_);
757     for (const auto rank : c10::irange(size_)) {
758       threads.emplace_back(std::thread(testFunc, file.path, rank, size_));
759     }
760     for (const auto rank : c10::irange(size_)) {
761       threads[rank].join();
762     }
763   }
764 
765   int size_{1};
766 };
767 
TEST_F(ProcessGroupNCCLTest,testAllreduce)768 TEST_F(ProcessGroupNCCLTest, testAllreduce) {
769   if (skipTest()) {
770     return;
771   }
772   multiThreadRun(testAllreduce);
773 }
774 
TEST_F(ProcessGroupNCCLTest,testBroadcast)775 TEST_F(ProcessGroupNCCLTest, testBroadcast) {
776   if (skipTest()) {
777     return;
778   }
779   multiThreadRun(testBroadcast);
780 }
781 
TEST_F(ProcessGroupNCCLTest,testReduce)782 TEST_F(ProcessGroupNCCLTest, testReduce) {
783   if (skipTest()) {
784     return;
785   }
786   multiThreadRun(testReduce);
787 }
788 
TEST_F(ProcessGroupNCCLTest,testAllgather)789 TEST_F(ProcessGroupNCCLTest, testAllgather) {
790   if (skipTest()) {
791     return;
792   }
793   multiThreadRun(testAllgather);
794 }
795 
TEST_F(ProcessGroupNCCLTest,testAllgatherBase)796 TEST_F(ProcessGroupNCCLTest, testAllgatherBase) {
797   if (skipTest()) {
798     return;
799   }
800   multiThreadRun(testAllgatherBase);
801 }
802 
TEST_F(ProcessGroupNCCLTest,testReduceScatter)803 TEST_F(ProcessGroupNCCLTest, testReduceScatter) {
804   if (skipTest()) {
805     return;
806   }
807   multiThreadRun(testReduceScatter);
808 }
809 
TEST_F(ProcessGroupNCCLTest,testSequenceNumInit)810 TEST_F(ProcessGroupNCCLTest, testSequenceNumInit) {
811   if (skipTest()) {
812     return;
813   }
814   multiThreadRun(testSequenceNumInit);
815 }
816 
TEST_F(ProcessGroupNCCLTest,testReduceScatterBase)817 TEST_F(ProcessGroupNCCLTest, testReduceScatterBase) {
818   if (skipTest()) {
819     return;
820   }
821   multiThreadRun(testReduceScatterBase);
822 }
823 
TEST_F(ProcessGroupNCCLTest,testBackendName)824 TEST_F(ProcessGroupNCCLTest, testBackendName) {
825   if (skipTest()) {
826     return;
827   }
828   TemporaryFile file;
829   auto test = NCCLTestBase(file.path);
830   test.initialize(/*rank=*/0, /*world_size=*/1);
831   EXPECT_EQ(
832       test.getProcessGroup()->getBackendName(),
833       std::string(c10d::NCCL_BACKEND_NAME));
834 }
835 
TEST_F(ProcessGroupNCCLTest,testSplittingCommunicator)836 TEST_F(ProcessGroupNCCLTest, testSplittingCommunicator) {
837   if (skipTest()) {
838     return;
839   }
840   multiThreadRun(testSplittingCommunicator);
841 }
842 
843 #ifdef IS_NCCLX
TEST_F(ProcessGroupNCCLTest,testSparseAllreduce)844 TEST_F(ProcessGroupNCCLTest, testSparseAllreduce) {
845   if (skipTest()) {
846     return;
847   }
848   multiThreadRun(testSparseAllreduce);
849   multiThreadRun(testSparseAllreduceLarge);
850 }
851 #endif
852