1 #include <c10/cuda/CUDAGuard.h>
2 #include <c10/util/irange.h>
3
4 #include <ATen/cuda/CUDAContext.h>
5 #include <gtest/gtest.h>
6 #include <torch/csrc/distributed/c10d/FileStore.hpp>
7 #include <torch/csrc/distributed/c10d/ProcessGroupGloo.hpp>
8 #include "CUDATest.hpp"
9 #include "TestUtils.hpp"
10
11 using namespace c10d::test;
12
13 using at::cuda::CUDAStream;
14
15 template <typename T, typename... Args>
initialize(const std::string & path,int N,Args &&...args)16 std::vector<T> initialize(const std::string& path, int N, Args&&... args) {
17 std::vector<T> tests;
18 for (C10_UNUSED const auto i : c10::irange(N)) {
19 tests.push_back(std::move(T(path, std::forward<Args>(args)...)));
20 }
21
22 std::vector<std::thread> threads;
23 for (C10_UNUSED const auto i : c10::irange(N)) {
24 threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); }));
25 }
26
27 for (auto& thread : threads) {
28 thread.join();
29 }
30
31 return tests;
32 }
33
34 class AsyncTest {
35 public:
AsyncTest(std::string path)36 AsyncTest(std::string path) : path_(std::move(path)) {}
37
AsyncTest(AsyncTest && other)38 AsyncTest(AsyncTest&& other) {
39 path_ = std::move(other.path_);
40 pg_ = std::move(other.pg_);
41 }
42
getProcessGroup()43 ::c10d::ProcessGroupGloo& getProcessGroup() {
44 return *pg_;
45 }
46
start(int rank,int size)47 void start(int rank, int size) {
48 auto store = c10::make_intrusive<::c10d::FileStore>(path_, size);
49
50 // Use tiny timeout to make this test run fast
51 auto options = ::c10d::ProcessGroupGloo::Options::create();
52 options->timeout = std::chrono::milliseconds(50);
53 options->devices.push_back(
54 ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1"));
55
56 pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>(
57 new ::c10d::ProcessGroupGloo(store, rank, size, options));
58 }
59
60 protected:
61 std::string path_;
62 std::unique_ptr<::c10d::ProcessGroupGloo> pg_;
63 };
64
65 class AsyncInputIsOutputTest : public AsyncTest {
66 public:
AsyncInputIsOutputTest(const std::string & path,int numTensors)67 AsyncInputIsOutputTest(const std::string& path, int numTensors)
68 : AsyncTest(path),
69 numTensors_(numTensors),
70 numDevices_(cudaNumDevices()) {
71 // Allocate inputs on available devices in a round robin fashion.
72 ::at::globalContext().lazyInitCUDA();
73 inputs_.resize(numTensors_);
74 for (const auto i : c10::irange(numTensors_)) {
75 inputs_[i] = at::empty(
76 {16, 16},
77 at::device(
78 {at::kCUDA, static_cast<c10::DeviceIndex>(i % numDevices_)}));
79 }
80
81 // Allocate a stream per device.
82 //
83 // The "current stream" is set globally per device in THC, so we
84 // can't make two tensors on the same device use different streams
85 // and pass this along to the collective (since it uses the THC
86 // getters to retrieve the current stream).
87 //
88 at::cuda::OptionalCUDAGuard deviceGuard;
89 streams_.reserve(numDevices_);
90 for (const auto i : c10::irange(numDevices_)) {
91 deviceGuard.set_index(i);
92 streams_.push_back(at::cuda::getStreamFromPool());
93 }
94 }
95
wait(c10::intrusive_ptr<c10d::Work> & work)96 void wait(c10::intrusive_ptr<c10d::Work>& work) {
97 c10::cuda::CUDAMultiStreamGuard guard(streams_);
98 work->wait();
99 }
100
getCpuTensors(const std::vector<at::Tensor> & gpu_tensors)101 std::vector<at::Tensor> getCpuTensors(
102 const std::vector<at::Tensor>& gpu_tensors) {
103 std::vector<at::Tensor> outputs(gpu_tensors.size());
104
105 // For the duration of this function, make THC use our streams
106 c10::cuda::CUDAMultiStreamGuard guard(streams_);
107
108 // Copy inputs to outputs
109 for (unsigned i = 0; i < gpu_tensors.size(); i++) {
110 outputs[i] = gpu_tensors[i].cpu();
111 }
112
113 return outputs;
114 }
115
getTensors()116 std::vector<at::Tensor> getTensors() {
117 return getCpuTensors(inputs_);
118 }
119
120 protected:
121 const int numTensors_;
122 const int numDevices_;
123 std::vector<at::Tensor> inputs_;
124 std::vector<CUDAStream> streams_;
125 };
126
127 class AsyncAllreduceTest : public AsyncInputIsOutputTest {
128 public:
AsyncAllreduceTest(const std::string & path,int numTensors)129 AsyncAllreduceTest(const std::string& path, int numTensors)
130 : AsyncInputIsOutputTest(path, numTensors) {}
131
run()132 c10::intrusive_ptr<c10d::Work> run() {
133 // For the duration of this function, make THC use our streams
134 c10::cuda::CUDAMultiStreamGuard guard(streams_);
135
136 // Launch sleep on every stream
137 at::cuda::OptionalCUDAGuard deviceGuard;
138 for (const auto i : c10::irange(numDevices_)) {
139 deviceGuard.set_index(i);
140 cudaSleep(streams_[i], 10 * 1000 * 1000);
141 }
142
143 // Launch value initialization for every tensor
144 for (const auto i : c10::irange(numTensors_)) {
145 deviceGuard.set_index(i % numDevices_);
146 inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
147 }
148
149 return pg_->allreduce(inputs_);
150 }
151 };
152
153 class AsyncBroadcastTest : public AsyncInputIsOutputTest {
154 public:
AsyncBroadcastTest(const std::string & path,int numTensors)155 AsyncBroadcastTest(const std::string& path, int numTensors)
156 : AsyncInputIsOutputTest(path, numTensors) {}
157
run(int rootRank,int rootTensor)158 c10::intrusive_ptr<c10d::Work> run(int rootRank, int rootTensor) {
159 // For the duration of this function, make THC use our streams
160 c10::cuda::CUDAMultiStreamGuard guard(streams_);
161
162 // Launch sleep on every stream
163 at::cuda::OptionalCUDAGuard deviceGuard;
164 for (const auto i : c10::irange(numDevices_)) {
165 deviceGuard.set_index(i);
166 cudaSleep(streams_[i], 10 * 1000 * 1000);
167 }
168
169 // Launch value initialization for every tensor
170 for (const auto i : c10::irange(numTensors_)) {
171 deviceGuard.set_index(i % numDevices_);
172 inputs_[i].fill_(pg_->getRank() * numTensors_ + i);
173 }
174
175 ::c10d::BroadcastOptions options;
176 options.rootRank = rootRank;
177 options.rootTensor = rootTensor;
178 return pg_->broadcast(inputs_, options);
179 }
180 };
181
runAsyncAllreduceTest(const std::string & path,size_t numProcesses=4,size_t numTensors=2)182 void runAsyncAllreduceTest(
183 const std::string& path,
184 size_t numProcesses = 4,
185 size_t numTensors = 2) {
186 auto tests = initialize<AsyncAllreduceTest>(path, numProcesses, numTensors);
187 std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
188 for (const auto i : c10::irange(numProcesses)) {
189 work[i] = tests[i].run();
190 }
191
192 // Wait for work to complete
193 for (const auto i : c10::irange(numProcesses)) {
194 tests[i].wait(work[i]);
195 }
196
197 // Check results
198 for (const auto i : c10::irange(numProcesses)) {
199 const auto size = numProcesses * numTensors;
200 const auto expected = (size * (size - 1)) / 2;
201 auto tensors = tests[i].getTensors();
202 auto results = tests[i].getCpuTensors(work[i]->result());
203 EXPECT_EQ(tensors.size(), results.size());
204
205 for (const auto j : c10::irange(tensors.size())) {
206 auto& tensor = tensors[j];
207 auto data = tensor.data_ptr<float>();
208
209 auto& result_tensor = results[j];
210 auto result_data = result_tensor.data_ptr<float>();
211
212 EXPECT_EQ(tensor.numel(), result_tensor.numel());
213
214 for (const auto k : c10::irange(tensor.numel())) {
215 EXPECT_EQ(data[k], expected);
216 EXPECT_EQ(result_data[k], expected);
217 }
218 }
219 }
220 }
221
runAsyncBroadcastTest(const std::string & path,size_t numProcesses=4,size_t numTensors=1)222 void runAsyncBroadcastTest(
223 const std::string& path,
224 size_t numProcesses = 4,
225 size_t numTensors = 1) {
226 auto tests = initialize<AsyncBroadcastTest>(path, numProcesses, numTensors);
227
228 // Try every permutation of root rank and root tensor
229 for (const auto rootRank : c10::irange(numProcesses)) {
230 for (const auto rootTensor : c10::irange(numTensors)) {
231 std::vector<c10::intrusive_ptr<c10d::Work>> work(numProcesses);
232 for (const auto i : c10::irange(numProcesses)) {
233 work[i] = tests[i].run(rootRank, rootTensor);
234 }
235
236 // Wait for work to complete
237 for (const auto i : c10::irange(numProcesses)) {
238 tests[i].wait(work[i]);
239 }
240
241 // Check results
242 const auto expected = (rootRank * numTensors + rootTensor);
243 for (const auto i : c10::irange(numProcesses)) {
244 auto tensors = tests[i].getTensors();
245 for (const auto& tensor : tensors) {
246 const auto* const data = tensor.const_data_ptr<float>();
247 for (const auto k : c10::irange(tensor.numel())) {
248 EXPECT_EQ(data[k], expected);
249 }
250 }
251 }
252 }
253 }
254 }
255
256 #ifdef USE_CUDA
TEST(ProcessGroupGlooAsyncTest,testAsyncAllreduce)257 TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) {
258 if (!at::cuda::is_available()) {
259 LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce";
260 return;
261 }
262 TemporaryFile file;
263 runAsyncAllreduceTest(file.path);
264 }
265
TEST(ProcessGroupGlooAsyncTest,testAsyncBroadcast)266 TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) {
267 if (!at::cuda::is_available()) {
268 LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast";
269 return;
270 }
271 TemporaryFile file;
272 runAsyncBroadcastTest(file.path);
273 }
274 #endif
275