1"""Unit tests for socket timeout feature."""
2
3import functools
4import unittest
5from test import support
6from test.support import socket_helper
7
8import time
9import errno
10import socket
11
12
13@functools.lru_cache()
14def resolve_address(host, port):
15    """Resolve an (host, port) to an address.
16
17    We must perform name resolution before timeout tests, otherwise it will be
18    performed by connect().
19    """
20    with socket_helper.transient_internet(host):
21        return socket.getaddrinfo(host, port, socket.AF_INET,
22                                  socket.SOCK_STREAM)[0][4]
23
24
25class CreationTestCase(unittest.TestCase):
26    """Test case for socket.gettimeout() and socket.settimeout()"""
27
28    def setUp(self):
29        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
30
31    def tearDown(self):
32        self.sock.close()
33
34    def testObjectCreation(self):
35        # Test Socket creation
36        self.assertEqual(self.sock.gettimeout(), None,
37                         "timeout not disabled by default")
38
39    def testFloatReturnValue(self):
40        # Test return value of gettimeout()
41        self.sock.settimeout(7.345)
42        self.assertEqual(self.sock.gettimeout(), 7.345)
43
44        self.sock.settimeout(3)
45        self.assertEqual(self.sock.gettimeout(), 3)
46
47        self.sock.settimeout(None)
48        self.assertEqual(self.sock.gettimeout(), None)
49
50    def testReturnType(self):
51        # Test return type of gettimeout()
52        self.sock.settimeout(1)
53        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
54
55        self.sock.settimeout(3.9)
56        self.assertEqual(type(self.sock.gettimeout()), type(1.0))
57
58    def testTypeCheck(self):
59        # Test type checking by settimeout()
60        self.sock.settimeout(0)
61        self.sock.settimeout(0)
62        self.sock.settimeout(0.0)
63        self.sock.settimeout(None)
64        self.assertRaises(TypeError, self.sock.settimeout, "")
65        self.assertRaises(TypeError, self.sock.settimeout, "")
66        self.assertRaises(TypeError, self.sock.settimeout, ())
67        self.assertRaises(TypeError, self.sock.settimeout, [])
68        self.assertRaises(TypeError, self.sock.settimeout, {})
69        self.assertRaises(TypeError, self.sock.settimeout, 0j)
70
71    def testRangeCheck(self):
72        # Test range checking by settimeout()
73        self.assertRaises(ValueError, self.sock.settimeout, -1)
74        self.assertRaises(ValueError, self.sock.settimeout, -1)
75        self.assertRaises(ValueError, self.sock.settimeout, -1.0)
76
77    def testTimeoutThenBlocking(self):
78        # Test settimeout() followed by setblocking()
79        self.sock.settimeout(10)
80        self.sock.setblocking(True)
81        self.assertEqual(self.sock.gettimeout(), None)
82        self.sock.setblocking(False)
83        self.assertEqual(self.sock.gettimeout(), 0.0)
84
85        self.sock.settimeout(10)
86        self.sock.setblocking(False)
87        self.assertEqual(self.sock.gettimeout(), 0.0)
88        self.sock.setblocking(True)
89        self.assertEqual(self.sock.gettimeout(), None)
90
91    def testBlockingThenTimeout(self):
92        # Test setblocking() followed by settimeout()
93        self.sock.setblocking(False)
94        self.sock.settimeout(1)
95        self.assertEqual(self.sock.gettimeout(), 1)
96
97        self.sock.setblocking(True)
98        self.sock.settimeout(1)
99        self.assertEqual(self.sock.gettimeout(), 1)
100
101
102class TimeoutTestCase(unittest.TestCase):
103    # There are a number of tests here trying to make sure that an operation
104    # doesn't take too much longer than expected.  But competing machine
105    # activity makes it inevitable that such tests will fail at times.
106    # When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
107    # and Win98SE.  Boosting it to 2.0 helped a lot, but isn't a real
108    # solution.
109    fuzz = 2.0
110
111    localhost = socket_helper.HOST
112
113    def setUp(self):
114        raise NotImplementedError()
115
116    tearDown = setUp
117
118    def _sock_operation(self, count, timeout, method, *args):
119        """
120        Test the specified socket method.
121
122        The method is run at most `count` times and must raise a TimeoutError
123        within `timeout` + self.fuzz seconds.
124        """
125        self.sock.settimeout(timeout)
126        method = getattr(self.sock, method)
127        for i in range(count):
128            t1 = time.monotonic()
129            try:
130                method(*args)
131            except TimeoutError as e:
132                delta = time.monotonic() - t1
133                break
134        else:
135            self.fail('TimeoutError was not raised')
136        # These checks should account for timing unprecision
137        self.assertLess(delta, timeout + self.fuzz)
138        self.assertGreater(delta, timeout - 1.0)
139
140
141class TCPTimeoutTestCase(TimeoutTestCase):
142    """TCP test case for socket.socket() timeout functions"""
143
144    def setUp(self):
145        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
146        self.addr_remote = resolve_address('www.python.org.', 80)
147
148    def tearDown(self):
149        self.sock.close()
150
151    @unittest.skipIf(True, 'need to replace these hosts; see bpo-35518')
152    def testConnectTimeout(self):
153        # Testing connect timeout is tricky: we need to have IP connectivity
154        # to a host that silently drops our packets.  We can't simulate this
155        # from Python because it's a function of the underlying TCP/IP stack.
156        # So, the following Snakebite host has been defined:
157        blackhole = resolve_address('blackhole.snakebite.net', 56666)
158
159        # Blackhole has been configured to silently drop any incoming packets.
160        # No RSTs (for TCP) or ICMP UNREACH (for UDP/ICMP) will be sent back
161        # to hosts that attempt to connect to this address: which is exactly
162        # what we need to confidently test connect timeout.
163
164        # However, we want to prevent false positives.  It's not unreasonable
165        # to expect certain hosts may not be able to reach the blackhole, due
166        # to firewalling or general network configuration.  In order to improve
167        # our confidence in testing the blackhole, a corresponding 'whitehole'
168        # has also been set up using one port higher:
169        whitehole = resolve_address('whitehole.snakebite.net', 56667)
170
171        # This address has been configured to immediately drop any incoming
172        # packets as well, but it does it respectfully with regards to the
173        # incoming protocol.  RSTs are sent for TCP packets, and ICMP UNREACH
174        # is sent for UDP/ICMP packets.  This means our attempts to connect to
175        # it should be met immediately with ECONNREFUSED.  The test case has
176        # been structured around this premise: if we get an ECONNREFUSED from
177        # the whitehole, we proceed with testing connect timeout against the
178        # blackhole.  If we don't, we skip the test (with a message about not
179        # getting the required RST from the whitehole within the required
180        # timeframe).
181
182        # For the records, the whitehole/blackhole configuration has been set
183        # up using the 'pf' firewall (available on BSDs), using the following:
184        #
185        #   ext_if="bge0"
186        #
187        #   blackhole_ip="35.8.247.6"
188        #   whitehole_ip="35.8.247.6"
189        #   blackhole_port="56666"
190        #   whitehole_port="56667"
191        #
192        #   block return in log quick on $ext_if proto { tcp udp } \
193        #       from any to $whitehole_ip port $whitehole_port
194        #   block drop in log quick on $ext_if proto { tcp udp } \
195        #       from any to $blackhole_ip port $blackhole_port
196        #
197
198        skip = True
199        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
200        timeout = support.LOOPBACK_TIMEOUT
201        sock.settimeout(timeout)
202        try:
203            sock.connect((whitehole))
204        except TimeoutError:
205            pass
206        except OSError as err:
207            if err.errno == errno.ECONNREFUSED:
208                skip = False
209        finally:
210            sock.close()
211            del sock
212
213        if skip:
214            self.skipTest(
215                "We didn't receive a connection reset (RST) packet from "
216                "{}:{} within {} seconds, so we're unable to test connect "
217                "timeout against the corresponding {}:{} (which is "
218                "configured to silently drop packets)."
219                    .format(
220                        whitehole[0],
221                        whitehole[1],
222                        timeout,
223                        blackhole[0],
224                        blackhole[1],
225                    )
226            )
227
228        # All that hard work just to test if connect times out in 0.001s ;-)
229        self.addr_remote = blackhole
230        with socket_helper.transient_internet(self.addr_remote[0]):
231            self._sock_operation(1, 0.001, 'connect', self.addr_remote)
232
233    def testRecvTimeout(self):
234        # Test recv() timeout
235        with socket_helper.transient_internet(self.addr_remote[0]):
236            self.sock.connect(self.addr_remote)
237            self._sock_operation(1, 1.5, 'recv', 1024)
238
239    def testAcceptTimeout(self):
240        # Test accept() timeout
241        socket_helper.bind_port(self.sock, self.localhost)
242        self.sock.listen()
243        self._sock_operation(1, 1.5, 'accept')
244
245    def testSend(self):
246        # Test send() timeout
247        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
248            socket_helper.bind_port(serv, self.localhost)
249            serv.listen()
250            self.sock.connect(serv.getsockname())
251            # Send a lot of data in order to bypass buffering in the TCP stack.
252            self._sock_operation(100, 1.5, 'send', b"X" * 200000)
253
254    def testSendto(self):
255        # Test sendto() timeout
256        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
257            socket_helper.bind_port(serv, self.localhost)
258            serv.listen()
259            self.sock.connect(serv.getsockname())
260            # The address argument is ignored since we already connected.
261            self._sock_operation(100, 1.5, 'sendto', b"X" * 200000,
262                                 serv.getsockname())
263
264    def testSendall(self):
265        # Test sendall() timeout
266        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as serv:
267            socket_helper.bind_port(serv, self.localhost)
268            serv.listen()
269            self.sock.connect(serv.getsockname())
270            # Send a lot of data in order to bypass buffering in the TCP stack.
271            self._sock_operation(100, 1.5, 'sendall', b"X" * 200000)
272
273
274class UDPTimeoutTestCase(TimeoutTestCase):
275    """UDP test case for socket.socket() timeout functions"""
276
277    def setUp(self):
278        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
279
280    def tearDown(self):
281        self.sock.close()
282
283    def testRecvfromTimeout(self):
284        # Test recvfrom() timeout
285        # Prevent "Address already in use" socket exceptions
286        socket_helper.bind_port(self.sock, self.localhost)
287        self._sock_operation(1, 1.5, 'recvfrom', 1024)
288
289
290def setUpModule():
291    support.requires('network')
292    support.requires_working_socket(module=True)
293
294
295if __name__ == "__main__":
296    unittest.main()
297