xref: /aosp_15_r20/external/pytorch/test/cpp/c10d/ProcessGroupGlooAsyncTest.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <c10/cuda/CUDAGuard.h>
2 #include <c10/util/irange.h>
3 
4 #include <ATen/cuda/CUDAContext.h>
5 #include <gtest/gtest.h>
6 #include <torch/csrc/distributed/c10d/FileStore.hpp>
7 #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
8 #include "CUDATest.hpp"
9 #include "TestUtils.hpp"
10 
11 using namespace c10d::test;
12 
13 using at::cuda::CUDAStream;
14 
15 template <typename T, typename... Args>
initialize(const std::string & path,int N,Args &&...args)16 std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
17   std::vector<T> tests;
18   for (C10_UNUSED const auto i : c10::irange(N)) {
19     tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
20   }
21 
22   std::vector<std::thread> threads;
23   for (C10_UNUSED const auto i : c10::irange(N)) {
24     threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); }));
25   }
26 
27   for (auto& thread : threads) {
28     thread.join();
29   }
30 
31   return tests;
32 }
33 
34 class AsyncTest {
35  public:
AsyncTest(std::string path)36   AsyncTest(std::string path) : path_(std::move(path)) {}
37 
AsyncTest(AsyncTest && other)38   AsyncTest(AsyncTest&& other) {
39     path_ = std::move(other.path_);
40     pg_ = std::move(other.pg_);
41   }
42 
getProcessGroup()43   ::c10d::ProcessGroupGloo& getProcessGroup() {
44     return *pg_;
45   }
46 
start(int rank,int size)47   void start(int rank, int size) {
48     auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
49 
50     // Use tiny timeout to make this test run fast
51     auto options = ::c10d::ProcessGroupGloo::Options::create();
52     options->timeout = std::chrono::milliseconds(50);
53     options->devices.push_back(
54         ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
55 
56     pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
57         new ::c10d::ProcessGroupGloo(store, rank, size, options));
58   }
59 
60  protected:
61   std::string path_;
62   std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
63 };
64 
65 class AsyncInputIsOutputTest : public AsyncTest {
66  public:
AsyncInputIsOutputTest(const std::string & path,int numTensors)67   AsyncInputIsOutputTest(const std::string& path, int numTensors)
68       : AsyncTest(path),
69         numTensors_(numTensors),
70         numDevices_(cudaNumDevices()) {
71     // Allocate inputs on available devices in a round robin fashion.
72     ::at::globalContext().lazyInitCUDA();
73     inputs_.resize(numTensors_);
74     for (const auto i : c10::irange(numTensors_)) {
75       inputs_[i] = at::empty(
76           {16, 16},
77           at::device(
78               {at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)}));
79     }
80 
81     // Allocate a stream per device.
82     //
83     // The "current stream" is set globally per device in THC, so we
84     // can't make two tensors on the same device use different streams
85     // and pass this along to the collective (since it uses the THC
86     // getters to retrieve the current stream).
87     //
88     at::cuda::OptionalCUDAGuard deviceGuard;
89     streams_.reserve(numDevices_);
90     for (const auto i : c10::irange(numDevices_)) {
91       deviceGuard.set_index(i);
92       streams_.push_back(at::cuda::getStreamFromPool());
93     }
94   }
95 
wait(c10::intrusive_ptr<c10d::Work> & work)96   void wait(c10::intrusive_ptr<c10d::Work>& work) {
97     c10::cuda::CUDAMultiStreamGuard guard(streams_);
98     work->wait();
99   }
100 
getCpuTensors(const std::vector<at::Tensor> & gpu_tensors)101   std::vector<at::Tensor> getCpuTensors(
102       const std::vector<at::Tensor>& gpu_tensors) {
103     std::vector<at::Tensor> outputs(gpu_tensors.size());
104 
105     // For the duration of this function, make THC use our streams
106     c10::cuda::CUDAMultiStreamGuard guard(streams_);
107 
108     // Copy inputs to outputs
109     for (unsigned i = 0; i < gpu_tensors.size(); i++) {
110       outputs[i] = gpu_tensors[i].cpu();
111     }
112 
113     return outputs;
114   }
115 
getTensors()116   std::vector<at::Tensor> getTensors() {
117     return getCpuTensors(inputs_);
118   }
119 
120  protected:
121   const int numTensors_;
122   const int numDevices_;
123   std::vector<at::Tensor> inputs_;
124   std::vector<CUDAStream> streams_;
125 };
126 
127 class AsyncAllreduceTest : public AsyncInputIsOutputTest {
128  public:
AsyncAllreduceTest(const std::string & path,int numTensors)129   AsyncAllreduceTest(const std::string& path, int numTensors)
130       : AsyncInputIsOutputTest(path, numTensors) {}
131 
run()132   c10::intrusive_ptr<c10d::Work> run() {
133     // For the duration of this function, make THC use our streams
134     c10::cuda::CUDAMultiStreamGuard guard(streams_);
135 
136     // Launch sleep on every stream
137     at::cuda::OptionalCUDAGuard deviceGuard;
138     for (const auto i : c10::irange(numDevices_)) {
139       deviceGuard.set_index(i);
140       cudaSleep(streams_[i], 10 * 1000 * 1000);
141     }
142 
143     // Launch value initialization for every tensor
144     for (const auto i : c10::irange(numTensors_)) {
145       deviceGuard.set_index(i % numDevices_);
146       inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
147     }
148 
149     return pg_->allreduce(inputs_);
150   }
151 };
152 
153 class AsyncBroadcastTest : public AsyncInputIsOutputTest {
154  public:
AsyncBroadcastTest(const std::string & path,int numTensors)155   AsyncBroadcastTest(const std::string& path, int numTensors)
156       : AsyncInputIsOutputTest(path, numTensors) {}
157 
run(int rootRank,int rootTensor)158   c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
159     // For the duration of this function, make THC use our streams
160     c10::cuda::CUDAMultiStreamGuard guard(streams_);
161 
162     // Launch sleep on every stream
163     at::cuda::OptionalCUDAGuard deviceGuard;
164     for (const auto i : c10::irange(numDevices_)) {
165       deviceGuard.set_index(i);
166       cudaSleep(streams_[i], 10 * 1000 * 1000);
167     }
168 
169     // Launch value initialization for every tensor
170     for (const auto i : c10::irange(numTensors_)) {
171       deviceGuard.set_index(i % numDevices_);
172       inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
173     }
174 
175     ::c10d::BroadcastOptions options;
176     options.rootRank = rootRank;
177     options.rootTensor = rootTensor;
178     return pg_->broadcast(inputs_, options);
179   }
180 };
181 
runAsyncAllreduceTest(const std::string & path,size_t numProcesses=4,size_t numTensors=2)182 void runAsyncAllreduceTest(
183     const std::string& path,
184     size_t numProcesses = 4,
185     size_t numTensors = 2) {
186   auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors);
187   std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
188   for (const auto i : c10::irange(numProcesses)) {
189     work[i] = tests[i].run();
190   }
191 
192   // Wait for work to complete
193   for (const auto i : c10::irange(numProcesses)) {
194     tests[i].wait(work[i]);
195   }
196 
197   // Check results
198   for (const auto i : c10::irange(numProcesses)) {
199     const auto size = numProcesses * numTensors;
200     const auto expected = (size * (size - 1)) / 2;
201     auto tensors = tests[i].getTensors();
202     auto results = tests[i].getCpuTensors(work[i]->result());
203     EXPECT_EQ(tensors.size(), results.size());
204 
205     for (const auto j : c10::irange(tensors.size())) {
206       auto& tensor = tensors[j];
207       auto data = tensor.data_ptr<float>();
208 
209       auto& result_tensor = results[j];
210       auto result_data = result_tensor.data_ptr<float>();
211 
212       EXPECT_EQ(tensor.numel(), result_tensor.numel());
213 
214       for (const auto k : c10::irange(tensor.numel())) {
215         EXPECT_EQ(data[k], expected);
216         EXPECT_EQ(result_data[k], expected);
217       }
218     }
219   }
220 }
221 
runAsyncBroadcastTest(const std::string & path,size_t numProcesses=4,size_t numTensors=1)222 void runAsyncBroadcastTest(
223     const std::string& path,
224     size_t numProcesses = 4,
225     size_t numTensors = 1) {
226   auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors);
227 
228   // Try every permutation of root rank and root tensor
229   for (const auto rootRank : c10::irange(numProcesses)) {
230     for (const auto rootTensor : c10::irange(numTensors)) {
231       std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
232       for (const auto i : c10::irange(numProcesses)) {
233         work[i] = tests[i].run(rootRank, rootTensor);
234       }
235 
236       // Wait for work to complete
237       for (const auto i : c10::irange(numProcesses)) {
238         tests[i].wait(work[i]);
239       }
240 
241       // Check results
242       const auto expected = (rootRank * numTensors + rootTensor);
243       for (const auto i : c10::irange(numProcesses)) {
244         auto tensors = tests[i].getTensors();
245         for (const auto& tensor : tensors) {
246           const auto* const data = tensor.const_data_ptr<float>();
247           for (const auto k : c10::irange(tensor.numel())) {
248             EXPECT_EQ(data[k], expected);
249           }
250         }
251       }
252     }
253   }
254 }
255 
256 #ifdef USE_CUDA
TEST(ProcessGroupGlooAsyncTest,testAsyncAllreduce)257 TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) {
258   if (!at::cuda::is_available()) {
259     LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce";
260     return;
261   }
262   TemporaryFile file;
263   runAsyncAllreduceTest(file.path);
264 }
265 
TEST(ProcessGroupGlooAsyncTest,testAsyncBroadcast)266 TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) {
267   if (!at::cuda::is_available()) {
268     LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast";
269     return;
270   }
271   TemporaryFile file;
272   runAsyncBroadcastTest(file.path);
273 }
274 #endif
275