1#!/usr/bin/env python3 2# Owner(s): ["oncall: r2p"] 3 4# Copyright (c) Facebook, Inc. and its affiliates. 5# All rights reserved. 6# 7# This source code is licensed under the BSD-style license found in the 8# LICENSE file in the root directory of this source tree. 9 10import multiprocessing as mp 11import os 12import socket 13import sys 14import unittest 15from contextlib import closing 16 17from torch.distributed import DistNetworkError, DistStoreError 18from torch.distributed.elastic.utils.distributed import ( 19 create_c10d_store, 20 get_socket_with_port, 21) 22from torch.testing._internal.common_utils import ( 23 IS_MACOS, 24 IS_WINDOWS, 25 run_tests, 26 TEST_WITH_TSAN, 27 TestCase, 28) 29 30 31def _create_c10d_store_mp(is_server, server_addr, port, world_size, wait_for_workers): 32 store = create_c10d_store( 33 is_server, 34 server_addr, 35 port, 36 world_size, 37 wait_for_workers=wait_for_workers, 38 timeout=2, 39 ) 40 if store is None: 41 raise AssertionError 42 43 store.set(f"test_key/{os.getpid()}", b"test_value") 44 45 46if IS_WINDOWS or IS_MACOS: 47 print("tests incompatible with tsan or asan", file=sys.stderr) 48 sys.exit(0) 49 50 51class DistributedUtilTest(TestCase): 52 def test_create_store_single_server(self): 53 store = create_c10d_store(is_server=True, server_addr=socket.gethostname()) 54 self.assertIsNotNone(store) 55 56 def test_create_store_no_port_multi(self): 57 with self.assertRaises(ValueError): 58 create_c10d_store( 59 is_server=True, server_addr=socket.gethostname(), world_size=2 60 ) 61 62 @unittest.skipIf(TEST_WITH_TSAN, "test incompatible with tsan") 63 def test_create_store_multi(self): 64 world_size = 3 65 wait_for_workers = False 66 localhost = socket.gethostname() 67 68 # start the server on the main process using an available port 69 store = create_c10d_store( 70 is_server=True, 71 server_addr=localhost, 72 server_port=0, 73 timeout=2, 74 world_size=world_size, 75 wait_for_workers=wait_for_workers, 76 ) 77 78 # worker processes will use the port that was assigned to the server 79 server_port = store.port 80 81 worker0 = mp.Process( 82 target=_create_c10d_store_mp, 83 args=(False, localhost, server_port, world_size, wait_for_workers), 84 ) 85 worker1 = mp.Process( 86 target=_create_c10d_store_mp, 87 args=(False, localhost, server_port, world_size, wait_for_workers), 88 ) 89 90 worker0.start() 91 worker1.start() 92 93 worker0.join() 94 worker1.join() 95 96 # check test_key/pid == "test_value" 97 self.assertEqual( 98 "test_value", store.get(f"test_key/{worker0.pid}").decode("UTF-8") 99 ) 100 self.assertEqual( 101 "test_value", store.get(f"test_key/{worker1.pid}").decode("UTF-8") 102 ) 103 104 self.assertEqual(0, worker0.exitcode) 105 self.assertEqual(0, worker1.exitcode) 106 107 def test_create_store_timeout_on_server(self): 108 with self.assertRaises(DistStoreError): 109 # use any available port (port 0) since timeout is expected 110 create_c10d_store( 111 is_server=True, 112 server_addr=socket.gethostname(), 113 server_port=0, 114 world_size=2, 115 timeout=1, 116 ) 117 118 def test_create_store_timeout_on_worker(self): 119 with self.assertRaises(DistNetworkError): 120 # use any available port (port 0) since timeout is expected 121 create_c10d_store( 122 is_server=False, 123 server_addr=socket.gethostname(), 124 server_port=0, 125 world_size=2, 126 timeout=1, 127 ) 128 129 def test_create_store_with_libuv_support(self): 130 world_size = 1 131 wait_for_workers = False 132 localhost = socket.gethostname() 133 134 os.environ["USE_LIBUV"] = "0" 135 store = create_c10d_store( 136 is_server=True, 137 server_addr=localhost, 138 server_port=0, 139 timeout=2, 140 world_size=world_size, 141 wait_for_workers=wait_for_workers, 142 ) 143 self.assertFalse(store.libuvBackend) 144 del os.environ["USE_LIBUV"] 145 assert "USE_LIBUV" not in os.environ 146 147 # libuv backend is enabled by default 148 store = create_c10d_store( 149 is_server=True, 150 server_addr=localhost, 151 server_port=0, 152 timeout=2, 153 world_size=world_size, 154 wait_for_workers=wait_for_workers, 155 ) 156 self.assertTrue(store.libuvBackend) 157 158 def test_port_already_in_use_on_server(self): 159 # try to create the TCPStore server twice on the same port 160 # the second should fail due to a port conflict 161 # first store binds onto a free port 162 # try creating the second store on the port that the first store binded to 163 server_addr = socket.gethostname() 164 pick_free_port = 0 165 store1 = create_c10d_store( 166 is_server=True, 167 server_addr=server_addr, 168 server_port=pick_free_port, 169 timeout=1, 170 ) 171 with self.assertRaises(RuntimeError): 172 create_c10d_store( 173 is_server=True, server_addr=server_addr, server_port=store1.port 174 ) 175 176 def test_port_already_in_use_on_worker(self): 177 sock = get_socket_with_port() 178 with closing(sock): 179 port = sock.getsockname()[1] 180 # on the worker port conflict shouldn't matter, it should just timeout 181 # since we never created a server 182 with self.assertRaises(DistNetworkError): 183 create_c10d_store( 184 is_server=False, 185 server_addr=socket.gethostname(), 186 server_port=port, 187 timeout=1, 188 ) 189 190 191if __name__ == "__main__": 192 run_tests() 193