1.. currentmodule:: asyncio
2
3
4.. _asyncio-transports-protocols:
5
6
7========================
8Transports and Protocols
9========================
10
11.. rubric:: Preface
12
13Transports and Protocols are used by the **low-level** event loop
14APIs such as :meth:`loop.create_connection`.  They use
15callback-based programming style and enable high-performance
16implementations of network or IPC protocols (e.g. HTTP).
17
18Essentially, transports and protocols should only be used in
19libraries and frameworks and never in high-level asyncio
20applications.
21
22This documentation page covers both `Transports`_ and `Protocols`_.
23
24.. rubric:: Introduction
25
26At the highest level, the transport is concerned with *how* bytes
27are transmitted, while the protocol determines *which* bytes to
28transmit (and to some extent when).
29
30A different way of saying the same thing: a transport is an
31abstraction for a socket (or similar I/O endpoint) while a protocol
32is an abstraction for an application, from the transport's point
33of view.
34
35Yet another view is the transport and protocol interfaces
36together define an abstract interface for using network I/O and
37interprocess I/O.
38
39There is always a 1:1 relationship between transport and protocol
40objects: the protocol calls transport methods to send data,
41while the transport calls protocol methods to pass it data that
42has been received.
43
44Most of connection oriented event loop methods
45(such as :meth:`loop.create_connection`) usually accept a
46*protocol_factory* argument used to create a *Protocol* object
47for an accepted connection, represented by a *Transport* object.
48Such methods usually return a tuple of ``(transport, protocol)``.
49
50.. rubric:: Contents
51
52This documentation page contains the following sections:
53
54* The `Transports`_ section documents asyncio :class:`BaseTransport`,
55  :class:`ReadTransport`, :class:`WriteTransport`, :class:`Transport`,
56  :class:`DatagramTransport`, and :class:`SubprocessTransport`
57  classes.
58
59* The `Protocols`_ section documents asyncio :class:`BaseProtocol`,
60  :class:`Protocol`, :class:`BufferedProtocol`,
61  :class:`DatagramProtocol`, and :class:`SubprocessProtocol` classes.
62
63* The `Examples`_ section showcases how to work with transports,
64  protocols, and low-level event loop APIs.
65
66
67.. _asyncio-transport:
68
69Transports
70==========
71
72**Source code:** :source:`Lib/asyncio/transports.py`
73
74----------------------------------------------------
75
76Transports are classes provided by :mod:`asyncio` in order to abstract
77various kinds of communication channels.
78
79Transport objects are always instantiated by an
80:ref:`asyncio event loop <asyncio-event-loop>`.
81
82asyncio implements transports for TCP, UDP, SSL, and subprocess pipes.
83The methods available on a transport depend on the transport's kind.
84
85The transport classes are :ref:`not thread safe <asyncio-multithreading>`.
86
87
88Transports Hierarchy
89--------------------
90
91.. class:: BaseTransport
92
93   Base class for all transports.  Contains methods that all
94   asyncio transports share.
95
96.. class:: WriteTransport(BaseTransport)
97
98   A base transport for write-only connections.
99
100   Instances of the *WriteTransport* class are returned from
101   the :meth:`loop.connect_write_pipe` event loop method and
102   are also used by subprocess-related methods like
103   :meth:`loop.subprocess_exec`.
104
105.. class:: ReadTransport(BaseTransport)
106
107   A base transport for read-only connections.
108
109   Instances of the *ReadTransport* class are returned from
110   the :meth:`loop.connect_read_pipe` event loop method and
111   are also used by subprocess-related methods like
112   :meth:`loop.subprocess_exec`.
113
114.. class:: Transport(WriteTransport, ReadTransport)
115
116   Interface representing a bidirectional transport, such as a
117   TCP connection.
118
119   The user does not instantiate a transport directly; they call a
120   utility function, passing it a protocol factory and other
121   information necessary to create the transport and protocol.
122
123   Instances of the *Transport* class are returned from or used by
124   event loop methods like :meth:`loop.create_connection`,
125   :meth:`loop.create_unix_connection`,
126   :meth:`loop.create_server`, :meth:`loop.sendfile`, etc.
127
128
129.. class:: DatagramTransport(BaseTransport)
130
131   A transport for datagram (UDP) connections.
132
133   Instances of the *DatagramTransport* class are returned from
134   the :meth:`loop.create_datagram_endpoint` event loop method.
135
136
137.. class:: SubprocessTransport(BaseTransport)
138
139   An abstraction to represent a connection between a parent and its
140   child OS process.
141
142   Instances of the *SubprocessTransport* class are returned from
143   event loop methods :meth:`loop.subprocess_shell` and
144   :meth:`loop.subprocess_exec`.
145
146
147Base Transport
148--------------
149
150.. method:: BaseTransport.close()
151
152   Close the transport.
153
154   If the transport has a buffer for outgoing
155   data, buffered data will be flushed asynchronously.  No more data
156   will be received.  After all buffered data is flushed, the
157   protocol's :meth:`protocol.connection_lost()
158   <BaseProtocol.connection_lost>` method will be called with
159   :const:`None` as its argument. The transport should not be
160   used once it is closed.
161
162.. method:: BaseTransport.is_closing()
163
164   Return ``True`` if the transport is closing or is closed.
165
166.. method:: BaseTransport.get_extra_info(name, default=None)
167
168   Return information about the transport or underlying resources
169   it uses.
170
171   *name* is a string representing the piece of transport-specific
172   information to get.
173
174   *default* is the value to return if the information is not
175   available, or if the transport does not support querying it
176   with the given third-party event loop implementation or on the
177   current platform.
178
179   For example, the following code attempts to get the underlying
180   socket object of the transport::
181
182      sock = transport.get_extra_info('socket')
183      if sock is not None:
184          print(sock.getsockopt(...))
185
186   Categories of information that can be queried on some transports:
187
188   * socket:
189
190     - ``'peername'``: the remote address to which the socket is
191       connected, result of :meth:`socket.socket.getpeername`
192       (``None`` on error)
193
194     - ``'socket'``: :class:`socket.socket` instance
195
196     - ``'sockname'``: the socket's own address,
197       result of :meth:`socket.socket.getsockname`
198
199   * SSL socket:
200
201     - ``'compression'``: the compression algorithm being used as a
202       string, or ``None`` if the connection isn't compressed; result
203       of :meth:`ssl.SSLSocket.compression`
204
205     - ``'cipher'``: a three-value tuple containing the name of the
206       cipher being used, the version of the SSL protocol that defines
207       its use, and the number of secret bits being used; result of
208       :meth:`ssl.SSLSocket.cipher`
209
210     - ``'peercert'``: peer certificate; result of
211       :meth:`ssl.SSLSocket.getpeercert`
212
213     - ``'sslcontext'``: :class:`ssl.SSLContext` instance
214
215     - ``'ssl_object'``: :class:`ssl.SSLObject` or
216       :class:`ssl.SSLSocket` instance
217
218   * pipe:
219
220     - ``'pipe'``: pipe object
221
222   * subprocess:
223
224     - ``'subprocess'``: :class:`subprocess.Popen` instance
225
226.. method:: BaseTransport.set_protocol(protocol)
227
228   Set a new protocol.
229
230   Switching protocol should only be done when both
231   protocols are documented to support the switch.
232
233.. method:: BaseTransport.get_protocol()
234
235   Return the current protocol.
236
237
238Read-only Transports
239--------------------
240
241.. method:: ReadTransport.is_reading()
242
243   Return ``True`` if the transport is receiving new data.
244
245   .. versionadded:: 3.7
246
247.. method:: ReadTransport.pause_reading()
248
249   Pause the receiving end of the transport.  No data will be passed to
250   the protocol's :meth:`protocol.data_received() <Protocol.data_received>`
251   method until :meth:`resume_reading` is called.
252
253   .. versionchanged:: 3.7
254      The method is idempotent, i.e. it can be called when the
255      transport is already paused or closed.
256
257.. method:: ReadTransport.resume_reading()
258
259   Resume the receiving end.  The protocol's
260   :meth:`protocol.data_received() <Protocol.data_received>` method
261   will be called once again if some data is available for reading.
262
263   .. versionchanged:: 3.7
264      The method is idempotent, i.e. it can be called when the
265      transport is already reading.
266
267
268Write-only Transports
269---------------------
270
271.. method:: WriteTransport.abort()
272
273   Close the transport immediately, without waiting for pending operations
274   to complete.  Buffered data will be lost.  No more data will be received.
275   The protocol's :meth:`protocol.connection_lost()
276   <BaseProtocol.connection_lost>` method will eventually be
277   called with :const:`None` as its argument.
278
279.. method:: WriteTransport.can_write_eof()
280
281   Return :const:`True` if the transport supports
282   :meth:`~WriteTransport.write_eof`, :const:`False` if not.
283
284.. method:: WriteTransport.get_write_buffer_size()
285
286   Return the current size of the output buffer used by the transport.
287
288.. method:: WriteTransport.get_write_buffer_limits()
289
290   Get the *high* and *low* watermarks for write flow control. Return a
291   tuple ``(low, high)`` where *low* and *high* are positive number of
292   bytes.
293
294   Use :meth:`set_write_buffer_limits` to set the limits.
295
296   .. versionadded:: 3.4.2
297
298.. method:: WriteTransport.set_write_buffer_limits(high=None, low=None)
299
300   Set the *high* and *low* watermarks for write flow control.
301
302   These two values (measured in number of
303   bytes) control when the protocol's
304   :meth:`protocol.pause_writing() <BaseProtocol.pause_writing>`
305   and :meth:`protocol.resume_writing() <BaseProtocol.resume_writing>`
306   methods are called. If specified, the low watermark must be less
307   than or equal to the high watermark.  Neither *high* nor *low*
308   can be negative.
309
310   :meth:`~BaseProtocol.pause_writing` is called when the buffer size
311   becomes greater than or equal to the *high* value. If writing has
312   been paused, :meth:`~BaseProtocol.resume_writing` is called when
313   the buffer size becomes less than or equal to the *low* value.
314
315   The defaults are implementation-specific.  If only the
316   high watermark is given, the low watermark defaults to an
317   implementation-specific value less than or equal to the
318   high watermark.  Setting *high* to zero forces *low* to zero as
319   well, and causes :meth:`~BaseProtocol.pause_writing` to be called
320   whenever the buffer becomes non-empty.  Setting *low* to zero causes
321   :meth:`~BaseProtocol.resume_writing` to be called only once the
322   buffer is empty. Use of zero for either limit is generally
323   sub-optimal as it reduces opportunities for doing I/O and
324   computation concurrently.
325
326   Use :meth:`~WriteTransport.get_write_buffer_limits`
327   to get the limits.
328
329.. method:: WriteTransport.write(data)
330
331   Write some *data* bytes to the transport.
332
333   This method does not block; it buffers the data and arranges for it
334   to be sent out asynchronously.
335
336.. method:: WriteTransport.writelines(list_of_data)
337
338   Write a list (or any iterable) of data bytes to the transport.
339   This is functionally equivalent to calling :meth:`write` on each
340   element yielded by the iterable, but may be implemented more
341   efficiently.
342
343.. method:: WriteTransport.write_eof()
344
345   Close the write end of the transport after flushing all buffered data.
346   Data may still be received.
347
348   This method can raise :exc:`NotImplementedError` if the transport
349   (e.g. SSL) doesn't support half-closed connections.
350
351
352Datagram Transports
353-------------------
354
355.. method:: DatagramTransport.sendto(data, addr=None)
356
357   Send the *data* bytes to the remote peer given by *addr* (a
358   transport-dependent target address).  If *addr* is :const:`None`,
359   the data is sent to the target address given on transport
360   creation.
361
362   This method does not block; it buffers the data and arranges
363   for it to be sent out asynchronously.
364
365.. method:: DatagramTransport.abort()
366
367   Close the transport immediately, without waiting for pending
368   operations to complete.  Buffered data will be lost.
369   No more data will be received.  The protocol's
370   :meth:`protocol.connection_lost() <BaseProtocol.connection_lost>`
371   method will eventually be called with :const:`None` as its argument.
372
373
374.. _asyncio-subprocess-transports:
375
376Subprocess Transports
377---------------------
378
379.. method:: SubprocessTransport.get_pid()
380
381   Return the subprocess process id as an integer.
382
383.. method:: SubprocessTransport.get_pipe_transport(fd)
384
385   Return the transport for the communication pipe corresponding to the
386   integer file descriptor *fd*:
387
388   * ``0``: readable streaming transport of the standard input (*stdin*),
389     or :const:`None` if the subprocess was not created with ``stdin=PIPE``
390   * ``1``: writable streaming transport of the standard output (*stdout*),
391     or :const:`None` if the subprocess was not created with ``stdout=PIPE``
392   * ``2``: writable streaming transport of the standard error (*stderr*),
393     or :const:`None` if the subprocess was not created with ``stderr=PIPE``
394   * other *fd*: :const:`None`
395
396.. method:: SubprocessTransport.get_returncode()
397
398   Return the subprocess return code as an integer or :const:`None`
399   if it hasn't returned, which is similar to the
400   :attr:`subprocess.Popen.returncode` attribute.
401
402.. method:: SubprocessTransport.kill()
403
404   Kill the subprocess.
405
406   On POSIX systems, the function sends SIGKILL to the subprocess.
407   On Windows, this method is an alias for :meth:`terminate`.
408
409   See also :meth:`subprocess.Popen.kill`.
410
411.. method:: SubprocessTransport.send_signal(signal)
412
413   Send the *signal* number to the subprocess, as in
414   :meth:`subprocess.Popen.send_signal`.
415
416.. method:: SubprocessTransport.terminate()
417
418   Stop the subprocess.
419
420   On POSIX systems, this method sends SIGTERM to the subprocess.
421   On Windows, the Windows API function TerminateProcess() is called to
422   stop the subprocess.
423
424   See also :meth:`subprocess.Popen.terminate`.
425
426.. method:: SubprocessTransport.close()
427
428   Kill the subprocess by calling the :meth:`kill` method.
429
430   If the subprocess hasn't returned yet, and close transports of
431   *stdin*, *stdout*, and *stderr* pipes.
432
433
434.. _asyncio-protocol:
435
436Protocols
437=========
438
439**Source code:** :source:`Lib/asyncio/protocols.py`
440
441---------------------------------------------------
442
443asyncio provides a set of abstract base classes that should be used
444to implement network protocols.  Those classes are meant to be used
445together with :ref:`transports <asyncio-transport>`.
446
447Subclasses of abstract base protocol classes may implement some or
448all methods.  All these methods are callbacks: they are called by
449transports on certain events, for example when some data is received.
450A base protocol method should be called by the corresponding transport.
451
452
453Base Protocols
454--------------
455
456.. class:: BaseProtocol
457
458   Base protocol with methods that all protocols share.
459
460.. class:: Protocol(BaseProtocol)
461
462   The base class for implementing streaming protocols
463   (TCP, Unix sockets, etc).
464
465.. class:: BufferedProtocol(BaseProtocol)
466
467   A base class for implementing streaming protocols with manual
468   control of the receive buffer.
469
470.. class:: DatagramProtocol(BaseProtocol)
471
472   The base class for implementing datagram (UDP) protocols.
473
474.. class:: SubprocessProtocol(BaseProtocol)
475
476   The base class for implementing protocols communicating with child
477   processes (unidirectional pipes).
478
479
480Base Protocol
481-------------
482
483All asyncio protocols can implement Base Protocol callbacks.
484
485.. rubric:: Connection Callbacks
486
487Connection callbacks are called on all protocols, exactly once per
488a successful connection.  All other protocol callbacks can only be
489called between those two methods.
490
491.. method:: BaseProtocol.connection_made(transport)
492
493   Called when a connection is made.
494
495   The *transport* argument is the transport representing the
496   connection.  The protocol is responsible for storing the reference
497   to its transport.
498
499.. method:: BaseProtocol.connection_lost(exc)
500
501   Called when the connection is lost or closed.
502
503   The argument is either an exception object or :const:`None`.
504   The latter means a regular EOF is received, or the connection was
505   aborted or closed by this side of the connection.
506
507
508.. rubric:: Flow Control Callbacks
509
510Flow control callbacks can be called by transports to pause or
511resume writing performed by the protocol.
512
513See the documentation of the :meth:`~WriteTransport.set_write_buffer_limits`
514method for more details.
515
516.. method:: BaseProtocol.pause_writing()
517
518   Called when the transport's buffer goes over the high watermark.
519
520.. method:: BaseProtocol.resume_writing()
521
522   Called when the transport's buffer drains below the low watermark.
523
524If the buffer size equals the high watermark,
525:meth:`~BaseProtocol.pause_writing` is not called: the buffer size must
526go strictly over.
527
528Conversely, :meth:`~BaseProtocol.resume_writing` is called when the
529buffer size is equal or lower than the low watermark.  These end
530conditions are important to ensure that things go as expected when
531either mark is zero.
532
533
534Streaming Protocols
535-------------------
536
537Event methods, such as :meth:`loop.create_server`,
538:meth:`loop.create_unix_server`, :meth:`loop.create_connection`,
539:meth:`loop.create_unix_connection`, :meth:`loop.connect_accepted_socket`,
540:meth:`loop.connect_read_pipe`, and :meth:`loop.connect_write_pipe`
541accept factories that return streaming protocols.
542
543.. method:: Protocol.data_received(data)
544
545   Called when some data is received.  *data* is a non-empty bytes
546   object containing the incoming data.
547
548   Whether the data is buffered, chunked or reassembled depends on
549   the transport.  In general, you shouldn't rely on specific semantics
550   and instead make your parsing generic and flexible. However,
551   data is always received in the correct order.
552
553   The method can be called an arbitrary number of times while
554   a connection is open.
555
556   However, :meth:`protocol.eof_received() <Protocol.eof_received>`
557   is called at most once.  Once ``eof_received()`` is called,
558   ``data_received()`` is not called anymore.
559
560.. method:: Protocol.eof_received()
561
562   Called when the other end signals it won't send any more data
563   (for example by calling :meth:`transport.write_eof()
564   <WriteTransport.write_eof>`, if the other end also uses
565   asyncio).
566
567   This method may return a false value (including ``None``), in which case
568   the transport will close itself.  Conversely, if this method returns a
569   true value, the protocol used determines whether to close the transport.
570   Since the default implementation returns ``None``, it implicitly closes the
571   connection.
572
573   Some transports, including SSL, don't support half-closed connections,
574   in which case returning true from this method will result in the connection
575   being closed.
576
577
578State machine:
579
580.. code-block:: none
581
582    start -> connection_made
583        [-> data_received]*
584        [-> eof_received]?
585    -> connection_lost -> end
586
587
588Buffered Streaming Protocols
589----------------------------
590
591.. versionadded:: 3.7
592
593Buffered Protocols can be used with any event loop method
594that supports `Streaming Protocols`_.
595
596``BufferedProtocol`` implementations allow explicit manual allocation
597and control of the receive buffer.  Event loops can then use the buffer
598provided by the protocol to avoid unnecessary data copies.  This
599can result in noticeable performance improvement for protocols that
600receive big amounts of data.  Sophisticated protocol implementations
601can significantly reduce the number of buffer allocations.
602
603The following callbacks are called on :class:`BufferedProtocol`
604instances:
605
606.. method:: BufferedProtocol.get_buffer(sizehint)
607
608   Called to allocate a new receive buffer.
609
610   *sizehint* is the recommended minimum size for the returned
611   buffer.  It is acceptable to return smaller or larger buffers
612   than what *sizehint* suggests.  When set to -1, the buffer size
613   can be arbitrary. It is an error to return a buffer with a zero size.
614
615   ``get_buffer()`` must return an object implementing the
616   :ref:`buffer protocol <bufferobjects>`.
617
618.. method:: BufferedProtocol.buffer_updated(nbytes)
619
620   Called when the buffer was updated with the received data.
621
622   *nbytes* is the total number of bytes that were written to the buffer.
623
624.. method:: BufferedProtocol.eof_received()
625
626   See the documentation of the :meth:`protocol.eof_received()
627   <Protocol.eof_received>` method.
628
629
630:meth:`~BufferedProtocol.get_buffer` can be called an arbitrary number
631of times during a connection.  However, :meth:`protocol.eof_received()
632<Protocol.eof_received>` is called at most once
633and, if called, :meth:`~BufferedProtocol.get_buffer` and
634:meth:`~BufferedProtocol.buffer_updated` won't be called after it.
635
636State machine:
637
638.. code-block:: none
639
640    start -> connection_made
641        [-> get_buffer
642            [-> buffer_updated]?
643        ]*
644        [-> eof_received]?
645    -> connection_lost -> end
646
647
648Datagram Protocols
649------------------
650
651Datagram Protocol instances should be constructed by protocol
652factories passed to the :meth:`loop.create_datagram_endpoint` method.
653
654.. method:: DatagramProtocol.datagram_received(data, addr)
655
656   Called when a datagram is received.  *data* is a bytes object containing
657   the incoming data.  *addr* is the address of the peer sending the data;
658   the exact format depends on the transport.
659
660.. method:: DatagramProtocol.error_received(exc)
661
662   Called when a previous send or receive operation raises an
663   :class:`OSError`.  *exc* is the :class:`OSError` instance.
664
665   This method is called in rare conditions, when the transport (e.g. UDP)
666   detects that a datagram could not be delivered to its recipient.
667   In many conditions though, undeliverable datagrams will be silently
668   dropped.
669
670.. note::
671
672   On BSD systems (macOS, FreeBSD, etc.) flow control is not supported
673   for datagram protocols, because there is no reliable way to detect send
674   failures caused by writing too many packets.
675
676   The socket always appears 'ready' and excess packets are dropped. An
677   :class:`OSError` with ``errno`` set to :const:`errno.ENOBUFS` may
678   or may not be raised; if it is raised, it will be reported to
679   :meth:`DatagramProtocol.error_received` but otherwise ignored.
680
681
682.. _asyncio-subprocess-protocols:
683
684Subprocess Protocols
685--------------------
686
687Subprocess Protocol instances should be constructed by protocol
688factories passed to the :meth:`loop.subprocess_exec` and
689:meth:`loop.subprocess_shell` methods.
690
691.. method:: SubprocessProtocol.pipe_data_received(fd, data)
692
693   Called when the child process writes data into its stdout or stderr
694   pipe.
695
696   *fd* is the integer file descriptor of the pipe.
697
698   *data* is a non-empty bytes object containing the received data.
699
700.. method:: SubprocessProtocol.pipe_connection_lost(fd, exc)
701
702   Called when one of the pipes communicating with the child process
703   is closed.
704
705   *fd* is the integer file descriptor that was closed.
706
707.. method:: SubprocessProtocol.process_exited()
708
709   Called when the child process has exited.
710
711
712Examples
713========
714
715.. _asyncio_example_tcp_echo_server_protocol:
716
717TCP Echo Server
718---------------
719
720Create a TCP echo server using the :meth:`loop.create_server` method, send back
721received data, and close the connection::
722
723    import asyncio
724
725
726    class EchoServerProtocol(asyncio.Protocol):
727        def connection_made(self, transport):
728            peername = transport.get_extra_info('peername')
729            print('Connection from {}'.format(peername))
730            self.transport = transport
731
732        def data_received(self, data):
733            message = data.decode()
734            print('Data received: {!r}'.format(message))
735
736            print('Send: {!r}'.format(message))
737            self.transport.write(data)
738
739            print('Close the client socket')
740            self.transport.close()
741
742
743    async def main():
744        # Get a reference to the event loop as we plan to use
745        # low-level APIs.
746        loop = asyncio.get_running_loop()
747
748        server = await loop.create_server(
749            lambda: EchoServerProtocol(),
750            '127.0.0.1', 8888)
751
752        async with server:
753            await server.serve_forever()
754
755
756    asyncio.run(main())
757
758
759.. seealso::
760
761   The :ref:`TCP echo server using streams <asyncio-tcp-echo-server-streams>`
762   example uses the high-level :func:`asyncio.start_server` function.
763
764.. _asyncio_example_tcp_echo_client_protocol:
765
766TCP Echo Client
767---------------
768
769A TCP echo client using the :meth:`loop.create_connection` method, sends
770data, and waits until the connection is closed::
771
772    import asyncio
773
774
775    class EchoClientProtocol(asyncio.Protocol):
776        def __init__(self, message, on_con_lost):
777            self.message = message
778            self.on_con_lost = on_con_lost
779
780        def connection_made(self, transport):
781            transport.write(self.message.encode())
782            print('Data sent: {!r}'.format(self.message))
783
784        def data_received(self, data):
785            print('Data received: {!r}'.format(data.decode()))
786
787        def connection_lost(self, exc):
788            print('The server closed the connection')
789            self.on_con_lost.set_result(True)
790
791
792    async def main():
793        # Get a reference to the event loop as we plan to use
794        # low-level APIs.
795        loop = asyncio.get_running_loop()
796
797        on_con_lost = loop.create_future()
798        message = 'Hello World!'
799
800        transport, protocol = await loop.create_connection(
801            lambda: EchoClientProtocol(message, on_con_lost),
802            '127.0.0.1', 8888)
803
804        # Wait until the protocol signals that the connection
805        # is lost and close the transport.
806        try:
807            await on_con_lost
808        finally:
809            transport.close()
810
811
812    asyncio.run(main())
813
814
815.. seealso::
816
817   The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
818   example uses the high-level :func:`asyncio.open_connection` function.
819
820
821.. _asyncio-udp-echo-server-protocol:
822
823UDP Echo Server
824---------------
825
826A UDP echo server, using the :meth:`loop.create_datagram_endpoint`
827method, sends back received data::
828
829    import asyncio
830
831
832    class EchoServerProtocol:
833        def connection_made(self, transport):
834            self.transport = transport
835
836        def datagram_received(self, data, addr):
837            message = data.decode()
838            print('Received %r from %s' % (message, addr))
839            print('Send %r to %s' % (message, addr))
840            self.transport.sendto(data, addr)
841
842
843    async def main():
844        print("Starting UDP server")
845
846        # Get a reference to the event loop as we plan to use
847        # low-level APIs.
848        loop = asyncio.get_running_loop()
849
850        # One protocol instance will be created to serve all
851        # client requests.
852        transport, protocol = await loop.create_datagram_endpoint(
853            lambda: EchoServerProtocol(),
854            local_addr=('127.0.0.1', 9999))
855
856        try:
857            await asyncio.sleep(3600)  # Serve for 1 hour.
858        finally:
859            transport.close()
860
861
862    asyncio.run(main())
863
864
865.. _asyncio-udp-echo-client-protocol:
866
867UDP Echo Client
868---------------
869
870A UDP echo client, using the :meth:`loop.create_datagram_endpoint`
871method, sends data and closes the transport when it receives the answer::
872
873    import asyncio
874
875
876    class EchoClientProtocol:
877        def __init__(self, message, on_con_lost):
878            self.message = message
879            self.on_con_lost = on_con_lost
880            self.transport = None
881
882        def connection_made(self, transport):
883            self.transport = transport
884            print('Send:', self.message)
885            self.transport.sendto(self.message.encode())
886
887        def datagram_received(self, data, addr):
888            print("Received:", data.decode())
889
890            print("Close the socket")
891            self.transport.close()
892
893        def error_received(self, exc):
894            print('Error received:', exc)
895
896        def connection_lost(self, exc):
897            print("Connection closed")
898            self.on_con_lost.set_result(True)
899
900
901    async def main():
902        # Get a reference to the event loop as we plan to use
903        # low-level APIs.
904        loop = asyncio.get_running_loop()
905
906        on_con_lost = loop.create_future()
907        message = "Hello World!"
908
909        transport, protocol = await loop.create_datagram_endpoint(
910            lambda: EchoClientProtocol(message, on_con_lost),
911            remote_addr=('127.0.0.1', 9999))
912
913        try:
914            await on_con_lost
915        finally:
916            transport.close()
917
918
919    asyncio.run(main())
920
921
922.. _asyncio_example_create_connection:
923
924Connecting Existing Sockets
925---------------------------
926
927Wait until a socket receives data using the
928:meth:`loop.create_connection` method with a protocol::
929
930    import asyncio
931    import socket
932
933
934    class MyProtocol(asyncio.Protocol):
935
936        def __init__(self, on_con_lost):
937            self.transport = None
938            self.on_con_lost = on_con_lost
939
940        def connection_made(self, transport):
941            self.transport = transport
942
943        def data_received(self, data):
944            print("Received:", data.decode())
945
946            # We are done: close the transport;
947            # connection_lost() will be called automatically.
948            self.transport.close()
949
950        def connection_lost(self, exc):
951            # The socket has been closed
952            self.on_con_lost.set_result(True)
953
954
955    async def main():
956        # Get a reference to the event loop as we plan to use
957        # low-level APIs.
958        loop = asyncio.get_running_loop()
959        on_con_lost = loop.create_future()
960
961        # Create a pair of connected sockets
962        rsock, wsock = socket.socketpair()
963
964        # Register the socket to wait for data.
965        transport, protocol = await loop.create_connection(
966            lambda: MyProtocol(on_con_lost), sock=rsock)
967
968        # Simulate the reception of data from the network.
969        loop.call_soon(wsock.send, 'abc'.encode())
970
971        try:
972            await protocol.on_con_lost
973        finally:
974            transport.close()
975            wsock.close()
976
977    asyncio.run(main())
978
979.. seealso::
980
981   The :ref:`watch a file descriptor for read events
982   <asyncio_example_watch_fd>` example uses the low-level
983   :meth:`loop.add_reader` method to register an FD.
984
985   The :ref:`register an open socket to wait for data using streams
986   <asyncio_example_create_connection-streams>` example uses high-level streams
987   created by the :func:`open_connection` function in a coroutine.
988
989.. _asyncio_example_subprocess_proto:
990
991loop.subprocess_exec() and SubprocessProtocol
992---------------------------------------------
993
994An example of a subprocess protocol used to get the output of a
995subprocess and to wait for the subprocess exit.
996
997The subprocess is created by the :meth:`loop.subprocess_exec` method::
998
999    import asyncio
1000    import sys
1001
1002    class DateProtocol(asyncio.SubprocessProtocol):
1003        def __init__(self, exit_future):
1004            self.exit_future = exit_future
1005            self.output = bytearray()
1006
1007        def pipe_data_received(self, fd, data):
1008            self.output.extend(data)
1009
1010        def process_exited(self):
1011            self.exit_future.set_result(True)
1012
1013    async def get_date():
1014        # Get a reference to the event loop as we plan to use
1015        # low-level APIs.
1016        loop = asyncio.get_running_loop()
1017
1018        code = 'import datetime; print(datetime.datetime.now())'
1019        exit_future = asyncio.Future(loop=loop)
1020
1021        # Create the subprocess controlled by DateProtocol;
1022        # redirect the standard output into a pipe.
1023        transport, protocol = await loop.subprocess_exec(
1024            lambda: DateProtocol(exit_future),
1025            sys.executable, '-c', code,
1026            stdin=None, stderr=None)
1027
1028        # Wait for the subprocess exit using the process_exited()
1029        # method of the protocol.
1030        await exit_future
1031
1032        # Close the stdout pipe.
1033        transport.close()
1034
1035        # Read the output which was collected by the
1036        # pipe_data_received() method of the protocol.
1037        data = bytes(protocol.output)
1038        return data.decode('ascii').rstrip()
1039
1040    date = asyncio.run(get_date())
1041    print(f"Current date: {date}")
1042
1043See also the :ref:`same example <asyncio_example_create_subprocess_exec>`
1044written using high-level APIs.
1045