#include #include #include #include #include #include #include "CUDATest.hpp" #include "TestUtils.hpp" using namespace c10d::test; using at::cuda::CUDAStream; template std::vector initialize(const std::string& path, int N, Args&&... args) { std::vector tests; for (C10_UNUSED const auto i : c10::irange(N)) { tests.push_back(std::move(T(path, std::forward(args)...))); } std::vector threads; for (C10_UNUSED const auto i : c10::irange(N)) { threads.push_back(std::thread([i, N, &tests] { tests[i].start(i, N); })); } for (auto& thread : threads) { thread.join(); } return tests; } class AsyncTest { public: AsyncTest(std::string path) : path_(std::move(path)) {} AsyncTest(AsyncTest&& other) { path_ = std::move(other.path_); pg_ = std::move(other.pg_); } ::c10d::ProcessGroupGloo& getProcessGroup() { return *pg_; } void start(int rank, int size) { auto store = c10::make_intrusive<::c10d::FileStore>(path_, size); // Use tiny timeout to make this test run fast auto options = ::c10d::ProcessGroupGloo::Options::create(); options->timeout = std::chrono::milliseconds(50); options->devices.push_back( ::c10d::ProcessGroupGloo::createDeviceForHostname("127.0.0.1")); pg_ = std::unique_ptr<::c10d::ProcessGroupGloo>( new ::c10d::ProcessGroupGloo(store, rank, size, options)); } protected: std::string path_; std::unique_ptr<::c10d::ProcessGroupGloo> pg_; }; class AsyncInputIsOutputTest : public AsyncTest { public: AsyncInputIsOutputTest(const std::string& path, int numTensors) : AsyncTest(path), numTensors_(numTensors), numDevices_(cudaNumDevices()) { // Allocate inputs on available devices in a round robin fashion. ::at::globalContext().lazyInitCUDA(); inputs_.resize(numTensors_); for (const auto i : c10::irange(numTensors_)) { inputs_[i] = at::empty( {16, 16}, at::device( {at::kCUDA, static_cast(i % numDevices_)})); } // Allocate a stream per device. // // The "current stream" is set globally per device in THC, so we // can't make two tensors on the same device use different streams // and pass this along to the collective (since it uses the THC // getters to retrieve the current stream). // at::cuda::OptionalCUDAGuard deviceGuard; streams_.reserve(numDevices_); for (const auto i : c10::irange(numDevices_)) { deviceGuard.set_index(i); streams_.push_back(at::cuda::getStreamFromPool()); } } void wait(c10::intrusive_ptr& work) { c10::cuda::CUDAMultiStreamGuard guard(streams_); work->wait(); } std::vector getCpuTensors( const std::vector& gpu_tensors) { std::vector outputs(gpu_tensors.size()); // For the duration of this function, make THC use our streams c10::cuda::CUDAMultiStreamGuard guard(streams_); // Copy inputs to outputs for (unsigned i = 0; i < gpu_tensors.size(); i++) { outputs[i] = gpu_tensors[i].cpu(); } return outputs; } std::vector getTensors() { return getCpuTensors(inputs_); } protected: const int numTensors_; const int numDevices_; std::vector inputs_; std::vector streams_; }; class AsyncAllreduceTest : public AsyncInputIsOutputTest { public: AsyncAllreduceTest(const std::string& path, int numTensors) : AsyncInputIsOutputTest(path, numTensors) {} c10::intrusive_ptr run() { // For the duration of this function, make THC use our streams c10::cuda::CUDAMultiStreamGuard guard(streams_); // Launch sleep on every stream at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { deviceGuard.set_index(i); cudaSleep(streams_[i], 10 * 1000 * 1000); } // Launch value initialization for every tensor for (const auto i : c10::irange(numTensors_)) { deviceGuard.set_index(i % numDevices_); inputs_[i].fill_(pg_->getRank() * numTensors_ + i); } return pg_->allreduce(inputs_); } }; class AsyncBroadcastTest : public AsyncInputIsOutputTest { public: AsyncBroadcastTest(const std::string& path, int numTensors) : AsyncInputIsOutputTest(path, numTensors) {} c10::intrusive_ptr run(int rootRank, int rootTensor) { // For the duration of this function, make THC use our streams c10::cuda::CUDAMultiStreamGuard guard(streams_); // Launch sleep on every stream at::cuda::OptionalCUDAGuard deviceGuard; for (const auto i : c10::irange(numDevices_)) { deviceGuard.set_index(i); cudaSleep(streams_[i], 10 * 1000 * 1000); } // Launch value initialization for every tensor for (const auto i : c10::irange(numTensors_)) { deviceGuard.set_index(i % numDevices_); inputs_[i].fill_(pg_->getRank() * numTensors_ + i); } ::c10d::BroadcastOptions options; options.rootRank = rootRank; options.rootTensor = rootTensor; return pg_->broadcast(inputs_, options); } }; void runAsyncAllreduceTest( const std::string& path, size_t numProcesses = 4, size_t numTensors = 2) { auto tests = initialize(path, numProcesses, numTensors); std::vector> work(numProcesses); for (const auto i : c10::irange(numProcesses)) { work[i] = tests[i].run(); } // Wait for work to complete for (const auto i : c10::irange(numProcesses)) { tests[i].wait(work[i]); } // Check results for (const auto i : c10::irange(numProcesses)) { const auto size = numProcesses * numTensors; const auto expected = (size * (size - 1)) / 2; auto tensors = tests[i].getTensors(); auto results = tests[i].getCpuTensors(work[i]->result()); EXPECT_EQ(tensors.size(), results.size()); for (const auto j : c10::irange(tensors.size())) { auto& tensor = tensors[j]; auto data = tensor.data_ptr(); auto& result_tensor = results[j]; auto result_data = result_tensor.data_ptr(); EXPECT_EQ(tensor.numel(), result_tensor.numel()); for (const auto k : c10::irange(tensor.numel())) { EXPECT_EQ(data[k], expected); EXPECT_EQ(result_data[k], expected); } } } } void runAsyncBroadcastTest( const std::string& path, size_t numProcesses = 4, size_t numTensors = 1) { auto tests = initialize(path, numProcesses, numTensors); // Try every permutation of root rank and root tensor for (const auto rootRank : c10::irange(numProcesses)) { for (const auto rootTensor : c10::irange(numTensors)) { std::vector> work(numProcesses); for (const auto i : c10::irange(numProcesses)) { work[i] = tests[i].run(rootRank, rootTensor); } // Wait for work to complete for (const auto i : c10::irange(numProcesses)) { tests[i].wait(work[i]); } // Check results const auto expected = (rootRank * numTensors + rootTensor); for (const auto i : c10::irange(numProcesses)) { auto tensors = tests[i].getTensors(); for (const auto& tensor : tensors) { const auto* const data = tensor.const_data_ptr(); for (const auto k : c10::irange(tensor.numel())) { EXPECT_EQ(data[k], expected); } } } } } } #ifdef USE_CUDA TEST(ProcessGroupGlooAsyncTest, testAsyncAllreduce) { if (!at::cuda::is_available()) { LOG(INFO) << "CUDA not available, skipping testAsyncAllreduce"; return; } TemporaryFile file; runAsyncAllreduceTest(file.path); } TEST(ProcessGroupGlooAsyncTest, testAsyncBroadcast) { if (!at::cuda::is_available()) { LOG(INFO) << "CUDA not available, skipping testAsyncBroadcast"; return; } TemporaryFile file; runAsyncBroadcastTest(file.path); } #endif