1import collections
2import enum
3import warnings
4try:
5    import ssl
6except ImportError:  # pragma: no cover
7    ssl = None
8
9from . import constants
10from . import exceptions
11from . import protocols
12from . import transports
13from .log import logger
14
15if ssl is not None:
16    SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
17
18
19class SSLProtocolState(enum.Enum):
20    UNWRAPPED = "UNWRAPPED"
21    DO_HANDSHAKE = "DO_HANDSHAKE"
22    WRAPPED = "WRAPPED"
23    FLUSHING = "FLUSHING"
24    SHUTDOWN = "SHUTDOWN"
25
26
27class AppProtocolState(enum.Enum):
28    # This tracks the state of app protocol (https://git.io/fj59P):
29    #
30    #     INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST
31    #
32    # * cm: connection_made()
33    # * dr: data_received()
34    # * er: eof_received()
35    # * cl: connection_lost()
36
37    STATE_INIT = "STATE_INIT"
38    STATE_CON_MADE = "STATE_CON_MADE"
39    STATE_EOF = "STATE_EOF"
40    STATE_CON_LOST = "STATE_CON_LOST"
41
42
43def _create_transport_context(server_side, server_hostname):
44    if server_side:
45        raise ValueError('Server side SSL needs a valid SSLContext')
46
47    # Client side may pass ssl=True to use a default
48    # context; in that case the sslcontext passed is None.
49    # The default is secure for client connections.
50    # Python 3.4+: use up-to-date strong settings.
51    sslcontext = ssl.create_default_context()
52    if not server_hostname:
53        sslcontext.check_hostname = False
54    return sslcontext
55
56
57def add_flowcontrol_defaults(high, low, kb):
58    if high is None:
59        if low is None:
60            hi = kb * 1024
61        else:
62            lo = low
63            hi = 4 * lo
64    else:
65        hi = high
66    if low is None:
67        lo = hi // 4
68    else:
69        lo = low
70
71    if not hi >= lo >= 0:
72        raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
73                         (hi, lo))
74
75    return hi, lo
76
77
78class _SSLProtocolTransport(transports._FlowControlMixin,
79                            transports.Transport):
80
81    _start_tls_compatible = True
82    _sendfile_compatible = constants._SendfileMode.FALLBACK
83
84    def __init__(self, loop, ssl_protocol):
85        self._loop = loop
86        self._ssl_protocol = ssl_protocol
87        self._closed = False
88
89    def get_extra_info(self, name, default=None):
90        """Get optional transport information."""
91        return self._ssl_protocol._get_extra_info(name, default)
92
93    def set_protocol(self, protocol):
94        self._ssl_protocol._set_app_protocol(protocol)
95
96    def get_protocol(self):
97        return self._ssl_protocol._app_protocol
98
99    def is_closing(self):
100        return self._closed
101
102    def close(self):
103        """Close the transport.
104
105        Buffered data will be flushed asynchronously.  No more data
106        will be received.  After all buffered data is flushed, the
107        protocol's connection_lost() method will (eventually) called
108        with None as its argument.
109        """
110        if not self._closed:
111            self._closed = True
112            self._ssl_protocol._start_shutdown()
113        else:
114            self._ssl_protocol = None
115
116    def __del__(self, _warnings=warnings):
117        if not self._closed:
118            self._closed = True
119            _warnings.warn(
120                "unclosed transport <asyncio._SSLProtocolTransport "
121                "object>", ResourceWarning)
122
123    def is_reading(self):
124        return not self._ssl_protocol._app_reading_paused
125
126    def pause_reading(self):
127        """Pause the receiving end.
128
129        No data will be passed to the protocol's data_received()
130        method until resume_reading() is called.
131        """
132        self._ssl_protocol._pause_reading()
133
134    def resume_reading(self):
135        """Resume the receiving end.
136
137        Data received will once again be passed to the protocol's
138        data_received() method.
139        """
140        self._ssl_protocol._resume_reading()
141
142    def set_write_buffer_limits(self, high=None, low=None):
143        """Set the high- and low-water limits for write flow control.
144
145        These two values control when to call the protocol's
146        pause_writing() and resume_writing() methods.  If specified,
147        the low-water limit must be less than or equal to the
148        high-water limit.  Neither value can be negative.
149
150        The defaults are implementation-specific.  If only the
151        high-water limit is given, the low-water limit defaults to an
152        implementation-specific value less than or equal to the
153        high-water limit.  Setting high to zero forces low to zero as
154        well, and causes pause_writing() to be called whenever the
155        buffer becomes non-empty.  Setting low to zero causes
156        resume_writing() to be called only once the buffer is empty.
157        Use of zero for either limit is generally sub-optimal as it
158        reduces opportunities for doing I/O and computation
159        concurrently.
160        """
161        self._ssl_protocol._set_write_buffer_limits(high, low)
162        self._ssl_protocol._control_app_writing()
163
164    def get_write_buffer_limits(self):
165        return (self._ssl_protocol._outgoing_low_water,
166                self._ssl_protocol._outgoing_high_water)
167
168    def get_write_buffer_size(self):
169        """Return the current size of the write buffers."""
170        return self._ssl_protocol._get_write_buffer_size()
171
172    def set_read_buffer_limits(self, high=None, low=None):
173        """Set the high- and low-water limits for read flow control.
174
175        These two values control when to call the upstream transport's
176        pause_reading() and resume_reading() methods.  If specified,
177        the low-water limit must be less than or equal to the
178        high-water limit.  Neither value can be negative.
179
180        The defaults are implementation-specific.  If only the
181        high-water limit is given, the low-water limit defaults to an
182        implementation-specific value less than or equal to the
183        high-water limit.  Setting high to zero forces low to zero as
184        well, and causes pause_reading() to be called whenever the
185        buffer becomes non-empty.  Setting low to zero causes
186        resume_reading() to be called only once the buffer is empty.
187        Use of zero for either limit is generally sub-optimal as it
188        reduces opportunities for doing I/O and computation
189        concurrently.
190        """
191        self._ssl_protocol._set_read_buffer_limits(high, low)
192        self._ssl_protocol._control_ssl_reading()
193
194    def get_read_buffer_limits(self):
195        return (self._ssl_protocol._incoming_low_water,
196                self._ssl_protocol._incoming_high_water)
197
198    def get_read_buffer_size(self):
199        """Return the current size of the read buffer."""
200        return self._ssl_protocol._get_read_buffer_size()
201
202    @property
203    def _protocol_paused(self):
204        # Required for sendfile fallback pause_writing/resume_writing logic
205        return self._ssl_protocol._app_writing_paused
206
207    def write(self, data):
208        """Write some data bytes to the transport.
209
210        This does not block; it buffers the data and arranges for it
211        to be sent out asynchronously.
212        """
213        if not isinstance(data, (bytes, bytearray, memoryview)):
214            raise TypeError(f"data: expecting a bytes-like instance, "
215                            f"got {type(data).__name__}")
216        if not data:
217            return
218        self._ssl_protocol._write_appdata((data,))
219
220    def writelines(self, list_of_data):
221        """Write a list (or any iterable) of data bytes to the transport.
222
223        The default implementation concatenates the arguments and
224        calls write() on the result.
225        """
226        self._ssl_protocol._write_appdata(list_of_data)
227
228    def write_eof(self):
229        """Close the write end after flushing buffered data.
230
231        This raises :exc:`NotImplementedError` right now.
232        """
233        raise NotImplementedError
234
235    def can_write_eof(self):
236        """Return True if this transport supports write_eof(), False if not."""
237        return False
238
239    def abort(self):
240        """Close the transport immediately.
241
242        Buffered data will be lost.  No more data will be received.
243        The protocol's connection_lost() method will (eventually) be
244        called with None as its argument.
245        """
246        self._closed = True
247        if self._ssl_protocol is not None:
248            self._ssl_protocol._abort()
249
250    def _force_close(self, exc):
251        self._closed = True
252        self._ssl_protocol._abort(exc)
253
254    def _test__append_write_backlog(self, data):
255        # for test only
256        self._ssl_protocol._write_backlog.append(data)
257        self._ssl_protocol._write_buffer_size += len(data)
258
259
260class SSLProtocol(protocols.BufferedProtocol):
261    max_size = 256 * 1024   # Buffer size passed to read()
262
263    _handshake_start_time = None
264    _handshake_timeout_handle = None
265    _shutdown_timeout_handle = None
266
267    def __init__(self, loop, app_protocol, sslcontext, waiter,
268                 server_side=False, server_hostname=None,
269                 call_connection_made=True,
270                 ssl_handshake_timeout=None,
271                 ssl_shutdown_timeout=None):
272        if ssl is None:
273            raise RuntimeError("stdlib ssl module not available")
274
275        self._ssl_buffer = bytearray(self.max_size)
276        self._ssl_buffer_view = memoryview(self._ssl_buffer)
277
278        if ssl_handshake_timeout is None:
279            ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT
280        elif ssl_handshake_timeout <= 0:
281            raise ValueError(
282                f"ssl_handshake_timeout should be a positive number, "
283                f"got {ssl_handshake_timeout}")
284        if ssl_shutdown_timeout is None:
285            ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT
286        elif ssl_shutdown_timeout <= 0:
287            raise ValueError(
288                f"ssl_shutdown_timeout should be a positive number, "
289                f"got {ssl_shutdown_timeout}")
290
291        if not sslcontext:
292            sslcontext = _create_transport_context(
293                server_side, server_hostname)
294
295        self._server_side = server_side
296        if server_hostname and not server_side:
297            self._server_hostname = server_hostname
298        else:
299            self._server_hostname = None
300        self._sslcontext = sslcontext
301        # SSL-specific extra info. More info are set when the handshake
302        # completes.
303        self._extra = dict(sslcontext=sslcontext)
304
305        # App data write buffering
306        self._write_backlog = collections.deque()
307        self._write_buffer_size = 0
308
309        self._waiter = waiter
310        self._loop = loop
311        self._set_app_protocol(app_protocol)
312        self._app_transport = None
313        self._app_transport_created = False
314        # transport, ex: SelectorSocketTransport
315        self._transport = None
316        self._ssl_handshake_timeout = ssl_handshake_timeout
317        self._ssl_shutdown_timeout = ssl_shutdown_timeout
318        # SSL and state machine
319        self._incoming = ssl.MemoryBIO()
320        self._outgoing = ssl.MemoryBIO()
321        self._state = SSLProtocolState.UNWRAPPED
322        self._conn_lost = 0  # Set when connection_lost called
323        if call_connection_made:
324            self._app_state = AppProtocolState.STATE_INIT
325        else:
326            self._app_state = AppProtocolState.STATE_CON_MADE
327        self._sslobj = self._sslcontext.wrap_bio(
328            self._incoming, self._outgoing,
329            server_side=self._server_side,
330            server_hostname=self._server_hostname)
331
332        # Flow Control
333
334        self._ssl_writing_paused = False
335
336        self._app_reading_paused = False
337
338        self._ssl_reading_paused = False
339        self._incoming_high_water = 0
340        self._incoming_low_water = 0
341        self._set_read_buffer_limits()
342        self._eof_received = False
343
344        self._app_writing_paused = False
345        self._outgoing_high_water = 0
346        self._outgoing_low_water = 0
347        self._set_write_buffer_limits()
348        self._get_app_transport()
349
350    def _set_app_protocol(self, app_protocol):
351        self._app_protocol = app_protocol
352        # Make fast hasattr check first
353        if (hasattr(app_protocol, 'get_buffer') and
354                isinstance(app_protocol, protocols.BufferedProtocol)):
355            self._app_protocol_get_buffer = app_protocol.get_buffer
356            self._app_protocol_buffer_updated = app_protocol.buffer_updated
357            self._app_protocol_is_buffer = True
358        else:
359            self._app_protocol_is_buffer = False
360
361    def _wakeup_waiter(self, exc=None):
362        if self._waiter is None:
363            return
364        if not self._waiter.cancelled():
365            if exc is not None:
366                self._waiter.set_exception(exc)
367            else:
368                self._waiter.set_result(None)
369        self._waiter = None
370
371    def _get_app_transport(self):
372        if self._app_transport is None:
373            if self._app_transport_created:
374                raise RuntimeError('Creating _SSLProtocolTransport twice')
375            self._app_transport = _SSLProtocolTransport(self._loop, self)
376            self._app_transport_created = True
377        return self._app_transport
378
379    def connection_made(self, transport):
380        """Called when the low-level connection is made.
381
382        Start the SSL handshake.
383        """
384        self._transport = transport
385        self._start_handshake()
386
387    def connection_lost(self, exc):
388        """Called when the low-level connection is lost or closed.
389
390        The argument is an exception object or None (the latter
391        meaning a regular EOF is received or the connection was
392        aborted or closed).
393        """
394        self._write_backlog.clear()
395        self._outgoing.read()
396        self._conn_lost += 1
397
398        # Just mark the app transport as closed so that its __dealloc__
399        # doesn't complain.
400        if self._app_transport is not None:
401            self._app_transport._closed = True
402
403        if self._state != SSLProtocolState.DO_HANDSHAKE:
404            if (
405                self._app_state == AppProtocolState.STATE_CON_MADE or
406                self._app_state == AppProtocolState.STATE_EOF
407            ):
408                self._app_state = AppProtocolState.STATE_CON_LOST
409                self._loop.call_soon(self._app_protocol.connection_lost, exc)
410        self._set_state(SSLProtocolState.UNWRAPPED)
411        self._transport = None
412        self._app_transport = None
413        self._app_protocol = None
414        self._wakeup_waiter(exc)
415
416        if self._shutdown_timeout_handle:
417            self._shutdown_timeout_handle.cancel()
418            self._shutdown_timeout_handle = None
419        if self._handshake_timeout_handle:
420            self._handshake_timeout_handle.cancel()
421            self._handshake_timeout_handle = None
422
423    def get_buffer(self, n):
424        want = n
425        if want <= 0 or want > self.max_size:
426            want = self.max_size
427        if len(self._ssl_buffer) < want:
428            self._ssl_buffer = bytearray(want)
429            self._ssl_buffer_view = memoryview(self._ssl_buffer)
430        return self._ssl_buffer_view
431
432    def buffer_updated(self, nbytes):
433        self._incoming.write(self._ssl_buffer_view[:nbytes])
434
435        if self._state == SSLProtocolState.DO_HANDSHAKE:
436            self._do_handshake()
437
438        elif self._state == SSLProtocolState.WRAPPED:
439            self._do_read()
440
441        elif self._state == SSLProtocolState.FLUSHING:
442            self._do_flush()
443
444        elif self._state == SSLProtocolState.SHUTDOWN:
445            self._do_shutdown()
446
447    def eof_received(self):
448        """Called when the other end of the low-level stream
449        is half-closed.
450
451        If this returns a false value (including None), the transport
452        will close itself.  If it returns a true value, closing the
453        transport is up to the protocol.
454        """
455        self._eof_received = True
456        try:
457            if self._loop.get_debug():
458                logger.debug("%r received EOF", self)
459
460            if self._state == SSLProtocolState.DO_HANDSHAKE:
461                self._on_handshake_complete(ConnectionResetError)
462
463            elif self._state == SSLProtocolState.WRAPPED:
464                self._set_state(SSLProtocolState.FLUSHING)
465                if self._app_reading_paused:
466                    return True
467                else:
468                    self._do_flush()
469
470            elif self._state == SSLProtocolState.FLUSHING:
471                self._do_write()
472                self._set_state(SSLProtocolState.SHUTDOWN)
473                self._do_shutdown()
474
475            elif self._state == SSLProtocolState.SHUTDOWN:
476                self._do_shutdown()
477
478        except Exception:
479            self._transport.close()
480            raise
481
482    def _get_extra_info(self, name, default=None):
483        if name in self._extra:
484            return self._extra[name]
485        elif self._transport is not None:
486            return self._transport.get_extra_info(name, default)
487        else:
488            return default
489
490    def _set_state(self, new_state):
491        allowed = False
492
493        if new_state == SSLProtocolState.UNWRAPPED:
494            allowed = True
495
496        elif (
497            self._state == SSLProtocolState.UNWRAPPED and
498            new_state == SSLProtocolState.DO_HANDSHAKE
499        ):
500            allowed = True
501
502        elif (
503            self._state == SSLProtocolState.DO_HANDSHAKE and
504            new_state == SSLProtocolState.WRAPPED
505        ):
506            allowed = True
507
508        elif (
509            self._state == SSLProtocolState.WRAPPED and
510            new_state == SSLProtocolState.FLUSHING
511        ):
512            allowed = True
513
514        elif (
515            self._state == SSLProtocolState.FLUSHING and
516            new_state == SSLProtocolState.SHUTDOWN
517        ):
518            allowed = True
519
520        if allowed:
521            self._state = new_state
522
523        else:
524            raise RuntimeError(
525                'cannot switch state from {} to {}'.format(
526                    self._state, new_state))
527
528    # Handshake flow
529
530    def _start_handshake(self):
531        if self._loop.get_debug():
532            logger.debug("%r starts SSL handshake", self)
533            self._handshake_start_time = self._loop.time()
534        else:
535            self._handshake_start_time = None
536
537        self._set_state(SSLProtocolState.DO_HANDSHAKE)
538
539        # start handshake timeout count down
540        self._handshake_timeout_handle = \
541            self._loop.call_later(self._ssl_handshake_timeout,
542                                  lambda: self._check_handshake_timeout())
543
544        self._do_handshake()
545
546    def _check_handshake_timeout(self):
547        if self._state == SSLProtocolState.DO_HANDSHAKE:
548            msg = (
549                f"SSL handshake is taking longer than "
550                f"{self._ssl_handshake_timeout} seconds: "
551                f"aborting the connection"
552            )
553            self._fatal_error(ConnectionAbortedError(msg))
554
555    def _do_handshake(self):
556        try:
557            self._sslobj.do_handshake()
558        except SSLAgainErrors:
559            self._process_outgoing()
560        except ssl.SSLError as exc:
561            self._on_handshake_complete(exc)
562        else:
563            self._on_handshake_complete(None)
564
565    def _on_handshake_complete(self, handshake_exc):
566        if self._handshake_timeout_handle is not None:
567            self._handshake_timeout_handle.cancel()
568            self._handshake_timeout_handle = None
569
570        sslobj = self._sslobj
571        try:
572            if handshake_exc is None:
573                self._set_state(SSLProtocolState.WRAPPED)
574            else:
575                raise handshake_exc
576
577            peercert = sslobj.getpeercert()
578        except Exception as exc:
579            self._set_state(SSLProtocolState.UNWRAPPED)
580            if isinstance(exc, ssl.CertificateError):
581                msg = 'SSL handshake failed on verifying the certificate'
582            else:
583                msg = 'SSL handshake failed'
584            self._fatal_error(exc, msg)
585            self._wakeup_waiter(exc)
586            return
587
588        if self._loop.get_debug():
589            dt = self._loop.time() - self._handshake_start_time
590            logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
591
592        # Add extra info that becomes available after handshake.
593        self._extra.update(peercert=peercert,
594                           cipher=sslobj.cipher(),
595                           compression=sslobj.compression(),
596                           ssl_object=sslobj)
597        if self._app_state == AppProtocolState.STATE_INIT:
598            self._app_state = AppProtocolState.STATE_CON_MADE
599            self._app_protocol.connection_made(self._get_app_transport())
600        self._wakeup_waiter()
601        self._do_read()
602
603    # Shutdown flow
604
605    def _start_shutdown(self):
606        if (
607            self._state in (
608                SSLProtocolState.FLUSHING,
609                SSLProtocolState.SHUTDOWN,
610                SSLProtocolState.UNWRAPPED
611            )
612        ):
613            return
614        if self._app_transport is not None:
615            self._app_transport._closed = True
616        if self._state == SSLProtocolState.DO_HANDSHAKE:
617            self._abort()
618        else:
619            self._set_state(SSLProtocolState.FLUSHING)
620            self._shutdown_timeout_handle = self._loop.call_later(
621                self._ssl_shutdown_timeout,
622                lambda: self._check_shutdown_timeout()
623            )
624            self._do_flush()
625
626    def _check_shutdown_timeout(self):
627        if (
628            self._state in (
629                SSLProtocolState.FLUSHING,
630                SSLProtocolState.SHUTDOWN
631            )
632        ):
633            self._transport._force_close(
634                exceptions.TimeoutError('SSL shutdown timed out'))
635
636    def _do_flush(self):
637        self._do_read()
638        self._set_state(SSLProtocolState.SHUTDOWN)
639        self._do_shutdown()
640
641    def _do_shutdown(self):
642        try:
643            if not self._eof_received:
644                self._sslobj.unwrap()
645        except SSLAgainErrors:
646            self._process_outgoing()
647        except ssl.SSLError as exc:
648            self._on_shutdown_complete(exc)
649        else:
650            self._process_outgoing()
651            self._call_eof_received()
652            self._on_shutdown_complete(None)
653
654    def _on_shutdown_complete(self, shutdown_exc):
655        if self._shutdown_timeout_handle is not None:
656            self._shutdown_timeout_handle.cancel()
657            self._shutdown_timeout_handle = None
658
659        if shutdown_exc:
660            self._fatal_error(shutdown_exc)
661        else:
662            self._loop.call_soon(self._transport.close)
663
664    def _abort(self):
665        self._set_state(SSLProtocolState.UNWRAPPED)
666        if self._transport is not None:
667            self._transport.abort()
668
669    # Outgoing flow
670
671    def _write_appdata(self, list_of_data):
672        if (
673            self._state in (
674                SSLProtocolState.FLUSHING,
675                SSLProtocolState.SHUTDOWN,
676                SSLProtocolState.UNWRAPPED
677            )
678        ):
679            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
680                logger.warning('SSL connection is closed')
681            self._conn_lost += 1
682            return
683
684        for data in list_of_data:
685            self._write_backlog.append(data)
686            self._write_buffer_size += len(data)
687
688        try:
689            if self._state == SSLProtocolState.WRAPPED:
690                self._do_write()
691
692        except Exception as ex:
693            self._fatal_error(ex, 'Fatal error on SSL protocol')
694
695    def _do_write(self):
696        try:
697            while self._write_backlog:
698                data = self._write_backlog[0]
699                count = self._sslobj.write(data)
700                data_len = len(data)
701                if count < data_len:
702                    self._write_backlog[0] = data[count:]
703                    self._write_buffer_size -= count
704                else:
705                    del self._write_backlog[0]
706                    self._write_buffer_size -= data_len
707        except SSLAgainErrors:
708            pass
709        self._process_outgoing()
710
711    def _process_outgoing(self):
712        if not self._ssl_writing_paused:
713            data = self._outgoing.read()
714            if len(data):
715                self._transport.write(data)
716        self._control_app_writing()
717
718    # Incoming flow
719
720    def _do_read(self):
721        if (
722            self._state not in (
723                SSLProtocolState.WRAPPED,
724                SSLProtocolState.FLUSHING,
725            )
726        ):
727            return
728        try:
729            if not self._app_reading_paused:
730                if self._app_protocol_is_buffer:
731                    self._do_read__buffered()
732                else:
733                    self._do_read__copied()
734                if self._write_backlog:
735                    self._do_write()
736                else:
737                    self._process_outgoing()
738            self._control_ssl_reading()
739        except Exception as ex:
740            self._fatal_error(ex, 'Fatal error on SSL protocol')
741
742    def _do_read__buffered(self):
743        offset = 0
744        count = 1
745
746        buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
747        wants = len(buf)
748
749        try:
750            count = self._sslobj.read(wants, buf)
751
752            if count > 0:
753                offset = count
754                while offset < wants:
755                    count = self._sslobj.read(wants - offset, buf[offset:])
756                    if count > 0:
757                        offset += count
758                    else:
759                        break
760                else:
761                    self._loop.call_soon(lambda: self._do_read())
762        except SSLAgainErrors:
763            pass
764        if offset > 0:
765            self._app_protocol_buffer_updated(offset)
766        if not count:
767            # close_notify
768            self._call_eof_received()
769            self._start_shutdown()
770
771    def _do_read__copied(self):
772        chunk = b'1'
773        zero = True
774        one = False
775
776        try:
777            while True:
778                chunk = self._sslobj.read(self.max_size)
779                if not chunk:
780                    break
781                if zero:
782                    zero = False
783                    one = True
784                    first = chunk
785                elif one:
786                    one = False
787                    data = [first, chunk]
788                else:
789                    data.append(chunk)
790        except SSLAgainErrors:
791            pass
792        if one:
793            self._app_protocol.data_received(first)
794        elif not zero:
795            self._app_protocol.data_received(b''.join(data))
796        if not chunk:
797            # close_notify
798            self._call_eof_received()
799            self._start_shutdown()
800
801    def _call_eof_received(self):
802        try:
803            if self._app_state == AppProtocolState.STATE_CON_MADE:
804                self._app_state = AppProtocolState.STATE_EOF
805                keep_open = self._app_protocol.eof_received()
806                if keep_open:
807                    logger.warning('returning true from eof_received() '
808                                   'has no effect when using ssl')
809        except (KeyboardInterrupt, SystemExit):
810            raise
811        except BaseException as ex:
812            self._fatal_error(ex, 'Error calling eof_received()')
813
814    # Flow control for writes from APP socket
815
816    def _control_app_writing(self):
817        size = self._get_write_buffer_size()
818        if size >= self._outgoing_high_water and not self._app_writing_paused:
819            self._app_writing_paused = True
820            try:
821                self._app_protocol.pause_writing()
822            except (KeyboardInterrupt, SystemExit):
823                raise
824            except BaseException as exc:
825                self._loop.call_exception_handler({
826                    'message': 'protocol.pause_writing() failed',
827                    'exception': exc,
828                    'transport': self._app_transport,
829                    'protocol': self,
830                })
831        elif size <= self._outgoing_low_water and self._app_writing_paused:
832            self._app_writing_paused = False
833            try:
834                self._app_protocol.resume_writing()
835            except (KeyboardInterrupt, SystemExit):
836                raise
837            except BaseException as exc:
838                self._loop.call_exception_handler({
839                    'message': 'protocol.resume_writing() failed',
840                    'exception': exc,
841                    'transport': self._app_transport,
842                    'protocol': self,
843                })
844
845    def _get_write_buffer_size(self):
846        return self._outgoing.pending + self._write_buffer_size
847
848    def _set_write_buffer_limits(self, high=None, low=None):
849        high, low = add_flowcontrol_defaults(
850            high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
851        self._outgoing_high_water = high
852        self._outgoing_low_water = low
853
854    # Flow control for reads to APP socket
855
856    def _pause_reading(self):
857        self._app_reading_paused = True
858
859    def _resume_reading(self):
860        if self._app_reading_paused:
861            self._app_reading_paused = False
862
863            def resume():
864                if self._state == SSLProtocolState.WRAPPED:
865                    self._do_read()
866                elif self._state == SSLProtocolState.FLUSHING:
867                    self._do_flush()
868                elif self._state == SSLProtocolState.SHUTDOWN:
869                    self._do_shutdown()
870            self._loop.call_soon(resume)
871
872    # Flow control for reads from SSL socket
873
874    def _control_ssl_reading(self):
875        size = self._get_read_buffer_size()
876        if size >= self._incoming_high_water and not self._ssl_reading_paused:
877            self._ssl_reading_paused = True
878            self._transport.pause_reading()
879        elif size <= self._incoming_low_water and self._ssl_reading_paused:
880            self._ssl_reading_paused = False
881            self._transport.resume_reading()
882
883    def _set_read_buffer_limits(self, high=None, low=None):
884        high, low = add_flowcontrol_defaults(
885            high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ)
886        self._incoming_high_water = high
887        self._incoming_low_water = low
888
889    def _get_read_buffer_size(self):
890        return self._incoming.pending
891
892    # Flow control for writes to SSL socket
893
894    def pause_writing(self):
895        """Called when the low-level transport's buffer goes over
896        the high-water mark.
897        """
898        assert not self._ssl_writing_paused
899        self._ssl_writing_paused = True
900
901    def resume_writing(self):
902        """Called when the low-level transport's buffer drains below
903        the low-water mark.
904        """
905        assert self._ssl_writing_paused
906        self._ssl_writing_paused = False
907        self._process_outgoing()
908
909    def _fatal_error(self, exc, message='Fatal error on transport'):
910        if self._transport:
911            self._transport._force_close(exc)
912
913        if isinstance(exc, OSError):
914            if self._loop.get_debug():
915                logger.debug("%r: %s", self, message, exc_info=True)
916        elif not isinstance(exc, exceptions.CancelledError):
917            self._loop.call_exception_handler({
918                'message': message,
919                'exception': exc,
920                'transport': self._transport,
921                'protocol': self,
922            })
923