xref: /aosp_15_r20/external/pytorch/test/distributed/elastic/utils/distributed_test.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1#!/usr/bin/env python3
2# Owner(s): ["oncall: r2p"]
3
4# Copyright (c) Facebook, Inc. and its affiliates.
5# All rights reserved.
6#
7# This source code is licensed under the BSD-style license found in the
8# LICENSE file in the root directory of this source tree.
9
10import multiprocessing as mp
11import os
12import socket
13import sys
14import unittest
15from contextlib import closing
16
17from torch.distributed import DistNetworkError, DistStoreError
18from torch.distributed.elastic.utils.distributed import (
19    create_c10d_store,
20    get_socket_with_port,
21)
22from torch.testing._internal.common_utils import (
23    IS_MACOS,
24    IS_WINDOWS,
25    run_tests,
26    TEST_WITH_TSAN,
27    TestCase,
28)
29
30
31def _create_c10d_store_mp(is_server, server_addr, port, world_size, wait_for_workers):
32    store = create_c10d_store(
33        is_server,
34        server_addr,
35        port,
36        world_size,
37        wait_for_workers=wait_for_workers,
38        timeout=2,
39    )
40    if store is None:
41        raise AssertionError
42
43    store.set(f"test_key/{os.getpid()}", b"test_value")
44
45
46if IS_WINDOWS or IS_MACOS:
47    print("tests incompatible with tsan or asan", file=sys.stderr)
48    sys.exit(0)
49
50
51class DistributedUtilTest(TestCase):
52    def test_create_store_single_server(self):
53        store = create_c10d_store(is_server=True, server_addr=socket.gethostname())
54        self.assertIsNotNone(store)
55
56    def test_create_store_no_port_multi(self):
57        with self.assertRaises(ValueError):
58            create_c10d_store(
59                is_server=True, server_addr=socket.gethostname(), world_size=2
60            )
61
62    @unittest.skipIf(TEST_WITH_TSAN, "test incompatible with tsan")
63    def test_create_store_multi(self):
64        world_size = 3
65        wait_for_workers = False
66        localhost = socket.gethostname()
67
68        # start the server on the main process using an available port
69        store = create_c10d_store(
70            is_server=True,
71            server_addr=localhost,
72            server_port=0,
73            timeout=2,
74            world_size=world_size,
75            wait_for_workers=wait_for_workers,
76        )
77
78        # worker processes will use the port that was assigned to the server
79        server_port = store.port
80
81        worker0 = mp.Process(
82            target=_create_c10d_store_mp,
83            args=(False, localhost, server_port, world_size, wait_for_workers),
84        )
85        worker1 = mp.Process(
86            target=_create_c10d_store_mp,
87            args=(False, localhost, server_port, world_size, wait_for_workers),
88        )
89
90        worker0.start()
91        worker1.start()
92
93        worker0.join()
94        worker1.join()
95
96        # check test_key/pid == "test_value"
97        self.assertEqual(
98            "test_value", store.get(f"test_key/{worker0.pid}").decode("UTF-8")
99        )
100        self.assertEqual(
101            "test_value", store.get(f"test_key/{worker1.pid}").decode("UTF-8")
102        )
103
104        self.assertEqual(0, worker0.exitcode)
105        self.assertEqual(0, worker1.exitcode)
106
107    def test_create_store_timeout_on_server(self):
108        with self.assertRaises(DistStoreError):
109            # use any available port (port 0) since timeout is expected
110            create_c10d_store(
111                is_server=True,
112                server_addr=socket.gethostname(),
113                server_port=0,
114                world_size=2,
115                timeout=1,
116            )
117
118    def test_create_store_timeout_on_worker(self):
119        with self.assertRaises(DistNetworkError):
120            # use any available port (port 0) since timeout is expected
121            create_c10d_store(
122                is_server=False,
123                server_addr=socket.gethostname(),
124                server_port=0,
125                world_size=2,
126                timeout=1,
127            )
128
129    def test_create_store_with_libuv_support(self):
130        world_size = 1
131        wait_for_workers = False
132        localhost = socket.gethostname()
133
134        os.environ["USE_LIBUV"] = "0"
135        store = create_c10d_store(
136            is_server=True,
137            server_addr=localhost,
138            server_port=0,
139            timeout=2,
140            world_size=world_size,
141            wait_for_workers=wait_for_workers,
142        )
143        self.assertFalse(store.libuvBackend)
144        del os.environ["USE_LIBUV"]
145        assert "USE_LIBUV" not in os.environ
146
147        # libuv backend is enabled by default
148        store = create_c10d_store(
149            is_server=True,
150            server_addr=localhost,
151            server_port=0,
152            timeout=2,
153            world_size=world_size,
154            wait_for_workers=wait_for_workers,
155        )
156        self.assertTrue(store.libuvBackend)
157
158    def test_port_already_in_use_on_server(self):
159        # try to create the TCPStore server twice on the same port
160        # the second should fail due to a port conflict
161        # first store binds onto a free port
162        # try creating the second store on the port that the first store binded to
163        server_addr = socket.gethostname()
164        pick_free_port = 0
165        store1 = create_c10d_store(
166            is_server=True,
167            server_addr=server_addr,
168            server_port=pick_free_port,
169            timeout=1,
170        )
171        with self.assertRaises(RuntimeError):
172            create_c10d_store(
173                is_server=True, server_addr=server_addr, server_port=store1.port
174            )
175
176    def test_port_already_in_use_on_worker(self):
177        sock = get_socket_with_port()
178        with closing(sock):
179            port = sock.getsockname()[1]
180            # on the worker port conflict shouldn't matter, it should just timeout
181            # since we never created a server
182            with self.assertRaises(DistNetworkError):
183                create_c10d_store(
184                    is_server=False,
185                    server_addr=socket.gethostname(),
186                    server_port=port,
187                    timeout=1,
188                )
189
190
191if __name__ == "__main__":
192    run_tests()
193