1.. currentmodule:: asyncio 2 3 4.. _asyncio-transports-protocols: 5 6 7======================== 8Transports and Protocols 9======================== 10 11.. rubric:: Preface 12 13Transports and Protocols are used by the **low-level** event loop 14APIs such as :meth:`loop.create_connection`. They use 15callback-based programming style and enable high-performance 16implementations of network or IPC protocols (e.g. HTTP). 17 18Essentially, transports and protocols should only be used in 19libraries and frameworks and never in high-level asyncio 20applications. 21 22This documentation page covers both `Transports`_ and `Protocols`_. 23 24.. rubric:: Introduction 25 26At the highest level, the transport is concerned with *how* bytes 27are transmitted, while the protocol determines *which* bytes to 28transmit (and to some extent when). 29 30A different way of saying the same thing: a transport is an 31abstraction for a socket (or similar I/O endpoint) while a protocol 32is an abstraction for an application, from the transport's point 33of view. 34 35Yet another view is the transport and protocol interfaces 36together define an abstract interface for using network I/O and 37interprocess I/O. 38 39There is always a 1:1 relationship between transport and protocol 40objects: the protocol calls transport methods to send data, 41while the transport calls protocol methods to pass it data that 42has been received. 43 44Most of connection oriented event loop methods 45(such as :meth:`loop.create_connection`) usually accept a 46*protocol_factory* argument used to create a *Protocol* object 47for an accepted connection, represented by a *Transport* object. 48Such methods usually return a tuple of ``(transport, protocol)``. 49 50.. rubric:: Contents 51 52This documentation page contains the following sections: 53 54* The `Transports`_ section documents asyncio :class:`BaseTransport`, 55 :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`, 56 :class:`DatagramTransport`, and :class:`SubprocessTransport` 57 classes. 58 59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`, 60 :class:`Protocol`, :class:`BufferedProtocol`, 61 :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes. 62 63* The `Examples`_ section showcases how to work with transports, 64 protocols, and low-level event loop APIs. 65 66 67.. _asyncio-transport: 68 69Transports 70========== 71 72**Source code:** :source:`Lib/asyncio/transports.py` 73 74---------------------------------------------------- 75 76Transports are classes provided by :mod:`asyncio` in order to abstract 77various kinds of communication channels. 78 79Transport objects are always instantiated by an 80:ref:`asyncio event loop <asyncio-event-loop>`. 81 82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes. 83The methods available on a transport depend on the transport's kind. 84 85The transport classes are :ref:`not thread safe <asyncio-multithreading>`. 86 87 88Transports Hierarchy 89-------------------- 90 91.. class:: BaseTransport 92 93 Base class for all transports. Contains methods that all 94 asyncio transports share. 95 96.. class:: WriteTransport(BaseTransport) 97 98 A base transport for write-only connections. 99 100 Instances of the *WriteTransport* class are returned from 101 the :meth:`loop.connect_write_pipe` event loop method and 102 are also used by subprocess-related methods like 103 :meth:`loop.subprocess_exec`. 104 105.. class:: ReadTransport(BaseTransport) 106 107 A base transport for read-only connections. 108 109 Instances of the *ReadTransport* class are returned from 110 the :meth:`loop.connect_read_pipe` event loop method and 111 are also used by subprocess-related methods like 112 :meth:`loop.subprocess_exec`. 113 114.. class:: Transport(WriteTransport, ReadTransport) 115 116 Interface representing a bidirectional transport, such as a 117 TCP connection. 118 119 The user does not instantiate a transport directly; they call a 120 utility function, passing it a protocol factory and other 121 information necessary to create the transport and protocol. 122 123 Instances of the *Transport* class are returned from or used by 124 event loop methods like :meth:`loop.create_connection`, 125 :meth:`loop.create_unix_connection`, 126 :meth:`loop.create_server`, :meth:`loop.sendfile`, etc. 127 128 129.. class:: DatagramTransport(BaseTransport) 130 131 A transport for datagram (UDP) connections. 132 133 Instances of the *DatagramTransport* class are returned from 134 the :meth:`loop.create_datagram_endpoint` event loop method. 135 136 137.. class:: SubprocessTransport(BaseTransport) 138 139 An abstraction to represent a connection between a parent and its 140 child OS process. 141 142 Instances of the *SubprocessTransport* class are returned from 143 event loop methods :meth:`loop.subprocess_shell` and 144 :meth:`loop.subprocess_exec`. 145 146 147Base Transport 148-------------- 149 150.. method:: BaseTransport.close() 151 152 Close the transport. 153 154 If the transport has a buffer for outgoing 155 data, buffered data will be flushed asynchronously. No more data 156 will be received. After all buffered data is flushed, the 157 protocol's :meth:`protocol.connection_lost() 158 <BaseProtocol.connection_lost>` method will be called with 159 :const:`None` as its argument. The transport should not be 160 used once it is closed. 161 162.. method:: BaseTransport.is_closing() 163 164 Return ``True`` if the transport is closing or is closed. 165 166.. method:: BaseTransport.get_extra_info(name, default=None) 167 168 Return information about the transport or underlying resources 169 it uses. 170 171 *name* is a string representing the piece of transport-specific 172 information to get. 173 174 *default* is the value to return if the information is not 175 available, or if the transport does not support querying it 176 with the given third-party event loop implementation or on the 177 current platform. 178 179 For example, the following code attempts to get the underlying 180 socket object of the transport:: 181 182 sock = transport.get_extra_info('socket') 183 if sock is not None: 184 print(sock.getsockopt(...)) 185 186 Categories of information that can be queried on some transports: 187 188 * socket: 189 190 - ``'peername'``: the remote address to which the socket is 191 connected, result of :meth:`socket.socket.getpeername` 192 (``None`` on error) 193 194 - ``'socket'``: :class:`socket.socket` instance 195 196 - ``'sockname'``: the socket's own address, 197 result of :meth:`socket.socket.getsockname` 198 199 * SSL socket: 200 201 - ``'compression'``: the compression algorithm being used as a 202 string, or ``None`` if the connection isn't compressed; result 203 of :meth:`ssl.SSLSocket.compression` 204 205 - ``'cipher'``: a three-value tuple containing the name of the 206 cipher being used, the version of the SSL protocol that defines 207 its use, and the number of secret bits being used; result of 208 :meth:`ssl.SSLSocket.cipher` 209 210 - ``'peercert'``: peer certificate; result of 211 :meth:`ssl.SSLSocket.getpeercert` 212 213 - ``'sslcontext'``: :class:`ssl.SSLContext` instance 214 215 - ``'ssl_object'``: :class:`ssl.SSLObject` or 216 :class:`ssl.SSLSocket` instance 217 218 * pipe: 219 220 - ``'pipe'``: pipe object 221 222 * subprocess: 223 224 - ``'subprocess'``: :class:`subprocess.Popen` instance 225 226.. method:: BaseTransport.set_protocol(protocol) 227 228 Set a new protocol. 229 230 Switching protocol should only be done when both 231 protocols are documented to support the switch. 232 233.. method:: BaseTransport.get_protocol() 234 235 Return the current protocol. 236 237 238Read-only Transports 239-------------------- 240 241.. method:: ReadTransport.is_reading() 242 243 Return ``True`` if the transport is receiving new data. 244 245 .. versionadded:: 3.7 246 247.. method:: ReadTransport.pause_reading() 248 249 Pause the receiving end of the transport. No data will be passed to 250 the protocol's :meth:`protocol.data_received() <Protocol.data_received>` 251 method until :meth:`resume_reading` is called. 252 253 .. versionchanged:: 3.7 254 The method is idempotent, i.e. it can be called when the 255 transport is already paused or closed. 256 257.. method:: ReadTransport.resume_reading() 258 259 Resume the receiving end. The protocol's 260 :meth:`protocol.data_received() <Protocol.data_received>` method 261 will be called once again if some data is available for reading. 262 263 .. versionchanged:: 3.7 264 The method is idempotent, i.e. it can be called when the 265 transport is already reading. 266 267 268Write-only Transports 269--------------------- 270 271.. method:: WriteTransport.abort() 272 273 Close the transport immediately, without waiting for pending operations 274 to complete. Buffered data will be lost. No more data will be received. 275 The protocol's :meth:`protocol.connection_lost() 276 <BaseProtocol.connection_lost>` method will eventually be 277 called with :const:`None` as its argument. 278 279.. method:: WriteTransport.can_write_eof() 280 281 Return :const:`True` if the transport supports 282 :meth:`~WriteTransport.write_eof`, :const:`False` if not. 283 284.. method:: WriteTransport.get_write_buffer_size() 285 286 Return the current size of the output buffer used by the transport. 287 288.. method:: WriteTransport.get_write_buffer_limits() 289 290 Get the *high* and *low* watermarks for write flow control. Return a 291 tuple ``(low, high)`` where *low* and *high* are positive number of 292 bytes. 293 294 Use :meth:`set_write_buffer_limits` to set the limits. 295 296 .. versionadded:: 3.4.2 297 298.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None) 299 300 Set the *high* and *low* watermarks for write flow control. 301 302 These two values (measured in number of 303 bytes) control when the protocol's 304 :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>` 305 and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>` 306 methods are called. If specified, the low watermark must be less 307 than or equal to the high watermark. Neither *high* nor *low* 308 can be negative. 309 310 :meth:`~BaseProtocol.pause_writing` is called when the buffer size 311 becomes greater than or equal to the *high* value. If writing has 312 been paused, :meth:`~BaseProtocol.resume_writing` is called when 313 the buffer size becomes less than or equal to the *low* value. 314 315 The defaults are implementation-specific. If only the 316 high watermark is given, the low watermark defaults to an 317 implementation-specific value less than or equal to the 318 high watermark. Setting *high* to zero forces *low* to zero as 319 well, and causes :meth:`~BaseProtocol.pause_writing` to be called 320 whenever the buffer becomes non-empty. Setting *low* to zero causes 321 :meth:`~BaseProtocol.resume_writing` to be called only once the 322 buffer is empty. Use of zero for either limit is generally 323 sub-optimal as it reduces opportunities for doing I/O and 324 computation concurrently. 325 326 Use :meth:`~WriteTransport.get_write_buffer_limits` 327 to get the limits. 328 329.. method:: WriteTransport.write(data) 330 331 Write some *data* bytes to the transport. 332 333 This method does not block; it buffers the data and arranges for it 334 to be sent out asynchronously. 335 336.. method:: WriteTransport.writelines(list_of_data) 337 338 Write a list (or any iterable) of data bytes to the transport. 339 This is functionally equivalent to calling :meth:`write` on each 340 element yielded by the iterable, but may be implemented more 341 efficiently. 342 343.. method:: WriteTransport.write_eof() 344 345 Close the write end of the transport after flushing all buffered data. 346 Data may still be received. 347 348 This method can raise :exc:`NotImplementedError` if the transport 349 (e.g. SSL) doesn't support half-closed connections. 350 351 352Datagram Transports 353------------------- 354 355.. method:: DatagramTransport.sendto(data, addr=None) 356 357 Send the *data* bytes to the remote peer given by *addr* (a 358 transport-dependent target address). If *addr* is :const:`None`, 359 the data is sent to the target address given on transport 360 creation. 361 362 This method does not block; it buffers the data and arranges 363 for it to be sent out asynchronously. 364 365.. method:: DatagramTransport.abort() 366 367 Close the transport immediately, without waiting for pending 368 operations to complete. Buffered data will be lost. 369 No more data will be received. The protocol's 370 :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>` 371 method will eventually be called with :const:`None` as its argument. 372 373 374.. _asyncio-subprocess-transports: 375 376Subprocess Transports 377--------------------- 378 379.. method:: SubprocessTransport.get_pid() 380 381 Return the subprocess process id as an integer. 382 383.. method:: SubprocessTransport.get_pipe_transport(fd) 384 385 Return the transport for the communication pipe corresponding to the 386 integer file descriptor *fd*: 387 388 * ``0``: readable streaming transport of the standard input (*stdin*), 389 or :const:`None` if the subprocess was not created with ``stdin=PIPE`` 390 * ``1``: writable streaming transport of the standard output (*stdout*), 391 or :const:`None` if the subprocess was not created with ``stdout=PIPE`` 392 * ``2``: writable streaming transport of the standard error (*stderr*), 393 or :const:`None` if the subprocess was not created with ``stderr=PIPE`` 394 * other *fd*: :const:`None` 395 396.. method:: SubprocessTransport.get_returncode() 397 398 Return the subprocess return code as an integer or :const:`None` 399 if it hasn't returned, which is similar to the 400 :attr:`subprocess.Popen.returncode` attribute. 401 402.. method:: SubprocessTransport.kill() 403 404 Kill the subprocess. 405 406 On POSIX systems, the function sends SIGKILL to the subprocess. 407 On Windows, this method is an alias for :meth:`terminate`. 408 409 See also :meth:`subprocess.Popen.kill`. 410 411.. method:: SubprocessTransport.send_signal(signal) 412 413 Send the *signal* number to the subprocess, as in 414 :meth:`subprocess.Popen.send_signal`. 415 416.. method:: SubprocessTransport.terminate() 417 418 Stop the subprocess. 419 420 On POSIX systems, this method sends SIGTERM to the subprocess. 421 On Windows, the Windows API function TerminateProcess() is called to 422 stop the subprocess. 423 424 See also :meth:`subprocess.Popen.terminate`. 425 426.. method:: SubprocessTransport.close() 427 428 Kill the subprocess by calling the :meth:`kill` method. 429 430 If the subprocess hasn't returned yet, and close transports of 431 *stdin*, *stdout*, and *stderr* pipes. 432 433 434.. _asyncio-protocol: 435 436Protocols 437========= 438 439**Source code:** :source:`Lib/asyncio/protocols.py` 440 441--------------------------------------------------- 442 443asyncio provides a set of abstract base classes that should be used 444to implement network protocols. Those classes are meant to be used 445together with :ref:`transports <asyncio-transport>`. 446 447Subclasses of abstract base protocol classes may implement some or 448all methods. All these methods are callbacks: they are called by 449transports on certain events, for example when some data is received. 450A base protocol method should be called by the corresponding transport. 451 452 453Base Protocols 454-------------- 455 456.. class:: BaseProtocol 457 458 Base protocol with methods that all protocols share. 459 460.. class:: Protocol(BaseProtocol) 461 462 The base class for implementing streaming protocols 463 (TCP, Unix sockets, etc). 464 465.. class:: BufferedProtocol(BaseProtocol) 466 467 A base class for implementing streaming protocols with manual 468 control of the receive buffer. 469 470.. class:: DatagramProtocol(BaseProtocol) 471 472 The base class for implementing datagram (UDP) protocols. 473 474.. class:: SubprocessProtocol(BaseProtocol) 475 476 The base class for implementing protocols communicating with child 477 processes (unidirectional pipes). 478 479 480Base Protocol 481------------- 482 483All asyncio protocols can implement Base Protocol callbacks. 484 485.. rubric:: Connection Callbacks 486 487Connection callbacks are called on all protocols, exactly once per 488a successful connection. All other protocol callbacks can only be 489called between those two methods. 490 491.. method:: BaseProtocol.connection_made(transport) 492 493 Called when a connection is made. 494 495 The *transport* argument is the transport representing the 496 connection. The protocol is responsible for storing the reference 497 to its transport. 498 499.. method:: BaseProtocol.connection_lost(exc) 500 501 Called when the connection is lost or closed. 502 503 The argument is either an exception object or :const:`None`. 504 The latter means a regular EOF is received, or the connection was 505 aborted or closed by this side of the connection. 506 507 508.. rubric:: Flow Control Callbacks 509 510Flow control callbacks can be called by transports to pause or 511resume writing performed by the protocol. 512 513See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits` 514method for more details. 515 516.. method:: BaseProtocol.pause_writing() 517 518 Called when the transport's buffer goes over the high watermark. 519 520.. method:: BaseProtocol.resume_writing() 521 522 Called when the transport's buffer drains below the low watermark. 523 524If the buffer size equals the high watermark, 525:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must 526go strictly over. 527 528Conversely, :meth:`~BaseProtocol.resume_writing` is called when the 529buffer size is equal or lower than the low watermark. These end 530conditions are important to ensure that things go as expected when 531either mark is zero. 532 533 534Streaming Protocols 535------------------- 536 537Event methods, such as :meth:`loop.create_server`, 538:meth:`loop.create_unix_server`, :meth:`loop.create_connection`, 539:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`, 540:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe` 541accept factories that return streaming protocols. 542 543.. method:: Protocol.data_received(data) 544 545 Called when some data is received. *data* is a non-empty bytes 546 object containing the incoming data. 547 548 Whether the data is buffered, chunked or reassembled depends on 549 the transport. In general, you shouldn't rely on specific semantics 550 and instead make your parsing generic and flexible. However, 551 data is always received in the correct order. 552 553 The method can be called an arbitrary number of times while 554 a connection is open. 555 556 However, :meth:`protocol.eof_received() <Protocol.eof_received>` 557 is called at most once. Once ``eof_received()`` is called, 558 ``data_received()`` is not called anymore. 559 560.. method:: Protocol.eof_received() 561 562 Called when the other end signals it won't send any more data 563 (for example by calling :meth:`transport.write_eof() 564 <WriteTransport.write_eof>`, if the other end also uses 565 asyncio). 566 567 This method may return a false value (including ``None``), in which case 568 the transport will close itself. Conversely, if this method returns a 569 true value, the protocol used determines whether to close the transport. 570 Since the default implementation returns ``None``, it implicitly closes the 571 connection. 572 573 Some transports, including SSL, don't support half-closed connections, 574 in which case returning true from this method will result in the connection 575 being closed. 576 577 578State machine: 579 580.. code-block:: none 581 582 start -> connection_made 583 [-> data_received]* 584 [-> eof_received]? 585 -> connection_lost -> end 586 587 588Buffered Streaming Protocols 589---------------------------- 590 591.. versionadded:: 3.7 592 593Buffered Protocols can be used with any event loop method 594that supports `Streaming Protocols`_. 595 596``BufferedProtocol`` implementations allow explicit manual allocation 597and control of the receive buffer. Event loops can then use the buffer 598provided by the protocol to avoid unnecessary data copies. This 599can result in noticeable performance improvement for protocols that 600receive big amounts of data. Sophisticated protocol implementations 601can significantly reduce the number of buffer allocations. 602 603The following callbacks are called on :class:`BufferedProtocol` 604instances: 605 606.. method:: BufferedProtocol.get_buffer(sizehint) 607 608 Called to allocate a new receive buffer. 609 610 *sizehint* is the recommended minimum size for the returned 611 buffer. It is acceptable to return smaller or larger buffers 612 than what *sizehint* suggests. When set to -1, the buffer size 613 can be arbitrary. It is an error to return a buffer with a zero size. 614 615 ``get_buffer()`` must return an object implementing the 616 :ref:`buffer protocol <bufferobjects>`. 617 618.. method:: BufferedProtocol.buffer_updated(nbytes) 619 620 Called when the buffer was updated with the received data. 621 622 *nbytes* is the total number of bytes that were written to the buffer. 623 624.. method:: BufferedProtocol.eof_received() 625 626 See the documentation of the :meth:`protocol.eof_received() 627 <Protocol.eof_received>` method. 628 629 630:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number 631of times during a connection. However, :meth:`protocol.eof_received() 632<Protocol.eof_received>` is called at most once 633and, if called, :meth:`~BufferedProtocol.get_buffer` and 634:meth:`~BufferedProtocol.buffer_updated` won't be called after it. 635 636State machine: 637 638.. code-block:: none 639 640 start -> connection_made 641 [-> get_buffer 642 [-> buffer_updated]? 643 ]* 644 [-> eof_received]? 645 -> connection_lost -> end 646 647 648Datagram Protocols 649------------------ 650 651Datagram Protocol instances should be constructed by protocol 652factories passed to the :meth:`loop.create_datagram_endpoint` method. 653 654.. method:: DatagramProtocol.datagram_received(data, addr) 655 656 Called when a datagram is received. *data* is a bytes object containing 657 the incoming data. *addr* is the address of the peer sending the data; 658 the exact format depends on the transport. 659 660.. method:: DatagramProtocol.error_received(exc) 661 662 Called when a previous send or receive operation raises an 663 :class:`OSError`. *exc* is the :class:`OSError` instance. 664 665 This method is called in rare conditions, when the transport (e.g. UDP) 666 detects that a datagram could not be delivered to its recipient. 667 In many conditions though, undeliverable datagrams will be silently 668 dropped. 669 670.. note:: 671 672 On BSD systems (macOS, FreeBSD, etc.) flow control is not supported 673 for datagram protocols, because there is no reliable way to detect send 674 failures caused by writing too many packets. 675 676 The socket always appears 'ready' and excess packets are dropped. An 677 :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may 678 or may not be raised; if it is raised, it will be reported to 679 :meth:`DatagramProtocol.error_received` but otherwise ignored. 680 681 682.. _asyncio-subprocess-protocols: 683 684Subprocess Protocols 685-------------------- 686 687Subprocess Protocol instances should be constructed by protocol 688factories passed to the :meth:`loop.subprocess_exec` and 689:meth:`loop.subprocess_shell` methods. 690 691.. method:: SubprocessProtocol.pipe_data_received(fd, data) 692 693 Called when the child process writes data into its stdout or stderr 694 pipe. 695 696 *fd* is the integer file descriptor of the pipe. 697 698 *data* is a non-empty bytes object containing the received data. 699 700.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc) 701 702 Called when one of the pipes communicating with the child process 703 is closed. 704 705 *fd* is the integer file descriptor that was closed. 706 707.. method:: SubprocessProtocol.process_exited() 708 709 Called when the child process has exited. 710 711 712Examples 713======== 714 715.. _asyncio_example_tcp_echo_server_protocol: 716 717TCP Echo Server 718--------------- 719 720Create a TCP echo server using the :meth:`loop.create_server` method, send back 721received data, and close the connection:: 722 723 import asyncio 724 725 726 class EchoServerProtocol(asyncio.Protocol): 727 def connection_made(self, transport): 728 peername = transport.get_extra_info('peername') 729 print('Connection from {}'.format(peername)) 730 self.transport = transport 731 732 def data_received(self, data): 733 message = data.decode() 734 print('Data received: {!r}'.format(message)) 735 736 print('Send: {!r}'.format(message)) 737 self.transport.write(data) 738 739 print('Close the client socket') 740 self.transport.close() 741 742 743 async def main(): 744 # Get a reference to the event loop as we plan to use 745 # low-level APIs. 746 loop = asyncio.get_running_loop() 747 748 server = await loop.create_server( 749 lambda: EchoServerProtocol(), 750 '127.0.0.1', 8888) 751 752 async with server: 753 await server.serve_forever() 754 755 756 asyncio.run(main()) 757 758 759.. seealso:: 760 761 The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>` 762 example uses the high-level :func:`asyncio.start_server` function. 763 764.. _asyncio_example_tcp_echo_client_protocol: 765 766TCP Echo Client 767--------------- 768 769A TCP echo client using the :meth:`loop.create_connection` method, sends 770data, and waits until the connection is closed:: 771 772 import asyncio 773 774 775 class EchoClientProtocol(asyncio.Protocol): 776 def __init__(self, message, on_con_lost): 777 self.message = message 778 self.on_con_lost = on_con_lost 779 780 def connection_made(self, transport): 781 transport.write(self.message.encode()) 782 print('Data sent: {!r}'.format(self.message)) 783 784 def data_received(self, data): 785 print('Data received: {!r}'.format(data.decode())) 786 787 def connection_lost(self, exc): 788 print('The server closed the connection') 789 self.on_con_lost.set_result(True) 790 791 792 async def main(): 793 # Get a reference to the event loop as we plan to use 794 # low-level APIs. 795 loop = asyncio.get_running_loop() 796 797 on_con_lost = loop.create_future() 798 message = 'Hello World!' 799 800 transport, protocol = await loop.create_connection( 801 lambda: EchoClientProtocol(message, on_con_lost), 802 '127.0.0.1', 8888) 803 804 # Wait until the protocol signals that the connection 805 # is lost and close the transport. 806 try: 807 await on_con_lost 808 finally: 809 transport.close() 810 811 812 asyncio.run(main()) 813 814 815.. seealso:: 816 817 The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` 818 example uses the high-level :func:`asyncio.open_connection` function. 819 820 821.. _asyncio-udp-echo-server-protocol: 822 823UDP Echo Server 824--------------- 825 826A UDP echo server, using the :meth:`loop.create_datagram_endpoint` 827method, sends back received data:: 828 829 import asyncio 830 831 832 class EchoServerProtocol: 833 def connection_made(self, transport): 834 self.transport = transport 835 836 def datagram_received(self, data, addr): 837 message = data.decode() 838 print('Received %r from %s' % (message, addr)) 839 print('Send %r to %s' % (message, addr)) 840 self.transport.sendto(data, addr) 841 842 843 async def main(): 844 print("Starting UDP server") 845 846 # Get a reference to the event loop as we plan to use 847 # low-level APIs. 848 loop = asyncio.get_running_loop() 849 850 # One protocol instance will be created to serve all 851 # client requests. 852 transport, protocol = await loop.create_datagram_endpoint( 853 lambda: EchoServerProtocol(), 854 local_addr=('127.0.0.1', 9999)) 855 856 try: 857 await asyncio.sleep(3600) # Serve for 1 hour. 858 finally: 859 transport.close() 860 861 862 asyncio.run(main()) 863 864 865.. _asyncio-udp-echo-client-protocol: 866 867UDP Echo Client 868--------------- 869 870A UDP echo client, using the :meth:`loop.create_datagram_endpoint` 871method, sends data and closes the transport when it receives the answer:: 872 873 import asyncio 874 875 876 class EchoClientProtocol: 877 def __init__(self, message, on_con_lost): 878 self.message = message 879 self.on_con_lost = on_con_lost 880 self.transport = None 881 882 def connection_made(self, transport): 883 self.transport = transport 884 print('Send:', self.message) 885 self.transport.sendto(self.message.encode()) 886 887 def datagram_received(self, data, addr): 888 print("Received:", data.decode()) 889 890 print("Close the socket") 891 self.transport.close() 892 893 def error_received(self, exc): 894 print('Error received:', exc) 895 896 def connection_lost(self, exc): 897 print("Connection closed") 898 self.on_con_lost.set_result(True) 899 900 901 async def main(): 902 # Get a reference to the event loop as we plan to use 903 # low-level APIs. 904 loop = asyncio.get_running_loop() 905 906 on_con_lost = loop.create_future() 907 message = "Hello World!" 908 909 transport, protocol = await loop.create_datagram_endpoint( 910 lambda: EchoClientProtocol(message, on_con_lost), 911 remote_addr=('127.0.0.1', 9999)) 912 913 try: 914 await on_con_lost 915 finally: 916 transport.close() 917 918 919 asyncio.run(main()) 920 921 922.. _asyncio_example_create_connection: 923 924Connecting Existing Sockets 925--------------------------- 926 927Wait until a socket receives data using the 928:meth:`loop.create_connection` method with a protocol:: 929 930 import asyncio 931 import socket 932 933 934 class MyProtocol(asyncio.Protocol): 935 936 def __init__(self, on_con_lost): 937 self.transport = None 938 self.on_con_lost = on_con_lost 939 940 def connection_made(self, transport): 941 self.transport = transport 942 943 def data_received(self, data): 944 print("Received:", data.decode()) 945 946 # We are done: close the transport; 947 # connection_lost() will be called automatically. 948 self.transport.close() 949 950 def connection_lost(self, exc): 951 # The socket has been closed 952 self.on_con_lost.set_result(True) 953 954 955 async def main(): 956 # Get a reference to the event loop as we plan to use 957 # low-level APIs. 958 loop = asyncio.get_running_loop() 959 on_con_lost = loop.create_future() 960 961 # Create a pair of connected sockets 962 rsock, wsock = socket.socketpair() 963 964 # Register the socket to wait for data. 965 transport, protocol = await loop.create_connection( 966 lambda: MyProtocol(on_con_lost), sock=rsock) 967 968 # Simulate the reception of data from the network. 969 loop.call_soon(wsock.send, 'abc'.encode()) 970 971 try: 972 await protocol.on_con_lost 973 finally: 974 transport.close() 975 wsock.close() 976 977 asyncio.run(main()) 978 979.. seealso:: 980 981 The :ref:`watch a file descriptor for read events 982 <asyncio_example_watch_fd>` example uses the low-level 983 :meth:`loop.add_reader` method to register an FD. 984 985 The :ref:`register an open socket to wait for data using streams 986 <asyncio_example_create_connection-streams>` example uses high-level streams 987 created by the :func:`open_connection` function in a coroutine. 988 989.. _asyncio_example_subprocess_proto: 990 991loop.subprocess_exec() and SubprocessProtocol 992--------------------------------------------- 993 994An example of a subprocess protocol used to get the output of a 995subprocess and to wait for the subprocess exit. 996 997The subprocess is created by the :meth:`loop.subprocess_exec` method:: 998 999 import asyncio 1000 import sys 1001 1002 class DateProtocol(asyncio.SubprocessProtocol): 1003 def __init__(self, exit_future): 1004 self.exit_future = exit_future 1005 self.output = bytearray() 1006 1007 def pipe_data_received(self, fd, data): 1008 self.output.extend(data) 1009 1010 def process_exited(self): 1011 self.exit_future.set_result(True) 1012 1013 async def get_date(): 1014 # Get a reference to the event loop as we plan to use 1015 # low-level APIs. 1016 loop = asyncio.get_running_loop() 1017 1018 code = 'import datetime; print(datetime.datetime.now())' 1019 exit_future = asyncio.Future(loop=loop) 1020 1021 # Create the subprocess controlled by DateProtocol; 1022 # redirect the standard output into a pipe. 1023 transport, protocol = await loop.subprocess_exec( 1024 lambda: DateProtocol(exit_future), 1025 sys.executable, '-c', code, 1026 stdin=None, stderr=None) 1027 1028 # Wait for the subprocess exit using the process_exited() 1029 # method of the protocol. 1030 await exit_future 1031 1032 # Close the stdout pipe. 1033 transport.close() 1034 1035 # Read the output which was collected by the 1036 # pipe_data_received() method of the protocol. 1037 data = bytes(protocol.output) 1038 return data.decode('ascii').rstrip() 1039 1040 date = asyncio.run(get_date()) 1041 print(f"Current date: {date}") 1042 1043See also the :ref:`same example <asyncio_example_create_subprocess_exec>` 1044written using high-level APIs. 1045