1"""Tests for events.py.""" 2 3import collections.abc 4import concurrent.futures 5import functools 6import io 7import os 8import platform 9import re 10import signal 11import socket 12try: 13 import ssl 14except ImportError: 15 ssl = None 16import subprocess 17import sys 18import threading 19import time 20import types 21import errno 22import unittest 23from unittest import mock 24import weakref 25 26if sys.platform not in ('win32', 'vxworks'): 27 import tty 28 29import asyncio 30from asyncio import coroutines 31from asyncio import events 32from asyncio import proactor_events 33from asyncio import selector_events 34from test.test_asyncio import utils as test_utils 35from test import support 36from test.support import socket_helper 37from test.support import threading_helper 38from test.support import ALWAYS_EQ, LARGEST, SMALLEST 39 40 41def tearDownModule(): 42 asyncio.set_event_loop_policy(None) 43 44 45def broken_unix_getsockname(): 46 """Return True if the platform is Mac OS 10.4 or older.""" 47 if sys.platform.startswith("aix"): 48 return True 49 elif sys.platform != 'darwin': 50 return False 51 version = platform.mac_ver()[0] 52 version = tuple(map(int, version.split('.'))) 53 return version < (10, 5) 54 55 56def _test_get_event_loop_new_process__sub_proc(): 57 async def doit(): 58 return 'hello' 59 60 loop = asyncio.new_event_loop() 61 asyncio.set_event_loop(loop) 62 return loop.run_until_complete(doit()) 63 64 65class CoroLike: 66 def send(self, v): 67 pass 68 69 def throw(self, *exc): 70 pass 71 72 def close(self): 73 pass 74 75 def __await__(self): 76 pass 77 78 79class MyBaseProto(asyncio.Protocol): 80 connected = None 81 done = None 82 83 def __init__(self, loop=None): 84 self.transport = None 85 self.state = 'INITIAL' 86 self.nbytes = 0 87 if loop is not None: 88 self.connected = loop.create_future() 89 self.done = loop.create_future() 90 91 def _assert_state(self, *expected): 92 if self.state not in expected: 93 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 94 95 def connection_made(self, transport): 96 self.transport = transport 97 self._assert_state('INITIAL') 98 self.state = 'CONNECTED' 99 if self.connected: 100 self.connected.set_result(None) 101 102 def data_received(self, data): 103 self._assert_state('CONNECTED') 104 self.nbytes += len(data) 105 106 def eof_received(self): 107 self._assert_state('CONNECTED') 108 self.state = 'EOF' 109 110 def connection_lost(self, exc): 111 self._assert_state('CONNECTED', 'EOF') 112 self.state = 'CLOSED' 113 if self.done: 114 self.done.set_result(None) 115 116 117class MyProto(MyBaseProto): 118 def connection_made(self, transport): 119 super().connection_made(transport) 120 transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 121 122 123class MyDatagramProto(asyncio.DatagramProtocol): 124 done = None 125 126 def __init__(self, loop=None): 127 self.state = 'INITIAL' 128 self.nbytes = 0 129 if loop is not None: 130 self.done = loop.create_future() 131 132 def _assert_state(self, expected): 133 if self.state != expected: 134 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 135 136 def connection_made(self, transport): 137 self.transport = transport 138 self._assert_state('INITIAL') 139 self.state = 'INITIALIZED' 140 141 def datagram_received(self, data, addr): 142 self._assert_state('INITIALIZED') 143 self.nbytes += len(data) 144 145 def error_received(self, exc): 146 self._assert_state('INITIALIZED') 147 148 def connection_lost(self, exc): 149 self._assert_state('INITIALIZED') 150 self.state = 'CLOSED' 151 if self.done: 152 self.done.set_result(None) 153 154 155class MyReadPipeProto(asyncio.Protocol): 156 done = None 157 158 def __init__(self, loop=None): 159 self.state = ['INITIAL'] 160 self.nbytes = 0 161 self.transport = None 162 if loop is not None: 163 self.done = loop.create_future() 164 165 def _assert_state(self, expected): 166 if self.state != expected: 167 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 168 169 def connection_made(self, transport): 170 self.transport = transport 171 self._assert_state(['INITIAL']) 172 self.state.append('CONNECTED') 173 174 def data_received(self, data): 175 self._assert_state(['INITIAL', 'CONNECTED']) 176 self.nbytes += len(data) 177 178 def eof_received(self): 179 self._assert_state(['INITIAL', 'CONNECTED']) 180 self.state.append('EOF') 181 182 def connection_lost(self, exc): 183 if 'EOF' not in self.state: 184 self.state.append('EOF') # It is okay if EOF is missed. 185 self._assert_state(['INITIAL', 'CONNECTED', 'EOF']) 186 self.state.append('CLOSED') 187 if self.done: 188 self.done.set_result(None) 189 190 191class MyWritePipeProto(asyncio.BaseProtocol): 192 done = None 193 194 def __init__(self, loop=None): 195 self.state = 'INITIAL' 196 self.transport = None 197 if loop is not None: 198 self.done = loop.create_future() 199 200 def _assert_state(self, expected): 201 if self.state != expected: 202 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 203 204 def connection_made(self, transport): 205 self.transport = transport 206 self._assert_state('INITIAL') 207 self.state = 'CONNECTED' 208 209 def connection_lost(self, exc): 210 self._assert_state('CONNECTED') 211 self.state = 'CLOSED' 212 if self.done: 213 self.done.set_result(None) 214 215 216class MySubprocessProtocol(asyncio.SubprocessProtocol): 217 218 def __init__(self, loop): 219 self.state = 'INITIAL' 220 self.transport = None 221 self.connected = loop.create_future() 222 self.completed = loop.create_future() 223 self.disconnects = {fd: loop.create_future() for fd in range(3)} 224 self.data = {1: b'', 2: b''} 225 self.returncode = None 226 self.got_data = {1: asyncio.Event(), 227 2: asyncio.Event()} 228 229 def _assert_state(self, expected): 230 if self.state != expected: 231 raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 232 233 def connection_made(self, transport): 234 self.transport = transport 235 self._assert_state('INITIAL') 236 self.state = 'CONNECTED' 237 self.connected.set_result(None) 238 239 def connection_lost(self, exc): 240 self._assert_state('CONNECTED') 241 self.state = 'CLOSED' 242 self.completed.set_result(None) 243 244 def pipe_data_received(self, fd, data): 245 self._assert_state('CONNECTED') 246 self.data[fd] += data 247 self.got_data[fd].set() 248 249 def pipe_connection_lost(self, fd, exc): 250 self._assert_state('CONNECTED') 251 if exc: 252 self.disconnects[fd].set_exception(exc) 253 else: 254 self.disconnects[fd].set_result(exc) 255 256 def process_exited(self): 257 self._assert_state('CONNECTED') 258 self.returncode = self.transport.get_returncode() 259 260 261class EventLoopTestsMixin: 262 263 def setUp(self): 264 super().setUp() 265 self.loop = self.create_event_loop() 266 self.set_event_loop(self.loop) 267 268 def tearDown(self): 269 # just in case if we have transport close callbacks 270 if not self.loop.is_closed(): 271 test_utils.run_briefly(self.loop) 272 273 self.doCleanups() 274 support.gc_collect() 275 super().tearDown() 276 277 def test_run_until_complete_nesting(self): 278 async def coro1(): 279 await asyncio.sleep(0) 280 281 async def coro2(): 282 self.assertTrue(self.loop.is_running()) 283 self.loop.run_until_complete(coro1()) 284 285 with self.assertWarnsRegex( 286 RuntimeWarning, 287 r"coroutine \S+ was never awaited" 288 ): 289 self.assertRaises( 290 RuntimeError, self.loop.run_until_complete, coro2()) 291 292 # Note: because of the default Windows timing granularity of 293 # 15.6 msec, we use fairly long sleep times here (~100 msec). 294 295 def test_run_until_complete(self): 296 t0 = self.loop.time() 297 self.loop.run_until_complete(asyncio.sleep(0.1)) 298 t1 = self.loop.time() 299 self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 300 301 def test_run_until_complete_stopped(self): 302 303 async def cb(): 304 self.loop.stop() 305 await asyncio.sleep(0.1) 306 task = cb() 307 self.assertRaises(RuntimeError, 308 self.loop.run_until_complete, task) 309 310 def test_call_later(self): 311 results = [] 312 313 def callback(arg): 314 results.append(arg) 315 self.loop.stop() 316 317 self.loop.call_later(0.1, callback, 'hello world') 318 self.loop.run_forever() 319 self.assertEqual(results, ['hello world']) 320 321 def test_call_soon(self): 322 results = [] 323 324 def callback(arg1, arg2): 325 results.append((arg1, arg2)) 326 self.loop.stop() 327 328 self.loop.call_soon(callback, 'hello', 'world') 329 self.loop.run_forever() 330 self.assertEqual(results, [('hello', 'world')]) 331 332 def test_call_soon_threadsafe(self): 333 results = [] 334 lock = threading.Lock() 335 336 def callback(arg): 337 results.append(arg) 338 if len(results) >= 2: 339 self.loop.stop() 340 341 def run_in_thread(): 342 self.loop.call_soon_threadsafe(callback, 'hello') 343 lock.release() 344 345 lock.acquire() 346 t = threading.Thread(target=run_in_thread) 347 t.start() 348 349 with lock: 350 self.loop.call_soon(callback, 'world') 351 self.loop.run_forever() 352 t.join() 353 self.assertEqual(results, ['hello', 'world']) 354 355 def test_call_soon_threadsafe_same_thread(self): 356 results = [] 357 358 def callback(arg): 359 results.append(arg) 360 if len(results) >= 2: 361 self.loop.stop() 362 363 self.loop.call_soon_threadsafe(callback, 'hello') 364 self.loop.call_soon(callback, 'world') 365 self.loop.run_forever() 366 self.assertEqual(results, ['hello', 'world']) 367 368 def test_run_in_executor(self): 369 def run(arg): 370 return (arg, threading.get_ident()) 371 f2 = self.loop.run_in_executor(None, run, 'yo') 372 res, thread_id = self.loop.run_until_complete(f2) 373 self.assertEqual(res, 'yo') 374 self.assertNotEqual(thread_id, threading.get_ident()) 375 376 def test_run_in_executor_cancel(self): 377 called = False 378 379 def patched_call_soon(*args): 380 nonlocal called 381 called = True 382 383 def run(): 384 time.sleep(0.05) 385 386 f2 = self.loop.run_in_executor(None, run) 387 f2.cancel() 388 self.loop.run_until_complete( 389 self.loop.shutdown_default_executor()) 390 self.loop.close() 391 self.loop.call_soon = patched_call_soon 392 self.loop.call_soon_threadsafe = patched_call_soon 393 time.sleep(0.4) 394 self.assertFalse(called) 395 396 def test_reader_callback(self): 397 r, w = socket.socketpair() 398 r.setblocking(False) 399 bytes_read = bytearray() 400 401 def reader(): 402 try: 403 data = r.recv(1024) 404 except BlockingIOError: 405 # Spurious readiness notifications are possible 406 # at least on Linux -- see man select. 407 return 408 if data: 409 bytes_read.extend(data) 410 else: 411 self.assertTrue(self.loop.remove_reader(r.fileno())) 412 r.close() 413 414 self.loop.add_reader(r.fileno(), reader) 415 self.loop.call_soon(w.send, b'abc') 416 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 417 self.loop.call_soon(w.send, b'def') 418 test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 419 self.loop.call_soon(w.close) 420 self.loop.call_soon(self.loop.stop) 421 self.loop.run_forever() 422 self.assertEqual(bytes_read, b'abcdef') 423 424 def test_writer_callback(self): 425 r, w = socket.socketpair() 426 w.setblocking(False) 427 428 def writer(data): 429 w.send(data) 430 self.loop.stop() 431 432 data = b'x' * 1024 433 self.loop.add_writer(w.fileno(), writer, data) 434 self.loop.run_forever() 435 436 self.assertTrue(self.loop.remove_writer(w.fileno())) 437 self.assertFalse(self.loop.remove_writer(w.fileno())) 438 439 w.close() 440 read = r.recv(len(data) * 2) 441 r.close() 442 self.assertEqual(read, data) 443 444 @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 445 def test_add_signal_handler(self): 446 caught = 0 447 448 def my_handler(): 449 nonlocal caught 450 caught += 1 451 452 # Check error behavior first. 453 self.assertRaises( 454 TypeError, self.loop.add_signal_handler, 'boom', my_handler) 455 self.assertRaises( 456 TypeError, self.loop.remove_signal_handler, 'boom') 457 self.assertRaises( 458 ValueError, self.loop.add_signal_handler, signal.NSIG+1, 459 my_handler) 460 self.assertRaises( 461 ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 462 self.assertRaises( 463 ValueError, self.loop.add_signal_handler, 0, my_handler) 464 self.assertRaises( 465 ValueError, self.loop.remove_signal_handler, 0) 466 self.assertRaises( 467 ValueError, self.loop.add_signal_handler, -1, my_handler) 468 self.assertRaises( 469 ValueError, self.loop.remove_signal_handler, -1) 470 self.assertRaises( 471 RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 472 my_handler) 473 # Removing SIGKILL doesn't raise, since we don't call signal(). 474 self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 475 # Now set a handler and handle it. 476 self.loop.add_signal_handler(signal.SIGINT, my_handler) 477 478 os.kill(os.getpid(), signal.SIGINT) 479 test_utils.run_until(self.loop, lambda: caught) 480 481 # Removing it should restore the default handler. 482 self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 483 self.assertEqual(signal.getsignal(signal.SIGINT), 484 signal.default_int_handler) 485 # Removing again returns False. 486 self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 487 488 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 489 @unittest.skipUnless(hasattr(signal, 'setitimer'), 490 'need signal.setitimer()') 491 def test_signal_handling_while_selecting(self): 492 # Test with a signal actually arriving during a select() call. 493 caught = 0 494 495 def my_handler(): 496 nonlocal caught 497 caught += 1 498 self.loop.stop() 499 500 self.loop.add_signal_handler(signal.SIGALRM, my_handler) 501 502 signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 503 self.loop.call_later(60, self.loop.stop) 504 self.loop.run_forever() 505 self.assertEqual(caught, 1) 506 507 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 508 @unittest.skipUnless(hasattr(signal, 'setitimer'), 509 'need signal.setitimer()') 510 def test_signal_handling_args(self): 511 some_args = (42,) 512 caught = 0 513 514 def my_handler(*args): 515 nonlocal caught 516 caught += 1 517 self.assertEqual(args, some_args) 518 self.loop.stop() 519 520 self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 521 522 signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 523 self.loop.call_later(60, self.loop.stop) 524 self.loop.run_forever() 525 self.assertEqual(caught, 1) 526 527 def _basetest_create_connection(self, connection_fut, check_sockname=True): 528 tr, pr = self.loop.run_until_complete(connection_fut) 529 self.assertIsInstance(tr, asyncio.Transport) 530 self.assertIsInstance(pr, asyncio.Protocol) 531 self.assertIs(pr.transport, tr) 532 if check_sockname: 533 self.assertIsNotNone(tr.get_extra_info('sockname')) 534 self.loop.run_until_complete(pr.done) 535 self.assertGreater(pr.nbytes, 0) 536 tr.close() 537 538 def test_create_connection(self): 539 with test_utils.run_test_server() as httpd: 540 conn_fut = self.loop.create_connection( 541 lambda: MyProto(loop=self.loop), *httpd.address) 542 self._basetest_create_connection(conn_fut) 543 544 @socket_helper.skip_unless_bind_unix_socket 545 def test_create_unix_connection(self): 546 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 547 # zero-length address for UNIX socket. 548 check_sockname = not broken_unix_getsockname() 549 550 with test_utils.run_test_unix_server() as httpd: 551 conn_fut = self.loop.create_unix_connection( 552 lambda: MyProto(loop=self.loop), httpd.address) 553 self._basetest_create_connection(conn_fut, check_sockname) 554 555 def check_ssl_extra_info(self, client, check_sockname=True, 556 peername=None, peercert={}): 557 if check_sockname: 558 self.assertIsNotNone(client.get_extra_info('sockname')) 559 if peername: 560 self.assertEqual(peername, 561 client.get_extra_info('peername')) 562 else: 563 self.assertIsNotNone(client.get_extra_info('peername')) 564 self.assertEqual(peercert, 565 client.get_extra_info('peercert')) 566 567 # test SSL cipher 568 cipher = client.get_extra_info('cipher') 569 self.assertIsInstance(cipher, tuple) 570 self.assertEqual(len(cipher), 3, cipher) 571 self.assertIsInstance(cipher[0], str) 572 self.assertIsInstance(cipher[1], str) 573 self.assertIsInstance(cipher[2], int) 574 575 # test SSL object 576 sslobj = client.get_extra_info('ssl_object') 577 self.assertIsNotNone(sslobj) 578 self.assertEqual(sslobj.compression(), 579 client.get_extra_info('compression')) 580 self.assertEqual(sslobj.cipher(), 581 client.get_extra_info('cipher')) 582 self.assertEqual(sslobj.getpeercert(), 583 client.get_extra_info('peercert')) 584 self.assertEqual(sslobj.compression(), 585 client.get_extra_info('compression')) 586 587 def _basetest_create_ssl_connection(self, connection_fut, 588 check_sockname=True, 589 peername=None): 590 tr, pr = self.loop.run_until_complete(connection_fut) 591 self.assertIsInstance(tr, asyncio.Transport) 592 self.assertIsInstance(pr, asyncio.Protocol) 593 self.assertTrue('ssl' in tr.__class__.__name__.lower()) 594 self.check_ssl_extra_info(tr, check_sockname, peername) 595 self.loop.run_until_complete(pr.done) 596 self.assertGreater(pr.nbytes, 0) 597 tr.close() 598 599 def _test_create_ssl_connection(self, httpd, create_connection, 600 check_sockname=True, peername=None): 601 conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 602 self._basetest_create_ssl_connection(conn_fut, check_sockname, 603 peername) 604 605 # ssl.Purpose was introduced in Python 3.4 606 if hasattr(ssl, 'Purpose'): 607 def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 608 cafile=None, capath=None, 609 cadata=None): 610 """ 611 A ssl.create_default_context() replacement that doesn't enable 612 cert validation. 613 """ 614 self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 615 return test_utils.dummy_ssl_context() 616 617 # With ssl=True, ssl.create_default_context() should be called 618 with mock.patch('ssl.create_default_context', 619 side_effect=_dummy_ssl_create_context) as m: 620 conn_fut = create_connection(ssl=True) 621 self._basetest_create_ssl_connection(conn_fut, check_sockname, 622 peername) 623 self.assertEqual(m.call_count, 1) 624 625 # With the real ssl.create_default_context(), certificate 626 # validation will fail 627 with self.assertRaises(ssl.SSLError) as cm: 628 conn_fut = create_connection(ssl=True) 629 # Ignore the "SSL handshake failed" log in debug mode 630 with test_utils.disable_logger(): 631 self._basetest_create_ssl_connection(conn_fut, check_sockname, 632 peername) 633 634 self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 635 636 @unittest.skipIf(ssl is None, 'No ssl module') 637 def test_create_ssl_connection(self): 638 with test_utils.run_test_server(use_ssl=True) as httpd: 639 create_connection = functools.partial( 640 self.loop.create_connection, 641 lambda: MyProto(loop=self.loop), 642 *httpd.address) 643 self._test_create_ssl_connection(httpd, create_connection, 644 peername=httpd.address) 645 646 @socket_helper.skip_unless_bind_unix_socket 647 @unittest.skipIf(ssl is None, 'No ssl module') 648 def test_create_ssl_unix_connection(self): 649 # Issue #20682: On Mac OS X Tiger, getsockname() returns a 650 # zero-length address for UNIX socket. 651 check_sockname = not broken_unix_getsockname() 652 653 with test_utils.run_test_unix_server(use_ssl=True) as httpd: 654 create_connection = functools.partial( 655 self.loop.create_unix_connection, 656 lambda: MyProto(loop=self.loop), httpd.address, 657 server_hostname='127.0.0.1') 658 659 self._test_create_ssl_connection(httpd, create_connection, 660 check_sockname, 661 peername=httpd.address) 662 663 def test_create_connection_local_addr(self): 664 with test_utils.run_test_server() as httpd: 665 port = socket_helper.find_unused_port() 666 f = self.loop.create_connection( 667 lambda: MyProto(loop=self.loop), 668 *httpd.address, local_addr=(httpd.address[0], port)) 669 tr, pr = self.loop.run_until_complete(f) 670 expected = pr.transport.get_extra_info('sockname')[1] 671 self.assertEqual(port, expected) 672 tr.close() 673 674 def test_create_connection_local_addr_skip_different_family(self): 675 # See https://github.com/python/cpython/issues/86508 676 port1 = socket_helper.find_unused_port() 677 port2 = socket_helper.find_unused_port() 678 getaddrinfo_orig = self.loop.getaddrinfo 679 680 async def getaddrinfo(host, port, *args, **kwargs): 681 if port == port2: 682 return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)), 683 (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))] 684 return await getaddrinfo_orig(host, port, *args, **kwargs) 685 686 self.loop.getaddrinfo = getaddrinfo 687 688 f = self.loop.create_connection( 689 lambda: MyProto(loop=self.loop), 690 'localhost', port1, local_addr=('localhost', port2)) 691 692 with self.assertRaises(OSError): 693 self.loop.run_until_complete(f) 694 695 def test_create_connection_local_addr_nomatch_family(self): 696 # See https://github.com/python/cpython/issues/86508 697 port1 = socket_helper.find_unused_port() 698 port2 = socket_helper.find_unused_port() 699 getaddrinfo_orig = self.loop.getaddrinfo 700 701 async def getaddrinfo(host, port, *args, **kwargs): 702 if port == port2: 703 return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))] 704 return await getaddrinfo_orig(host, port, *args, **kwargs) 705 706 self.loop.getaddrinfo = getaddrinfo 707 708 f = self.loop.create_connection( 709 lambda: MyProto(loop=self.loop), 710 'localhost', port1, local_addr=('localhost', port2)) 711 712 with self.assertRaises(OSError): 713 self.loop.run_until_complete(f) 714 715 def test_create_connection_local_addr_in_use(self): 716 with test_utils.run_test_server() as httpd: 717 f = self.loop.create_connection( 718 lambda: MyProto(loop=self.loop), 719 *httpd.address, local_addr=httpd.address) 720 with self.assertRaises(OSError) as cm: 721 self.loop.run_until_complete(f) 722 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 723 self.assertIn(str(httpd.address), cm.exception.strerror) 724 725 def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 726 loop = self.loop 727 728 class MyProto(MyBaseProto): 729 730 def connection_lost(self, exc): 731 super().connection_lost(exc) 732 loop.call_soon(loop.stop) 733 734 def data_received(self, data): 735 super().data_received(data) 736 self.transport.write(expected_response) 737 738 lsock = socket.create_server(('127.0.0.1', 0), backlog=1) 739 addr = lsock.getsockname() 740 741 message = b'test data' 742 response = None 743 expected_response = b'roger' 744 745 def client(): 746 nonlocal response 747 try: 748 csock = socket.socket() 749 if client_ssl is not None: 750 csock = client_ssl.wrap_socket(csock) 751 csock.connect(addr) 752 csock.sendall(message) 753 response = csock.recv(99) 754 csock.close() 755 except Exception as exc: 756 print( 757 "Failure in client thread in test_connect_accepted_socket", 758 exc) 759 760 thread = threading.Thread(target=client, daemon=True) 761 thread.start() 762 763 conn, _ = lsock.accept() 764 proto = MyProto(loop=loop) 765 proto.loop = loop 766 loop.run_until_complete( 767 loop.connect_accepted_socket( 768 (lambda: proto), conn, ssl=server_ssl)) 769 loop.run_forever() 770 proto.transport.close() 771 lsock.close() 772 773 threading_helper.join_thread(thread) 774 self.assertFalse(thread.is_alive()) 775 self.assertEqual(proto.state, 'CLOSED') 776 self.assertEqual(proto.nbytes, len(message)) 777 self.assertEqual(response, expected_response) 778 779 @unittest.skipIf(ssl is None, 'No ssl module') 780 def test_ssl_connect_accepted_socket(self): 781 server_context = test_utils.simple_server_sslcontext() 782 client_context = test_utils.simple_client_sslcontext() 783 784 self.test_connect_accepted_socket(server_context, client_context) 785 786 def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 787 sock = socket.socket() 788 self.addCleanup(sock.close) 789 coro = self.loop.connect_accepted_socket( 790 MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT) 791 with self.assertRaisesRegex( 792 ValueError, 793 'ssl_handshake_timeout is only meaningful with ssl'): 794 self.loop.run_until_complete(coro) 795 796 @mock.patch('asyncio.base_events.socket') 797 def create_server_multiple_hosts(self, family, hosts, mock_sock): 798 async def getaddrinfo(host, port, *args, **kw): 799 if family == socket.AF_INET: 800 return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 801 else: 802 return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 803 804 def getaddrinfo_task(*args, **kwds): 805 return self.loop.create_task(getaddrinfo(*args, **kwds)) 806 807 unique_hosts = set(hosts) 808 809 if family == socket.AF_INET: 810 mock_sock.socket().getsockbyname.side_effect = [ 811 (host, 80) for host in unique_hosts] 812 else: 813 mock_sock.socket().getsockbyname.side_effect = [ 814 (host, 80, 0, 0) for host in unique_hosts] 815 self.loop.getaddrinfo = getaddrinfo_task 816 self.loop._start_serving = mock.Mock() 817 self.loop._stop_serving = mock.Mock() 818 f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 819 server = self.loop.run_until_complete(f) 820 self.addCleanup(server.close) 821 server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 822 self.assertEqual(server_hosts, unique_hosts) 823 824 def test_create_server_multiple_hosts_ipv4(self): 825 self.create_server_multiple_hosts(socket.AF_INET, 826 ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 827 828 def test_create_server_multiple_hosts_ipv6(self): 829 self.create_server_multiple_hosts(socket.AF_INET6, 830 ['::1', '::2', '::1']) 831 832 def test_create_server(self): 833 proto = MyProto(self.loop) 834 f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 835 server = self.loop.run_until_complete(f) 836 self.assertEqual(len(server.sockets), 1) 837 sock = server.sockets[0] 838 host, port = sock.getsockname() 839 self.assertEqual(host, '0.0.0.0') 840 client = socket.socket() 841 client.connect(('127.0.0.1', port)) 842 client.sendall(b'xxx') 843 844 self.loop.run_until_complete(proto.connected) 845 self.assertEqual('CONNECTED', proto.state) 846 847 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 848 self.assertEqual(3, proto.nbytes) 849 850 # extra info is available 851 self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 852 self.assertEqual('127.0.0.1', 853 proto.transport.get_extra_info('peername')[0]) 854 855 # close connection 856 proto.transport.close() 857 self.loop.run_until_complete(proto.done) 858 859 self.assertEqual('CLOSED', proto.state) 860 861 # the client socket must be closed after to avoid ECONNRESET upon 862 # recv()/send() on the serving socket 863 client.close() 864 865 # close server 866 server.close() 867 868 @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 869 def test_create_server_reuse_port(self): 870 proto = MyProto(self.loop) 871 f = self.loop.create_server( 872 lambda: proto, '0.0.0.0', 0) 873 server = self.loop.run_until_complete(f) 874 self.assertEqual(len(server.sockets), 1) 875 sock = server.sockets[0] 876 self.assertFalse( 877 sock.getsockopt( 878 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 879 server.close() 880 881 test_utils.run_briefly(self.loop) 882 883 proto = MyProto(self.loop) 884 f = self.loop.create_server( 885 lambda: proto, '0.0.0.0', 0, reuse_port=True) 886 server = self.loop.run_until_complete(f) 887 self.assertEqual(len(server.sockets), 1) 888 sock = server.sockets[0] 889 self.assertTrue( 890 sock.getsockopt( 891 socket.SOL_SOCKET, socket.SO_REUSEPORT)) 892 server.close() 893 894 def _make_unix_server(self, factory, **kwargs): 895 path = test_utils.gen_unix_socket_path() 896 self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 897 898 f = self.loop.create_unix_server(factory, path, **kwargs) 899 server = self.loop.run_until_complete(f) 900 901 return server, path 902 903 @socket_helper.skip_unless_bind_unix_socket 904 def test_create_unix_server(self): 905 proto = MyProto(loop=self.loop) 906 server, path = self._make_unix_server(lambda: proto) 907 self.assertEqual(len(server.sockets), 1) 908 909 client = socket.socket(socket.AF_UNIX) 910 client.connect(path) 911 client.sendall(b'xxx') 912 913 self.loop.run_until_complete(proto.connected) 914 self.assertEqual('CONNECTED', proto.state) 915 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 916 self.assertEqual(3, proto.nbytes) 917 918 # close connection 919 proto.transport.close() 920 self.loop.run_until_complete(proto.done) 921 922 self.assertEqual('CLOSED', proto.state) 923 924 # the client socket must be closed after to avoid ECONNRESET upon 925 # recv()/send() on the serving socket 926 client.close() 927 928 # close server 929 server.close() 930 931 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 932 def test_create_unix_server_path_socket_error(self): 933 proto = MyProto(loop=self.loop) 934 sock = socket.socket() 935 with sock: 936 f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 937 with self.assertRaisesRegex(ValueError, 938 'path and sock can not be specified ' 939 'at the same time'): 940 self.loop.run_until_complete(f) 941 942 def _create_ssl_context(self, certfile, keyfile=None): 943 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 944 sslcontext.options |= ssl.OP_NO_SSLv2 945 sslcontext.load_cert_chain(certfile, keyfile) 946 return sslcontext 947 948 def _make_ssl_server(self, factory, certfile, keyfile=None): 949 sslcontext = self._create_ssl_context(certfile, keyfile) 950 951 f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 952 server = self.loop.run_until_complete(f) 953 954 sock = server.sockets[0] 955 host, port = sock.getsockname() 956 self.assertEqual(host, '127.0.0.1') 957 return server, host, port 958 959 def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 960 sslcontext = self._create_ssl_context(certfile, keyfile) 961 return self._make_unix_server(factory, ssl=sslcontext) 962 963 @unittest.skipIf(ssl is None, 'No ssl module') 964 def test_create_server_ssl(self): 965 proto = MyProto(loop=self.loop) 966 server, host, port = self._make_ssl_server( 967 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 968 969 f_c = self.loop.create_connection(MyBaseProto, host, port, 970 ssl=test_utils.dummy_ssl_context()) 971 client, pr = self.loop.run_until_complete(f_c) 972 973 client.write(b'xxx') 974 self.loop.run_until_complete(proto.connected) 975 self.assertEqual('CONNECTED', proto.state) 976 977 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 978 self.assertEqual(3, proto.nbytes) 979 980 # extra info is available 981 self.check_ssl_extra_info(client, peername=(host, port)) 982 983 # close connection 984 proto.transport.close() 985 self.loop.run_until_complete(proto.done) 986 self.assertEqual('CLOSED', proto.state) 987 988 # the client socket must be closed after to avoid ECONNRESET upon 989 # recv()/send() on the serving socket 990 client.close() 991 992 # stop serving 993 server.close() 994 995 @socket_helper.skip_unless_bind_unix_socket 996 @unittest.skipIf(ssl is None, 'No ssl module') 997 def test_create_unix_server_ssl(self): 998 proto = MyProto(loop=self.loop) 999 server, path = self._make_ssl_unix_server( 1000 lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 1001 1002 f_c = self.loop.create_unix_connection( 1003 MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 1004 server_hostname='') 1005 1006 client, pr = self.loop.run_until_complete(f_c) 1007 1008 client.write(b'xxx') 1009 self.loop.run_until_complete(proto.connected) 1010 self.assertEqual('CONNECTED', proto.state) 1011 test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 1012 self.assertEqual(3, proto.nbytes) 1013 1014 # close connection 1015 proto.transport.close() 1016 self.loop.run_until_complete(proto.done) 1017 self.assertEqual('CLOSED', proto.state) 1018 1019 # the client socket must be closed after to avoid ECONNRESET upon 1020 # recv()/send() on the serving socket 1021 client.close() 1022 1023 # stop serving 1024 server.close() 1025 1026 @unittest.skipIf(ssl is None, 'No ssl module') 1027 def test_create_server_ssl_verify_failed(self): 1028 proto = MyProto(loop=self.loop) 1029 server, host, port = self._make_ssl_server( 1030 lambda: proto, test_utils.SIGNED_CERTFILE) 1031 1032 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1033 sslcontext_client.options |= ssl.OP_NO_SSLv2 1034 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1035 if hasattr(sslcontext_client, 'check_hostname'): 1036 sslcontext_client.check_hostname = True 1037 1038 1039 # no CA loaded 1040 f_c = self.loop.create_connection(MyProto, host, port, 1041 ssl=sslcontext_client) 1042 with mock.patch.object(self.loop, 'call_exception_handler'): 1043 with test_utils.disable_logger(): 1044 with self.assertRaisesRegex(ssl.SSLError, 1045 '(?i)certificate.verify.failed'): 1046 self.loop.run_until_complete(f_c) 1047 1048 # execute the loop to log the connection error 1049 test_utils.run_briefly(self.loop) 1050 1051 # close connection 1052 self.assertIsNone(proto.transport) 1053 server.close() 1054 1055 @socket_helper.skip_unless_bind_unix_socket 1056 @unittest.skipIf(ssl is None, 'No ssl module') 1057 def test_create_unix_server_ssl_verify_failed(self): 1058 proto = MyProto(loop=self.loop) 1059 server, path = self._make_ssl_unix_server( 1060 lambda: proto, test_utils.SIGNED_CERTFILE) 1061 1062 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1063 sslcontext_client.options |= ssl.OP_NO_SSLv2 1064 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1065 if hasattr(sslcontext_client, 'check_hostname'): 1066 sslcontext_client.check_hostname = True 1067 1068 # no CA loaded 1069 f_c = self.loop.create_unix_connection(MyProto, path, 1070 ssl=sslcontext_client, 1071 server_hostname='invalid') 1072 with mock.patch.object(self.loop, 'call_exception_handler'): 1073 with test_utils.disable_logger(): 1074 with self.assertRaisesRegex(ssl.SSLError, 1075 '(?i)certificate.verify.failed'): 1076 self.loop.run_until_complete(f_c) 1077 1078 # execute the loop to log the connection error 1079 test_utils.run_briefly(self.loop) 1080 1081 # close connection 1082 self.assertIsNone(proto.transport) 1083 server.close() 1084 1085 @unittest.skipIf(ssl is None, 'No ssl module') 1086 def test_create_server_ssl_match_failed(self): 1087 proto = MyProto(loop=self.loop) 1088 server, host, port = self._make_ssl_server( 1089 lambda: proto, test_utils.SIGNED_CERTFILE) 1090 1091 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1092 sslcontext_client.options |= ssl.OP_NO_SSLv2 1093 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1094 sslcontext_client.load_verify_locations( 1095 cafile=test_utils.SIGNING_CA) 1096 if hasattr(sslcontext_client, 'check_hostname'): 1097 sslcontext_client.check_hostname = True 1098 1099 # incorrect server_hostname 1100 f_c = self.loop.create_connection(MyProto, host, port, 1101 ssl=sslcontext_client) 1102 with mock.patch.object(self.loop, 'call_exception_handler'): 1103 with test_utils.disable_logger(): 1104 with self.assertRaisesRegex( 1105 ssl.CertificateError, 1106 "IP address mismatch, certificate is not valid for " 1107 "'127.0.0.1'"): 1108 self.loop.run_until_complete(f_c) 1109 1110 # close connection 1111 # transport is None because TLS ALERT aborted the handshake 1112 self.assertIsNone(proto.transport) 1113 server.close() 1114 1115 @socket_helper.skip_unless_bind_unix_socket 1116 @unittest.skipIf(ssl is None, 'No ssl module') 1117 def test_create_unix_server_ssl_verified(self): 1118 proto = MyProto(loop=self.loop) 1119 server, path = self._make_ssl_unix_server( 1120 lambda: proto, test_utils.SIGNED_CERTFILE) 1121 1122 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1123 sslcontext_client.options |= ssl.OP_NO_SSLv2 1124 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1125 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1126 if hasattr(sslcontext_client, 'check_hostname'): 1127 sslcontext_client.check_hostname = True 1128 1129 # Connection succeeds with correct CA and server hostname. 1130 f_c = self.loop.create_unix_connection(MyProto, path, 1131 ssl=sslcontext_client, 1132 server_hostname='localhost') 1133 client, pr = self.loop.run_until_complete(f_c) 1134 self.loop.run_until_complete(proto.connected) 1135 1136 # close connection 1137 proto.transport.close() 1138 client.close() 1139 server.close() 1140 self.loop.run_until_complete(proto.done) 1141 1142 @unittest.skipIf(ssl is None, 'No ssl module') 1143 def test_create_server_ssl_verified(self): 1144 proto = MyProto(loop=self.loop) 1145 server, host, port = self._make_ssl_server( 1146 lambda: proto, test_utils.SIGNED_CERTFILE) 1147 1148 sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1149 sslcontext_client.options |= ssl.OP_NO_SSLv2 1150 sslcontext_client.verify_mode = ssl.CERT_REQUIRED 1151 sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 1152 if hasattr(sslcontext_client, 'check_hostname'): 1153 sslcontext_client.check_hostname = True 1154 1155 # Connection succeeds with correct CA and server hostname. 1156 f_c = self.loop.create_connection(MyProto, host, port, 1157 ssl=sslcontext_client, 1158 server_hostname='localhost') 1159 client, pr = self.loop.run_until_complete(f_c) 1160 self.loop.run_until_complete(proto.connected) 1161 1162 # extra info is available 1163 self.check_ssl_extra_info(client, peername=(host, port), 1164 peercert=test_utils.PEERCERT) 1165 1166 # close connection 1167 proto.transport.close() 1168 client.close() 1169 server.close() 1170 self.loop.run_until_complete(proto.done) 1171 1172 def test_create_server_sock(self): 1173 proto = self.loop.create_future() 1174 1175 class TestMyProto(MyProto): 1176 def connection_made(self, transport): 1177 super().connection_made(transport) 1178 proto.set_result(self) 1179 1180 sock_ob = socket.create_server(('0.0.0.0', 0)) 1181 1182 f = self.loop.create_server(TestMyProto, sock=sock_ob) 1183 server = self.loop.run_until_complete(f) 1184 sock = server.sockets[0] 1185 self.assertEqual(sock.fileno(), sock_ob.fileno()) 1186 1187 host, port = sock.getsockname() 1188 self.assertEqual(host, '0.0.0.0') 1189 client = socket.socket() 1190 client.connect(('127.0.0.1', port)) 1191 client.send(b'xxx') 1192 client.close() 1193 server.close() 1194 1195 def test_create_server_addr_in_use(self): 1196 sock_ob = socket.create_server(('0.0.0.0', 0)) 1197 1198 f = self.loop.create_server(MyProto, sock=sock_ob) 1199 server = self.loop.run_until_complete(f) 1200 sock = server.sockets[0] 1201 host, port = sock.getsockname() 1202 1203 f = self.loop.create_server(MyProto, host=host, port=port) 1204 with self.assertRaises(OSError) as cm: 1205 self.loop.run_until_complete(f) 1206 self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 1207 1208 server.close() 1209 1210 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1211 def test_create_server_dual_stack(self): 1212 f_proto = self.loop.create_future() 1213 1214 class TestMyProto(MyProto): 1215 def connection_made(self, transport): 1216 super().connection_made(transport) 1217 f_proto.set_result(self) 1218 1219 try_count = 0 1220 while True: 1221 try: 1222 port = socket_helper.find_unused_port() 1223 f = self.loop.create_server(TestMyProto, host=None, port=port) 1224 server = self.loop.run_until_complete(f) 1225 except OSError as ex: 1226 if ex.errno == errno.EADDRINUSE: 1227 try_count += 1 1228 self.assertGreaterEqual(5, try_count) 1229 continue 1230 else: 1231 raise 1232 else: 1233 break 1234 client = socket.socket() 1235 client.connect(('127.0.0.1', port)) 1236 client.send(b'xxx') 1237 proto = self.loop.run_until_complete(f_proto) 1238 proto.transport.close() 1239 client.close() 1240 1241 f_proto = self.loop.create_future() 1242 client = socket.socket(socket.AF_INET6) 1243 client.connect(('::1', port)) 1244 client.send(b'xxx') 1245 proto = self.loop.run_until_complete(f_proto) 1246 proto.transport.close() 1247 client.close() 1248 1249 server.close() 1250 1251 def test_server_close(self): 1252 f = self.loop.create_server(MyProto, '0.0.0.0', 0) 1253 server = self.loop.run_until_complete(f) 1254 sock = server.sockets[0] 1255 host, port = sock.getsockname() 1256 1257 client = socket.socket() 1258 client.connect(('127.0.0.1', port)) 1259 client.send(b'xxx') 1260 client.close() 1261 1262 server.close() 1263 1264 client = socket.socket() 1265 self.assertRaises( 1266 ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 1267 client.close() 1268 1269 def _test_create_datagram_endpoint(self, local_addr, family): 1270 class TestMyDatagramProto(MyDatagramProto): 1271 def __init__(inner_self): 1272 super().__init__(loop=self.loop) 1273 1274 def datagram_received(self, data, addr): 1275 super().datagram_received(data, addr) 1276 self.transport.sendto(b'resp:'+data, addr) 1277 1278 coro = self.loop.create_datagram_endpoint( 1279 TestMyDatagramProto, local_addr=local_addr, family=family) 1280 s_transport, server = self.loop.run_until_complete(coro) 1281 sockname = s_transport.get_extra_info('sockname') 1282 host, port = socket.getnameinfo( 1283 sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV) 1284 1285 self.assertIsInstance(s_transport, asyncio.Transport) 1286 self.assertIsInstance(server, TestMyDatagramProto) 1287 self.assertEqual('INITIALIZED', server.state) 1288 self.assertIs(server.transport, s_transport) 1289 1290 coro = self.loop.create_datagram_endpoint( 1291 lambda: MyDatagramProto(loop=self.loop), 1292 remote_addr=(host, port)) 1293 transport, client = self.loop.run_until_complete(coro) 1294 1295 self.assertIsInstance(transport, asyncio.Transport) 1296 self.assertIsInstance(client, MyDatagramProto) 1297 self.assertEqual('INITIALIZED', client.state) 1298 self.assertIs(client.transport, transport) 1299 1300 transport.sendto(b'xxx') 1301 test_utils.run_until(self.loop, lambda: server.nbytes) 1302 self.assertEqual(3, server.nbytes) 1303 test_utils.run_until(self.loop, lambda: client.nbytes) 1304 1305 # received 1306 self.assertEqual(8, client.nbytes) 1307 1308 # extra info is available 1309 self.assertIsNotNone(transport.get_extra_info('sockname')) 1310 1311 # close connection 1312 transport.close() 1313 self.loop.run_until_complete(client.done) 1314 self.assertEqual('CLOSED', client.state) 1315 server.transport.close() 1316 1317 def test_create_datagram_endpoint(self): 1318 self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET) 1319 1320 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 1321 def test_create_datagram_endpoint_ipv6(self): 1322 self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6) 1323 1324 def test_create_datagram_endpoint_sock(self): 1325 sock = None 1326 local_address = ('127.0.0.1', 0) 1327 infos = self.loop.run_until_complete( 1328 self.loop.getaddrinfo( 1329 *local_address, type=socket.SOCK_DGRAM)) 1330 for family, type, proto, cname, address in infos: 1331 try: 1332 sock = socket.socket(family=family, type=type, proto=proto) 1333 sock.setblocking(False) 1334 sock.bind(address) 1335 except: 1336 pass 1337 else: 1338 break 1339 else: 1340 self.fail('Can not create socket.') 1341 1342 f = self.loop.create_datagram_endpoint( 1343 lambda: MyDatagramProto(loop=self.loop), sock=sock) 1344 tr, pr = self.loop.run_until_complete(f) 1345 self.assertIsInstance(tr, asyncio.Transport) 1346 self.assertIsInstance(pr, MyDatagramProto) 1347 tr.close() 1348 self.loop.run_until_complete(pr.done) 1349 1350 def test_internal_fds(self): 1351 loop = self.create_event_loop() 1352 if not isinstance(loop, selector_events.BaseSelectorEventLoop): 1353 loop.close() 1354 self.skipTest('loop is not a BaseSelectorEventLoop') 1355 1356 self.assertEqual(1, loop._internal_fds) 1357 loop.close() 1358 self.assertEqual(0, loop._internal_fds) 1359 self.assertIsNone(loop._csock) 1360 self.assertIsNone(loop._ssock) 1361 1362 @unittest.skipUnless(sys.platform != 'win32', 1363 "Don't support pipes for Windows") 1364 def test_read_pipe(self): 1365 proto = MyReadPipeProto(loop=self.loop) 1366 1367 rpipe, wpipe = os.pipe() 1368 pipeobj = io.open(rpipe, 'rb', 1024) 1369 1370 async def connect(): 1371 t, p = await self.loop.connect_read_pipe( 1372 lambda: proto, pipeobj) 1373 self.assertIs(p, proto) 1374 self.assertIs(t, proto.transport) 1375 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1376 self.assertEqual(0, proto.nbytes) 1377 1378 self.loop.run_until_complete(connect()) 1379 1380 os.write(wpipe, b'1') 1381 test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 1382 self.assertEqual(1, proto.nbytes) 1383 1384 os.write(wpipe, b'2345') 1385 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1386 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1387 self.assertEqual(5, proto.nbytes) 1388 1389 os.close(wpipe) 1390 self.loop.run_until_complete(proto.done) 1391 self.assertEqual( 1392 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1393 # extra info is available 1394 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1395 1396 @unittest.skipUnless(sys.platform != 'win32', 1397 "Don't support pipes for Windows") 1398 def test_unclosed_pipe_transport(self): 1399 # This test reproduces the issue #314 on GitHub 1400 loop = self.create_event_loop() 1401 read_proto = MyReadPipeProto(loop=loop) 1402 write_proto = MyWritePipeProto(loop=loop) 1403 1404 rpipe, wpipe = os.pipe() 1405 rpipeobj = io.open(rpipe, 'rb', 1024) 1406 wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8") 1407 1408 async def connect(): 1409 read_transport, _ = await loop.connect_read_pipe( 1410 lambda: read_proto, rpipeobj) 1411 write_transport, _ = await loop.connect_write_pipe( 1412 lambda: write_proto, wpipeobj) 1413 return read_transport, write_transport 1414 1415 # Run and close the loop without closing the transports 1416 read_transport, write_transport = loop.run_until_complete(connect()) 1417 loop.close() 1418 1419 # These 'repr' calls used to raise an AttributeError 1420 # See Issue #314 on GitHub 1421 self.assertIn('open', repr(read_transport)) 1422 self.assertIn('open', repr(write_transport)) 1423 1424 # Clean up (avoid ResourceWarning) 1425 rpipeobj.close() 1426 wpipeobj.close() 1427 read_transport._pipe = None 1428 write_transport._pipe = None 1429 1430 @unittest.skipUnless(sys.platform != 'win32', 1431 "Don't support pipes for Windows") 1432 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 1433 def test_read_pty_output(self): 1434 proto = MyReadPipeProto(loop=self.loop) 1435 1436 master, slave = os.openpty() 1437 master_read_obj = io.open(master, 'rb', 0) 1438 1439 async def connect(): 1440 t, p = await self.loop.connect_read_pipe(lambda: proto, 1441 master_read_obj) 1442 self.assertIs(p, proto) 1443 self.assertIs(t, proto.transport) 1444 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1445 self.assertEqual(0, proto.nbytes) 1446 1447 self.loop.run_until_complete(connect()) 1448 1449 os.write(slave, b'1') 1450 test_utils.run_until(self.loop, lambda: proto.nbytes) 1451 self.assertEqual(1, proto.nbytes) 1452 1453 os.write(slave, b'2345') 1454 test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 1455 self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 1456 self.assertEqual(5, proto.nbytes) 1457 1458 os.close(slave) 1459 proto.transport.close() 1460 self.loop.run_until_complete(proto.done) 1461 self.assertEqual( 1462 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 1463 # extra info is available 1464 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1465 1466 @unittest.skipUnless(sys.platform != 'win32', 1467 "Don't support pipes for Windows") 1468 def test_write_pipe(self): 1469 rpipe, wpipe = os.pipe() 1470 pipeobj = io.open(wpipe, 'wb', 1024) 1471 1472 proto = MyWritePipeProto(loop=self.loop) 1473 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1474 transport, p = self.loop.run_until_complete(connect) 1475 self.assertIs(p, proto) 1476 self.assertIs(transport, proto.transport) 1477 self.assertEqual('CONNECTED', proto.state) 1478 1479 transport.write(b'1') 1480 1481 data = bytearray() 1482 def reader(data): 1483 chunk = os.read(rpipe, 1024) 1484 data += chunk 1485 return len(data) 1486 1487 test_utils.run_until(self.loop, lambda: reader(data) >= 1) 1488 self.assertEqual(b'1', data) 1489 1490 transport.write(b'2345') 1491 test_utils.run_until(self.loop, lambda: reader(data) >= 5) 1492 self.assertEqual(b'12345', data) 1493 self.assertEqual('CONNECTED', proto.state) 1494 1495 os.close(rpipe) 1496 1497 # extra info is available 1498 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1499 1500 # close connection 1501 proto.transport.close() 1502 self.loop.run_until_complete(proto.done) 1503 self.assertEqual('CLOSED', proto.state) 1504 1505 @unittest.skipUnless(sys.platform != 'win32', 1506 "Don't support pipes for Windows") 1507 def test_write_pipe_disconnect_on_close(self): 1508 rsock, wsock = socket.socketpair() 1509 rsock.setblocking(False) 1510 pipeobj = io.open(wsock.detach(), 'wb', 1024) 1511 1512 proto = MyWritePipeProto(loop=self.loop) 1513 connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 1514 transport, p = self.loop.run_until_complete(connect) 1515 self.assertIs(p, proto) 1516 self.assertIs(transport, proto.transport) 1517 self.assertEqual('CONNECTED', proto.state) 1518 1519 transport.write(b'1') 1520 data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 1521 self.assertEqual(b'1', data) 1522 1523 rsock.close() 1524 1525 self.loop.run_until_complete(proto.done) 1526 self.assertEqual('CLOSED', proto.state) 1527 1528 @unittest.skipUnless(sys.platform != 'win32', 1529 "Don't support pipes for Windows") 1530 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 1531 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1532 # older than 10.6 (Snow Leopard) 1533 @support.requires_mac_ver(10, 6) 1534 def test_write_pty(self): 1535 master, slave = os.openpty() 1536 slave_write_obj = io.open(slave, 'wb', 0) 1537 1538 proto = MyWritePipeProto(loop=self.loop) 1539 connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 1540 transport, p = self.loop.run_until_complete(connect) 1541 self.assertIs(p, proto) 1542 self.assertIs(transport, proto.transport) 1543 self.assertEqual('CONNECTED', proto.state) 1544 1545 transport.write(b'1') 1546 1547 data = bytearray() 1548 def reader(data): 1549 chunk = os.read(master, 1024) 1550 data += chunk 1551 return len(data) 1552 1553 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1554 timeout=support.SHORT_TIMEOUT) 1555 self.assertEqual(b'1', data) 1556 1557 transport.write(b'2345') 1558 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1559 timeout=support.SHORT_TIMEOUT) 1560 self.assertEqual(b'12345', data) 1561 self.assertEqual('CONNECTED', proto.state) 1562 1563 os.close(master) 1564 1565 # extra info is available 1566 self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 1567 1568 # close connection 1569 proto.transport.close() 1570 self.loop.run_until_complete(proto.done) 1571 self.assertEqual('CLOSED', proto.state) 1572 1573 @unittest.skipUnless(sys.platform != 'win32', 1574 "Don't support pipes for Windows") 1575 @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 1576 # select, poll and kqueue don't support character devices (PTY) on Mac OS X 1577 # older than 10.6 (Snow Leopard) 1578 @support.requires_mac_ver(10, 6) 1579 def test_bidirectional_pty(self): 1580 master, read_slave = os.openpty() 1581 write_slave = os.dup(read_slave) 1582 tty.setraw(read_slave) 1583 1584 slave_read_obj = io.open(read_slave, 'rb', 0) 1585 read_proto = MyReadPipeProto(loop=self.loop) 1586 read_connect = self.loop.connect_read_pipe(lambda: read_proto, 1587 slave_read_obj) 1588 read_transport, p = self.loop.run_until_complete(read_connect) 1589 self.assertIs(p, read_proto) 1590 self.assertIs(read_transport, read_proto.transport) 1591 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1592 self.assertEqual(0, read_proto.nbytes) 1593 1594 1595 slave_write_obj = io.open(write_slave, 'wb', 0) 1596 write_proto = MyWritePipeProto(loop=self.loop) 1597 write_connect = self.loop.connect_write_pipe(lambda: write_proto, 1598 slave_write_obj) 1599 write_transport, p = self.loop.run_until_complete(write_connect) 1600 self.assertIs(p, write_proto) 1601 self.assertIs(write_transport, write_proto.transport) 1602 self.assertEqual('CONNECTED', write_proto.state) 1603 1604 data = bytearray() 1605 def reader(data): 1606 chunk = os.read(master, 1024) 1607 data += chunk 1608 return len(data) 1609 1610 write_transport.write(b'1') 1611 test_utils.run_until(self.loop, lambda: reader(data) >= 1, 1612 timeout=support.SHORT_TIMEOUT) 1613 self.assertEqual(b'1', data) 1614 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1615 self.assertEqual('CONNECTED', write_proto.state) 1616 1617 os.write(master, b'a') 1618 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 1619 timeout=support.SHORT_TIMEOUT) 1620 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1621 self.assertEqual(1, read_proto.nbytes) 1622 self.assertEqual('CONNECTED', write_proto.state) 1623 1624 write_transport.write(b'2345') 1625 test_utils.run_until(self.loop, lambda: reader(data) >= 5, 1626 timeout=support.SHORT_TIMEOUT) 1627 self.assertEqual(b'12345', data) 1628 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1629 self.assertEqual('CONNECTED', write_proto.state) 1630 1631 os.write(master, b'bcde') 1632 test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 1633 timeout=support.SHORT_TIMEOUT) 1634 self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 1635 self.assertEqual(5, read_proto.nbytes) 1636 self.assertEqual('CONNECTED', write_proto.state) 1637 1638 os.close(master) 1639 1640 read_transport.close() 1641 self.loop.run_until_complete(read_proto.done) 1642 self.assertEqual( 1643 ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 1644 1645 write_transport.close() 1646 self.loop.run_until_complete(write_proto.done) 1647 self.assertEqual('CLOSED', write_proto.state) 1648 1649 def test_prompt_cancellation(self): 1650 r, w = socket.socketpair() 1651 r.setblocking(False) 1652 f = self.loop.create_task(self.loop.sock_recv(r, 1)) 1653 ov = getattr(f, 'ov', None) 1654 if ov is not None: 1655 self.assertTrue(ov.pending) 1656 1657 async def main(): 1658 try: 1659 self.loop.call_soon(f.cancel) 1660 await f 1661 except asyncio.CancelledError: 1662 res = 'cancelled' 1663 else: 1664 res = None 1665 finally: 1666 self.loop.stop() 1667 return res 1668 1669 start = time.monotonic() 1670 t = self.loop.create_task(main()) 1671 self.loop.run_forever() 1672 elapsed = time.monotonic() - start 1673 1674 self.assertLess(elapsed, 0.1) 1675 self.assertEqual(t.result(), 'cancelled') 1676 self.assertRaises(asyncio.CancelledError, f.result) 1677 if ov is not None: 1678 self.assertFalse(ov.pending) 1679 self.loop._stop_serving(r) 1680 1681 r.close() 1682 w.close() 1683 1684 def test_timeout_rounding(self): 1685 def _run_once(): 1686 self.loop._run_once_counter += 1 1687 orig_run_once() 1688 1689 orig_run_once = self.loop._run_once 1690 self.loop._run_once_counter = 0 1691 self.loop._run_once = _run_once 1692 1693 async def wait(): 1694 loop = self.loop 1695 await asyncio.sleep(1e-2) 1696 await asyncio.sleep(1e-4) 1697 await asyncio.sleep(1e-6) 1698 await asyncio.sleep(1e-8) 1699 await asyncio.sleep(1e-10) 1700 1701 self.loop.run_until_complete(wait()) 1702 # The ideal number of call is 12, but on some platforms, the selector 1703 # may sleep at little bit less than timeout depending on the resolution 1704 # of the clock used by the kernel. Tolerate a few useless calls on 1705 # these platforms. 1706 self.assertLessEqual(self.loop._run_once_counter, 20, 1707 {'clock_resolution': self.loop._clock_resolution, 1708 'selector': self.loop._selector.__class__.__name__}) 1709 1710 def test_remove_fds_after_closing(self): 1711 loop = self.create_event_loop() 1712 callback = lambda: None 1713 r, w = socket.socketpair() 1714 self.addCleanup(r.close) 1715 self.addCleanup(w.close) 1716 loop.add_reader(r, callback) 1717 loop.add_writer(w, callback) 1718 loop.close() 1719 self.assertFalse(loop.remove_reader(r)) 1720 self.assertFalse(loop.remove_writer(w)) 1721 1722 def test_add_fds_after_closing(self): 1723 loop = self.create_event_loop() 1724 callback = lambda: None 1725 r, w = socket.socketpair() 1726 self.addCleanup(r.close) 1727 self.addCleanup(w.close) 1728 loop.close() 1729 with self.assertRaises(RuntimeError): 1730 loop.add_reader(r, callback) 1731 with self.assertRaises(RuntimeError): 1732 loop.add_writer(w, callback) 1733 1734 def test_close_running_event_loop(self): 1735 async def close_loop(loop): 1736 self.loop.close() 1737 1738 coro = close_loop(self.loop) 1739 with self.assertRaises(RuntimeError): 1740 self.loop.run_until_complete(coro) 1741 1742 def test_close(self): 1743 self.loop.close() 1744 1745 async def test(): 1746 pass 1747 1748 func = lambda: False 1749 coro = test() 1750 self.addCleanup(coro.close) 1751 1752 # operation blocked when the loop is closed 1753 with self.assertRaises(RuntimeError): 1754 self.loop.run_forever() 1755 with self.assertRaises(RuntimeError): 1756 fut = self.loop.create_future() 1757 self.loop.run_until_complete(fut) 1758 with self.assertRaises(RuntimeError): 1759 self.loop.call_soon(func) 1760 with self.assertRaises(RuntimeError): 1761 self.loop.call_soon_threadsafe(func) 1762 with self.assertRaises(RuntimeError): 1763 self.loop.call_later(1.0, func) 1764 with self.assertRaises(RuntimeError): 1765 self.loop.call_at(self.loop.time() + .0, func) 1766 with self.assertRaises(RuntimeError): 1767 self.loop.create_task(coro) 1768 with self.assertRaises(RuntimeError): 1769 self.loop.add_signal_handler(signal.SIGTERM, func) 1770 1771 # run_in_executor test is tricky: the method is a coroutine, 1772 # but run_until_complete cannot be called on closed loop. 1773 # Thus iterate once explicitly. 1774 with self.assertRaises(RuntimeError): 1775 it = self.loop.run_in_executor(None, func).__await__() 1776 next(it) 1777 1778 1779class SubprocessTestsMixin: 1780 1781 def check_terminated(self, returncode): 1782 if sys.platform == 'win32': 1783 self.assertIsInstance(returncode, int) 1784 # expect 1 but sometimes get 0 1785 else: 1786 self.assertEqual(-signal.SIGTERM, returncode) 1787 1788 def check_killed(self, returncode): 1789 if sys.platform == 'win32': 1790 self.assertIsInstance(returncode, int) 1791 # expect 1 but sometimes get 0 1792 else: 1793 self.assertEqual(-signal.SIGKILL, returncode) 1794 1795 def test_subprocess_exec(self): 1796 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1797 1798 connect = self.loop.subprocess_exec( 1799 functools.partial(MySubprocessProtocol, self.loop), 1800 sys.executable, prog) 1801 1802 transp, proto = self.loop.run_until_complete(connect) 1803 self.assertIsInstance(proto, MySubprocessProtocol) 1804 self.loop.run_until_complete(proto.connected) 1805 self.assertEqual('CONNECTED', proto.state) 1806 1807 stdin = transp.get_pipe_transport(0) 1808 stdin.write(b'Python The Winner') 1809 self.loop.run_until_complete(proto.got_data[1].wait()) 1810 with test_utils.disable_logger(): 1811 transp.close() 1812 self.loop.run_until_complete(proto.completed) 1813 self.check_killed(proto.returncode) 1814 self.assertEqual(b'Python The Winner', proto.data[1]) 1815 1816 def test_subprocess_interactive(self): 1817 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1818 1819 connect = self.loop.subprocess_exec( 1820 functools.partial(MySubprocessProtocol, self.loop), 1821 sys.executable, prog) 1822 1823 transp, proto = self.loop.run_until_complete(connect) 1824 self.assertIsInstance(proto, MySubprocessProtocol) 1825 self.loop.run_until_complete(proto.connected) 1826 self.assertEqual('CONNECTED', proto.state) 1827 1828 stdin = transp.get_pipe_transport(0) 1829 stdin.write(b'Python ') 1830 self.loop.run_until_complete(proto.got_data[1].wait()) 1831 proto.got_data[1].clear() 1832 self.assertEqual(b'Python ', proto.data[1]) 1833 1834 stdin.write(b'The Winner') 1835 self.loop.run_until_complete(proto.got_data[1].wait()) 1836 self.assertEqual(b'Python The Winner', proto.data[1]) 1837 1838 with test_utils.disable_logger(): 1839 transp.close() 1840 self.loop.run_until_complete(proto.completed) 1841 self.check_killed(proto.returncode) 1842 1843 def test_subprocess_shell(self): 1844 connect = self.loop.subprocess_shell( 1845 functools.partial(MySubprocessProtocol, self.loop), 1846 'echo Python') 1847 transp, proto = self.loop.run_until_complete(connect) 1848 self.assertIsInstance(proto, MySubprocessProtocol) 1849 self.loop.run_until_complete(proto.connected) 1850 1851 transp.get_pipe_transport(0).close() 1852 self.loop.run_until_complete(proto.completed) 1853 self.assertEqual(0, proto.returncode) 1854 self.assertTrue(all(f.done() for f in proto.disconnects.values())) 1855 self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 1856 self.assertEqual(proto.data[2], b'') 1857 transp.close() 1858 1859 def test_subprocess_exitcode(self): 1860 connect = self.loop.subprocess_shell( 1861 functools.partial(MySubprocessProtocol, self.loop), 1862 'exit 7', stdin=None, stdout=None, stderr=None) 1863 1864 transp, proto = self.loop.run_until_complete(connect) 1865 self.assertIsInstance(proto, MySubprocessProtocol) 1866 self.loop.run_until_complete(proto.completed) 1867 self.assertEqual(7, proto.returncode) 1868 transp.close() 1869 1870 def test_subprocess_close_after_finish(self): 1871 connect = self.loop.subprocess_shell( 1872 functools.partial(MySubprocessProtocol, self.loop), 1873 'exit 7', stdin=None, stdout=None, stderr=None) 1874 1875 transp, proto = self.loop.run_until_complete(connect) 1876 self.assertIsInstance(proto, MySubprocessProtocol) 1877 self.assertIsNone(transp.get_pipe_transport(0)) 1878 self.assertIsNone(transp.get_pipe_transport(1)) 1879 self.assertIsNone(transp.get_pipe_transport(2)) 1880 self.loop.run_until_complete(proto.completed) 1881 self.assertEqual(7, proto.returncode) 1882 self.assertIsNone(transp.close()) 1883 1884 def test_subprocess_kill(self): 1885 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1886 1887 connect = self.loop.subprocess_exec( 1888 functools.partial(MySubprocessProtocol, self.loop), 1889 sys.executable, prog) 1890 1891 transp, proto = self.loop.run_until_complete(connect) 1892 self.assertIsInstance(proto, MySubprocessProtocol) 1893 self.loop.run_until_complete(proto.connected) 1894 1895 transp.kill() 1896 self.loop.run_until_complete(proto.completed) 1897 self.check_killed(proto.returncode) 1898 transp.close() 1899 1900 def test_subprocess_terminate(self): 1901 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1902 1903 connect = self.loop.subprocess_exec( 1904 functools.partial(MySubprocessProtocol, self.loop), 1905 sys.executable, prog) 1906 1907 transp, proto = self.loop.run_until_complete(connect) 1908 self.assertIsInstance(proto, MySubprocessProtocol) 1909 self.loop.run_until_complete(proto.connected) 1910 1911 transp.terminate() 1912 self.loop.run_until_complete(proto.completed) 1913 self.check_terminated(proto.returncode) 1914 transp.close() 1915 1916 @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 1917 def test_subprocess_send_signal(self): 1918 # bpo-31034: Make sure that we get the default signal handler (killing 1919 # the process). The parent process may have decided to ignore SIGHUP, 1920 # and signal handlers are inherited. 1921 old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 1922 try: 1923 prog = os.path.join(os.path.dirname(__file__), 'echo.py') 1924 1925 connect = self.loop.subprocess_exec( 1926 functools.partial(MySubprocessProtocol, self.loop), 1927 sys.executable, prog) 1928 1929 1930 transp, proto = self.loop.run_until_complete(connect) 1931 self.assertIsInstance(proto, MySubprocessProtocol) 1932 self.loop.run_until_complete(proto.connected) 1933 1934 transp.send_signal(signal.SIGHUP) 1935 self.loop.run_until_complete(proto.completed) 1936 self.assertEqual(-signal.SIGHUP, proto.returncode) 1937 transp.close() 1938 finally: 1939 signal.signal(signal.SIGHUP, old_handler) 1940 1941 def test_subprocess_stderr(self): 1942 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1943 1944 connect = self.loop.subprocess_exec( 1945 functools.partial(MySubprocessProtocol, self.loop), 1946 sys.executable, prog) 1947 1948 transp, proto = self.loop.run_until_complete(connect) 1949 self.assertIsInstance(proto, MySubprocessProtocol) 1950 self.loop.run_until_complete(proto.connected) 1951 1952 stdin = transp.get_pipe_transport(0) 1953 stdin.write(b'test') 1954 1955 self.loop.run_until_complete(proto.completed) 1956 1957 transp.close() 1958 self.assertEqual(b'OUT:test', proto.data[1]) 1959 self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 1960 self.assertEqual(0, proto.returncode) 1961 1962 def test_subprocess_stderr_redirect_to_stdout(self): 1963 prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 1964 1965 connect = self.loop.subprocess_exec( 1966 functools.partial(MySubprocessProtocol, self.loop), 1967 sys.executable, prog, stderr=subprocess.STDOUT) 1968 1969 1970 transp, proto = self.loop.run_until_complete(connect) 1971 self.assertIsInstance(proto, MySubprocessProtocol) 1972 self.loop.run_until_complete(proto.connected) 1973 1974 stdin = transp.get_pipe_transport(0) 1975 self.assertIsNotNone(transp.get_pipe_transport(1)) 1976 self.assertIsNone(transp.get_pipe_transport(2)) 1977 1978 stdin.write(b'test') 1979 self.loop.run_until_complete(proto.completed) 1980 self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 1981 proto.data[1]) 1982 self.assertEqual(b'', proto.data[2]) 1983 1984 transp.close() 1985 self.assertEqual(0, proto.returncode) 1986 1987 def test_subprocess_close_client_stream(self): 1988 prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 1989 1990 connect = self.loop.subprocess_exec( 1991 functools.partial(MySubprocessProtocol, self.loop), 1992 sys.executable, prog) 1993 1994 transp, proto = self.loop.run_until_complete(connect) 1995 self.assertIsInstance(proto, MySubprocessProtocol) 1996 self.loop.run_until_complete(proto.connected) 1997 1998 stdin = transp.get_pipe_transport(0) 1999 stdout = transp.get_pipe_transport(1) 2000 stdin.write(b'test') 2001 self.loop.run_until_complete(proto.got_data[1].wait()) 2002 self.assertEqual(b'OUT:test', proto.data[1]) 2003 2004 stdout.close() 2005 self.loop.run_until_complete(proto.disconnects[1]) 2006 stdin.write(b'xxx') 2007 self.loop.run_until_complete(proto.got_data[2].wait()) 2008 if sys.platform != 'win32': 2009 self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 2010 else: 2011 # After closing the read-end of a pipe, writing to the 2012 # write-end using os.write() fails with errno==EINVAL and 2013 # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 2014 # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 2015 self.assertEqual(b'ERR:OSError', proto.data[2]) 2016 with test_utils.disable_logger(): 2017 transp.close() 2018 self.loop.run_until_complete(proto.completed) 2019 self.check_killed(proto.returncode) 2020 2021 def test_subprocess_wait_no_same_group(self): 2022 # start the new process in a new session 2023 connect = self.loop.subprocess_shell( 2024 functools.partial(MySubprocessProtocol, self.loop), 2025 'exit 7', stdin=None, stdout=None, stderr=None, 2026 start_new_session=True) 2027 transp, proto = self.loop.run_until_complete(connect) 2028 self.assertIsInstance(proto, MySubprocessProtocol) 2029 self.loop.run_until_complete(proto.completed) 2030 self.assertEqual(7, proto.returncode) 2031 transp.close() 2032 2033 def test_subprocess_exec_invalid_args(self): 2034 async def connect(**kwds): 2035 await self.loop.subprocess_exec( 2036 asyncio.SubprocessProtocol, 2037 'pwd', **kwds) 2038 2039 with self.assertRaises(ValueError): 2040 self.loop.run_until_complete(connect(universal_newlines=True)) 2041 with self.assertRaises(ValueError): 2042 self.loop.run_until_complete(connect(bufsize=4096)) 2043 with self.assertRaises(ValueError): 2044 self.loop.run_until_complete(connect(shell=True)) 2045 2046 def test_subprocess_shell_invalid_args(self): 2047 2048 async def connect(cmd=None, **kwds): 2049 if not cmd: 2050 cmd = 'pwd' 2051 await self.loop.subprocess_shell( 2052 asyncio.SubprocessProtocol, 2053 cmd, **kwds) 2054 2055 with self.assertRaises(ValueError): 2056 self.loop.run_until_complete(connect(['ls', '-l'])) 2057 with self.assertRaises(ValueError): 2058 self.loop.run_until_complete(connect(universal_newlines=True)) 2059 with self.assertRaises(ValueError): 2060 self.loop.run_until_complete(connect(bufsize=4096)) 2061 with self.assertRaises(ValueError): 2062 self.loop.run_until_complete(connect(shell=False)) 2063 2064 2065if sys.platform == 'win32': 2066 2067 class SelectEventLoopTests(EventLoopTestsMixin, 2068 test_utils.TestCase): 2069 2070 def create_event_loop(self): 2071 return asyncio.SelectorEventLoop() 2072 2073 class ProactorEventLoopTests(EventLoopTestsMixin, 2074 SubprocessTestsMixin, 2075 test_utils.TestCase): 2076 2077 def create_event_loop(self): 2078 return asyncio.ProactorEventLoop() 2079 2080 def test_reader_callback(self): 2081 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2082 2083 def test_reader_callback_cancel(self): 2084 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2085 2086 def test_writer_callback(self): 2087 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2088 2089 def test_writer_callback_cancel(self): 2090 raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 2091 2092 def test_remove_fds_after_closing(self): 2093 raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 2094else: 2095 import selectors 2096 2097 class UnixEventLoopTestsMixin(EventLoopTestsMixin): 2098 def setUp(self): 2099 super().setUp() 2100 watcher = asyncio.SafeChildWatcher() 2101 watcher.attach_loop(self.loop) 2102 asyncio.set_child_watcher(watcher) 2103 2104 def tearDown(self): 2105 asyncio.set_child_watcher(None) 2106 super().tearDown() 2107 2108 2109 if hasattr(selectors, 'KqueueSelector'): 2110 class KqueueEventLoopTests(UnixEventLoopTestsMixin, 2111 SubprocessTestsMixin, 2112 test_utils.TestCase): 2113 2114 def create_event_loop(self): 2115 return asyncio.SelectorEventLoop( 2116 selectors.KqueueSelector()) 2117 2118 # kqueue doesn't support character devices (PTY) on Mac OS X older 2119 # than 10.9 (Maverick) 2120 @support.requires_mac_ver(10, 9) 2121 # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 2122 # hangs on OpenBSD 5.5 2123 @unittest.skipIf(sys.platform.startswith('openbsd'), 2124 'test hangs on OpenBSD') 2125 def test_read_pty_output(self): 2126 super().test_read_pty_output() 2127 2128 # kqueue doesn't support character devices (PTY) on Mac OS X older 2129 # than 10.9 (Maverick) 2130 @support.requires_mac_ver(10, 9) 2131 def test_write_pty(self): 2132 super().test_write_pty() 2133 2134 if hasattr(selectors, 'EpollSelector'): 2135 class EPollEventLoopTests(UnixEventLoopTestsMixin, 2136 SubprocessTestsMixin, 2137 test_utils.TestCase): 2138 2139 def create_event_loop(self): 2140 return asyncio.SelectorEventLoop(selectors.EpollSelector()) 2141 2142 if hasattr(selectors, 'PollSelector'): 2143 class PollEventLoopTests(UnixEventLoopTestsMixin, 2144 SubprocessTestsMixin, 2145 test_utils.TestCase): 2146 2147 def create_event_loop(self): 2148 return asyncio.SelectorEventLoop(selectors.PollSelector()) 2149 2150 # Should always exist. 2151 class SelectEventLoopTests(UnixEventLoopTestsMixin, 2152 SubprocessTestsMixin, 2153 test_utils.TestCase): 2154 2155 def create_event_loop(self): 2156 return asyncio.SelectorEventLoop(selectors.SelectSelector()) 2157 2158 2159def noop(*args, **kwargs): 2160 pass 2161 2162 2163class HandleTests(test_utils.TestCase): 2164 2165 def setUp(self): 2166 super().setUp() 2167 self.loop = mock.Mock() 2168 self.loop.get_debug.return_value = True 2169 2170 def test_handle(self): 2171 def callback(*args): 2172 return args 2173 2174 args = () 2175 h = asyncio.Handle(callback, args, self.loop) 2176 self.assertIs(h._callback, callback) 2177 self.assertIs(h._args, args) 2178 self.assertFalse(h.cancelled()) 2179 2180 h.cancel() 2181 self.assertTrue(h.cancelled()) 2182 2183 def test_callback_with_exception(self): 2184 def callback(): 2185 raise ValueError() 2186 2187 self.loop = mock.Mock() 2188 self.loop.call_exception_handler = mock.Mock() 2189 2190 h = asyncio.Handle(callback, (), self.loop) 2191 h._run() 2192 2193 self.loop.call_exception_handler.assert_called_with({ 2194 'message': test_utils.MockPattern('Exception in callback.*'), 2195 'exception': mock.ANY, 2196 'handle': h, 2197 'source_traceback': h._source_traceback, 2198 }) 2199 2200 def test_handle_weakref(self): 2201 wd = weakref.WeakValueDictionary() 2202 h = asyncio.Handle(lambda: None, (), self.loop) 2203 wd['h'] = h # Would fail without __weakref__ slot. 2204 2205 def test_handle_repr(self): 2206 self.loop.get_debug.return_value = False 2207 2208 # simple function 2209 h = asyncio.Handle(noop, (1, 2), self.loop) 2210 filename, lineno = test_utils.get_function_source(noop) 2211 self.assertEqual(repr(h), 2212 '<Handle noop(1, 2) at %s:%s>' 2213 % (filename, lineno)) 2214 2215 # cancelled handle 2216 h.cancel() 2217 self.assertEqual(repr(h), 2218 '<Handle cancelled>') 2219 2220 # decorated function 2221 cb = types.coroutine(noop) 2222 h = asyncio.Handle(cb, (), self.loop) 2223 self.assertEqual(repr(h), 2224 '<Handle noop() at %s:%s>' 2225 % (filename, lineno)) 2226 2227 # partial function 2228 cb = functools.partial(noop, 1, 2) 2229 h = asyncio.Handle(cb, (3,), self.loop) 2230 regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 2231 % (re.escape(filename), lineno)) 2232 self.assertRegex(repr(h), regex) 2233 2234 # partial function with keyword args 2235 cb = functools.partial(noop, x=1) 2236 h = asyncio.Handle(cb, (2, 3), self.loop) 2237 regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 2238 % (re.escape(filename), lineno)) 2239 self.assertRegex(repr(h), regex) 2240 2241 # partial method 2242 method = HandleTests.test_handle_repr 2243 cb = functools.partialmethod(method) 2244 filename, lineno = test_utils.get_function_source(method) 2245 h = asyncio.Handle(cb, (), self.loop) 2246 2247 cb_regex = r'<function HandleTests.test_handle_repr .*>' 2248 cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)' 2249 regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$' 2250 self.assertRegex(repr(h), regex) 2251 2252 def test_handle_repr_debug(self): 2253 self.loop.get_debug.return_value = True 2254 2255 # simple function 2256 create_filename = __file__ 2257 create_lineno = sys._getframe().f_lineno + 1 2258 h = asyncio.Handle(noop, (1, 2), self.loop) 2259 filename, lineno = test_utils.get_function_source(noop) 2260 self.assertEqual(repr(h), 2261 '<Handle noop(1, 2) at %s:%s created at %s:%s>' 2262 % (filename, lineno, create_filename, create_lineno)) 2263 2264 # cancelled handle 2265 h.cancel() 2266 self.assertEqual( 2267 repr(h), 2268 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2269 % (filename, lineno, create_filename, create_lineno)) 2270 2271 # double cancellation won't overwrite _repr 2272 h.cancel() 2273 self.assertEqual( 2274 repr(h), 2275 '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 2276 % (filename, lineno, create_filename, create_lineno)) 2277 2278 def test_handle_source_traceback(self): 2279 loop = asyncio.get_event_loop_policy().new_event_loop() 2280 loop.set_debug(True) 2281 self.set_event_loop(loop) 2282 2283 def check_source_traceback(h): 2284 lineno = sys._getframe(1).f_lineno - 1 2285 self.assertIsInstance(h._source_traceback, list) 2286 self.assertEqual(h._source_traceback[-1][:3], 2287 (__file__, 2288 lineno, 2289 'test_handle_source_traceback')) 2290 2291 # call_soon 2292 h = loop.call_soon(noop) 2293 check_source_traceback(h) 2294 2295 # call_soon_threadsafe 2296 h = loop.call_soon_threadsafe(noop) 2297 check_source_traceback(h) 2298 2299 # call_later 2300 h = loop.call_later(0, noop) 2301 check_source_traceback(h) 2302 2303 # call_at 2304 h = loop.call_later(0, noop) 2305 check_source_traceback(h) 2306 2307 @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 2308 'No collections.abc.Coroutine') 2309 def test_coroutine_like_object_debug_formatting(self): 2310 # Test that asyncio can format coroutines that are instances of 2311 # collections.abc.Coroutine, but lack cr_core or gi_code attributes 2312 # (such as ones compiled with Cython). 2313 2314 coro = CoroLike() 2315 coro.__name__ = 'AAA' 2316 self.assertTrue(asyncio.iscoroutine(coro)) 2317 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2318 2319 coro.__qualname__ = 'BBB' 2320 self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 2321 2322 coro.cr_running = True 2323 self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 2324 2325 coro.__name__ = coro.__qualname__ = None 2326 self.assertEqual(coroutines._format_coroutine(coro), 2327 '<CoroLike without __name__>() running') 2328 2329 coro = CoroLike() 2330 coro.__qualname__ = 'CoroLike' 2331 # Some coroutines might not have '__name__', such as 2332 # built-in async_gen.asend(). 2333 self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 2334 2335 coro = CoroLike() 2336 coro.__qualname__ = 'AAA' 2337 coro.cr_code = None 2338 self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 2339 2340 2341class TimerTests(unittest.TestCase): 2342 2343 def setUp(self): 2344 super().setUp() 2345 self.loop = mock.Mock() 2346 2347 def test_hash(self): 2348 when = time.monotonic() 2349 h = asyncio.TimerHandle(when, lambda: False, (), 2350 mock.Mock()) 2351 self.assertEqual(hash(h), hash(when)) 2352 2353 def test_when(self): 2354 when = time.monotonic() 2355 h = asyncio.TimerHandle(when, lambda: False, (), 2356 mock.Mock()) 2357 self.assertEqual(when, h.when()) 2358 2359 def test_timer(self): 2360 def callback(*args): 2361 return args 2362 2363 args = (1, 2, 3) 2364 when = time.monotonic() 2365 h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 2366 self.assertIs(h._callback, callback) 2367 self.assertIs(h._args, args) 2368 self.assertFalse(h.cancelled()) 2369 2370 # cancel 2371 h.cancel() 2372 self.assertTrue(h.cancelled()) 2373 self.assertIsNone(h._callback) 2374 self.assertIsNone(h._args) 2375 2376 2377 def test_timer_repr(self): 2378 self.loop.get_debug.return_value = False 2379 2380 # simple function 2381 h = asyncio.TimerHandle(123, noop, (), self.loop) 2382 src = test_utils.get_function_source(noop) 2383 self.assertEqual(repr(h), 2384 '<TimerHandle when=123 noop() at %s:%s>' % src) 2385 2386 # cancelled handle 2387 h.cancel() 2388 self.assertEqual(repr(h), 2389 '<TimerHandle cancelled when=123>') 2390 2391 def test_timer_repr_debug(self): 2392 self.loop.get_debug.return_value = True 2393 2394 # simple function 2395 create_filename = __file__ 2396 create_lineno = sys._getframe().f_lineno + 1 2397 h = asyncio.TimerHandle(123, noop, (), self.loop) 2398 filename, lineno = test_utils.get_function_source(noop) 2399 self.assertEqual(repr(h), 2400 '<TimerHandle when=123 noop() ' 2401 'at %s:%s created at %s:%s>' 2402 % (filename, lineno, create_filename, create_lineno)) 2403 2404 # cancelled handle 2405 h.cancel() 2406 self.assertEqual(repr(h), 2407 '<TimerHandle cancelled when=123 noop() ' 2408 'at %s:%s created at %s:%s>' 2409 % (filename, lineno, create_filename, create_lineno)) 2410 2411 2412 def test_timer_comparison(self): 2413 def callback(*args): 2414 return args 2415 2416 when = time.monotonic() 2417 2418 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2419 h2 = asyncio.TimerHandle(when, callback, (), self.loop) 2420 # TODO: Use assertLess etc. 2421 self.assertFalse(h1 < h2) 2422 self.assertFalse(h2 < h1) 2423 self.assertTrue(h1 <= h2) 2424 self.assertTrue(h2 <= h1) 2425 self.assertFalse(h1 > h2) 2426 self.assertFalse(h2 > h1) 2427 self.assertTrue(h1 >= h2) 2428 self.assertTrue(h2 >= h1) 2429 self.assertTrue(h1 == h2) 2430 self.assertFalse(h1 != h2) 2431 2432 h2.cancel() 2433 self.assertFalse(h1 == h2) 2434 2435 h1 = asyncio.TimerHandle(when, callback, (), self.loop) 2436 h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 2437 self.assertTrue(h1 < h2) 2438 self.assertFalse(h2 < h1) 2439 self.assertTrue(h1 <= h2) 2440 self.assertFalse(h2 <= h1) 2441 self.assertFalse(h1 > h2) 2442 self.assertTrue(h2 > h1) 2443 self.assertFalse(h1 >= h2) 2444 self.assertTrue(h2 >= h1) 2445 self.assertFalse(h1 == h2) 2446 self.assertTrue(h1 != h2) 2447 2448 h3 = asyncio.Handle(callback, (), self.loop) 2449 self.assertIs(NotImplemented, h1.__eq__(h3)) 2450 self.assertIs(NotImplemented, h1.__ne__(h3)) 2451 2452 with self.assertRaises(TypeError): 2453 h1 < () 2454 with self.assertRaises(TypeError): 2455 h1 > () 2456 with self.assertRaises(TypeError): 2457 h1 <= () 2458 with self.assertRaises(TypeError): 2459 h1 >= () 2460 self.assertFalse(h1 == ()) 2461 self.assertTrue(h1 != ()) 2462 2463 self.assertTrue(h1 == ALWAYS_EQ) 2464 self.assertFalse(h1 != ALWAYS_EQ) 2465 self.assertTrue(h1 < LARGEST) 2466 self.assertFalse(h1 > LARGEST) 2467 self.assertTrue(h1 <= LARGEST) 2468 self.assertFalse(h1 >= LARGEST) 2469 self.assertFalse(h1 < SMALLEST) 2470 self.assertTrue(h1 > SMALLEST) 2471 self.assertFalse(h1 <= SMALLEST) 2472 self.assertTrue(h1 >= SMALLEST) 2473 2474 2475class AbstractEventLoopTests(unittest.TestCase): 2476 2477 def test_not_implemented(self): 2478 f = mock.Mock() 2479 loop = asyncio.AbstractEventLoop() 2480 self.assertRaises( 2481 NotImplementedError, loop.run_forever) 2482 self.assertRaises( 2483 NotImplementedError, loop.run_until_complete, None) 2484 self.assertRaises( 2485 NotImplementedError, loop.stop) 2486 self.assertRaises( 2487 NotImplementedError, loop.is_running) 2488 self.assertRaises( 2489 NotImplementedError, loop.is_closed) 2490 self.assertRaises( 2491 NotImplementedError, loop.close) 2492 self.assertRaises( 2493 NotImplementedError, loop.create_task, None) 2494 self.assertRaises( 2495 NotImplementedError, loop.call_later, None, None) 2496 self.assertRaises( 2497 NotImplementedError, loop.call_at, f, f) 2498 self.assertRaises( 2499 NotImplementedError, loop.call_soon, None) 2500 self.assertRaises( 2501 NotImplementedError, loop.time) 2502 self.assertRaises( 2503 NotImplementedError, loop.call_soon_threadsafe, None) 2504 self.assertRaises( 2505 NotImplementedError, loop.set_default_executor, f) 2506 self.assertRaises( 2507 NotImplementedError, loop.add_reader, 1, f) 2508 self.assertRaises( 2509 NotImplementedError, loop.remove_reader, 1) 2510 self.assertRaises( 2511 NotImplementedError, loop.add_writer, 1, f) 2512 self.assertRaises( 2513 NotImplementedError, loop.remove_writer, 1) 2514 self.assertRaises( 2515 NotImplementedError, loop.add_signal_handler, 1, f) 2516 self.assertRaises( 2517 NotImplementedError, loop.remove_signal_handler, 1) 2518 self.assertRaises( 2519 NotImplementedError, loop.remove_signal_handler, 1) 2520 self.assertRaises( 2521 NotImplementedError, loop.set_exception_handler, f) 2522 self.assertRaises( 2523 NotImplementedError, loop.default_exception_handler, f) 2524 self.assertRaises( 2525 NotImplementedError, loop.call_exception_handler, f) 2526 self.assertRaises( 2527 NotImplementedError, loop.get_debug) 2528 self.assertRaises( 2529 NotImplementedError, loop.set_debug, f) 2530 2531 def test_not_implemented_async(self): 2532 2533 async def inner(): 2534 f = mock.Mock() 2535 loop = asyncio.AbstractEventLoop() 2536 2537 with self.assertRaises(NotImplementedError): 2538 await loop.run_in_executor(f, f) 2539 with self.assertRaises(NotImplementedError): 2540 await loop.getaddrinfo('localhost', 8080) 2541 with self.assertRaises(NotImplementedError): 2542 await loop.getnameinfo(('localhost', 8080)) 2543 with self.assertRaises(NotImplementedError): 2544 await loop.create_connection(f) 2545 with self.assertRaises(NotImplementedError): 2546 await loop.create_server(f) 2547 with self.assertRaises(NotImplementedError): 2548 await loop.create_datagram_endpoint(f) 2549 with self.assertRaises(NotImplementedError): 2550 await loop.sock_recv(f, 10) 2551 with self.assertRaises(NotImplementedError): 2552 await loop.sock_recv_into(f, 10) 2553 with self.assertRaises(NotImplementedError): 2554 await loop.sock_sendall(f, 10) 2555 with self.assertRaises(NotImplementedError): 2556 await loop.sock_connect(f, f) 2557 with self.assertRaises(NotImplementedError): 2558 await loop.sock_accept(f) 2559 with self.assertRaises(NotImplementedError): 2560 await loop.sock_sendfile(f, f) 2561 with self.assertRaises(NotImplementedError): 2562 await loop.sendfile(f, f) 2563 with self.assertRaises(NotImplementedError): 2564 await loop.connect_read_pipe(f, mock.sentinel.pipe) 2565 with self.assertRaises(NotImplementedError): 2566 await loop.connect_write_pipe(f, mock.sentinel.pipe) 2567 with self.assertRaises(NotImplementedError): 2568 await loop.subprocess_shell(f, mock.sentinel) 2569 with self.assertRaises(NotImplementedError): 2570 await loop.subprocess_exec(f) 2571 2572 loop = asyncio.new_event_loop() 2573 loop.run_until_complete(inner()) 2574 loop.close() 2575 2576 2577class PolicyTests(unittest.TestCase): 2578 2579 def test_event_loop_policy(self): 2580 policy = asyncio.AbstractEventLoopPolicy() 2581 self.assertRaises(NotImplementedError, policy.get_event_loop) 2582 self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 2583 self.assertRaises(NotImplementedError, policy.new_event_loop) 2584 self.assertRaises(NotImplementedError, policy.get_child_watcher) 2585 self.assertRaises(NotImplementedError, policy.set_child_watcher, 2586 object()) 2587 2588 def test_get_event_loop(self): 2589 policy = asyncio.DefaultEventLoopPolicy() 2590 self.assertIsNone(policy._local._loop) 2591 loop = policy.get_event_loop() 2592 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2593 2594 self.assertIs(policy._local._loop, loop) 2595 self.assertIs(loop, policy.get_event_loop()) 2596 loop.close() 2597 2598 def test_get_event_loop_calls_set_event_loop(self): 2599 policy = asyncio.DefaultEventLoopPolicy() 2600 2601 with mock.patch.object( 2602 policy, "set_event_loop", 2603 wraps=policy.set_event_loop) as m_set_event_loop: 2604 2605 loop = policy.get_event_loop() 2606 self.addCleanup(loop.close) 2607 2608 # policy._local._loop must be set through .set_event_loop() 2609 # (the unix DefaultEventLoopPolicy needs this call to attach 2610 # the child watcher correctly) 2611 m_set_event_loop.assert_called_with(loop) 2612 2613 loop.close() 2614 2615 def test_get_event_loop_after_set_none(self): 2616 policy = asyncio.DefaultEventLoopPolicy() 2617 policy.set_event_loop(None) 2618 self.assertRaises(RuntimeError, policy.get_event_loop) 2619 2620 @mock.patch('asyncio.events.threading.current_thread') 2621 def test_get_event_loop_thread(self, m_current_thread): 2622 2623 def f(): 2624 policy = asyncio.DefaultEventLoopPolicy() 2625 self.assertRaises(RuntimeError, policy.get_event_loop) 2626 2627 th = threading.Thread(target=f) 2628 th.start() 2629 th.join() 2630 2631 def test_new_event_loop(self): 2632 policy = asyncio.DefaultEventLoopPolicy() 2633 2634 loop = policy.new_event_loop() 2635 self.assertIsInstance(loop, asyncio.AbstractEventLoop) 2636 loop.close() 2637 2638 def test_set_event_loop(self): 2639 policy = asyncio.DefaultEventLoopPolicy() 2640 old_loop = policy.new_event_loop() 2641 policy.set_event_loop(old_loop) 2642 2643 self.assertRaises(TypeError, policy.set_event_loop, object()) 2644 2645 loop = policy.new_event_loop() 2646 policy.set_event_loop(loop) 2647 self.assertIs(loop, policy.get_event_loop()) 2648 self.assertIsNot(old_loop, policy.get_event_loop()) 2649 loop.close() 2650 old_loop.close() 2651 2652 def test_get_event_loop_policy(self): 2653 policy = asyncio.get_event_loop_policy() 2654 self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 2655 self.assertIs(policy, asyncio.get_event_loop_policy()) 2656 2657 def test_set_event_loop_policy(self): 2658 self.assertRaises( 2659 TypeError, asyncio.set_event_loop_policy, object()) 2660 2661 old_policy = asyncio.get_event_loop_policy() 2662 2663 policy = asyncio.DefaultEventLoopPolicy() 2664 asyncio.set_event_loop_policy(policy) 2665 self.assertIs(policy, asyncio.get_event_loop_policy()) 2666 self.assertIsNot(policy, old_policy) 2667 2668 2669class GetEventLoopTestsMixin: 2670 2671 _get_running_loop_impl = None 2672 _set_running_loop_impl = None 2673 get_running_loop_impl = None 2674 get_event_loop_impl = None 2675 2676 def setUp(self): 2677 self._get_running_loop_saved = events._get_running_loop 2678 self._set_running_loop_saved = events._set_running_loop 2679 self.get_running_loop_saved = events.get_running_loop 2680 self.get_event_loop_saved = events.get_event_loop 2681 2682 events._get_running_loop = type(self)._get_running_loop_impl 2683 events._set_running_loop = type(self)._set_running_loop_impl 2684 events.get_running_loop = type(self).get_running_loop_impl 2685 events.get_event_loop = type(self).get_event_loop_impl 2686 2687 asyncio._get_running_loop = type(self)._get_running_loop_impl 2688 asyncio._set_running_loop = type(self)._set_running_loop_impl 2689 asyncio.get_running_loop = type(self).get_running_loop_impl 2690 asyncio.get_event_loop = type(self).get_event_loop_impl 2691 2692 super().setUp() 2693 2694 self.loop = asyncio.new_event_loop() 2695 asyncio.set_event_loop(self.loop) 2696 2697 if sys.platform != 'win32': 2698 watcher = asyncio.SafeChildWatcher() 2699 watcher.attach_loop(self.loop) 2700 asyncio.set_child_watcher(watcher) 2701 2702 def tearDown(self): 2703 try: 2704 if sys.platform != 'win32': 2705 asyncio.set_child_watcher(None) 2706 2707 super().tearDown() 2708 finally: 2709 self.loop.close() 2710 asyncio.set_event_loop(None) 2711 2712 events._get_running_loop = self._get_running_loop_saved 2713 events._set_running_loop = self._set_running_loop_saved 2714 events.get_running_loop = self.get_running_loop_saved 2715 events.get_event_loop = self.get_event_loop_saved 2716 2717 asyncio._get_running_loop = self._get_running_loop_saved 2718 asyncio._set_running_loop = self._set_running_loop_saved 2719 asyncio.get_running_loop = self.get_running_loop_saved 2720 asyncio.get_event_loop = self.get_event_loop_saved 2721 2722 if sys.platform != 'win32': 2723 2724 def test_get_event_loop_new_process(self): 2725 # bpo-32126: The multiprocessing module used by 2726 # ProcessPoolExecutor is not functional when the 2727 # multiprocessing.synchronize module cannot be imported. 2728 support.skip_if_broken_multiprocessing_synchronize() 2729 2730 async def main(): 2731 pool = concurrent.futures.ProcessPoolExecutor() 2732 result = await self.loop.run_in_executor( 2733 pool, _test_get_event_loop_new_process__sub_proc) 2734 pool.shutdown() 2735 return result 2736 2737 self.assertEqual( 2738 self.loop.run_until_complete(main()), 2739 'hello') 2740 2741 def test_get_event_loop_returns_running_loop(self): 2742 class TestError(Exception): 2743 pass 2744 2745 class Policy(asyncio.DefaultEventLoopPolicy): 2746 def get_event_loop(self): 2747 raise TestError 2748 2749 old_policy = asyncio.get_event_loop_policy() 2750 try: 2751 asyncio.set_event_loop_policy(Policy()) 2752 loop = asyncio.new_event_loop() 2753 2754 with self.assertRaises(TestError): 2755 asyncio.get_event_loop() 2756 asyncio.set_event_loop(None) 2757 with self.assertRaises(TestError): 2758 asyncio.get_event_loop() 2759 2760 with self.assertRaisesRegex(RuntimeError, 'no running'): 2761 asyncio.get_running_loop() 2762 self.assertIs(asyncio._get_running_loop(), None) 2763 2764 async def func(): 2765 self.assertIs(asyncio.get_event_loop(), loop) 2766 self.assertIs(asyncio.get_running_loop(), loop) 2767 self.assertIs(asyncio._get_running_loop(), loop) 2768 2769 loop.run_until_complete(func()) 2770 2771 asyncio.set_event_loop(loop) 2772 with self.assertRaises(TestError): 2773 asyncio.get_event_loop() 2774 asyncio.set_event_loop(None) 2775 with self.assertRaises(TestError): 2776 asyncio.get_event_loop() 2777 2778 finally: 2779 asyncio.set_event_loop_policy(old_policy) 2780 if loop is not None: 2781 loop.close() 2782 2783 with self.assertRaisesRegex(RuntimeError, 'no running'): 2784 asyncio.get_running_loop() 2785 2786 self.assertIs(asyncio._get_running_loop(), None) 2787 2788 def test_get_event_loop_returns_running_loop2(self): 2789 old_policy = asyncio.get_event_loop_policy() 2790 try: 2791 asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) 2792 loop = asyncio.new_event_loop() 2793 self.addCleanup(loop.close) 2794 2795 loop2 = asyncio.get_event_loop() 2796 self.addCleanup(loop2.close) 2797 asyncio.set_event_loop(None) 2798 with self.assertRaisesRegex(RuntimeError, 'no current'): 2799 asyncio.get_event_loop() 2800 2801 with self.assertRaisesRegex(RuntimeError, 'no running'): 2802 asyncio.get_running_loop() 2803 self.assertIs(asyncio._get_running_loop(), None) 2804 2805 async def func(): 2806 self.assertIs(asyncio.get_event_loop(), loop) 2807 self.assertIs(asyncio.get_running_loop(), loop) 2808 self.assertIs(asyncio._get_running_loop(), loop) 2809 2810 loop.run_until_complete(func()) 2811 2812 asyncio.set_event_loop(loop) 2813 self.assertIs(asyncio.get_event_loop(), loop) 2814 2815 asyncio.set_event_loop(None) 2816 with self.assertRaisesRegex(RuntimeError, 'no current'): 2817 asyncio.get_event_loop() 2818 2819 finally: 2820 asyncio.set_event_loop_policy(old_policy) 2821 if loop is not None: 2822 loop.close() 2823 2824 with self.assertRaisesRegex(RuntimeError, 'no running'): 2825 asyncio.get_running_loop() 2826 2827 self.assertIs(asyncio._get_running_loop(), None) 2828 2829 2830class TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2831 2832 _get_running_loop_impl = events._py__get_running_loop 2833 _set_running_loop_impl = events._py__set_running_loop 2834 get_running_loop_impl = events._py_get_running_loop 2835 get_event_loop_impl = events._py_get_event_loop 2836 2837 2838try: 2839 import _asyncio # NoQA 2840except ImportError: 2841 pass 2842else: 2843 2844 class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 2845 2846 _get_running_loop_impl = events._c__get_running_loop 2847 _set_running_loop_impl = events._c__set_running_loop 2848 get_running_loop_impl = events._c_get_running_loop 2849 get_event_loop_impl = events._c_get_event_loop 2850 2851 2852class TestServer(unittest.TestCase): 2853 2854 def test_get_loop(self): 2855 loop = asyncio.new_event_loop() 2856 self.addCleanup(loop.close) 2857 proto = MyProto(loop) 2858 server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 2859 self.assertEqual(server.get_loop(), loop) 2860 server.close() 2861 loop.run_until_complete(server.wait_closed()) 2862 2863 2864class TestAbstractServer(unittest.TestCase): 2865 2866 def test_close(self): 2867 with self.assertRaises(NotImplementedError): 2868 events.AbstractServer().close() 2869 2870 def test_wait_closed(self): 2871 loop = asyncio.new_event_loop() 2872 self.addCleanup(loop.close) 2873 2874 with self.assertRaises(NotImplementedError): 2875 loop.run_until_complete(events.AbstractServer().wait_closed()) 2876 2877 def test_get_loop(self): 2878 with self.assertRaises(NotImplementedError): 2879 events.AbstractServer().get_loop() 2880 2881 2882if __name__ == '__main__': 2883 unittest.main() 2884