#include #include #include #include #include #include #include #include #define STR_HELPER(x) #x #define STR(x) STR_HELPER(x) // Wait for work to complete std::vector> waitWork( c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg, std::vector> works) { std::vector> outputTensors; for (auto& work : works) { try { work->wait(); } catch (const std::exception& ex) { std::cerr << "Exception received: " << ex.what() << std::endl; pg->abort(); } outputTensors.emplace_back(work->result()); } return outputTensors; } // Wait using Futures std::vector> waitFuture( c10::intrusive_ptr<::c10d::ProcessGroupMPI> pg, std::vector> works) { std::vector> outputTensors; for (auto& work : works) { auto fut = work->getFuture(); try { fut->wait(); } catch (const std::exception& ex) { std::cerr << "Exception received: " << ex.what() << std::endl; pg->abort(); } auto result = fut->value(); if (result.isNone()) { outputTensors.emplace_back(); } else if (result.isTensorList()) { outputTensors.emplace_back(result.toTensorVector()); } else { TORCH_CHECK(false, "future result should be tensor list or none"); } } return outputTensors; } void testAllreduce(int iter = 1000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); // Generate inputs std::vector> works; for (const auto i : c10::irange(iter)) { auto tensor = at::ones({16, 16}) * i; std::vector tensors = {tensor}; // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->allreduce(tensors); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Get the world size auto worldSize = pg->getSize(); // Verify outputs for (const auto i : c10::irange(iter)) { const auto expected = worldSize * i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { if (data[j] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } void testBroadcast(int iter = 10000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); std::vector> works; for (const auto i : c10::irange(iter)) { auto tensors = std::vector(); if (pg->getRank() == 0) { auto tensor = at::ones({16, 16}) * i; tensors = std::vector({tensor}); } else { auto tensor = at::zeros({16, 16}); tensors = std::vector({tensor}); } // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->broadcast(tensors); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Verify outputs for (const auto i : c10::irange(iter)) { const auto expected = i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { if (data[j] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } void testReduce(int iter = 10000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); std::vector> works; for (const auto i : c10::irange(iter)) { auto tensor = at::ones({16, 16}) * i; auto tensors = std::vector({tensor}); // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->reduce(tensors); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Get the world size auto worldSize = pg->getSize(); if (pg->getRank() == 0) { // Verify outputs for (const auto i : c10::irange(iter)) { const auto expected = worldSize * i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { if (data[j] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } } void testAllgather(int iter = 10000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); std::vector> works; // Get the world size auto worldSize = pg->getSize(); auto rank = pg->getRank(); // Generate inputs for (const auto i : c10::irange(iter)) { auto tensor = at::ones({16, 16}) * i * rank; auto tensors = std::vector({tensor}); auto outputs = std::vector>(1); outputs[0].resize(worldSize); for (const auto j : c10::irange(worldSize)) { outputs[0][j] = at::zeros({16, 16}); } // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->allgather(outputs, tensors); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Verify outputs for (const auto i : c10::irange(iter)) { for (const auto j : c10::irange(worldSize)) { const auto expected = i * j; auto data = outputTensors[i][j].data_ptr(); for (auto k = 0; k < outputTensors[i][j].numel(); ++k) { if (data[k] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } } void testGather(int iter = 10000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); std::vector> works; // Get the world size auto worldSize = pg->getSize(); auto rank = pg->getRank(); // Generate inputs for (const auto i : c10::irange(iter)) { auto tensor = at::ones({16, 16}) * i * rank; auto tensors = std::vector({tensor}); auto outputs = std::vector>(0); if (rank == 0) { outputs = std::vector>(1); outputs[0].resize(worldSize); for (const auto j : c10::irange(worldSize)) { outputs[0][j] = at::zeros({16, 16}); } } // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->gather(outputs, tensors); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Verify outputs if (rank == 0) { for (const auto i : c10::irange(iter)) { for (const auto j : c10::irange(worldSize)) { const auto expected = i * j; auto data = outputTensors[i][j].data_ptr(); for (auto k = 0; k < outputTensors[i][j].numel(); ++k) { if (data[k] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } } else { for (const auto i : c10::irange(iter)) { if (outputTensors[i].size() != 0) { TORCH_CHECK(false, "BOOM!"); } } } } void testScatter(int iter = 1) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); std::vector> works; // Get the world size auto worldSize = pg->getSize(); auto rank = pg->getRank(); // Generate inputs for (const auto i : c10::irange(iter)) { auto tensor = at::zeros({16, 16}); auto tensors = std::vector({tensor}); auto inputs = std::vector>(0); if (rank == 0) { inputs = std::vector>(1); inputs[0].resize(worldSize); for (const auto j : c10::irange(worldSize)) { inputs[0][j] = at::ones({16, 16}) * i * j; } } // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->scatter(tensors, inputs); works.push_back(std::move(work)); } auto outputTensors = waitFuture(pg, works); // Verify outputs for (const auto i : c10::irange(iter)) { for (const auto j : c10::irange(worldSize)) { const auto expected = i * j; auto data = outputTensors[i][0].data_ptr(); for (auto k = 0; k < outputTensors[i][0].numel(); ++k) { if (data[k] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } } void testSendRecv(bool recvAnysource, int iter = 10000) { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); // Generate inputs std::vector> works; // pg->send does not keep sent tensors alive, so we need to. std::vector> sendTensors(iter); auto rank = pg->getRank(); for (const auto i : c10::irange(iter)) { if (rank == 0) { auto tensor = at::ones({16, 16}) * i; sendTensors[i] = std::vector({tensor}); // Queue the work. c10::intrusive_ptr<::c10d::Work> work = pg->send(sendTensors[i], 1, 0); works.push_back(std::move(work)); } else { auto tensor = at::zeros({16, 16}); auto recvTensors = std::vector({tensor}); // Queue the work. if (!recvAnysource) { c10::intrusive_ptr<::c10d::Work> work = pg->recv(recvTensors, 0, 0); works.push_back(std::move(work)); } else { c10::intrusive_ptr<::c10d::Work> work = pg->recvAnysource(recvTensors, 0); works.push_back(std::move(work)); } } } auto outputTensors = waitWork(pg, works); if (rank == 0) { return; } std::vector srcRanks; if (recvAnysource) { for (const auto& work : works) { srcRanks.push_back(work->sourceRank()); } } // Verify outputs for (const auto i : c10::irange(iter)) { if (recvAnysource && srcRanks[i] != 0) { TORCH_CHECK(false, "src rank is wrong for recvAnysource"); } const auto expected = i; auto data = outputTensors[i][0].data_ptr(); for (auto j = 0; j < outputTensors[i][0].numel(); ++j) { if (data[j] != expected) { TORCH_CHECK(false, "BOOM!"); } } } } void testBackendName() { auto pg = c10d::ProcessGroupMPI::createProcessGroupMPI(); if (pg->getBackendName() != std::string(c10d::MPI_BACKEND_NAME)) { TORCH_CHECK(false, "BOOM!"); } } int main(int argc, char** argv) { #ifdef MPIEXEC // If we are within an openmpi mpirun, then skip the exec if (!std::getenv("OMPI_COMM_WORLD_SIZE")) { std::cout << "Execute mpiexec from: " << STR(MPIEXEC) << std::endl; execl(STR(MPIEXEC), "-np 2", argv[0], (char*)nullptr); } testAllreduce(); testBroadcast(); testReduce(); testAllgather(); testGather(); testScatter(); testSendRecv(false); testSendRecv(true); testBackendName(); std::cout << "Test successful" << std::endl; #else std::cout << "MPI executable not found, skipping test" << std::endl; #endif return EXIT_SUCCESS; }