1"""Tests for events.py."""
2
3import collections.abc
4import concurrent.futures
5import functools
6import io
7import os
8import platform
9import re
10import signal
11import socket
12try:
13    import ssl
14except ImportError:
15    ssl = None
16import subprocess
17import sys
18import threading
19import time
20import types
21import errno
22import unittest
23from unittest import mock
24import weakref
25
26if sys.platform not in ('win32', 'vxworks'):
27    import tty
28
29import asyncio
30from asyncio import coroutines
31from asyncio import events
32from asyncio import proactor_events
33from asyncio import selector_events
34from test.test_asyncio import utils as test_utils
35from test import support
36from test.support import socket_helper
37from test.support import threading_helper
38from test.support import ALWAYS_EQ, LARGEST, SMALLEST
39
40
41def tearDownModule():
42    asyncio.set_event_loop_policy(None)
43
44
45def broken_unix_getsockname():
46    """Return True if the platform is Mac OS 10.4 or older."""
47    if sys.platform.startswith("aix"):
48        return True
49    elif sys.platform != 'darwin':
50        return False
51    version = platform.mac_ver()[0]
52    version = tuple(map(int, version.split('.')))
53    return version < (10, 5)
54
55
56def _test_get_event_loop_new_process__sub_proc():
57    async def doit():
58        return 'hello'
59
60    loop = asyncio.new_event_loop()
61    asyncio.set_event_loop(loop)
62    return loop.run_until_complete(doit())
63
64
65class CoroLike:
66    def send(self, v):
67        pass
68
69    def throw(self, *exc):
70        pass
71
72    def close(self):
73        pass
74
75    def __await__(self):
76        pass
77
78
79class MyBaseProto(asyncio.Protocol):
80    connected = None
81    done = None
82
83    def __init__(self, loop=None):
84        self.transport = None
85        self.state = 'INITIAL'
86        self.nbytes = 0
87        if loop is not None:
88            self.connected = loop.create_future()
89            self.done = loop.create_future()
90
91    def _assert_state(self, *expected):
92        if self.state not in expected:
93            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
94
95    def connection_made(self, transport):
96        self.transport = transport
97        self._assert_state('INITIAL')
98        self.state = 'CONNECTED'
99        if self.connected:
100            self.connected.set_result(None)
101
102    def data_received(self, data):
103        self._assert_state('CONNECTED')
104        self.nbytes += len(data)
105
106    def eof_received(self):
107        self._assert_state('CONNECTED')
108        self.state = 'EOF'
109
110    def connection_lost(self, exc):
111        self._assert_state('CONNECTED', 'EOF')
112        self.state = 'CLOSED'
113        if self.done:
114            self.done.set_result(None)
115
116
117class MyProto(MyBaseProto):
118    def connection_made(self, transport):
119        super().connection_made(transport)
120        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
121
122
123class MyDatagramProto(asyncio.DatagramProtocol):
124    done = None
125
126    def __init__(self, loop=None):
127        self.state = 'INITIAL'
128        self.nbytes = 0
129        if loop is not None:
130            self.done = loop.create_future()
131
132    def _assert_state(self, expected):
133        if self.state != expected:
134            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
135
136    def connection_made(self, transport):
137        self.transport = transport
138        self._assert_state('INITIAL')
139        self.state = 'INITIALIZED'
140
141    def datagram_received(self, data, addr):
142        self._assert_state('INITIALIZED')
143        self.nbytes += len(data)
144
145    def error_received(self, exc):
146        self._assert_state('INITIALIZED')
147
148    def connection_lost(self, exc):
149        self._assert_state('INITIALIZED')
150        self.state = 'CLOSED'
151        if self.done:
152            self.done.set_result(None)
153
154
155class MyReadPipeProto(asyncio.Protocol):
156    done = None
157
158    def __init__(self, loop=None):
159        self.state = ['INITIAL']
160        self.nbytes = 0
161        self.transport = None
162        if loop is not None:
163            self.done = loop.create_future()
164
165    def _assert_state(self, expected):
166        if self.state != expected:
167            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
168
169    def connection_made(self, transport):
170        self.transport = transport
171        self._assert_state(['INITIAL'])
172        self.state.append('CONNECTED')
173
174    def data_received(self, data):
175        self._assert_state(['INITIAL', 'CONNECTED'])
176        self.nbytes += len(data)
177
178    def eof_received(self):
179        self._assert_state(['INITIAL', 'CONNECTED'])
180        self.state.append('EOF')
181
182    def connection_lost(self, exc):
183        if 'EOF' not in self.state:
184            self.state.append('EOF')  # It is okay if EOF is missed.
185        self._assert_state(['INITIAL', 'CONNECTED', 'EOF'])
186        self.state.append('CLOSED')
187        if self.done:
188            self.done.set_result(None)
189
190
191class MyWritePipeProto(asyncio.BaseProtocol):
192    done = None
193
194    def __init__(self, loop=None):
195        self.state = 'INITIAL'
196        self.transport = None
197        if loop is not None:
198            self.done = loop.create_future()
199
200    def _assert_state(self, expected):
201        if self.state != expected:
202            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
203
204    def connection_made(self, transport):
205        self.transport = transport
206        self._assert_state('INITIAL')
207        self.state = 'CONNECTED'
208
209    def connection_lost(self, exc):
210        self._assert_state('CONNECTED')
211        self.state = 'CLOSED'
212        if self.done:
213            self.done.set_result(None)
214
215
216class MySubprocessProtocol(asyncio.SubprocessProtocol):
217
218    def __init__(self, loop):
219        self.state = 'INITIAL'
220        self.transport = None
221        self.connected = loop.create_future()
222        self.completed = loop.create_future()
223        self.disconnects = {fd: loop.create_future() for fd in range(3)}
224        self.data = {1: b'', 2: b''}
225        self.returncode = None
226        self.got_data = {1: asyncio.Event(),
227                         2: asyncio.Event()}
228
229    def _assert_state(self, expected):
230        if self.state != expected:
231            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
232
233    def connection_made(self, transport):
234        self.transport = transport
235        self._assert_state('INITIAL')
236        self.state = 'CONNECTED'
237        self.connected.set_result(None)
238
239    def connection_lost(self, exc):
240        self._assert_state('CONNECTED')
241        self.state = 'CLOSED'
242        self.completed.set_result(None)
243
244    def pipe_data_received(self, fd, data):
245        self._assert_state('CONNECTED')
246        self.data[fd] += data
247        self.got_data[fd].set()
248
249    def pipe_connection_lost(self, fd, exc):
250        self._assert_state('CONNECTED')
251        if exc:
252            self.disconnects[fd].set_exception(exc)
253        else:
254            self.disconnects[fd].set_result(exc)
255
256    def process_exited(self):
257        self._assert_state('CONNECTED')
258        self.returncode = self.transport.get_returncode()
259
260
261class EventLoopTestsMixin:
262
263    def setUp(self):
264        super().setUp()
265        self.loop = self.create_event_loop()
266        self.set_event_loop(self.loop)
267
268    def tearDown(self):
269        # just in case if we have transport close callbacks
270        if not self.loop.is_closed():
271            test_utils.run_briefly(self.loop)
272
273        self.doCleanups()
274        support.gc_collect()
275        super().tearDown()
276
277    def test_run_until_complete_nesting(self):
278        async def coro1():
279            await asyncio.sleep(0)
280
281        async def coro2():
282            self.assertTrue(self.loop.is_running())
283            self.loop.run_until_complete(coro1())
284
285        with self.assertWarnsRegex(
286            RuntimeWarning,
287            r"coroutine \S+ was never awaited"
288        ):
289            self.assertRaises(
290                RuntimeError, self.loop.run_until_complete, coro2())
291
292    # Note: because of the default Windows timing granularity of
293    # 15.6 msec, we use fairly long sleep times here (~100 msec).
294
295    def test_run_until_complete(self):
296        t0 = self.loop.time()
297        self.loop.run_until_complete(asyncio.sleep(0.1))
298        t1 = self.loop.time()
299        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
300
301    def test_run_until_complete_stopped(self):
302
303        async def cb():
304            self.loop.stop()
305            await asyncio.sleep(0.1)
306        task = cb()
307        self.assertRaises(RuntimeError,
308                          self.loop.run_until_complete, task)
309
310    def test_call_later(self):
311        results = []
312
313        def callback(arg):
314            results.append(arg)
315            self.loop.stop()
316
317        self.loop.call_later(0.1, callback, 'hello world')
318        self.loop.run_forever()
319        self.assertEqual(results, ['hello world'])
320
321    def test_call_soon(self):
322        results = []
323
324        def callback(arg1, arg2):
325            results.append((arg1, arg2))
326            self.loop.stop()
327
328        self.loop.call_soon(callback, 'hello', 'world')
329        self.loop.run_forever()
330        self.assertEqual(results, [('hello', 'world')])
331
332    def test_call_soon_threadsafe(self):
333        results = []
334        lock = threading.Lock()
335
336        def callback(arg):
337            results.append(arg)
338            if len(results) >= 2:
339                self.loop.stop()
340
341        def run_in_thread():
342            self.loop.call_soon_threadsafe(callback, 'hello')
343            lock.release()
344
345        lock.acquire()
346        t = threading.Thread(target=run_in_thread)
347        t.start()
348
349        with lock:
350            self.loop.call_soon(callback, 'world')
351            self.loop.run_forever()
352        t.join()
353        self.assertEqual(results, ['hello', 'world'])
354
355    def test_call_soon_threadsafe_same_thread(self):
356        results = []
357
358        def callback(arg):
359            results.append(arg)
360            if len(results) >= 2:
361                self.loop.stop()
362
363        self.loop.call_soon_threadsafe(callback, 'hello')
364        self.loop.call_soon(callback, 'world')
365        self.loop.run_forever()
366        self.assertEqual(results, ['hello', 'world'])
367
368    def test_run_in_executor(self):
369        def run(arg):
370            return (arg, threading.get_ident())
371        f2 = self.loop.run_in_executor(None, run, 'yo')
372        res, thread_id = self.loop.run_until_complete(f2)
373        self.assertEqual(res, 'yo')
374        self.assertNotEqual(thread_id, threading.get_ident())
375
376    def test_run_in_executor_cancel(self):
377        called = False
378
379        def patched_call_soon(*args):
380            nonlocal called
381            called = True
382
383        def run():
384            time.sleep(0.05)
385
386        f2 = self.loop.run_in_executor(None, run)
387        f2.cancel()
388        self.loop.run_until_complete(
389                self.loop.shutdown_default_executor())
390        self.loop.close()
391        self.loop.call_soon = patched_call_soon
392        self.loop.call_soon_threadsafe = patched_call_soon
393        time.sleep(0.4)
394        self.assertFalse(called)
395
396    def test_reader_callback(self):
397        r, w = socket.socketpair()
398        r.setblocking(False)
399        bytes_read = bytearray()
400
401        def reader():
402            try:
403                data = r.recv(1024)
404            except BlockingIOError:
405                # Spurious readiness notifications are possible
406                # at least on Linux -- see man select.
407                return
408            if data:
409                bytes_read.extend(data)
410            else:
411                self.assertTrue(self.loop.remove_reader(r.fileno()))
412                r.close()
413
414        self.loop.add_reader(r.fileno(), reader)
415        self.loop.call_soon(w.send, b'abc')
416        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
417        self.loop.call_soon(w.send, b'def')
418        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
419        self.loop.call_soon(w.close)
420        self.loop.call_soon(self.loop.stop)
421        self.loop.run_forever()
422        self.assertEqual(bytes_read, b'abcdef')
423
424    def test_writer_callback(self):
425        r, w = socket.socketpair()
426        w.setblocking(False)
427
428        def writer(data):
429            w.send(data)
430            self.loop.stop()
431
432        data = b'x' * 1024
433        self.loop.add_writer(w.fileno(), writer, data)
434        self.loop.run_forever()
435
436        self.assertTrue(self.loop.remove_writer(w.fileno()))
437        self.assertFalse(self.loop.remove_writer(w.fileno()))
438
439        w.close()
440        read = r.recv(len(data) * 2)
441        r.close()
442        self.assertEqual(read, data)
443
444    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
445    def test_add_signal_handler(self):
446        caught = 0
447
448        def my_handler():
449            nonlocal caught
450            caught += 1
451
452        # Check error behavior first.
453        self.assertRaises(
454            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
455        self.assertRaises(
456            TypeError, self.loop.remove_signal_handler, 'boom')
457        self.assertRaises(
458            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
459            my_handler)
460        self.assertRaises(
461            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
462        self.assertRaises(
463            ValueError, self.loop.add_signal_handler, 0, my_handler)
464        self.assertRaises(
465            ValueError, self.loop.remove_signal_handler, 0)
466        self.assertRaises(
467            ValueError, self.loop.add_signal_handler, -1, my_handler)
468        self.assertRaises(
469            ValueError, self.loop.remove_signal_handler, -1)
470        self.assertRaises(
471            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
472            my_handler)
473        # Removing SIGKILL doesn't raise, since we don't call signal().
474        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
475        # Now set a handler and handle it.
476        self.loop.add_signal_handler(signal.SIGINT, my_handler)
477
478        os.kill(os.getpid(), signal.SIGINT)
479        test_utils.run_until(self.loop, lambda: caught)
480
481        # Removing it should restore the default handler.
482        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
483        self.assertEqual(signal.getsignal(signal.SIGINT),
484                         signal.default_int_handler)
485        # Removing again returns False.
486        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
487
488    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
489    @unittest.skipUnless(hasattr(signal, 'setitimer'),
490                         'need signal.setitimer()')
491    def test_signal_handling_while_selecting(self):
492        # Test with a signal actually arriving during a select() call.
493        caught = 0
494
495        def my_handler():
496            nonlocal caught
497            caught += 1
498            self.loop.stop()
499
500        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
501
502        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
503        self.loop.call_later(60, self.loop.stop)
504        self.loop.run_forever()
505        self.assertEqual(caught, 1)
506
507    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
508    @unittest.skipUnless(hasattr(signal, 'setitimer'),
509                         'need signal.setitimer()')
510    def test_signal_handling_args(self):
511        some_args = (42,)
512        caught = 0
513
514        def my_handler(*args):
515            nonlocal caught
516            caught += 1
517            self.assertEqual(args, some_args)
518            self.loop.stop()
519
520        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
521
522        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
523        self.loop.call_later(60, self.loop.stop)
524        self.loop.run_forever()
525        self.assertEqual(caught, 1)
526
527    def _basetest_create_connection(self, connection_fut, check_sockname=True):
528        tr, pr = self.loop.run_until_complete(connection_fut)
529        self.assertIsInstance(tr, asyncio.Transport)
530        self.assertIsInstance(pr, asyncio.Protocol)
531        self.assertIs(pr.transport, tr)
532        if check_sockname:
533            self.assertIsNotNone(tr.get_extra_info('sockname'))
534        self.loop.run_until_complete(pr.done)
535        self.assertGreater(pr.nbytes, 0)
536        tr.close()
537
538    def test_create_connection(self):
539        with test_utils.run_test_server() as httpd:
540            conn_fut = self.loop.create_connection(
541                lambda: MyProto(loop=self.loop), *httpd.address)
542            self._basetest_create_connection(conn_fut)
543
544    @socket_helper.skip_unless_bind_unix_socket
545    def test_create_unix_connection(self):
546        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
547        # zero-length address for UNIX socket.
548        check_sockname = not broken_unix_getsockname()
549
550        with test_utils.run_test_unix_server() as httpd:
551            conn_fut = self.loop.create_unix_connection(
552                lambda: MyProto(loop=self.loop), httpd.address)
553            self._basetest_create_connection(conn_fut, check_sockname)
554
555    def check_ssl_extra_info(self, client, check_sockname=True,
556                             peername=None, peercert={}):
557        if check_sockname:
558            self.assertIsNotNone(client.get_extra_info('sockname'))
559        if peername:
560            self.assertEqual(peername,
561                             client.get_extra_info('peername'))
562        else:
563            self.assertIsNotNone(client.get_extra_info('peername'))
564        self.assertEqual(peercert,
565                         client.get_extra_info('peercert'))
566
567        # test SSL cipher
568        cipher = client.get_extra_info('cipher')
569        self.assertIsInstance(cipher, tuple)
570        self.assertEqual(len(cipher), 3, cipher)
571        self.assertIsInstance(cipher[0], str)
572        self.assertIsInstance(cipher[1], str)
573        self.assertIsInstance(cipher[2], int)
574
575        # test SSL object
576        sslobj = client.get_extra_info('ssl_object')
577        self.assertIsNotNone(sslobj)
578        self.assertEqual(sslobj.compression(),
579                         client.get_extra_info('compression'))
580        self.assertEqual(sslobj.cipher(),
581                         client.get_extra_info('cipher'))
582        self.assertEqual(sslobj.getpeercert(),
583                         client.get_extra_info('peercert'))
584        self.assertEqual(sslobj.compression(),
585                         client.get_extra_info('compression'))
586
587    def _basetest_create_ssl_connection(self, connection_fut,
588                                        check_sockname=True,
589                                        peername=None):
590        tr, pr = self.loop.run_until_complete(connection_fut)
591        self.assertIsInstance(tr, asyncio.Transport)
592        self.assertIsInstance(pr, asyncio.Protocol)
593        self.assertTrue('ssl' in tr.__class__.__name__.lower())
594        self.check_ssl_extra_info(tr, check_sockname, peername)
595        self.loop.run_until_complete(pr.done)
596        self.assertGreater(pr.nbytes, 0)
597        tr.close()
598
599    def _test_create_ssl_connection(self, httpd, create_connection,
600                                    check_sockname=True, peername=None):
601        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
602        self._basetest_create_ssl_connection(conn_fut, check_sockname,
603                                             peername)
604
605        # ssl.Purpose was introduced in Python 3.4
606        if hasattr(ssl, 'Purpose'):
607            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
608                                          cafile=None, capath=None,
609                                          cadata=None):
610                """
611                A ssl.create_default_context() replacement that doesn't enable
612                cert validation.
613                """
614                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
615                return test_utils.dummy_ssl_context()
616
617            # With ssl=True, ssl.create_default_context() should be called
618            with mock.patch('ssl.create_default_context',
619                            side_effect=_dummy_ssl_create_context) as m:
620                conn_fut = create_connection(ssl=True)
621                self._basetest_create_ssl_connection(conn_fut, check_sockname,
622                                                     peername)
623                self.assertEqual(m.call_count, 1)
624
625        # With the real ssl.create_default_context(), certificate
626        # validation will fail
627        with self.assertRaises(ssl.SSLError) as cm:
628            conn_fut = create_connection(ssl=True)
629            # Ignore the "SSL handshake failed" log in debug mode
630            with test_utils.disable_logger():
631                self._basetest_create_ssl_connection(conn_fut, check_sockname,
632                                                     peername)
633
634        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
635
636    @unittest.skipIf(ssl is None, 'No ssl module')
637    def test_create_ssl_connection(self):
638        with test_utils.run_test_server(use_ssl=True) as httpd:
639            create_connection = functools.partial(
640                self.loop.create_connection,
641                lambda: MyProto(loop=self.loop),
642                *httpd.address)
643            self._test_create_ssl_connection(httpd, create_connection,
644                                             peername=httpd.address)
645
646    @socket_helper.skip_unless_bind_unix_socket
647    @unittest.skipIf(ssl is None, 'No ssl module')
648    def test_create_ssl_unix_connection(self):
649        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
650        # zero-length address for UNIX socket.
651        check_sockname = not broken_unix_getsockname()
652
653        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
654            create_connection = functools.partial(
655                self.loop.create_unix_connection,
656                lambda: MyProto(loop=self.loop), httpd.address,
657                server_hostname='127.0.0.1')
658
659            self._test_create_ssl_connection(httpd, create_connection,
660                                             check_sockname,
661                                             peername=httpd.address)
662
663    def test_create_connection_local_addr(self):
664        with test_utils.run_test_server() as httpd:
665            port = socket_helper.find_unused_port()
666            f = self.loop.create_connection(
667                lambda: MyProto(loop=self.loop),
668                *httpd.address, local_addr=(httpd.address[0], port))
669            tr, pr = self.loop.run_until_complete(f)
670            expected = pr.transport.get_extra_info('sockname')[1]
671            self.assertEqual(port, expected)
672            tr.close()
673
674    def test_create_connection_local_addr_skip_different_family(self):
675        # See https://github.com/python/cpython/issues/86508
676        port1 = socket_helper.find_unused_port()
677        port2 = socket_helper.find_unused_port()
678        getaddrinfo_orig = self.loop.getaddrinfo
679
680        async def getaddrinfo(host, port, *args, **kwargs):
681            if port == port2:
682                return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)),
683                        (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))]
684            return await getaddrinfo_orig(host, port, *args, **kwargs)
685
686        self.loop.getaddrinfo = getaddrinfo
687
688        f = self.loop.create_connection(
689            lambda: MyProto(loop=self.loop),
690            'localhost', port1, local_addr=('localhost', port2))
691
692        with self.assertRaises(OSError):
693            self.loop.run_until_complete(f)
694
695    def test_create_connection_local_addr_nomatch_family(self):
696        # See https://github.com/python/cpython/issues/86508
697        port1 = socket_helper.find_unused_port()
698        port2 = socket_helper.find_unused_port()
699        getaddrinfo_orig = self.loop.getaddrinfo
700
701        async def getaddrinfo(host, port, *args, **kwargs):
702            if port == port2:
703                return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))]
704            return await getaddrinfo_orig(host, port, *args, **kwargs)
705
706        self.loop.getaddrinfo = getaddrinfo
707
708        f = self.loop.create_connection(
709            lambda: MyProto(loop=self.loop),
710            'localhost', port1, local_addr=('localhost', port2))
711
712        with self.assertRaises(OSError):
713            self.loop.run_until_complete(f)
714
715    def test_create_connection_local_addr_in_use(self):
716        with test_utils.run_test_server() as httpd:
717            f = self.loop.create_connection(
718                lambda: MyProto(loop=self.loop),
719                *httpd.address, local_addr=httpd.address)
720            with self.assertRaises(OSError) as cm:
721                self.loop.run_until_complete(f)
722            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
723            self.assertIn(str(httpd.address), cm.exception.strerror)
724
725    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
726        loop = self.loop
727
728        class MyProto(MyBaseProto):
729
730            def connection_lost(self, exc):
731                super().connection_lost(exc)
732                loop.call_soon(loop.stop)
733
734            def data_received(self, data):
735                super().data_received(data)
736                self.transport.write(expected_response)
737
738        lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
739        addr = lsock.getsockname()
740
741        message = b'test data'
742        response = None
743        expected_response = b'roger'
744
745        def client():
746            nonlocal response
747            try:
748                csock = socket.socket()
749                if client_ssl is not None:
750                    csock = client_ssl.wrap_socket(csock)
751                csock.connect(addr)
752                csock.sendall(message)
753                response = csock.recv(99)
754                csock.close()
755            except Exception as exc:
756                print(
757                    "Failure in client thread in test_connect_accepted_socket",
758                    exc)
759
760        thread = threading.Thread(target=client, daemon=True)
761        thread.start()
762
763        conn, _ = lsock.accept()
764        proto = MyProto(loop=loop)
765        proto.loop = loop
766        loop.run_until_complete(
767            loop.connect_accepted_socket(
768                (lambda: proto), conn, ssl=server_ssl))
769        loop.run_forever()
770        proto.transport.close()
771        lsock.close()
772
773        threading_helper.join_thread(thread)
774        self.assertFalse(thread.is_alive())
775        self.assertEqual(proto.state, 'CLOSED')
776        self.assertEqual(proto.nbytes, len(message))
777        self.assertEqual(response, expected_response)
778
779    @unittest.skipIf(ssl is None, 'No ssl module')
780    def test_ssl_connect_accepted_socket(self):
781        server_context = test_utils.simple_server_sslcontext()
782        client_context = test_utils.simple_client_sslcontext()
783
784        self.test_connect_accepted_socket(server_context, client_context)
785
786    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
787        sock = socket.socket()
788        self.addCleanup(sock.close)
789        coro = self.loop.connect_accepted_socket(
790            MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT)
791        with self.assertRaisesRegex(
792                ValueError,
793                'ssl_handshake_timeout is only meaningful with ssl'):
794            self.loop.run_until_complete(coro)
795
796    @mock.patch('asyncio.base_events.socket')
797    def create_server_multiple_hosts(self, family, hosts, mock_sock):
798        async def getaddrinfo(host, port, *args, **kw):
799            if family == socket.AF_INET:
800                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
801            else:
802                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
803
804        def getaddrinfo_task(*args, **kwds):
805            return self.loop.create_task(getaddrinfo(*args, **kwds))
806
807        unique_hosts = set(hosts)
808
809        if family == socket.AF_INET:
810            mock_sock.socket().getsockbyname.side_effect = [
811                (host, 80) for host in unique_hosts]
812        else:
813            mock_sock.socket().getsockbyname.side_effect = [
814                (host, 80, 0, 0) for host in unique_hosts]
815        self.loop.getaddrinfo = getaddrinfo_task
816        self.loop._start_serving = mock.Mock()
817        self.loop._stop_serving = mock.Mock()
818        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
819        server = self.loop.run_until_complete(f)
820        self.addCleanup(server.close)
821        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
822        self.assertEqual(server_hosts, unique_hosts)
823
824    def test_create_server_multiple_hosts_ipv4(self):
825        self.create_server_multiple_hosts(socket.AF_INET,
826                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
827
828    def test_create_server_multiple_hosts_ipv6(self):
829        self.create_server_multiple_hosts(socket.AF_INET6,
830                                          ['::1', '::2', '::1'])
831
832    def test_create_server(self):
833        proto = MyProto(self.loop)
834        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
835        server = self.loop.run_until_complete(f)
836        self.assertEqual(len(server.sockets), 1)
837        sock = server.sockets[0]
838        host, port = sock.getsockname()
839        self.assertEqual(host, '0.0.0.0')
840        client = socket.socket()
841        client.connect(('127.0.0.1', port))
842        client.sendall(b'xxx')
843
844        self.loop.run_until_complete(proto.connected)
845        self.assertEqual('CONNECTED', proto.state)
846
847        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
848        self.assertEqual(3, proto.nbytes)
849
850        # extra info is available
851        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
852        self.assertEqual('127.0.0.1',
853                         proto.transport.get_extra_info('peername')[0])
854
855        # close connection
856        proto.transport.close()
857        self.loop.run_until_complete(proto.done)
858
859        self.assertEqual('CLOSED', proto.state)
860
861        # the client socket must be closed after to avoid ECONNRESET upon
862        # recv()/send() on the serving socket
863        client.close()
864
865        # close server
866        server.close()
867
868    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
869    def test_create_server_reuse_port(self):
870        proto = MyProto(self.loop)
871        f = self.loop.create_server(
872            lambda: proto, '0.0.0.0', 0)
873        server = self.loop.run_until_complete(f)
874        self.assertEqual(len(server.sockets), 1)
875        sock = server.sockets[0]
876        self.assertFalse(
877            sock.getsockopt(
878                socket.SOL_SOCKET, socket.SO_REUSEPORT))
879        server.close()
880
881        test_utils.run_briefly(self.loop)
882
883        proto = MyProto(self.loop)
884        f = self.loop.create_server(
885            lambda: proto, '0.0.0.0', 0, reuse_port=True)
886        server = self.loop.run_until_complete(f)
887        self.assertEqual(len(server.sockets), 1)
888        sock = server.sockets[0]
889        self.assertTrue(
890            sock.getsockopt(
891                socket.SOL_SOCKET, socket.SO_REUSEPORT))
892        server.close()
893
894    def _make_unix_server(self, factory, **kwargs):
895        path = test_utils.gen_unix_socket_path()
896        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
897
898        f = self.loop.create_unix_server(factory, path, **kwargs)
899        server = self.loop.run_until_complete(f)
900
901        return server, path
902
903    @socket_helper.skip_unless_bind_unix_socket
904    def test_create_unix_server(self):
905        proto = MyProto(loop=self.loop)
906        server, path = self._make_unix_server(lambda: proto)
907        self.assertEqual(len(server.sockets), 1)
908
909        client = socket.socket(socket.AF_UNIX)
910        client.connect(path)
911        client.sendall(b'xxx')
912
913        self.loop.run_until_complete(proto.connected)
914        self.assertEqual('CONNECTED', proto.state)
915        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
916        self.assertEqual(3, proto.nbytes)
917
918        # close connection
919        proto.transport.close()
920        self.loop.run_until_complete(proto.done)
921
922        self.assertEqual('CLOSED', proto.state)
923
924        # the client socket must be closed after to avoid ECONNRESET upon
925        # recv()/send() on the serving socket
926        client.close()
927
928        # close server
929        server.close()
930
931    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
932    def test_create_unix_server_path_socket_error(self):
933        proto = MyProto(loop=self.loop)
934        sock = socket.socket()
935        with sock:
936            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
937            with self.assertRaisesRegex(ValueError,
938                                        'path and sock can not be specified '
939                                        'at the same time'):
940                self.loop.run_until_complete(f)
941
942    def _create_ssl_context(self, certfile, keyfile=None):
943        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
944        sslcontext.options |= ssl.OP_NO_SSLv2
945        sslcontext.load_cert_chain(certfile, keyfile)
946        return sslcontext
947
948    def _make_ssl_server(self, factory, certfile, keyfile=None):
949        sslcontext = self._create_ssl_context(certfile, keyfile)
950
951        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
952        server = self.loop.run_until_complete(f)
953
954        sock = server.sockets[0]
955        host, port = sock.getsockname()
956        self.assertEqual(host, '127.0.0.1')
957        return server, host, port
958
959    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
960        sslcontext = self._create_ssl_context(certfile, keyfile)
961        return self._make_unix_server(factory, ssl=sslcontext)
962
963    @unittest.skipIf(ssl is None, 'No ssl module')
964    def test_create_server_ssl(self):
965        proto = MyProto(loop=self.loop)
966        server, host, port = self._make_ssl_server(
967            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
968
969        f_c = self.loop.create_connection(MyBaseProto, host, port,
970                                          ssl=test_utils.dummy_ssl_context())
971        client, pr = self.loop.run_until_complete(f_c)
972
973        client.write(b'xxx')
974        self.loop.run_until_complete(proto.connected)
975        self.assertEqual('CONNECTED', proto.state)
976
977        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
978        self.assertEqual(3, proto.nbytes)
979
980        # extra info is available
981        self.check_ssl_extra_info(client, peername=(host, port))
982
983        # close connection
984        proto.transport.close()
985        self.loop.run_until_complete(proto.done)
986        self.assertEqual('CLOSED', proto.state)
987
988        # the client socket must be closed after to avoid ECONNRESET upon
989        # recv()/send() on the serving socket
990        client.close()
991
992        # stop serving
993        server.close()
994
995    @socket_helper.skip_unless_bind_unix_socket
996    @unittest.skipIf(ssl is None, 'No ssl module')
997    def test_create_unix_server_ssl(self):
998        proto = MyProto(loop=self.loop)
999        server, path = self._make_ssl_unix_server(
1000            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
1001
1002        f_c = self.loop.create_unix_connection(
1003            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
1004            server_hostname='')
1005
1006        client, pr = self.loop.run_until_complete(f_c)
1007
1008        client.write(b'xxx')
1009        self.loop.run_until_complete(proto.connected)
1010        self.assertEqual('CONNECTED', proto.state)
1011        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
1012        self.assertEqual(3, proto.nbytes)
1013
1014        # close connection
1015        proto.transport.close()
1016        self.loop.run_until_complete(proto.done)
1017        self.assertEqual('CLOSED', proto.state)
1018
1019        # the client socket must be closed after to avoid ECONNRESET upon
1020        # recv()/send() on the serving socket
1021        client.close()
1022
1023        # stop serving
1024        server.close()
1025
1026    @unittest.skipIf(ssl is None, 'No ssl module')
1027    def test_create_server_ssl_verify_failed(self):
1028        proto = MyProto(loop=self.loop)
1029        server, host, port = self._make_ssl_server(
1030            lambda: proto, test_utils.SIGNED_CERTFILE)
1031
1032        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1033        sslcontext_client.options |= ssl.OP_NO_SSLv2
1034        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1035        if hasattr(sslcontext_client, 'check_hostname'):
1036            sslcontext_client.check_hostname = True
1037
1038
1039        # no CA loaded
1040        f_c = self.loop.create_connection(MyProto, host, port,
1041                                          ssl=sslcontext_client)
1042        with mock.patch.object(self.loop, 'call_exception_handler'):
1043            with test_utils.disable_logger():
1044                with self.assertRaisesRegex(ssl.SSLError,
1045                                            '(?i)certificate.verify.failed'):
1046                    self.loop.run_until_complete(f_c)
1047
1048            # execute the loop to log the connection error
1049            test_utils.run_briefly(self.loop)
1050
1051        # close connection
1052        self.assertIsNone(proto.transport)
1053        server.close()
1054
1055    @socket_helper.skip_unless_bind_unix_socket
1056    @unittest.skipIf(ssl is None, 'No ssl module')
1057    def test_create_unix_server_ssl_verify_failed(self):
1058        proto = MyProto(loop=self.loop)
1059        server, path = self._make_ssl_unix_server(
1060            lambda: proto, test_utils.SIGNED_CERTFILE)
1061
1062        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1063        sslcontext_client.options |= ssl.OP_NO_SSLv2
1064        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1065        if hasattr(sslcontext_client, 'check_hostname'):
1066            sslcontext_client.check_hostname = True
1067
1068        # no CA loaded
1069        f_c = self.loop.create_unix_connection(MyProto, path,
1070                                               ssl=sslcontext_client,
1071                                               server_hostname='invalid')
1072        with mock.patch.object(self.loop, 'call_exception_handler'):
1073            with test_utils.disable_logger():
1074                with self.assertRaisesRegex(ssl.SSLError,
1075                                            '(?i)certificate.verify.failed'):
1076                    self.loop.run_until_complete(f_c)
1077
1078            # execute the loop to log the connection error
1079            test_utils.run_briefly(self.loop)
1080
1081        # close connection
1082        self.assertIsNone(proto.transport)
1083        server.close()
1084
1085    @unittest.skipIf(ssl is None, 'No ssl module')
1086    def test_create_server_ssl_match_failed(self):
1087        proto = MyProto(loop=self.loop)
1088        server, host, port = self._make_ssl_server(
1089            lambda: proto, test_utils.SIGNED_CERTFILE)
1090
1091        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1092        sslcontext_client.options |= ssl.OP_NO_SSLv2
1093        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1094        sslcontext_client.load_verify_locations(
1095            cafile=test_utils.SIGNING_CA)
1096        if hasattr(sslcontext_client, 'check_hostname'):
1097            sslcontext_client.check_hostname = True
1098
1099        # incorrect server_hostname
1100        f_c = self.loop.create_connection(MyProto, host, port,
1101                                          ssl=sslcontext_client)
1102        with mock.patch.object(self.loop, 'call_exception_handler'):
1103            with test_utils.disable_logger():
1104                with self.assertRaisesRegex(
1105                        ssl.CertificateError,
1106                        "IP address mismatch, certificate is not valid for "
1107                        "'127.0.0.1'"):
1108                    self.loop.run_until_complete(f_c)
1109
1110        # close connection
1111        # transport is None because TLS ALERT aborted the handshake
1112        self.assertIsNone(proto.transport)
1113        server.close()
1114
1115    @socket_helper.skip_unless_bind_unix_socket
1116    @unittest.skipIf(ssl is None, 'No ssl module')
1117    def test_create_unix_server_ssl_verified(self):
1118        proto = MyProto(loop=self.loop)
1119        server, path = self._make_ssl_unix_server(
1120            lambda: proto, test_utils.SIGNED_CERTFILE)
1121
1122        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1123        sslcontext_client.options |= ssl.OP_NO_SSLv2
1124        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1125        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1126        if hasattr(sslcontext_client, 'check_hostname'):
1127            sslcontext_client.check_hostname = True
1128
1129        # Connection succeeds with correct CA and server hostname.
1130        f_c = self.loop.create_unix_connection(MyProto, path,
1131                                               ssl=sslcontext_client,
1132                                               server_hostname='localhost')
1133        client, pr = self.loop.run_until_complete(f_c)
1134        self.loop.run_until_complete(proto.connected)
1135
1136        # close connection
1137        proto.transport.close()
1138        client.close()
1139        server.close()
1140        self.loop.run_until_complete(proto.done)
1141
1142    @unittest.skipIf(ssl is None, 'No ssl module')
1143    def test_create_server_ssl_verified(self):
1144        proto = MyProto(loop=self.loop)
1145        server, host, port = self._make_ssl_server(
1146            lambda: proto, test_utils.SIGNED_CERTFILE)
1147
1148        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
1149        sslcontext_client.options |= ssl.OP_NO_SSLv2
1150        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
1151        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
1152        if hasattr(sslcontext_client, 'check_hostname'):
1153            sslcontext_client.check_hostname = True
1154
1155        # Connection succeeds with correct CA and server hostname.
1156        f_c = self.loop.create_connection(MyProto, host, port,
1157                                          ssl=sslcontext_client,
1158                                          server_hostname='localhost')
1159        client, pr = self.loop.run_until_complete(f_c)
1160        self.loop.run_until_complete(proto.connected)
1161
1162        # extra info is available
1163        self.check_ssl_extra_info(client, peername=(host, port),
1164                                  peercert=test_utils.PEERCERT)
1165
1166        # close connection
1167        proto.transport.close()
1168        client.close()
1169        server.close()
1170        self.loop.run_until_complete(proto.done)
1171
1172    def test_create_server_sock(self):
1173        proto = self.loop.create_future()
1174
1175        class TestMyProto(MyProto):
1176            def connection_made(self, transport):
1177                super().connection_made(transport)
1178                proto.set_result(self)
1179
1180        sock_ob = socket.create_server(('0.0.0.0', 0))
1181
1182        f = self.loop.create_server(TestMyProto, sock=sock_ob)
1183        server = self.loop.run_until_complete(f)
1184        sock = server.sockets[0]
1185        self.assertEqual(sock.fileno(), sock_ob.fileno())
1186
1187        host, port = sock.getsockname()
1188        self.assertEqual(host, '0.0.0.0')
1189        client = socket.socket()
1190        client.connect(('127.0.0.1', port))
1191        client.send(b'xxx')
1192        client.close()
1193        server.close()
1194
1195    def test_create_server_addr_in_use(self):
1196        sock_ob = socket.create_server(('0.0.0.0', 0))
1197
1198        f = self.loop.create_server(MyProto, sock=sock_ob)
1199        server = self.loop.run_until_complete(f)
1200        sock = server.sockets[0]
1201        host, port = sock.getsockname()
1202
1203        f = self.loop.create_server(MyProto, host=host, port=port)
1204        with self.assertRaises(OSError) as cm:
1205            self.loop.run_until_complete(f)
1206        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
1207
1208        server.close()
1209
1210    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1211    def test_create_server_dual_stack(self):
1212        f_proto = self.loop.create_future()
1213
1214        class TestMyProto(MyProto):
1215            def connection_made(self, transport):
1216                super().connection_made(transport)
1217                f_proto.set_result(self)
1218
1219        try_count = 0
1220        while True:
1221            try:
1222                port = socket_helper.find_unused_port()
1223                f = self.loop.create_server(TestMyProto, host=None, port=port)
1224                server = self.loop.run_until_complete(f)
1225            except OSError as ex:
1226                if ex.errno == errno.EADDRINUSE:
1227                    try_count += 1
1228                    self.assertGreaterEqual(5, try_count)
1229                    continue
1230                else:
1231                    raise
1232            else:
1233                break
1234        client = socket.socket()
1235        client.connect(('127.0.0.1', port))
1236        client.send(b'xxx')
1237        proto = self.loop.run_until_complete(f_proto)
1238        proto.transport.close()
1239        client.close()
1240
1241        f_proto = self.loop.create_future()
1242        client = socket.socket(socket.AF_INET6)
1243        client.connect(('::1', port))
1244        client.send(b'xxx')
1245        proto = self.loop.run_until_complete(f_proto)
1246        proto.transport.close()
1247        client.close()
1248
1249        server.close()
1250
1251    def test_server_close(self):
1252        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
1253        server = self.loop.run_until_complete(f)
1254        sock = server.sockets[0]
1255        host, port = sock.getsockname()
1256
1257        client = socket.socket()
1258        client.connect(('127.0.0.1', port))
1259        client.send(b'xxx')
1260        client.close()
1261
1262        server.close()
1263
1264        client = socket.socket()
1265        self.assertRaises(
1266            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
1267        client.close()
1268
1269    def _test_create_datagram_endpoint(self, local_addr, family):
1270        class TestMyDatagramProto(MyDatagramProto):
1271            def __init__(inner_self):
1272                super().__init__(loop=self.loop)
1273
1274            def datagram_received(self, data, addr):
1275                super().datagram_received(data, addr)
1276                self.transport.sendto(b'resp:'+data, addr)
1277
1278        coro = self.loop.create_datagram_endpoint(
1279            TestMyDatagramProto, local_addr=local_addr, family=family)
1280        s_transport, server = self.loop.run_until_complete(coro)
1281        sockname = s_transport.get_extra_info('sockname')
1282        host, port = socket.getnameinfo(
1283            sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
1284
1285        self.assertIsInstance(s_transport, asyncio.Transport)
1286        self.assertIsInstance(server, TestMyDatagramProto)
1287        self.assertEqual('INITIALIZED', server.state)
1288        self.assertIs(server.transport, s_transport)
1289
1290        coro = self.loop.create_datagram_endpoint(
1291            lambda: MyDatagramProto(loop=self.loop),
1292            remote_addr=(host, port))
1293        transport, client = self.loop.run_until_complete(coro)
1294
1295        self.assertIsInstance(transport, asyncio.Transport)
1296        self.assertIsInstance(client, MyDatagramProto)
1297        self.assertEqual('INITIALIZED', client.state)
1298        self.assertIs(client.transport, transport)
1299
1300        transport.sendto(b'xxx')
1301        test_utils.run_until(self.loop, lambda: server.nbytes)
1302        self.assertEqual(3, server.nbytes)
1303        test_utils.run_until(self.loop, lambda: client.nbytes)
1304
1305        # received
1306        self.assertEqual(8, client.nbytes)
1307
1308        # extra info is available
1309        self.assertIsNotNone(transport.get_extra_info('sockname'))
1310
1311        # close connection
1312        transport.close()
1313        self.loop.run_until_complete(client.done)
1314        self.assertEqual('CLOSED', client.state)
1315        server.transport.close()
1316
1317    def test_create_datagram_endpoint(self):
1318        self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
1319
1320    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
1321    def test_create_datagram_endpoint_ipv6(self):
1322        self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
1323
1324    def test_create_datagram_endpoint_sock(self):
1325        sock = None
1326        local_address = ('127.0.0.1', 0)
1327        infos = self.loop.run_until_complete(
1328            self.loop.getaddrinfo(
1329                *local_address, type=socket.SOCK_DGRAM))
1330        for family, type, proto, cname, address in infos:
1331            try:
1332                sock = socket.socket(family=family, type=type, proto=proto)
1333                sock.setblocking(False)
1334                sock.bind(address)
1335            except:
1336                pass
1337            else:
1338                break
1339        else:
1340            self.fail('Can not create socket.')
1341
1342        f = self.loop.create_datagram_endpoint(
1343            lambda: MyDatagramProto(loop=self.loop), sock=sock)
1344        tr, pr = self.loop.run_until_complete(f)
1345        self.assertIsInstance(tr, asyncio.Transport)
1346        self.assertIsInstance(pr, MyDatagramProto)
1347        tr.close()
1348        self.loop.run_until_complete(pr.done)
1349
1350    def test_internal_fds(self):
1351        loop = self.create_event_loop()
1352        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
1353            loop.close()
1354            self.skipTest('loop is not a BaseSelectorEventLoop')
1355
1356        self.assertEqual(1, loop._internal_fds)
1357        loop.close()
1358        self.assertEqual(0, loop._internal_fds)
1359        self.assertIsNone(loop._csock)
1360        self.assertIsNone(loop._ssock)
1361
1362    @unittest.skipUnless(sys.platform != 'win32',
1363                         "Don't support pipes for Windows")
1364    def test_read_pipe(self):
1365        proto = MyReadPipeProto(loop=self.loop)
1366
1367        rpipe, wpipe = os.pipe()
1368        pipeobj = io.open(rpipe, 'rb', 1024)
1369
1370        async def connect():
1371            t, p = await self.loop.connect_read_pipe(
1372                lambda: proto, pipeobj)
1373            self.assertIs(p, proto)
1374            self.assertIs(t, proto.transport)
1375            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1376            self.assertEqual(0, proto.nbytes)
1377
1378        self.loop.run_until_complete(connect())
1379
1380        os.write(wpipe, b'1')
1381        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
1382        self.assertEqual(1, proto.nbytes)
1383
1384        os.write(wpipe, b'2345')
1385        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1386        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1387        self.assertEqual(5, proto.nbytes)
1388
1389        os.close(wpipe)
1390        self.loop.run_until_complete(proto.done)
1391        self.assertEqual(
1392            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1393        # extra info is available
1394        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1395
1396    @unittest.skipUnless(sys.platform != 'win32',
1397                         "Don't support pipes for Windows")
1398    def test_unclosed_pipe_transport(self):
1399        # This test reproduces the issue #314 on GitHub
1400        loop = self.create_event_loop()
1401        read_proto = MyReadPipeProto(loop=loop)
1402        write_proto = MyWritePipeProto(loop=loop)
1403
1404        rpipe, wpipe = os.pipe()
1405        rpipeobj = io.open(rpipe, 'rb', 1024)
1406        wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8")
1407
1408        async def connect():
1409            read_transport, _ = await loop.connect_read_pipe(
1410                lambda: read_proto, rpipeobj)
1411            write_transport, _ = await loop.connect_write_pipe(
1412                lambda: write_proto, wpipeobj)
1413            return read_transport, write_transport
1414
1415        # Run and close the loop without closing the transports
1416        read_transport, write_transport = loop.run_until_complete(connect())
1417        loop.close()
1418
1419        # These 'repr' calls used to raise an AttributeError
1420        # See Issue #314 on GitHub
1421        self.assertIn('open', repr(read_transport))
1422        self.assertIn('open', repr(write_transport))
1423
1424        # Clean up (avoid ResourceWarning)
1425        rpipeobj.close()
1426        wpipeobj.close()
1427        read_transport._pipe = None
1428        write_transport._pipe = None
1429
1430    @unittest.skipUnless(sys.platform != 'win32',
1431                         "Don't support pipes for Windows")
1432    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1433    def test_read_pty_output(self):
1434        proto = MyReadPipeProto(loop=self.loop)
1435
1436        master, slave = os.openpty()
1437        master_read_obj = io.open(master, 'rb', 0)
1438
1439        async def connect():
1440            t, p = await self.loop.connect_read_pipe(lambda: proto,
1441                                                     master_read_obj)
1442            self.assertIs(p, proto)
1443            self.assertIs(t, proto.transport)
1444            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1445            self.assertEqual(0, proto.nbytes)
1446
1447        self.loop.run_until_complete(connect())
1448
1449        os.write(slave, b'1')
1450        test_utils.run_until(self.loop, lambda: proto.nbytes)
1451        self.assertEqual(1, proto.nbytes)
1452
1453        os.write(slave, b'2345')
1454        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
1455        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
1456        self.assertEqual(5, proto.nbytes)
1457
1458        os.close(slave)
1459        proto.transport.close()
1460        self.loop.run_until_complete(proto.done)
1461        self.assertEqual(
1462            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
1463        # extra info is available
1464        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1465
1466    @unittest.skipUnless(sys.platform != 'win32',
1467                         "Don't support pipes for Windows")
1468    def test_write_pipe(self):
1469        rpipe, wpipe = os.pipe()
1470        pipeobj = io.open(wpipe, 'wb', 1024)
1471
1472        proto = MyWritePipeProto(loop=self.loop)
1473        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1474        transport, p = self.loop.run_until_complete(connect)
1475        self.assertIs(p, proto)
1476        self.assertIs(transport, proto.transport)
1477        self.assertEqual('CONNECTED', proto.state)
1478
1479        transport.write(b'1')
1480
1481        data = bytearray()
1482        def reader(data):
1483            chunk = os.read(rpipe, 1024)
1484            data += chunk
1485            return len(data)
1486
1487        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
1488        self.assertEqual(b'1', data)
1489
1490        transport.write(b'2345')
1491        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
1492        self.assertEqual(b'12345', data)
1493        self.assertEqual('CONNECTED', proto.state)
1494
1495        os.close(rpipe)
1496
1497        # extra info is available
1498        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1499
1500        # close connection
1501        proto.transport.close()
1502        self.loop.run_until_complete(proto.done)
1503        self.assertEqual('CLOSED', proto.state)
1504
1505    @unittest.skipUnless(sys.platform != 'win32',
1506                         "Don't support pipes for Windows")
1507    def test_write_pipe_disconnect_on_close(self):
1508        rsock, wsock = socket.socketpair()
1509        rsock.setblocking(False)
1510        pipeobj = io.open(wsock.detach(), 'wb', 1024)
1511
1512        proto = MyWritePipeProto(loop=self.loop)
1513        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
1514        transport, p = self.loop.run_until_complete(connect)
1515        self.assertIs(p, proto)
1516        self.assertIs(transport, proto.transport)
1517        self.assertEqual('CONNECTED', proto.state)
1518
1519        transport.write(b'1')
1520        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
1521        self.assertEqual(b'1', data)
1522
1523        rsock.close()
1524
1525        self.loop.run_until_complete(proto.done)
1526        self.assertEqual('CLOSED', proto.state)
1527
1528    @unittest.skipUnless(sys.platform != 'win32',
1529                         "Don't support pipes for Windows")
1530    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1531    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1532    # older than 10.6 (Snow Leopard)
1533    @support.requires_mac_ver(10, 6)
1534    def test_write_pty(self):
1535        master, slave = os.openpty()
1536        slave_write_obj = io.open(slave, 'wb', 0)
1537
1538        proto = MyWritePipeProto(loop=self.loop)
1539        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
1540        transport, p = self.loop.run_until_complete(connect)
1541        self.assertIs(p, proto)
1542        self.assertIs(transport, proto.transport)
1543        self.assertEqual('CONNECTED', proto.state)
1544
1545        transport.write(b'1')
1546
1547        data = bytearray()
1548        def reader(data):
1549            chunk = os.read(master, 1024)
1550            data += chunk
1551            return len(data)
1552
1553        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1554                             timeout=support.SHORT_TIMEOUT)
1555        self.assertEqual(b'1', data)
1556
1557        transport.write(b'2345')
1558        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1559                             timeout=support.SHORT_TIMEOUT)
1560        self.assertEqual(b'12345', data)
1561        self.assertEqual('CONNECTED', proto.state)
1562
1563        os.close(master)
1564
1565        # extra info is available
1566        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
1567
1568        # close connection
1569        proto.transport.close()
1570        self.loop.run_until_complete(proto.done)
1571        self.assertEqual('CLOSED', proto.state)
1572
1573    @unittest.skipUnless(sys.platform != 'win32',
1574                         "Don't support pipes for Windows")
1575    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
1576    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
1577    # older than 10.6 (Snow Leopard)
1578    @support.requires_mac_ver(10, 6)
1579    def test_bidirectional_pty(self):
1580        master, read_slave = os.openpty()
1581        write_slave = os.dup(read_slave)
1582        tty.setraw(read_slave)
1583
1584        slave_read_obj = io.open(read_slave, 'rb', 0)
1585        read_proto = MyReadPipeProto(loop=self.loop)
1586        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
1587                                                   slave_read_obj)
1588        read_transport, p = self.loop.run_until_complete(read_connect)
1589        self.assertIs(p, read_proto)
1590        self.assertIs(read_transport, read_proto.transport)
1591        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1592        self.assertEqual(0, read_proto.nbytes)
1593
1594
1595        slave_write_obj = io.open(write_slave, 'wb', 0)
1596        write_proto = MyWritePipeProto(loop=self.loop)
1597        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
1598                                                     slave_write_obj)
1599        write_transport, p = self.loop.run_until_complete(write_connect)
1600        self.assertIs(p, write_proto)
1601        self.assertIs(write_transport, write_proto.transport)
1602        self.assertEqual('CONNECTED', write_proto.state)
1603
1604        data = bytearray()
1605        def reader(data):
1606            chunk = os.read(master, 1024)
1607            data += chunk
1608            return len(data)
1609
1610        write_transport.write(b'1')
1611        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
1612                             timeout=support.SHORT_TIMEOUT)
1613        self.assertEqual(b'1', data)
1614        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1615        self.assertEqual('CONNECTED', write_proto.state)
1616
1617        os.write(master, b'a')
1618        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
1619                             timeout=support.SHORT_TIMEOUT)
1620        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1621        self.assertEqual(1, read_proto.nbytes)
1622        self.assertEqual('CONNECTED', write_proto.state)
1623
1624        write_transport.write(b'2345')
1625        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
1626                             timeout=support.SHORT_TIMEOUT)
1627        self.assertEqual(b'12345', data)
1628        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1629        self.assertEqual('CONNECTED', write_proto.state)
1630
1631        os.write(master, b'bcde')
1632        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
1633                             timeout=support.SHORT_TIMEOUT)
1634        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
1635        self.assertEqual(5, read_proto.nbytes)
1636        self.assertEqual('CONNECTED', write_proto.state)
1637
1638        os.close(master)
1639
1640        read_transport.close()
1641        self.loop.run_until_complete(read_proto.done)
1642        self.assertEqual(
1643            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
1644
1645        write_transport.close()
1646        self.loop.run_until_complete(write_proto.done)
1647        self.assertEqual('CLOSED', write_proto.state)
1648
1649    def test_prompt_cancellation(self):
1650        r, w = socket.socketpair()
1651        r.setblocking(False)
1652        f = self.loop.create_task(self.loop.sock_recv(r, 1))
1653        ov = getattr(f, 'ov', None)
1654        if ov is not None:
1655            self.assertTrue(ov.pending)
1656
1657        async def main():
1658            try:
1659                self.loop.call_soon(f.cancel)
1660                await f
1661            except asyncio.CancelledError:
1662                res = 'cancelled'
1663            else:
1664                res = None
1665            finally:
1666                self.loop.stop()
1667            return res
1668
1669        start = time.monotonic()
1670        t = self.loop.create_task(main())
1671        self.loop.run_forever()
1672        elapsed = time.monotonic() - start
1673
1674        self.assertLess(elapsed, 0.1)
1675        self.assertEqual(t.result(), 'cancelled')
1676        self.assertRaises(asyncio.CancelledError, f.result)
1677        if ov is not None:
1678            self.assertFalse(ov.pending)
1679        self.loop._stop_serving(r)
1680
1681        r.close()
1682        w.close()
1683
1684    def test_timeout_rounding(self):
1685        def _run_once():
1686            self.loop._run_once_counter += 1
1687            orig_run_once()
1688
1689        orig_run_once = self.loop._run_once
1690        self.loop._run_once_counter = 0
1691        self.loop._run_once = _run_once
1692
1693        async def wait():
1694            loop = self.loop
1695            await asyncio.sleep(1e-2)
1696            await asyncio.sleep(1e-4)
1697            await asyncio.sleep(1e-6)
1698            await asyncio.sleep(1e-8)
1699            await asyncio.sleep(1e-10)
1700
1701        self.loop.run_until_complete(wait())
1702        # The ideal number of call is 12, but on some platforms, the selector
1703        # may sleep at little bit less than timeout depending on the resolution
1704        # of the clock used by the kernel. Tolerate a few useless calls on
1705        # these platforms.
1706        self.assertLessEqual(self.loop._run_once_counter, 20,
1707            {'clock_resolution': self.loop._clock_resolution,
1708             'selector': self.loop._selector.__class__.__name__})
1709
1710    def test_remove_fds_after_closing(self):
1711        loop = self.create_event_loop()
1712        callback = lambda: None
1713        r, w = socket.socketpair()
1714        self.addCleanup(r.close)
1715        self.addCleanup(w.close)
1716        loop.add_reader(r, callback)
1717        loop.add_writer(w, callback)
1718        loop.close()
1719        self.assertFalse(loop.remove_reader(r))
1720        self.assertFalse(loop.remove_writer(w))
1721
1722    def test_add_fds_after_closing(self):
1723        loop = self.create_event_loop()
1724        callback = lambda: None
1725        r, w = socket.socketpair()
1726        self.addCleanup(r.close)
1727        self.addCleanup(w.close)
1728        loop.close()
1729        with self.assertRaises(RuntimeError):
1730            loop.add_reader(r, callback)
1731        with self.assertRaises(RuntimeError):
1732            loop.add_writer(w, callback)
1733
1734    def test_close_running_event_loop(self):
1735        async def close_loop(loop):
1736            self.loop.close()
1737
1738        coro = close_loop(self.loop)
1739        with self.assertRaises(RuntimeError):
1740            self.loop.run_until_complete(coro)
1741
1742    def test_close(self):
1743        self.loop.close()
1744
1745        async def test():
1746            pass
1747
1748        func = lambda: False
1749        coro = test()
1750        self.addCleanup(coro.close)
1751
1752        # operation blocked when the loop is closed
1753        with self.assertRaises(RuntimeError):
1754            self.loop.run_forever()
1755        with self.assertRaises(RuntimeError):
1756            fut = self.loop.create_future()
1757            self.loop.run_until_complete(fut)
1758        with self.assertRaises(RuntimeError):
1759            self.loop.call_soon(func)
1760        with self.assertRaises(RuntimeError):
1761            self.loop.call_soon_threadsafe(func)
1762        with self.assertRaises(RuntimeError):
1763            self.loop.call_later(1.0, func)
1764        with self.assertRaises(RuntimeError):
1765            self.loop.call_at(self.loop.time() + .0, func)
1766        with self.assertRaises(RuntimeError):
1767            self.loop.create_task(coro)
1768        with self.assertRaises(RuntimeError):
1769            self.loop.add_signal_handler(signal.SIGTERM, func)
1770
1771        # run_in_executor test is tricky: the method is a coroutine,
1772        # but run_until_complete cannot be called on closed loop.
1773        # Thus iterate once explicitly.
1774        with self.assertRaises(RuntimeError):
1775            it = self.loop.run_in_executor(None, func).__await__()
1776            next(it)
1777
1778
1779class SubprocessTestsMixin:
1780
1781    def check_terminated(self, returncode):
1782        if sys.platform == 'win32':
1783            self.assertIsInstance(returncode, int)
1784            # expect 1 but sometimes get 0
1785        else:
1786            self.assertEqual(-signal.SIGTERM, returncode)
1787
1788    def check_killed(self, returncode):
1789        if sys.platform == 'win32':
1790            self.assertIsInstance(returncode, int)
1791            # expect 1 but sometimes get 0
1792        else:
1793            self.assertEqual(-signal.SIGKILL, returncode)
1794
1795    def test_subprocess_exec(self):
1796        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1797
1798        connect = self.loop.subprocess_exec(
1799                        functools.partial(MySubprocessProtocol, self.loop),
1800                        sys.executable, prog)
1801
1802        transp, proto = self.loop.run_until_complete(connect)
1803        self.assertIsInstance(proto, MySubprocessProtocol)
1804        self.loop.run_until_complete(proto.connected)
1805        self.assertEqual('CONNECTED', proto.state)
1806
1807        stdin = transp.get_pipe_transport(0)
1808        stdin.write(b'Python The Winner')
1809        self.loop.run_until_complete(proto.got_data[1].wait())
1810        with test_utils.disable_logger():
1811            transp.close()
1812        self.loop.run_until_complete(proto.completed)
1813        self.check_killed(proto.returncode)
1814        self.assertEqual(b'Python The Winner', proto.data[1])
1815
1816    def test_subprocess_interactive(self):
1817        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1818
1819        connect = self.loop.subprocess_exec(
1820                        functools.partial(MySubprocessProtocol, self.loop),
1821                        sys.executable, prog)
1822
1823        transp, proto = self.loop.run_until_complete(connect)
1824        self.assertIsInstance(proto, MySubprocessProtocol)
1825        self.loop.run_until_complete(proto.connected)
1826        self.assertEqual('CONNECTED', proto.state)
1827
1828        stdin = transp.get_pipe_transport(0)
1829        stdin.write(b'Python ')
1830        self.loop.run_until_complete(proto.got_data[1].wait())
1831        proto.got_data[1].clear()
1832        self.assertEqual(b'Python ', proto.data[1])
1833
1834        stdin.write(b'The Winner')
1835        self.loop.run_until_complete(proto.got_data[1].wait())
1836        self.assertEqual(b'Python The Winner', proto.data[1])
1837
1838        with test_utils.disable_logger():
1839            transp.close()
1840        self.loop.run_until_complete(proto.completed)
1841        self.check_killed(proto.returncode)
1842
1843    def test_subprocess_shell(self):
1844        connect = self.loop.subprocess_shell(
1845                        functools.partial(MySubprocessProtocol, self.loop),
1846                        'echo Python')
1847        transp, proto = self.loop.run_until_complete(connect)
1848        self.assertIsInstance(proto, MySubprocessProtocol)
1849        self.loop.run_until_complete(proto.connected)
1850
1851        transp.get_pipe_transport(0).close()
1852        self.loop.run_until_complete(proto.completed)
1853        self.assertEqual(0, proto.returncode)
1854        self.assertTrue(all(f.done() for f in proto.disconnects.values()))
1855        self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
1856        self.assertEqual(proto.data[2], b'')
1857        transp.close()
1858
1859    def test_subprocess_exitcode(self):
1860        connect = self.loop.subprocess_shell(
1861                        functools.partial(MySubprocessProtocol, self.loop),
1862                        'exit 7', stdin=None, stdout=None, stderr=None)
1863
1864        transp, proto = self.loop.run_until_complete(connect)
1865        self.assertIsInstance(proto, MySubprocessProtocol)
1866        self.loop.run_until_complete(proto.completed)
1867        self.assertEqual(7, proto.returncode)
1868        transp.close()
1869
1870    def test_subprocess_close_after_finish(self):
1871        connect = self.loop.subprocess_shell(
1872                        functools.partial(MySubprocessProtocol, self.loop),
1873                        'exit 7', stdin=None, stdout=None, stderr=None)
1874
1875        transp, proto = self.loop.run_until_complete(connect)
1876        self.assertIsInstance(proto, MySubprocessProtocol)
1877        self.assertIsNone(transp.get_pipe_transport(0))
1878        self.assertIsNone(transp.get_pipe_transport(1))
1879        self.assertIsNone(transp.get_pipe_transport(2))
1880        self.loop.run_until_complete(proto.completed)
1881        self.assertEqual(7, proto.returncode)
1882        self.assertIsNone(transp.close())
1883
1884    def test_subprocess_kill(self):
1885        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1886
1887        connect = self.loop.subprocess_exec(
1888                        functools.partial(MySubprocessProtocol, self.loop),
1889                        sys.executable, prog)
1890
1891        transp, proto = self.loop.run_until_complete(connect)
1892        self.assertIsInstance(proto, MySubprocessProtocol)
1893        self.loop.run_until_complete(proto.connected)
1894
1895        transp.kill()
1896        self.loop.run_until_complete(proto.completed)
1897        self.check_killed(proto.returncode)
1898        transp.close()
1899
1900    def test_subprocess_terminate(self):
1901        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1902
1903        connect = self.loop.subprocess_exec(
1904                        functools.partial(MySubprocessProtocol, self.loop),
1905                        sys.executable, prog)
1906
1907        transp, proto = self.loop.run_until_complete(connect)
1908        self.assertIsInstance(proto, MySubprocessProtocol)
1909        self.loop.run_until_complete(proto.connected)
1910
1911        transp.terminate()
1912        self.loop.run_until_complete(proto.completed)
1913        self.check_terminated(proto.returncode)
1914        transp.close()
1915
1916    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
1917    def test_subprocess_send_signal(self):
1918        # bpo-31034: Make sure that we get the default signal handler (killing
1919        # the process). The parent process may have decided to ignore SIGHUP,
1920        # and signal handlers are inherited.
1921        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
1922        try:
1923            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
1924
1925            connect = self.loop.subprocess_exec(
1926                            functools.partial(MySubprocessProtocol, self.loop),
1927                            sys.executable, prog)
1928
1929
1930            transp, proto = self.loop.run_until_complete(connect)
1931            self.assertIsInstance(proto, MySubprocessProtocol)
1932            self.loop.run_until_complete(proto.connected)
1933
1934            transp.send_signal(signal.SIGHUP)
1935            self.loop.run_until_complete(proto.completed)
1936            self.assertEqual(-signal.SIGHUP, proto.returncode)
1937            transp.close()
1938        finally:
1939            signal.signal(signal.SIGHUP, old_handler)
1940
1941    def test_subprocess_stderr(self):
1942        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1943
1944        connect = self.loop.subprocess_exec(
1945                        functools.partial(MySubprocessProtocol, self.loop),
1946                        sys.executable, prog)
1947
1948        transp, proto = self.loop.run_until_complete(connect)
1949        self.assertIsInstance(proto, MySubprocessProtocol)
1950        self.loop.run_until_complete(proto.connected)
1951
1952        stdin = transp.get_pipe_transport(0)
1953        stdin.write(b'test')
1954
1955        self.loop.run_until_complete(proto.completed)
1956
1957        transp.close()
1958        self.assertEqual(b'OUT:test', proto.data[1])
1959        self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
1960        self.assertEqual(0, proto.returncode)
1961
1962    def test_subprocess_stderr_redirect_to_stdout(self):
1963        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
1964
1965        connect = self.loop.subprocess_exec(
1966                        functools.partial(MySubprocessProtocol, self.loop),
1967                        sys.executable, prog, stderr=subprocess.STDOUT)
1968
1969
1970        transp, proto = self.loop.run_until_complete(connect)
1971        self.assertIsInstance(proto, MySubprocessProtocol)
1972        self.loop.run_until_complete(proto.connected)
1973
1974        stdin = transp.get_pipe_transport(0)
1975        self.assertIsNotNone(transp.get_pipe_transport(1))
1976        self.assertIsNone(transp.get_pipe_transport(2))
1977
1978        stdin.write(b'test')
1979        self.loop.run_until_complete(proto.completed)
1980        self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
1981                        proto.data[1])
1982        self.assertEqual(b'', proto.data[2])
1983
1984        transp.close()
1985        self.assertEqual(0, proto.returncode)
1986
1987    def test_subprocess_close_client_stream(self):
1988        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
1989
1990        connect = self.loop.subprocess_exec(
1991                        functools.partial(MySubprocessProtocol, self.loop),
1992                        sys.executable, prog)
1993
1994        transp, proto = self.loop.run_until_complete(connect)
1995        self.assertIsInstance(proto, MySubprocessProtocol)
1996        self.loop.run_until_complete(proto.connected)
1997
1998        stdin = transp.get_pipe_transport(0)
1999        stdout = transp.get_pipe_transport(1)
2000        stdin.write(b'test')
2001        self.loop.run_until_complete(proto.got_data[1].wait())
2002        self.assertEqual(b'OUT:test', proto.data[1])
2003
2004        stdout.close()
2005        self.loop.run_until_complete(proto.disconnects[1])
2006        stdin.write(b'xxx')
2007        self.loop.run_until_complete(proto.got_data[2].wait())
2008        if sys.platform != 'win32':
2009            self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
2010        else:
2011            # After closing the read-end of a pipe, writing to the
2012            # write-end using os.write() fails with errno==EINVAL and
2013            # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
2014            # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
2015            self.assertEqual(b'ERR:OSError', proto.data[2])
2016        with test_utils.disable_logger():
2017            transp.close()
2018        self.loop.run_until_complete(proto.completed)
2019        self.check_killed(proto.returncode)
2020
2021    def test_subprocess_wait_no_same_group(self):
2022        # start the new process in a new session
2023        connect = self.loop.subprocess_shell(
2024                        functools.partial(MySubprocessProtocol, self.loop),
2025                        'exit 7', stdin=None, stdout=None, stderr=None,
2026                        start_new_session=True)
2027        transp, proto = self.loop.run_until_complete(connect)
2028        self.assertIsInstance(proto, MySubprocessProtocol)
2029        self.loop.run_until_complete(proto.completed)
2030        self.assertEqual(7, proto.returncode)
2031        transp.close()
2032
2033    def test_subprocess_exec_invalid_args(self):
2034        async def connect(**kwds):
2035            await self.loop.subprocess_exec(
2036                asyncio.SubprocessProtocol,
2037                'pwd', **kwds)
2038
2039        with self.assertRaises(ValueError):
2040            self.loop.run_until_complete(connect(universal_newlines=True))
2041        with self.assertRaises(ValueError):
2042            self.loop.run_until_complete(connect(bufsize=4096))
2043        with self.assertRaises(ValueError):
2044            self.loop.run_until_complete(connect(shell=True))
2045
2046    def test_subprocess_shell_invalid_args(self):
2047
2048        async def connect(cmd=None, **kwds):
2049            if not cmd:
2050                cmd = 'pwd'
2051            await self.loop.subprocess_shell(
2052                asyncio.SubprocessProtocol,
2053                cmd, **kwds)
2054
2055        with self.assertRaises(ValueError):
2056            self.loop.run_until_complete(connect(['ls', '-l']))
2057        with self.assertRaises(ValueError):
2058            self.loop.run_until_complete(connect(universal_newlines=True))
2059        with self.assertRaises(ValueError):
2060            self.loop.run_until_complete(connect(bufsize=4096))
2061        with self.assertRaises(ValueError):
2062            self.loop.run_until_complete(connect(shell=False))
2063
2064
2065if sys.platform == 'win32':
2066
2067    class SelectEventLoopTests(EventLoopTestsMixin,
2068                               test_utils.TestCase):
2069
2070        def create_event_loop(self):
2071            return asyncio.SelectorEventLoop()
2072
2073    class ProactorEventLoopTests(EventLoopTestsMixin,
2074                                 SubprocessTestsMixin,
2075                                 test_utils.TestCase):
2076
2077        def create_event_loop(self):
2078            return asyncio.ProactorEventLoop()
2079
2080        def test_reader_callback(self):
2081            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2082
2083        def test_reader_callback_cancel(self):
2084            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2085
2086        def test_writer_callback(self):
2087            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2088
2089        def test_writer_callback_cancel(self):
2090            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
2091
2092        def test_remove_fds_after_closing(self):
2093            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
2094else:
2095    import selectors
2096
2097    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
2098        def setUp(self):
2099            super().setUp()
2100            watcher = asyncio.SafeChildWatcher()
2101            watcher.attach_loop(self.loop)
2102            asyncio.set_child_watcher(watcher)
2103
2104        def tearDown(self):
2105            asyncio.set_child_watcher(None)
2106            super().tearDown()
2107
2108
2109    if hasattr(selectors, 'KqueueSelector'):
2110        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
2111                                   SubprocessTestsMixin,
2112                                   test_utils.TestCase):
2113
2114            def create_event_loop(self):
2115                return asyncio.SelectorEventLoop(
2116                    selectors.KqueueSelector())
2117
2118            # kqueue doesn't support character devices (PTY) on Mac OS X older
2119            # than 10.9 (Maverick)
2120            @support.requires_mac_ver(10, 9)
2121            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
2122            # hangs on OpenBSD 5.5
2123            @unittest.skipIf(sys.platform.startswith('openbsd'),
2124                             'test hangs on OpenBSD')
2125            def test_read_pty_output(self):
2126                super().test_read_pty_output()
2127
2128            # kqueue doesn't support character devices (PTY) on Mac OS X older
2129            # than 10.9 (Maverick)
2130            @support.requires_mac_ver(10, 9)
2131            def test_write_pty(self):
2132                super().test_write_pty()
2133
2134    if hasattr(selectors, 'EpollSelector'):
2135        class EPollEventLoopTests(UnixEventLoopTestsMixin,
2136                                  SubprocessTestsMixin,
2137                                  test_utils.TestCase):
2138
2139            def create_event_loop(self):
2140                return asyncio.SelectorEventLoop(selectors.EpollSelector())
2141
2142    if hasattr(selectors, 'PollSelector'):
2143        class PollEventLoopTests(UnixEventLoopTestsMixin,
2144                                 SubprocessTestsMixin,
2145                                 test_utils.TestCase):
2146
2147            def create_event_loop(self):
2148                return asyncio.SelectorEventLoop(selectors.PollSelector())
2149
2150    # Should always exist.
2151    class SelectEventLoopTests(UnixEventLoopTestsMixin,
2152                               SubprocessTestsMixin,
2153                               test_utils.TestCase):
2154
2155        def create_event_loop(self):
2156            return asyncio.SelectorEventLoop(selectors.SelectSelector())
2157
2158
2159def noop(*args, **kwargs):
2160    pass
2161
2162
2163class HandleTests(test_utils.TestCase):
2164
2165    def setUp(self):
2166        super().setUp()
2167        self.loop = mock.Mock()
2168        self.loop.get_debug.return_value = True
2169
2170    def test_handle(self):
2171        def callback(*args):
2172            return args
2173
2174        args = ()
2175        h = asyncio.Handle(callback, args, self.loop)
2176        self.assertIs(h._callback, callback)
2177        self.assertIs(h._args, args)
2178        self.assertFalse(h.cancelled())
2179
2180        h.cancel()
2181        self.assertTrue(h.cancelled())
2182
2183    def test_callback_with_exception(self):
2184        def callback():
2185            raise ValueError()
2186
2187        self.loop = mock.Mock()
2188        self.loop.call_exception_handler = mock.Mock()
2189
2190        h = asyncio.Handle(callback, (), self.loop)
2191        h._run()
2192
2193        self.loop.call_exception_handler.assert_called_with({
2194            'message': test_utils.MockPattern('Exception in callback.*'),
2195            'exception': mock.ANY,
2196            'handle': h,
2197            'source_traceback': h._source_traceback,
2198        })
2199
2200    def test_handle_weakref(self):
2201        wd = weakref.WeakValueDictionary()
2202        h = asyncio.Handle(lambda: None, (), self.loop)
2203        wd['h'] = h  # Would fail without __weakref__ slot.
2204
2205    def test_handle_repr(self):
2206        self.loop.get_debug.return_value = False
2207
2208        # simple function
2209        h = asyncio.Handle(noop, (1, 2), self.loop)
2210        filename, lineno = test_utils.get_function_source(noop)
2211        self.assertEqual(repr(h),
2212                        '<Handle noop(1, 2) at %s:%s>'
2213                        % (filename, lineno))
2214
2215        # cancelled handle
2216        h.cancel()
2217        self.assertEqual(repr(h),
2218                        '<Handle cancelled>')
2219
2220        # decorated function
2221        cb = types.coroutine(noop)
2222        h = asyncio.Handle(cb, (), self.loop)
2223        self.assertEqual(repr(h),
2224                        '<Handle noop() at %s:%s>'
2225                        % (filename, lineno))
2226
2227        # partial function
2228        cb = functools.partial(noop, 1, 2)
2229        h = asyncio.Handle(cb, (3,), self.loop)
2230        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
2231                 % (re.escape(filename), lineno))
2232        self.assertRegex(repr(h), regex)
2233
2234        # partial function with keyword args
2235        cb = functools.partial(noop, x=1)
2236        h = asyncio.Handle(cb, (2, 3), self.loop)
2237        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
2238                 % (re.escape(filename), lineno))
2239        self.assertRegex(repr(h), regex)
2240
2241        # partial method
2242        method = HandleTests.test_handle_repr
2243        cb = functools.partialmethod(method)
2244        filename, lineno = test_utils.get_function_source(method)
2245        h = asyncio.Handle(cb, (), self.loop)
2246
2247        cb_regex = r'<function HandleTests.test_handle_repr .*>'
2248        cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)'
2249        regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$'
2250        self.assertRegex(repr(h), regex)
2251
2252    def test_handle_repr_debug(self):
2253        self.loop.get_debug.return_value = True
2254
2255        # simple function
2256        create_filename = __file__
2257        create_lineno = sys._getframe().f_lineno + 1
2258        h = asyncio.Handle(noop, (1, 2), self.loop)
2259        filename, lineno = test_utils.get_function_source(noop)
2260        self.assertEqual(repr(h),
2261                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
2262                        % (filename, lineno, create_filename, create_lineno))
2263
2264        # cancelled handle
2265        h.cancel()
2266        self.assertEqual(
2267            repr(h),
2268            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2269            % (filename, lineno, create_filename, create_lineno))
2270
2271        # double cancellation won't overwrite _repr
2272        h.cancel()
2273        self.assertEqual(
2274            repr(h),
2275            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
2276            % (filename, lineno, create_filename, create_lineno))
2277
2278    def test_handle_source_traceback(self):
2279        loop = asyncio.get_event_loop_policy().new_event_loop()
2280        loop.set_debug(True)
2281        self.set_event_loop(loop)
2282
2283        def check_source_traceback(h):
2284            lineno = sys._getframe(1).f_lineno - 1
2285            self.assertIsInstance(h._source_traceback, list)
2286            self.assertEqual(h._source_traceback[-1][:3],
2287                             (__file__,
2288                              lineno,
2289                              'test_handle_source_traceback'))
2290
2291        # call_soon
2292        h = loop.call_soon(noop)
2293        check_source_traceback(h)
2294
2295        # call_soon_threadsafe
2296        h = loop.call_soon_threadsafe(noop)
2297        check_source_traceback(h)
2298
2299        # call_later
2300        h = loop.call_later(0, noop)
2301        check_source_traceback(h)
2302
2303        # call_at
2304        h = loop.call_later(0, noop)
2305        check_source_traceback(h)
2306
2307    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
2308                         'No collections.abc.Coroutine')
2309    def test_coroutine_like_object_debug_formatting(self):
2310        # Test that asyncio can format coroutines that are instances of
2311        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
2312        # (such as ones compiled with Cython).
2313
2314        coro = CoroLike()
2315        coro.__name__ = 'AAA'
2316        self.assertTrue(asyncio.iscoroutine(coro))
2317        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2318
2319        coro.__qualname__ = 'BBB'
2320        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
2321
2322        coro.cr_running = True
2323        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
2324
2325        coro.__name__ = coro.__qualname__ = None
2326        self.assertEqual(coroutines._format_coroutine(coro),
2327                         '<CoroLike without __name__>() running')
2328
2329        coro = CoroLike()
2330        coro.__qualname__ = 'CoroLike'
2331        # Some coroutines might not have '__name__', such as
2332        # built-in async_gen.asend().
2333        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
2334
2335        coro = CoroLike()
2336        coro.__qualname__ = 'AAA'
2337        coro.cr_code = None
2338        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
2339
2340
2341class TimerTests(unittest.TestCase):
2342
2343    def setUp(self):
2344        super().setUp()
2345        self.loop = mock.Mock()
2346
2347    def test_hash(self):
2348        when = time.monotonic()
2349        h = asyncio.TimerHandle(when, lambda: False, (),
2350                                mock.Mock())
2351        self.assertEqual(hash(h), hash(when))
2352
2353    def test_when(self):
2354        when = time.monotonic()
2355        h = asyncio.TimerHandle(when, lambda: False, (),
2356                                mock.Mock())
2357        self.assertEqual(when, h.when())
2358
2359    def test_timer(self):
2360        def callback(*args):
2361            return args
2362
2363        args = (1, 2, 3)
2364        when = time.monotonic()
2365        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
2366        self.assertIs(h._callback, callback)
2367        self.assertIs(h._args, args)
2368        self.assertFalse(h.cancelled())
2369
2370        # cancel
2371        h.cancel()
2372        self.assertTrue(h.cancelled())
2373        self.assertIsNone(h._callback)
2374        self.assertIsNone(h._args)
2375
2376
2377    def test_timer_repr(self):
2378        self.loop.get_debug.return_value = False
2379
2380        # simple function
2381        h = asyncio.TimerHandle(123, noop, (), self.loop)
2382        src = test_utils.get_function_source(noop)
2383        self.assertEqual(repr(h),
2384                        '<TimerHandle when=123 noop() at %s:%s>' % src)
2385
2386        # cancelled handle
2387        h.cancel()
2388        self.assertEqual(repr(h),
2389                        '<TimerHandle cancelled when=123>')
2390
2391    def test_timer_repr_debug(self):
2392        self.loop.get_debug.return_value = True
2393
2394        # simple function
2395        create_filename = __file__
2396        create_lineno = sys._getframe().f_lineno + 1
2397        h = asyncio.TimerHandle(123, noop, (), self.loop)
2398        filename, lineno = test_utils.get_function_source(noop)
2399        self.assertEqual(repr(h),
2400                        '<TimerHandle when=123 noop() '
2401                        'at %s:%s created at %s:%s>'
2402                        % (filename, lineno, create_filename, create_lineno))
2403
2404        # cancelled handle
2405        h.cancel()
2406        self.assertEqual(repr(h),
2407                        '<TimerHandle cancelled when=123 noop() '
2408                        'at %s:%s created at %s:%s>'
2409                        % (filename, lineno, create_filename, create_lineno))
2410
2411
2412    def test_timer_comparison(self):
2413        def callback(*args):
2414            return args
2415
2416        when = time.monotonic()
2417
2418        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2419        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
2420        # TODO: Use assertLess etc.
2421        self.assertFalse(h1 < h2)
2422        self.assertFalse(h2 < h1)
2423        self.assertTrue(h1 <= h2)
2424        self.assertTrue(h2 <= h1)
2425        self.assertFalse(h1 > h2)
2426        self.assertFalse(h2 > h1)
2427        self.assertTrue(h1 >= h2)
2428        self.assertTrue(h2 >= h1)
2429        self.assertTrue(h1 == h2)
2430        self.assertFalse(h1 != h2)
2431
2432        h2.cancel()
2433        self.assertFalse(h1 == h2)
2434
2435        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
2436        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
2437        self.assertTrue(h1 < h2)
2438        self.assertFalse(h2 < h1)
2439        self.assertTrue(h1 <= h2)
2440        self.assertFalse(h2 <= h1)
2441        self.assertFalse(h1 > h2)
2442        self.assertTrue(h2 > h1)
2443        self.assertFalse(h1 >= h2)
2444        self.assertTrue(h2 >= h1)
2445        self.assertFalse(h1 == h2)
2446        self.assertTrue(h1 != h2)
2447
2448        h3 = asyncio.Handle(callback, (), self.loop)
2449        self.assertIs(NotImplemented, h1.__eq__(h3))
2450        self.assertIs(NotImplemented, h1.__ne__(h3))
2451
2452        with self.assertRaises(TypeError):
2453            h1 < ()
2454        with self.assertRaises(TypeError):
2455            h1 > ()
2456        with self.assertRaises(TypeError):
2457            h1 <= ()
2458        with self.assertRaises(TypeError):
2459            h1 >= ()
2460        self.assertFalse(h1 == ())
2461        self.assertTrue(h1 != ())
2462
2463        self.assertTrue(h1 == ALWAYS_EQ)
2464        self.assertFalse(h1 != ALWAYS_EQ)
2465        self.assertTrue(h1 < LARGEST)
2466        self.assertFalse(h1 > LARGEST)
2467        self.assertTrue(h1 <= LARGEST)
2468        self.assertFalse(h1 >= LARGEST)
2469        self.assertFalse(h1 < SMALLEST)
2470        self.assertTrue(h1 > SMALLEST)
2471        self.assertFalse(h1 <= SMALLEST)
2472        self.assertTrue(h1 >= SMALLEST)
2473
2474
2475class AbstractEventLoopTests(unittest.TestCase):
2476
2477    def test_not_implemented(self):
2478        f = mock.Mock()
2479        loop = asyncio.AbstractEventLoop()
2480        self.assertRaises(
2481            NotImplementedError, loop.run_forever)
2482        self.assertRaises(
2483            NotImplementedError, loop.run_until_complete, None)
2484        self.assertRaises(
2485            NotImplementedError, loop.stop)
2486        self.assertRaises(
2487            NotImplementedError, loop.is_running)
2488        self.assertRaises(
2489            NotImplementedError, loop.is_closed)
2490        self.assertRaises(
2491            NotImplementedError, loop.close)
2492        self.assertRaises(
2493            NotImplementedError, loop.create_task, None)
2494        self.assertRaises(
2495            NotImplementedError, loop.call_later, None, None)
2496        self.assertRaises(
2497            NotImplementedError, loop.call_at, f, f)
2498        self.assertRaises(
2499            NotImplementedError, loop.call_soon, None)
2500        self.assertRaises(
2501            NotImplementedError, loop.time)
2502        self.assertRaises(
2503            NotImplementedError, loop.call_soon_threadsafe, None)
2504        self.assertRaises(
2505            NotImplementedError, loop.set_default_executor, f)
2506        self.assertRaises(
2507            NotImplementedError, loop.add_reader, 1, f)
2508        self.assertRaises(
2509            NotImplementedError, loop.remove_reader, 1)
2510        self.assertRaises(
2511            NotImplementedError, loop.add_writer, 1, f)
2512        self.assertRaises(
2513            NotImplementedError, loop.remove_writer, 1)
2514        self.assertRaises(
2515            NotImplementedError, loop.add_signal_handler, 1, f)
2516        self.assertRaises(
2517            NotImplementedError, loop.remove_signal_handler, 1)
2518        self.assertRaises(
2519            NotImplementedError, loop.remove_signal_handler, 1)
2520        self.assertRaises(
2521            NotImplementedError, loop.set_exception_handler, f)
2522        self.assertRaises(
2523            NotImplementedError, loop.default_exception_handler, f)
2524        self.assertRaises(
2525            NotImplementedError, loop.call_exception_handler, f)
2526        self.assertRaises(
2527            NotImplementedError, loop.get_debug)
2528        self.assertRaises(
2529            NotImplementedError, loop.set_debug, f)
2530
2531    def test_not_implemented_async(self):
2532
2533        async def inner():
2534            f = mock.Mock()
2535            loop = asyncio.AbstractEventLoop()
2536
2537            with self.assertRaises(NotImplementedError):
2538                await loop.run_in_executor(f, f)
2539            with self.assertRaises(NotImplementedError):
2540                await loop.getaddrinfo('localhost', 8080)
2541            with self.assertRaises(NotImplementedError):
2542                await loop.getnameinfo(('localhost', 8080))
2543            with self.assertRaises(NotImplementedError):
2544                await loop.create_connection(f)
2545            with self.assertRaises(NotImplementedError):
2546                await loop.create_server(f)
2547            with self.assertRaises(NotImplementedError):
2548                await loop.create_datagram_endpoint(f)
2549            with self.assertRaises(NotImplementedError):
2550                await loop.sock_recv(f, 10)
2551            with self.assertRaises(NotImplementedError):
2552                await loop.sock_recv_into(f, 10)
2553            with self.assertRaises(NotImplementedError):
2554                await loop.sock_sendall(f, 10)
2555            with self.assertRaises(NotImplementedError):
2556                await loop.sock_connect(f, f)
2557            with self.assertRaises(NotImplementedError):
2558                await loop.sock_accept(f)
2559            with self.assertRaises(NotImplementedError):
2560                await loop.sock_sendfile(f, f)
2561            with self.assertRaises(NotImplementedError):
2562                await loop.sendfile(f, f)
2563            with self.assertRaises(NotImplementedError):
2564                await loop.connect_read_pipe(f, mock.sentinel.pipe)
2565            with self.assertRaises(NotImplementedError):
2566                await loop.connect_write_pipe(f, mock.sentinel.pipe)
2567            with self.assertRaises(NotImplementedError):
2568                await loop.subprocess_shell(f, mock.sentinel)
2569            with self.assertRaises(NotImplementedError):
2570                await loop.subprocess_exec(f)
2571
2572        loop = asyncio.new_event_loop()
2573        loop.run_until_complete(inner())
2574        loop.close()
2575
2576
2577class PolicyTests(unittest.TestCase):
2578
2579    def test_event_loop_policy(self):
2580        policy = asyncio.AbstractEventLoopPolicy()
2581        self.assertRaises(NotImplementedError, policy.get_event_loop)
2582        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
2583        self.assertRaises(NotImplementedError, policy.new_event_loop)
2584        self.assertRaises(NotImplementedError, policy.get_child_watcher)
2585        self.assertRaises(NotImplementedError, policy.set_child_watcher,
2586                          object())
2587
2588    def test_get_event_loop(self):
2589        policy = asyncio.DefaultEventLoopPolicy()
2590        self.assertIsNone(policy._local._loop)
2591        loop = policy.get_event_loop()
2592        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2593
2594        self.assertIs(policy._local._loop, loop)
2595        self.assertIs(loop, policy.get_event_loop())
2596        loop.close()
2597
2598    def test_get_event_loop_calls_set_event_loop(self):
2599        policy = asyncio.DefaultEventLoopPolicy()
2600
2601        with mock.patch.object(
2602                policy, "set_event_loop",
2603                wraps=policy.set_event_loop) as m_set_event_loop:
2604
2605            loop = policy.get_event_loop()
2606            self.addCleanup(loop.close)
2607
2608            # policy._local._loop must be set through .set_event_loop()
2609            # (the unix DefaultEventLoopPolicy needs this call to attach
2610            # the child watcher correctly)
2611            m_set_event_loop.assert_called_with(loop)
2612
2613        loop.close()
2614
2615    def test_get_event_loop_after_set_none(self):
2616        policy = asyncio.DefaultEventLoopPolicy()
2617        policy.set_event_loop(None)
2618        self.assertRaises(RuntimeError, policy.get_event_loop)
2619
2620    @mock.patch('asyncio.events.threading.current_thread')
2621    def test_get_event_loop_thread(self, m_current_thread):
2622
2623        def f():
2624            policy = asyncio.DefaultEventLoopPolicy()
2625            self.assertRaises(RuntimeError, policy.get_event_loop)
2626
2627        th = threading.Thread(target=f)
2628        th.start()
2629        th.join()
2630
2631    def test_new_event_loop(self):
2632        policy = asyncio.DefaultEventLoopPolicy()
2633
2634        loop = policy.new_event_loop()
2635        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
2636        loop.close()
2637
2638    def test_set_event_loop(self):
2639        policy = asyncio.DefaultEventLoopPolicy()
2640        old_loop = policy.new_event_loop()
2641        policy.set_event_loop(old_loop)
2642
2643        self.assertRaises(TypeError, policy.set_event_loop, object())
2644
2645        loop = policy.new_event_loop()
2646        policy.set_event_loop(loop)
2647        self.assertIs(loop, policy.get_event_loop())
2648        self.assertIsNot(old_loop, policy.get_event_loop())
2649        loop.close()
2650        old_loop.close()
2651
2652    def test_get_event_loop_policy(self):
2653        policy = asyncio.get_event_loop_policy()
2654        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
2655        self.assertIs(policy, asyncio.get_event_loop_policy())
2656
2657    def test_set_event_loop_policy(self):
2658        self.assertRaises(
2659            TypeError, asyncio.set_event_loop_policy, object())
2660
2661        old_policy = asyncio.get_event_loop_policy()
2662
2663        policy = asyncio.DefaultEventLoopPolicy()
2664        asyncio.set_event_loop_policy(policy)
2665        self.assertIs(policy, asyncio.get_event_loop_policy())
2666        self.assertIsNot(policy, old_policy)
2667
2668
2669class GetEventLoopTestsMixin:
2670
2671    _get_running_loop_impl = None
2672    _set_running_loop_impl = None
2673    get_running_loop_impl = None
2674    get_event_loop_impl = None
2675
2676    def setUp(self):
2677        self._get_running_loop_saved = events._get_running_loop
2678        self._set_running_loop_saved = events._set_running_loop
2679        self.get_running_loop_saved = events.get_running_loop
2680        self.get_event_loop_saved = events.get_event_loop
2681
2682        events._get_running_loop = type(self)._get_running_loop_impl
2683        events._set_running_loop = type(self)._set_running_loop_impl
2684        events.get_running_loop = type(self).get_running_loop_impl
2685        events.get_event_loop = type(self).get_event_loop_impl
2686
2687        asyncio._get_running_loop = type(self)._get_running_loop_impl
2688        asyncio._set_running_loop = type(self)._set_running_loop_impl
2689        asyncio.get_running_loop = type(self).get_running_loop_impl
2690        asyncio.get_event_loop = type(self).get_event_loop_impl
2691
2692        super().setUp()
2693
2694        self.loop = asyncio.new_event_loop()
2695        asyncio.set_event_loop(self.loop)
2696
2697        if sys.platform != 'win32':
2698            watcher = asyncio.SafeChildWatcher()
2699            watcher.attach_loop(self.loop)
2700            asyncio.set_child_watcher(watcher)
2701
2702    def tearDown(self):
2703        try:
2704            if sys.platform != 'win32':
2705                asyncio.set_child_watcher(None)
2706
2707            super().tearDown()
2708        finally:
2709            self.loop.close()
2710            asyncio.set_event_loop(None)
2711
2712            events._get_running_loop = self._get_running_loop_saved
2713            events._set_running_loop = self._set_running_loop_saved
2714            events.get_running_loop = self.get_running_loop_saved
2715            events.get_event_loop = self.get_event_loop_saved
2716
2717            asyncio._get_running_loop = self._get_running_loop_saved
2718            asyncio._set_running_loop = self._set_running_loop_saved
2719            asyncio.get_running_loop = self.get_running_loop_saved
2720            asyncio.get_event_loop = self.get_event_loop_saved
2721
2722    if sys.platform != 'win32':
2723
2724        def test_get_event_loop_new_process(self):
2725            # bpo-32126: The multiprocessing module used by
2726            # ProcessPoolExecutor is not functional when the
2727            # multiprocessing.synchronize module cannot be imported.
2728            support.skip_if_broken_multiprocessing_synchronize()
2729
2730            async def main():
2731                pool = concurrent.futures.ProcessPoolExecutor()
2732                result = await self.loop.run_in_executor(
2733                    pool, _test_get_event_loop_new_process__sub_proc)
2734                pool.shutdown()
2735                return result
2736
2737            self.assertEqual(
2738                self.loop.run_until_complete(main()),
2739                'hello')
2740
2741    def test_get_event_loop_returns_running_loop(self):
2742        class TestError(Exception):
2743            pass
2744
2745        class Policy(asyncio.DefaultEventLoopPolicy):
2746            def get_event_loop(self):
2747                raise TestError
2748
2749        old_policy = asyncio.get_event_loop_policy()
2750        try:
2751            asyncio.set_event_loop_policy(Policy())
2752            loop = asyncio.new_event_loop()
2753
2754            with self.assertRaises(TestError):
2755                asyncio.get_event_loop()
2756            asyncio.set_event_loop(None)
2757            with self.assertRaises(TestError):
2758                asyncio.get_event_loop()
2759
2760            with self.assertRaisesRegex(RuntimeError, 'no running'):
2761                asyncio.get_running_loop()
2762            self.assertIs(asyncio._get_running_loop(), None)
2763
2764            async def func():
2765                self.assertIs(asyncio.get_event_loop(), loop)
2766                self.assertIs(asyncio.get_running_loop(), loop)
2767                self.assertIs(asyncio._get_running_loop(), loop)
2768
2769            loop.run_until_complete(func())
2770
2771            asyncio.set_event_loop(loop)
2772            with self.assertRaises(TestError):
2773                asyncio.get_event_loop()
2774            asyncio.set_event_loop(None)
2775            with self.assertRaises(TestError):
2776                asyncio.get_event_loop()
2777
2778        finally:
2779            asyncio.set_event_loop_policy(old_policy)
2780            if loop is not None:
2781                loop.close()
2782
2783        with self.assertRaisesRegex(RuntimeError, 'no running'):
2784            asyncio.get_running_loop()
2785
2786        self.assertIs(asyncio._get_running_loop(), None)
2787
2788    def test_get_event_loop_returns_running_loop2(self):
2789        old_policy = asyncio.get_event_loop_policy()
2790        try:
2791            asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
2792            loop = asyncio.new_event_loop()
2793            self.addCleanup(loop.close)
2794
2795            loop2 = asyncio.get_event_loop()
2796            self.addCleanup(loop2.close)
2797            asyncio.set_event_loop(None)
2798            with self.assertRaisesRegex(RuntimeError, 'no current'):
2799                asyncio.get_event_loop()
2800
2801            with self.assertRaisesRegex(RuntimeError, 'no running'):
2802                asyncio.get_running_loop()
2803            self.assertIs(asyncio._get_running_loop(), None)
2804
2805            async def func():
2806                self.assertIs(asyncio.get_event_loop(), loop)
2807                self.assertIs(asyncio.get_running_loop(), loop)
2808                self.assertIs(asyncio._get_running_loop(), loop)
2809
2810            loop.run_until_complete(func())
2811
2812            asyncio.set_event_loop(loop)
2813            self.assertIs(asyncio.get_event_loop(), loop)
2814
2815            asyncio.set_event_loop(None)
2816            with self.assertRaisesRegex(RuntimeError, 'no current'):
2817                asyncio.get_event_loop()
2818
2819        finally:
2820            asyncio.set_event_loop_policy(old_policy)
2821            if loop is not None:
2822                loop.close()
2823
2824        with self.assertRaisesRegex(RuntimeError, 'no running'):
2825            asyncio.get_running_loop()
2826
2827        self.assertIs(asyncio._get_running_loop(), None)
2828
2829
2830class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2831
2832    _get_running_loop_impl = events._py__get_running_loop
2833    _set_running_loop_impl = events._py__set_running_loop
2834    get_running_loop_impl = events._py_get_running_loop
2835    get_event_loop_impl = events._py_get_event_loop
2836
2837
2838try:
2839    import _asyncio  # NoQA
2840except ImportError:
2841    pass
2842else:
2843
2844    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
2845
2846        _get_running_loop_impl = events._c__get_running_loop
2847        _set_running_loop_impl = events._c__set_running_loop
2848        get_running_loop_impl = events._c_get_running_loop
2849        get_event_loop_impl = events._c_get_event_loop
2850
2851
2852class TestServer(unittest.TestCase):
2853
2854    def test_get_loop(self):
2855        loop = asyncio.new_event_loop()
2856        self.addCleanup(loop.close)
2857        proto = MyProto(loop)
2858        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
2859        self.assertEqual(server.get_loop(), loop)
2860        server.close()
2861        loop.run_until_complete(server.wait_closed())
2862
2863
2864class TestAbstractServer(unittest.TestCase):
2865
2866    def test_close(self):
2867        with self.assertRaises(NotImplementedError):
2868            events.AbstractServer().close()
2869
2870    def test_wait_closed(self):
2871        loop = asyncio.new_event_loop()
2872        self.addCleanup(loop.close)
2873
2874        with self.assertRaises(NotImplementedError):
2875            loop.run_until_complete(events.AbstractServer().wait_closed())
2876
2877    def test_get_loop(self):
2878        with self.assertRaises(NotImplementedError):
2879            events.AbstractServer().get_loop()
2880
2881
2882if __name__ == '__main__':
2883    unittest.main()
2884