1import collections 2import enum 3import warnings 4try: 5 import ssl 6except ImportError: # pragma: no cover 7 ssl = None 8 9from . import constants 10from . import exceptions 11from . import protocols 12from . import transports 13from .log import logger 14 15if ssl is not None: 16 SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError) 17 18 19class SSLProtocolState(enum.Enum): 20 UNWRAPPED = "UNWRAPPED" 21 DO_HANDSHAKE = "DO_HANDSHAKE" 22 WRAPPED = "WRAPPED" 23 FLUSHING = "FLUSHING" 24 SHUTDOWN = "SHUTDOWN" 25 26 27class AppProtocolState(enum.Enum): 28 # This tracks the state of app protocol (https://git.io/fj59P): 29 # 30 # INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST 31 # 32 # * cm: connection_made() 33 # * dr: data_received() 34 # * er: eof_received() 35 # * cl: connection_lost() 36 37 STATE_INIT = "STATE_INIT" 38 STATE_CON_MADE = "STATE_CON_MADE" 39 STATE_EOF = "STATE_EOF" 40 STATE_CON_LOST = "STATE_CON_LOST" 41 42 43def _create_transport_context(server_side, server_hostname): 44 if server_side: 45 raise ValueError('Server side SSL needs a valid SSLContext') 46 47 # Client side may pass ssl=True to use a default 48 # context; in that case the sslcontext passed is None. 49 # The default is secure for client connections. 50 # Python 3.4+: use up-to-date strong settings. 51 sslcontext = ssl.create_default_context() 52 if not server_hostname: 53 sslcontext.check_hostname = False 54 return sslcontext 55 56 57def add_flowcontrol_defaults(high, low, kb): 58 if high is None: 59 if low is None: 60 hi = kb * 1024 61 else: 62 lo = low 63 hi = 4 * lo 64 else: 65 hi = high 66 if low is None: 67 lo = hi // 4 68 else: 69 lo = low 70 71 if not hi >= lo >= 0: 72 raise ValueError('high (%r) must be >= low (%r) must be >= 0' % 73 (hi, lo)) 74 75 return hi, lo 76 77 78class _SSLProtocolTransport(transports._FlowControlMixin, 79 transports.Transport): 80 81 _start_tls_compatible = True 82 _sendfile_compatible = constants._SendfileMode.FALLBACK 83 84 def __init__(self, loop, ssl_protocol): 85 self._loop = loop 86 self._ssl_protocol = ssl_protocol 87 self._closed = False 88 89 def get_extra_info(self, name, default=None): 90 """Get optional transport information.""" 91 return self._ssl_protocol._get_extra_info(name, default) 92 93 def set_protocol(self, protocol): 94 self._ssl_protocol._set_app_protocol(protocol) 95 96 def get_protocol(self): 97 return self._ssl_protocol._app_protocol 98 99 def is_closing(self): 100 return self._closed 101 102 def close(self): 103 """Close the transport. 104 105 Buffered data will be flushed asynchronously. No more data 106 will be received. After all buffered data is flushed, the 107 protocol's connection_lost() method will (eventually) called 108 with None as its argument. 109 """ 110 if not self._closed: 111 self._closed = True 112 self._ssl_protocol._start_shutdown() 113 else: 114 self._ssl_protocol = None 115 116 def __del__(self, _warnings=warnings): 117 if not self._closed: 118 self._closed = True 119 _warnings.warn( 120 "unclosed transport <asyncio._SSLProtocolTransport " 121 "object>", ResourceWarning) 122 123 def is_reading(self): 124 return not self._ssl_protocol._app_reading_paused 125 126 def pause_reading(self): 127 """Pause the receiving end. 128 129 No data will be passed to the protocol's data_received() 130 method until resume_reading() is called. 131 """ 132 self._ssl_protocol._pause_reading() 133 134 def resume_reading(self): 135 """Resume the receiving end. 136 137 Data received will once again be passed to the protocol's 138 data_received() method. 139 """ 140 self._ssl_protocol._resume_reading() 141 142 def set_write_buffer_limits(self, high=None, low=None): 143 """Set the high- and low-water limits for write flow control. 144 145 These two values control when to call the protocol's 146 pause_writing() and resume_writing() methods. If specified, 147 the low-water limit must be less than or equal to the 148 high-water limit. Neither value can be negative. 149 150 The defaults are implementation-specific. If only the 151 high-water limit is given, the low-water limit defaults to an 152 implementation-specific value less than or equal to the 153 high-water limit. Setting high to zero forces low to zero as 154 well, and causes pause_writing() to be called whenever the 155 buffer becomes non-empty. Setting low to zero causes 156 resume_writing() to be called only once the buffer is empty. 157 Use of zero for either limit is generally sub-optimal as it 158 reduces opportunities for doing I/O and computation 159 concurrently. 160 """ 161 self._ssl_protocol._set_write_buffer_limits(high, low) 162 self._ssl_protocol._control_app_writing() 163 164 def get_write_buffer_limits(self): 165 return (self._ssl_protocol._outgoing_low_water, 166 self._ssl_protocol._outgoing_high_water) 167 168 def get_write_buffer_size(self): 169 """Return the current size of the write buffers.""" 170 return self._ssl_protocol._get_write_buffer_size() 171 172 def set_read_buffer_limits(self, high=None, low=None): 173 """Set the high- and low-water limits for read flow control. 174 175 These two values control when to call the upstream transport's 176 pause_reading() and resume_reading() methods. If specified, 177 the low-water limit must be less than or equal to the 178 high-water limit. Neither value can be negative. 179 180 The defaults are implementation-specific. If only the 181 high-water limit is given, the low-water limit defaults to an 182 implementation-specific value less than or equal to the 183 high-water limit. Setting high to zero forces low to zero as 184 well, and causes pause_reading() to be called whenever the 185 buffer becomes non-empty. Setting low to zero causes 186 resume_reading() to be called only once the buffer is empty. 187 Use of zero for either limit is generally sub-optimal as it 188 reduces opportunities for doing I/O and computation 189 concurrently. 190 """ 191 self._ssl_protocol._set_read_buffer_limits(high, low) 192 self._ssl_protocol._control_ssl_reading() 193 194 def get_read_buffer_limits(self): 195 return (self._ssl_protocol._incoming_low_water, 196 self._ssl_protocol._incoming_high_water) 197 198 def get_read_buffer_size(self): 199 """Return the current size of the read buffer.""" 200 return self._ssl_protocol._get_read_buffer_size() 201 202 @property 203 def _protocol_paused(self): 204 # Required for sendfile fallback pause_writing/resume_writing logic 205 return self._ssl_protocol._app_writing_paused 206 207 def write(self, data): 208 """Write some data bytes to the transport. 209 210 This does not block; it buffers the data and arranges for it 211 to be sent out asynchronously. 212 """ 213 if not isinstance(data, (bytes, bytearray, memoryview)): 214 raise TypeError(f"data: expecting a bytes-like instance, " 215 f"got {type(data).__name__}") 216 if not data: 217 return 218 self._ssl_protocol._write_appdata((data,)) 219 220 def writelines(self, list_of_data): 221 """Write a list (or any iterable) of data bytes to the transport. 222 223 The default implementation concatenates the arguments and 224 calls write() on the result. 225 """ 226 self._ssl_protocol._write_appdata(list_of_data) 227 228 def write_eof(self): 229 """Close the write end after flushing buffered data. 230 231 This raises :exc:`NotImplementedError` right now. 232 """ 233 raise NotImplementedError 234 235 def can_write_eof(self): 236 """Return True if this transport supports write_eof(), False if not.""" 237 return False 238 239 def abort(self): 240 """Close the transport immediately. 241 242 Buffered data will be lost. No more data will be received. 243 The protocol's connection_lost() method will (eventually) be 244 called with None as its argument. 245 """ 246 self._closed = True 247 if self._ssl_protocol is not None: 248 self._ssl_protocol._abort() 249 250 def _force_close(self, exc): 251 self._closed = True 252 self._ssl_protocol._abort(exc) 253 254 def _test__append_write_backlog(self, data): 255 # for test only 256 self._ssl_protocol._write_backlog.append(data) 257 self._ssl_protocol._write_buffer_size += len(data) 258 259 260class SSLProtocol(protocols.BufferedProtocol): 261 max_size = 256 * 1024 # Buffer size passed to read() 262 263 _handshake_start_time = None 264 _handshake_timeout_handle = None 265 _shutdown_timeout_handle = None 266 267 def __init__(self, loop, app_protocol, sslcontext, waiter, 268 server_side=False, server_hostname=None, 269 call_connection_made=True, 270 ssl_handshake_timeout=None, 271 ssl_shutdown_timeout=None): 272 if ssl is None: 273 raise RuntimeError("stdlib ssl module not available") 274 275 self._ssl_buffer = bytearray(self.max_size) 276 self._ssl_buffer_view = memoryview(self._ssl_buffer) 277 278 if ssl_handshake_timeout is None: 279 ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT 280 elif ssl_handshake_timeout <= 0: 281 raise ValueError( 282 f"ssl_handshake_timeout should be a positive number, " 283 f"got {ssl_handshake_timeout}") 284 if ssl_shutdown_timeout is None: 285 ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT 286 elif ssl_shutdown_timeout <= 0: 287 raise ValueError( 288 f"ssl_shutdown_timeout should be a positive number, " 289 f"got {ssl_shutdown_timeout}") 290 291 if not sslcontext: 292 sslcontext = _create_transport_context( 293 server_side, server_hostname) 294 295 self._server_side = server_side 296 if server_hostname and not server_side: 297 self._server_hostname = server_hostname 298 else: 299 self._server_hostname = None 300 self._sslcontext = sslcontext 301 # SSL-specific extra info. More info are set when the handshake 302 # completes. 303 self._extra = dict(sslcontext=sslcontext) 304 305 # App data write buffering 306 self._write_backlog = collections.deque() 307 self._write_buffer_size = 0 308 309 self._waiter = waiter 310 self._loop = loop 311 self._set_app_protocol(app_protocol) 312 self._app_transport = None 313 self._app_transport_created = False 314 # transport, ex: SelectorSocketTransport 315 self._transport = None 316 self._ssl_handshake_timeout = ssl_handshake_timeout 317 self._ssl_shutdown_timeout = ssl_shutdown_timeout 318 # SSL and state machine 319 self._incoming = ssl.MemoryBIO() 320 self._outgoing = ssl.MemoryBIO() 321 self._state = SSLProtocolState.UNWRAPPED 322 self._conn_lost = 0 # Set when connection_lost called 323 if call_connection_made: 324 self._app_state = AppProtocolState.STATE_INIT 325 else: 326 self._app_state = AppProtocolState.STATE_CON_MADE 327 self._sslobj = self._sslcontext.wrap_bio( 328 self._incoming, self._outgoing, 329 server_side=self._server_side, 330 server_hostname=self._server_hostname) 331 332 # Flow Control 333 334 self._ssl_writing_paused = False 335 336 self._app_reading_paused = False 337 338 self._ssl_reading_paused = False 339 self._incoming_high_water = 0 340 self._incoming_low_water = 0 341 self._set_read_buffer_limits() 342 self._eof_received = False 343 344 self._app_writing_paused = False 345 self._outgoing_high_water = 0 346 self._outgoing_low_water = 0 347 self._set_write_buffer_limits() 348 self._get_app_transport() 349 350 def _set_app_protocol(self, app_protocol): 351 self._app_protocol = app_protocol 352 # Make fast hasattr check first 353 if (hasattr(app_protocol, 'get_buffer') and 354 isinstance(app_protocol, protocols.BufferedProtocol)): 355 self._app_protocol_get_buffer = app_protocol.get_buffer 356 self._app_protocol_buffer_updated = app_protocol.buffer_updated 357 self._app_protocol_is_buffer = True 358 else: 359 self._app_protocol_is_buffer = False 360 361 def _wakeup_waiter(self, exc=None): 362 if self._waiter is None: 363 return 364 if not self._waiter.cancelled(): 365 if exc is not None: 366 self._waiter.set_exception(exc) 367 else: 368 self._waiter.set_result(None) 369 self._waiter = None 370 371 def _get_app_transport(self): 372 if self._app_transport is None: 373 if self._app_transport_created: 374 raise RuntimeError('Creating _SSLProtocolTransport twice') 375 self._app_transport = _SSLProtocolTransport(self._loop, self) 376 self._app_transport_created = True 377 return self._app_transport 378 379 def connection_made(self, transport): 380 """Called when the low-level connection is made. 381 382 Start the SSL handshake. 383 """ 384 self._transport = transport 385 self._start_handshake() 386 387 def connection_lost(self, exc): 388 """Called when the low-level connection is lost or closed. 389 390 The argument is an exception object or None (the latter 391 meaning a regular EOF is received or the connection was 392 aborted or closed). 393 """ 394 self._write_backlog.clear() 395 self._outgoing.read() 396 self._conn_lost += 1 397 398 # Just mark the app transport as closed so that its __dealloc__ 399 # doesn't complain. 400 if self._app_transport is not None: 401 self._app_transport._closed = True 402 403 if self._state != SSLProtocolState.DO_HANDSHAKE: 404 if ( 405 self._app_state == AppProtocolState.STATE_CON_MADE or 406 self._app_state == AppProtocolState.STATE_EOF 407 ): 408 self._app_state = AppProtocolState.STATE_CON_LOST 409 self._loop.call_soon(self._app_protocol.connection_lost, exc) 410 self._set_state(SSLProtocolState.UNWRAPPED) 411 self._transport = None 412 self._app_transport = None 413 self._app_protocol = None 414 self._wakeup_waiter(exc) 415 416 if self._shutdown_timeout_handle: 417 self._shutdown_timeout_handle.cancel() 418 self._shutdown_timeout_handle = None 419 if self._handshake_timeout_handle: 420 self._handshake_timeout_handle.cancel() 421 self._handshake_timeout_handle = None 422 423 def get_buffer(self, n): 424 want = n 425 if want <= 0 or want > self.max_size: 426 want = self.max_size 427 if len(self._ssl_buffer) < want: 428 self._ssl_buffer = bytearray(want) 429 self._ssl_buffer_view = memoryview(self._ssl_buffer) 430 return self._ssl_buffer_view 431 432 def buffer_updated(self, nbytes): 433 self._incoming.write(self._ssl_buffer_view[:nbytes]) 434 435 if self._state == SSLProtocolState.DO_HANDSHAKE: 436 self._do_handshake() 437 438 elif self._state == SSLProtocolState.WRAPPED: 439 self._do_read() 440 441 elif self._state == SSLProtocolState.FLUSHING: 442 self._do_flush() 443 444 elif self._state == SSLProtocolState.SHUTDOWN: 445 self._do_shutdown() 446 447 def eof_received(self): 448 """Called when the other end of the low-level stream 449 is half-closed. 450 451 If this returns a false value (including None), the transport 452 will close itself. If it returns a true value, closing the 453 transport is up to the protocol. 454 """ 455 self._eof_received = True 456 try: 457 if self._loop.get_debug(): 458 logger.debug("%r received EOF", self) 459 460 if self._state == SSLProtocolState.DO_HANDSHAKE: 461 self._on_handshake_complete(ConnectionResetError) 462 463 elif self._state == SSLProtocolState.WRAPPED: 464 self._set_state(SSLProtocolState.FLUSHING) 465 if self._app_reading_paused: 466 return True 467 else: 468 self._do_flush() 469 470 elif self._state == SSLProtocolState.FLUSHING: 471 self._do_write() 472 self._set_state(SSLProtocolState.SHUTDOWN) 473 self._do_shutdown() 474 475 elif self._state == SSLProtocolState.SHUTDOWN: 476 self._do_shutdown() 477 478 except Exception: 479 self._transport.close() 480 raise 481 482 def _get_extra_info(self, name, default=None): 483 if name in self._extra: 484 return self._extra[name] 485 elif self._transport is not None: 486 return self._transport.get_extra_info(name, default) 487 else: 488 return default 489 490 def _set_state(self, new_state): 491 allowed = False 492 493 if new_state == SSLProtocolState.UNWRAPPED: 494 allowed = True 495 496 elif ( 497 self._state == SSLProtocolState.UNWRAPPED and 498 new_state == SSLProtocolState.DO_HANDSHAKE 499 ): 500 allowed = True 501 502 elif ( 503 self._state == SSLProtocolState.DO_HANDSHAKE and 504 new_state == SSLProtocolState.WRAPPED 505 ): 506 allowed = True 507 508 elif ( 509 self._state == SSLProtocolState.WRAPPED and 510 new_state == SSLProtocolState.FLUSHING 511 ): 512 allowed = True 513 514 elif ( 515 self._state == SSLProtocolState.FLUSHING and 516 new_state == SSLProtocolState.SHUTDOWN 517 ): 518 allowed = True 519 520 if allowed: 521 self._state = new_state 522 523 else: 524 raise RuntimeError( 525 'cannot switch state from {} to {}'.format( 526 self._state, new_state)) 527 528 # Handshake flow 529 530 def _start_handshake(self): 531 if self._loop.get_debug(): 532 logger.debug("%r starts SSL handshake", self) 533 self._handshake_start_time = self._loop.time() 534 else: 535 self._handshake_start_time = None 536 537 self._set_state(SSLProtocolState.DO_HANDSHAKE) 538 539 # start handshake timeout count down 540 self._handshake_timeout_handle = \ 541 self._loop.call_later(self._ssl_handshake_timeout, 542 lambda: self._check_handshake_timeout()) 543 544 self._do_handshake() 545 546 def _check_handshake_timeout(self): 547 if self._state == SSLProtocolState.DO_HANDSHAKE: 548 msg = ( 549 f"SSL handshake is taking longer than " 550 f"{self._ssl_handshake_timeout} seconds: " 551 f"aborting the connection" 552 ) 553 self._fatal_error(ConnectionAbortedError(msg)) 554 555 def _do_handshake(self): 556 try: 557 self._sslobj.do_handshake() 558 except SSLAgainErrors: 559 self._process_outgoing() 560 except ssl.SSLError as exc: 561 self._on_handshake_complete(exc) 562 else: 563 self._on_handshake_complete(None) 564 565 def _on_handshake_complete(self, handshake_exc): 566 if self._handshake_timeout_handle is not None: 567 self._handshake_timeout_handle.cancel() 568 self._handshake_timeout_handle = None 569 570 sslobj = self._sslobj 571 try: 572 if handshake_exc is None: 573 self._set_state(SSLProtocolState.WRAPPED) 574 else: 575 raise handshake_exc 576 577 peercert = sslobj.getpeercert() 578 except Exception as exc: 579 self._set_state(SSLProtocolState.UNWRAPPED) 580 if isinstance(exc, ssl.CertificateError): 581 msg = 'SSL handshake failed on verifying the certificate' 582 else: 583 msg = 'SSL handshake failed' 584 self._fatal_error(exc, msg) 585 self._wakeup_waiter(exc) 586 return 587 588 if self._loop.get_debug(): 589 dt = self._loop.time() - self._handshake_start_time 590 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3) 591 592 # Add extra info that becomes available after handshake. 593 self._extra.update(peercert=peercert, 594 cipher=sslobj.cipher(), 595 compression=sslobj.compression(), 596 ssl_object=sslobj) 597 if self._app_state == AppProtocolState.STATE_INIT: 598 self._app_state = AppProtocolState.STATE_CON_MADE 599 self._app_protocol.connection_made(self._get_app_transport()) 600 self._wakeup_waiter() 601 self._do_read() 602 603 # Shutdown flow 604 605 def _start_shutdown(self): 606 if ( 607 self._state in ( 608 SSLProtocolState.FLUSHING, 609 SSLProtocolState.SHUTDOWN, 610 SSLProtocolState.UNWRAPPED 611 ) 612 ): 613 return 614 if self._app_transport is not None: 615 self._app_transport._closed = True 616 if self._state == SSLProtocolState.DO_HANDSHAKE: 617 self._abort() 618 else: 619 self._set_state(SSLProtocolState.FLUSHING) 620 self._shutdown_timeout_handle = self._loop.call_later( 621 self._ssl_shutdown_timeout, 622 lambda: self._check_shutdown_timeout() 623 ) 624 self._do_flush() 625 626 def _check_shutdown_timeout(self): 627 if ( 628 self._state in ( 629 SSLProtocolState.FLUSHING, 630 SSLProtocolState.SHUTDOWN 631 ) 632 ): 633 self._transport._force_close( 634 exceptions.TimeoutError('SSL shutdown timed out')) 635 636 def _do_flush(self): 637 self._do_read() 638 self._set_state(SSLProtocolState.SHUTDOWN) 639 self._do_shutdown() 640 641 def _do_shutdown(self): 642 try: 643 if not self._eof_received: 644 self._sslobj.unwrap() 645 except SSLAgainErrors: 646 self._process_outgoing() 647 except ssl.SSLError as exc: 648 self._on_shutdown_complete(exc) 649 else: 650 self._process_outgoing() 651 self._call_eof_received() 652 self._on_shutdown_complete(None) 653 654 def _on_shutdown_complete(self, shutdown_exc): 655 if self._shutdown_timeout_handle is not None: 656 self._shutdown_timeout_handle.cancel() 657 self._shutdown_timeout_handle = None 658 659 if shutdown_exc: 660 self._fatal_error(shutdown_exc) 661 else: 662 self._loop.call_soon(self._transport.close) 663 664 def _abort(self): 665 self._set_state(SSLProtocolState.UNWRAPPED) 666 if self._transport is not None: 667 self._transport.abort() 668 669 # Outgoing flow 670 671 def _write_appdata(self, list_of_data): 672 if ( 673 self._state in ( 674 SSLProtocolState.FLUSHING, 675 SSLProtocolState.SHUTDOWN, 676 SSLProtocolState.UNWRAPPED 677 ) 678 ): 679 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 680 logger.warning('SSL connection is closed') 681 self._conn_lost += 1 682 return 683 684 for data in list_of_data: 685 self._write_backlog.append(data) 686 self._write_buffer_size += len(data) 687 688 try: 689 if self._state == SSLProtocolState.WRAPPED: 690 self._do_write() 691 692 except Exception as ex: 693 self._fatal_error(ex, 'Fatal error on SSL protocol') 694 695 def _do_write(self): 696 try: 697 while self._write_backlog: 698 data = self._write_backlog[0] 699 count = self._sslobj.write(data) 700 data_len = len(data) 701 if count < data_len: 702 self._write_backlog[0] = data[count:] 703 self._write_buffer_size -= count 704 else: 705 del self._write_backlog[0] 706 self._write_buffer_size -= data_len 707 except SSLAgainErrors: 708 pass 709 self._process_outgoing() 710 711 def _process_outgoing(self): 712 if not self._ssl_writing_paused: 713 data = self._outgoing.read() 714 if len(data): 715 self._transport.write(data) 716 self._control_app_writing() 717 718 # Incoming flow 719 720 def _do_read(self): 721 if ( 722 self._state not in ( 723 SSLProtocolState.WRAPPED, 724 SSLProtocolState.FLUSHING, 725 ) 726 ): 727 return 728 try: 729 if not self._app_reading_paused: 730 if self._app_protocol_is_buffer: 731 self._do_read__buffered() 732 else: 733 self._do_read__copied() 734 if self._write_backlog: 735 self._do_write() 736 else: 737 self._process_outgoing() 738 self._control_ssl_reading() 739 except Exception as ex: 740 self._fatal_error(ex, 'Fatal error on SSL protocol') 741 742 def _do_read__buffered(self): 743 offset = 0 744 count = 1 745 746 buf = self._app_protocol_get_buffer(self._get_read_buffer_size()) 747 wants = len(buf) 748 749 try: 750 count = self._sslobj.read(wants, buf) 751 752 if count > 0: 753 offset = count 754 while offset < wants: 755 count = self._sslobj.read(wants - offset, buf[offset:]) 756 if count > 0: 757 offset += count 758 else: 759 break 760 else: 761 self._loop.call_soon(lambda: self._do_read()) 762 except SSLAgainErrors: 763 pass 764 if offset > 0: 765 self._app_protocol_buffer_updated(offset) 766 if not count: 767 # close_notify 768 self._call_eof_received() 769 self._start_shutdown() 770 771 def _do_read__copied(self): 772 chunk = b'1' 773 zero = True 774 one = False 775 776 try: 777 while True: 778 chunk = self._sslobj.read(self.max_size) 779 if not chunk: 780 break 781 if zero: 782 zero = False 783 one = True 784 first = chunk 785 elif one: 786 one = False 787 data = [first, chunk] 788 else: 789 data.append(chunk) 790 except SSLAgainErrors: 791 pass 792 if one: 793 self._app_protocol.data_received(first) 794 elif not zero: 795 self._app_protocol.data_received(b''.join(data)) 796 if not chunk: 797 # close_notify 798 self._call_eof_received() 799 self._start_shutdown() 800 801 def _call_eof_received(self): 802 try: 803 if self._app_state == AppProtocolState.STATE_CON_MADE: 804 self._app_state = AppProtocolState.STATE_EOF 805 keep_open = self._app_protocol.eof_received() 806 if keep_open: 807 logger.warning('returning true from eof_received() ' 808 'has no effect when using ssl') 809 except (KeyboardInterrupt, SystemExit): 810 raise 811 except BaseException as ex: 812 self._fatal_error(ex, 'Error calling eof_received()') 813 814 # Flow control for writes from APP socket 815 816 def _control_app_writing(self): 817 size = self._get_write_buffer_size() 818 if size >= self._outgoing_high_water and not self._app_writing_paused: 819 self._app_writing_paused = True 820 try: 821 self._app_protocol.pause_writing() 822 except (KeyboardInterrupt, SystemExit): 823 raise 824 except BaseException as exc: 825 self._loop.call_exception_handler({ 826 'message': 'protocol.pause_writing() failed', 827 'exception': exc, 828 'transport': self._app_transport, 829 'protocol': self, 830 }) 831 elif size <= self._outgoing_low_water and self._app_writing_paused: 832 self._app_writing_paused = False 833 try: 834 self._app_protocol.resume_writing() 835 except (KeyboardInterrupt, SystemExit): 836 raise 837 except BaseException as exc: 838 self._loop.call_exception_handler({ 839 'message': 'protocol.resume_writing() failed', 840 'exception': exc, 841 'transport': self._app_transport, 842 'protocol': self, 843 }) 844 845 def _get_write_buffer_size(self): 846 return self._outgoing.pending + self._write_buffer_size 847 848 def _set_write_buffer_limits(self, high=None, low=None): 849 high, low = add_flowcontrol_defaults( 850 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE) 851 self._outgoing_high_water = high 852 self._outgoing_low_water = low 853 854 # Flow control for reads to APP socket 855 856 def _pause_reading(self): 857 self._app_reading_paused = True 858 859 def _resume_reading(self): 860 if self._app_reading_paused: 861 self._app_reading_paused = False 862 863 def resume(): 864 if self._state == SSLProtocolState.WRAPPED: 865 self._do_read() 866 elif self._state == SSLProtocolState.FLUSHING: 867 self._do_flush() 868 elif self._state == SSLProtocolState.SHUTDOWN: 869 self._do_shutdown() 870 self._loop.call_soon(resume) 871 872 # Flow control for reads from SSL socket 873 874 def _control_ssl_reading(self): 875 size = self._get_read_buffer_size() 876 if size >= self._incoming_high_water and not self._ssl_reading_paused: 877 self._ssl_reading_paused = True 878 self._transport.pause_reading() 879 elif size <= self._incoming_low_water and self._ssl_reading_paused: 880 self._ssl_reading_paused = False 881 self._transport.resume_reading() 882 883 def _set_read_buffer_limits(self, high=None, low=None): 884 high, low = add_flowcontrol_defaults( 885 high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ) 886 self._incoming_high_water = high 887 self._incoming_low_water = low 888 889 def _get_read_buffer_size(self): 890 return self._incoming.pending 891 892 # Flow control for writes to SSL socket 893 894 def pause_writing(self): 895 """Called when the low-level transport's buffer goes over 896 the high-water mark. 897 """ 898 assert not self._ssl_writing_paused 899 self._ssl_writing_paused = True 900 901 def resume_writing(self): 902 """Called when the low-level transport's buffer drains below 903 the low-water mark. 904 """ 905 assert self._ssl_writing_paused 906 self._ssl_writing_paused = False 907 self._process_outgoing() 908 909 def _fatal_error(self, exc, message='Fatal error on transport'): 910 if self._transport: 911 self._transport._force_close(exc) 912 913 if isinstance(exc, OSError): 914 if self._loop.get_debug(): 915 logger.debug("%r: %s", self, message, exc_info=True) 916 elif not isinstance(exc, exceptions.CancelledError): 917 self._loop.call_exception_handler({ 918 'message': message, 919 'exception': exc, 920 'transport': self._transport, 921 'protocol': self, 922 }) 923