1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Invocation-side implementation of gRPC Python.""" 15 16import copy 17import functools 18import logging 19import os 20import sys 21import threading 22import time 23import types 24from typing import ( 25 Any, 26 Callable, 27 Dict, 28 Iterator, 29 List, 30 Optional, 31 Sequence, 32 Set, 33 Tuple, 34 Union, 35) 36 37import grpc # pytype: disable=pyi-error 38from grpc import _common # pytype: disable=pyi-error 39from grpc import _compression # pytype: disable=pyi-error 40from grpc import _grpcio_metadata # pytype: disable=pyi-error 41from grpc import _observability # pytype: disable=pyi-error 42from grpc._cython import cygrpc 43from grpc._typing import ChannelArgumentType 44from grpc._typing import DeserializingFunction 45from grpc._typing import IntegratedCallFactory 46from grpc._typing import MetadataType 47from grpc._typing import NullaryCallbackType 48from grpc._typing import ResponseType 49from grpc._typing import SerializingFunction 50from grpc._typing import UserTag 51import grpc.experimental # pytype: disable=pyi-error 52 53_LOGGER = logging.getLogger(__name__) 54 55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__) 56 57_EMPTY_FLAGS = 0 58 59# NOTE(rbellevi): No guarantees are given about the maintenance of this 60# environment variable. 61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = ( 62 os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None 63) 64 65_UNARY_UNARY_INITIAL_DUE = ( 66 cygrpc.OperationType.send_initial_metadata, 67 cygrpc.OperationType.send_message, 68 cygrpc.OperationType.send_close_from_client, 69 cygrpc.OperationType.receive_initial_metadata, 70 cygrpc.OperationType.receive_message, 71 cygrpc.OperationType.receive_status_on_client, 72) 73_UNARY_STREAM_INITIAL_DUE = ( 74 cygrpc.OperationType.send_initial_metadata, 75 cygrpc.OperationType.send_message, 76 cygrpc.OperationType.send_close_from_client, 77 cygrpc.OperationType.receive_initial_metadata, 78 cygrpc.OperationType.receive_status_on_client, 79) 80_STREAM_UNARY_INITIAL_DUE = ( 81 cygrpc.OperationType.send_initial_metadata, 82 cygrpc.OperationType.receive_initial_metadata, 83 cygrpc.OperationType.receive_message, 84 cygrpc.OperationType.receive_status_on_client, 85) 86_STREAM_STREAM_INITIAL_DUE = ( 87 cygrpc.OperationType.send_initial_metadata, 88 cygrpc.OperationType.receive_initial_metadata, 89 cygrpc.OperationType.receive_status_on_client, 90) 91 92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = ( 93 "Exception calling channel subscription callback!" 94) 95 96_OK_RENDEZVOUS_REPR_FORMAT = ( 97 '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>' 98) 99 100_NON_OK_RENDEZVOUS_REPR_FORMAT = ( 101 "<{} of RPC that terminated with:\n" 102 "\tstatus = {}\n" 103 '\tdetails = "{}"\n' 104 '\tdebug_error_string = "{}"\n' 105 ">" 106) 107 108 109def _deadline(timeout: Optional[float]) -> Optional[float]: 110 return None if timeout is None else time.time() + timeout 111 112 113def _unknown_code_details( 114 unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str] 115) -> str: 116 return 'Server sent unknown code {} and details "{}"'.format( 117 unknown_cygrpc_code, details 118 ) 119 120 121class _RPCState(object): 122 condition: threading.Condition 123 due: Set[cygrpc.OperationType] 124 initial_metadata: Optional[MetadataType] 125 response: Any 126 trailing_metadata: Optional[MetadataType] 127 code: Optional[grpc.StatusCode] 128 details: Optional[str] 129 debug_error_string: Optional[str] 130 cancelled: bool 131 callbacks: List[NullaryCallbackType] 132 fork_epoch: Optional[int] 133 rpc_start_time: Optional[float] # In relative seconds 134 rpc_end_time: Optional[float] # In relative seconds 135 method: Optional[str] 136 target: Optional[str] 137 138 def __init__( 139 self, 140 due: Sequence[cygrpc.OperationType], 141 initial_metadata: Optional[MetadataType], 142 trailing_metadata: Optional[MetadataType], 143 code: Optional[grpc.StatusCode], 144 details: Optional[str], 145 ): 146 # `condition` guards all members of _RPCState. `notify_all` is called on 147 # `condition` when the state of the RPC has changed. 148 self.condition = threading.Condition() 149 150 # The cygrpc.OperationType objects representing events due from the RPC's 151 # completion queue. If an operation is in `due`, it is guaranteed that 152 # `operate()` has been called on a corresponding operation. But the 153 # converse is not true. That is, in the case of failed `operate()` 154 # calls, there may briefly be events in `due` that do not correspond to 155 # operations submitted to Core. 156 self.due = set(due) 157 self.initial_metadata = initial_metadata 158 self.response = None 159 self.trailing_metadata = trailing_metadata 160 self.code = code 161 self.details = details 162 self.debug_error_string = None 163 # The following three fields are used for observability. 164 # Updates to those fields do not trigger self.condition. 165 self.rpc_start_time = None 166 self.rpc_end_time = None 167 self.method = None 168 self.target = None 169 170 # The semantics of grpc.Future.cancel and grpc.Future.cancelled are 171 # slightly wonky, so they have to be tracked separately from the rest of the 172 # result of the RPC. This field tracks whether cancellation was requested 173 # prior to termination of the RPC. 174 self.cancelled = False 175 self.callbacks = [] 176 self.fork_epoch = cygrpc.get_fork_epoch() 177 178 def reset_postfork_child(self): 179 self.condition = threading.Condition() 180 181 182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None: 183 if state.code is None: 184 state.code = code 185 state.details = details 186 if state.initial_metadata is None: 187 state.initial_metadata = () 188 state.trailing_metadata = () 189 190 191def _handle_event( 192 event: cygrpc.BaseEvent, 193 state: _RPCState, 194 response_deserializer: Optional[DeserializingFunction], 195) -> List[NullaryCallbackType]: 196 callbacks = [] 197 for batch_operation in event.batch_operations: 198 operation_type = batch_operation.type() 199 state.due.remove(operation_type) 200 if operation_type == cygrpc.OperationType.receive_initial_metadata: 201 state.initial_metadata = batch_operation.initial_metadata() 202 elif operation_type == cygrpc.OperationType.receive_message: 203 serialized_response = batch_operation.message() 204 if serialized_response is not None: 205 response = _common.deserialize( 206 serialized_response, response_deserializer 207 ) 208 if response is None: 209 details = "Exception deserializing response!" 210 _abort(state, grpc.StatusCode.INTERNAL, details) 211 else: 212 state.response = response 213 elif operation_type == cygrpc.OperationType.receive_status_on_client: 214 state.trailing_metadata = batch_operation.trailing_metadata() 215 if state.code is None: 216 code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( 217 batch_operation.code() 218 ) 219 if code is None: 220 state.code = grpc.StatusCode.UNKNOWN 221 state.details = _unknown_code_details( 222 code, batch_operation.details() 223 ) 224 else: 225 state.code = code 226 state.details = batch_operation.details() 227 state.debug_error_string = batch_operation.error_string() 228 state.rpc_end_time = time.perf_counter() 229 _observability.maybe_record_rpc_latency(state) 230 callbacks.extend(state.callbacks) 231 state.callbacks = None 232 return callbacks 233 234 235def _event_handler( 236 state: _RPCState, response_deserializer: Optional[DeserializingFunction] 237) -> UserTag: 238 def handle_event(event): 239 with state.condition: 240 callbacks = _handle_event(event, state, response_deserializer) 241 state.condition.notify_all() 242 done = not state.due 243 for callback in callbacks: 244 try: 245 callback() 246 except Exception as e: # pylint: disable=broad-except 247 # NOTE(rbellevi): We suppress but log errors here so as not to 248 # kill the channel spin thread. 249 logging.error( 250 "Exception in callback %s: %s", repr(callback.func), repr(e) 251 ) 252 return done and state.fork_epoch >= cygrpc.get_fork_epoch() 253 254 return handle_event 255 256 257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall. 258# pylint: disable=too-many-statements 259def _consume_request_iterator( 260 request_iterator: Iterator, 261 state: _RPCState, 262 call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall], 263 request_serializer: SerializingFunction, 264 event_handler: Optional[UserTag], 265) -> None: 266 """Consume a request supplied by the user.""" 267 268 def consume_request_iterator(): # pylint: disable=too-many-branches 269 # Iterate over the request iterator until it is exhausted or an error 270 # condition is encountered. 271 while True: 272 return_from_user_request_generator_invoked = False 273 try: 274 # The thread may die in user-code. Do not block fork for this. 275 cygrpc.enter_user_request_generator() 276 request = next(request_iterator) 277 except StopIteration: 278 break 279 except Exception: # pylint: disable=broad-except 280 cygrpc.return_from_user_request_generator() 281 return_from_user_request_generator_invoked = True 282 code = grpc.StatusCode.UNKNOWN 283 details = "Exception iterating requests!" 284 _LOGGER.exception(details) 285 call.cancel( 286 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 287 ) 288 _abort(state, code, details) 289 return 290 finally: 291 if not return_from_user_request_generator_invoked: 292 cygrpc.return_from_user_request_generator() 293 serialized_request = _common.serialize(request, request_serializer) 294 with state.condition: 295 if state.code is None and not state.cancelled: 296 if serialized_request is None: 297 code = grpc.StatusCode.INTERNAL 298 details = "Exception serializing request!" 299 call.cancel( 300 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], 301 details, 302 ) 303 _abort(state, code, details) 304 return 305 else: 306 state.due.add(cygrpc.OperationType.send_message) 307 operations = ( 308 cygrpc.SendMessageOperation( 309 serialized_request, _EMPTY_FLAGS 310 ), 311 ) 312 operating = call.operate(operations, event_handler) 313 if not operating: 314 state.due.remove(cygrpc.OperationType.send_message) 315 return 316 317 def _done(): 318 return ( 319 state.code is not None 320 or cygrpc.OperationType.send_message 321 not in state.due 322 ) 323 324 _common.wait( 325 state.condition.wait, 326 _done, 327 spin_cb=functools.partial( 328 cygrpc.block_if_fork_in_progress, state 329 ), 330 ) 331 if state.code is not None: 332 return 333 else: 334 return 335 with state.condition: 336 if state.code is None: 337 state.due.add(cygrpc.OperationType.send_close_from_client) 338 operations = ( 339 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 340 ) 341 operating = call.operate(operations, event_handler) 342 if not operating: 343 state.due.remove( 344 cygrpc.OperationType.send_close_from_client 345 ) 346 347 consumption_thread = cygrpc.ForkManagedThread( 348 target=consume_request_iterator 349 ) 350 consumption_thread.setDaemon(True) 351 consumption_thread.start() 352 353 354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str: 355 """Calculates error string for RPC.""" 356 with rpc_state.condition: 357 if rpc_state.code is None: 358 return "<{} object>".format(class_name) 359 elif rpc_state.code is grpc.StatusCode.OK: 360 return _OK_RENDEZVOUS_REPR_FORMAT.format( 361 class_name, rpc_state.code, rpc_state.details 362 ) 363 else: 364 return _NON_OK_RENDEZVOUS_REPR_FORMAT.format( 365 class_name, 366 rpc_state.code, 367 rpc_state.details, 368 rpc_state.debug_error_string, 369 ) 370 371 372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future): 373 """An RPC error not tied to the execution of a particular RPC. 374 375 The RPC represented by the state object must not be in-progress or 376 cancelled. 377 378 Attributes: 379 _state: An instance of _RPCState. 380 """ 381 382 _state: _RPCState 383 384 def __init__(self, state: _RPCState): 385 with state.condition: 386 self._state = _RPCState( 387 (), 388 copy.deepcopy(state.initial_metadata), 389 copy.deepcopy(state.trailing_metadata), 390 state.code, 391 copy.deepcopy(state.details), 392 ) 393 self._state.response = copy.copy(state.response) 394 self._state.debug_error_string = copy.copy(state.debug_error_string) 395 396 def initial_metadata(self) -> Optional[MetadataType]: 397 return self._state.initial_metadata 398 399 def trailing_metadata(self) -> Optional[MetadataType]: 400 return self._state.trailing_metadata 401 402 def code(self) -> Optional[grpc.StatusCode]: 403 return self._state.code 404 405 def details(self) -> Optional[str]: 406 return _common.decode(self._state.details) 407 408 def debug_error_string(self) -> Optional[str]: 409 return _common.decode(self._state.debug_error_string) 410 411 def _repr(self) -> str: 412 return _rpc_state_string(self.__class__.__name__, self._state) 413 414 def __repr__(self) -> str: 415 return self._repr() 416 417 def __str__(self) -> str: 418 return self._repr() 419 420 def cancel(self) -> bool: 421 """See grpc.Future.cancel.""" 422 return False 423 424 def cancelled(self) -> bool: 425 """See grpc.Future.cancelled.""" 426 return False 427 428 def running(self) -> bool: 429 """See grpc.Future.running.""" 430 return False 431 432 def done(self) -> bool: 433 """See grpc.Future.done.""" 434 return True 435 436 def result( 437 self, timeout: Optional[float] = None 438 ) -> Any: # pylint: disable=unused-argument 439 """See grpc.Future.result.""" 440 raise self 441 442 def exception( 443 self, timeout: Optional[float] = None # pylint: disable=unused-argument 444 ) -> Optional[Exception]: 445 """See grpc.Future.exception.""" 446 return self 447 448 def traceback( 449 self, timeout: Optional[float] = None # pylint: disable=unused-argument 450 ) -> Optional[types.TracebackType]: 451 """See grpc.Future.traceback.""" 452 try: 453 raise self 454 except grpc.RpcError: 455 return sys.exc_info()[2] 456 457 def add_done_callback( 458 self, 459 fn: Callable[[grpc.Future], None], 460 timeout: Optional[float] = None, # pylint: disable=unused-argument 461 ) -> None: 462 """See grpc.Future.add_done_callback.""" 463 fn(self) 464 465 466class _Rendezvous(grpc.RpcError, grpc.RpcContext): 467 """An RPC iterator. 468 469 Attributes: 470 _state: An instance of _RPCState. 471 _call: An instance of SegregatedCall or IntegratedCall. 472 In either case, the _call object is expected to have operate, cancel, 473 and next_event methods. 474 _response_deserializer: A callable taking bytes and return a Python 475 object. 476 _deadline: A float representing the deadline of the RPC in seconds. Or 477 possibly None, to represent an RPC with no deadline at all. 478 """ 479 480 _state: _RPCState 481 _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall] 482 _response_deserializer: Optional[DeserializingFunction] 483 _deadline: Optional[float] 484 485 def __init__( 486 self, 487 state: _RPCState, 488 call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall], 489 response_deserializer: Optional[DeserializingFunction], 490 deadline: Optional[float], 491 ): 492 super(_Rendezvous, self).__init__() 493 self._state = state 494 self._call = call 495 self._response_deserializer = response_deserializer 496 self._deadline = deadline 497 498 def is_active(self) -> bool: 499 """See grpc.RpcContext.is_active""" 500 with self._state.condition: 501 return self._state.code is None 502 503 def time_remaining(self) -> Optional[float]: 504 """See grpc.RpcContext.time_remaining""" 505 with self._state.condition: 506 if self._deadline is None: 507 return None 508 else: 509 return max(self._deadline - time.time(), 0) 510 511 def cancel(self) -> bool: 512 """See grpc.RpcContext.cancel""" 513 with self._state.condition: 514 if self._state.code is None: 515 code = grpc.StatusCode.CANCELLED 516 details = "Locally cancelled by application!" 517 self._call.cancel( 518 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details 519 ) 520 self._state.cancelled = True 521 _abort(self._state, code, details) 522 self._state.condition.notify_all() 523 return True 524 else: 525 return False 526 527 def add_callback(self, callback: NullaryCallbackType) -> bool: 528 """See grpc.RpcContext.add_callback""" 529 with self._state.condition: 530 if self._state.callbacks is None: 531 return False 532 else: 533 self._state.callbacks.append(callback) 534 return True 535 536 def __iter__(self): 537 return self 538 539 def next(self): 540 return self._next() 541 542 def __next__(self): 543 return self._next() 544 545 def _next(self): 546 raise NotImplementedError() 547 548 def debug_error_string(self) -> Optional[str]: 549 raise NotImplementedError() 550 551 def _repr(self) -> str: 552 return _rpc_state_string(self.__class__.__name__, self._state) 553 554 def __repr__(self) -> str: 555 return self._repr() 556 557 def __str__(self) -> str: 558 return self._repr() 559 560 def __del__(self) -> None: 561 with self._state.condition: 562 if self._state.code is None: 563 self._state.code = grpc.StatusCode.CANCELLED 564 self._state.details = "Cancelled upon garbage collection!" 565 self._state.cancelled = True 566 self._call.cancel( 567 _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code], 568 self._state.details, 569 ) 570 self._state.condition.notify_all() 571 572 573class _SingleThreadedRendezvous( 574 _Rendezvous, grpc.Call, grpc.Future 575): # pylint: disable=too-many-ancestors 576 """An RPC iterator operating entirely on a single thread. 577 578 The __next__ method of _SingleThreadedRendezvous does not depend on the 579 existence of any other thread, including the "channel spin thread". 580 However, this means that its interface is entirely synchronous. So this 581 class cannot completely fulfill the grpc.Future interface. The result, 582 exception, and traceback methods will never block and will instead raise 583 an exception if calling the method would result in blocking. 584 585 This means that these methods are safe to call from add_done_callback 586 handlers. 587 """ 588 589 _state: _RPCState 590 591 def _is_complete(self) -> bool: 592 return self._state.code is not None 593 594 def cancelled(self) -> bool: 595 with self._state.condition: 596 return self._state.cancelled 597 598 def running(self) -> bool: 599 with self._state.condition: 600 return self._state.code is None 601 602 def done(self) -> bool: 603 with self._state.condition: 604 return self._state.code is not None 605 606 def result(self, timeout: Optional[float] = None) -> Any: 607 """Returns the result of the computation or raises its exception. 608 609 This method will never block. Instead, it will raise an exception 610 if calling this method would otherwise result in blocking. 611 612 Since this method will never block, any `timeout` argument passed will 613 be ignored. 614 """ 615 del timeout 616 with self._state.condition: 617 if not self._is_complete(): 618 raise grpc.experimental.UsageError( 619 "_SingleThreadedRendezvous only supports result() when the" 620 " RPC is complete." 621 ) 622 if self._state.code is grpc.StatusCode.OK: 623 return self._state.response 624 elif self._state.cancelled: 625 raise grpc.FutureCancelledError() 626 else: 627 raise self 628 629 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 630 """Return the exception raised by the computation. 631 632 This method will never block. Instead, it will raise an exception 633 if calling this method would otherwise result in blocking. 634 635 Since this method will never block, any `timeout` argument passed will 636 be ignored. 637 """ 638 del timeout 639 with self._state.condition: 640 if not self._is_complete(): 641 raise grpc.experimental.UsageError( 642 "_SingleThreadedRendezvous only supports exception() when" 643 " the RPC is complete." 644 ) 645 if self._state.code is grpc.StatusCode.OK: 646 return None 647 elif self._state.cancelled: 648 raise grpc.FutureCancelledError() 649 else: 650 return self 651 652 def traceback( 653 self, timeout: Optional[float] = None 654 ) -> Optional[types.TracebackType]: 655 """Access the traceback of the exception raised by the computation. 656 657 This method will never block. Instead, it will raise an exception 658 if calling this method would otherwise result in blocking. 659 660 Since this method will never block, any `timeout` argument passed will 661 be ignored. 662 """ 663 del timeout 664 with self._state.condition: 665 if not self._is_complete(): 666 raise grpc.experimental.UsageError( 667 "_SingleThreadedRendezvous only supports traceback() when" 668 " the RPC is complete." 669 ) 670 if self._state.code is grpc.StatusCode.OK: 671 return None 672 elif self._state.cancelled: 673 raise grpc.FutureCancelledError() 674 else: 675 try: 676 raise self 677 except grpc.RpcError: 678 return sys.exc_info()[2] 679 680 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 681 with self._state.condition: 682 if self._state.code is None: 683 self._state.callbacks.append(functools.partial(fn, self)) 684 return 685 686 fn(self) 687 688 def initial_metadata(self) -> Optional[MetadataType]: 689 """See grpc.Call.initial_metadata""" 690 with self._state.condition: 691 # NOTE(gnossen): Based on our initial call batch, we are guaranteed 692 # to receive initial metadata before any messages. 693 while self._state.initial_metadata is None: 694 self._consume_next_event() 695 return self._state.initial_metadata 696 697 def trailing_metadata(self) -> Optional[MetadataType]: 698 """See grpc.Call.trailing_metadata""" 699 with self._state.condition: 700 if self._state.trailing_metadata is None: 701 raise grpc.experimental.UsageError( 702 "Cannot get trailing metadata until RPC is completed." 703 ) 704 return self._state.trailing_metadata 705 706 def code(self) -> Optional[grpc.StatusCode]: 707 """See grpc.Call.code""" 708 with self._state.condition: 709 if self._state.code is None: 710 raise grpc.experimental.UsageError( 711 "Cannot get code until RPC is completed." 712 ) 713 return self._state.code 714 715 def details(self) -> Optional[str]: 716 """See grpc.Call.details""" 717 with self._state.condition: 718 if self._state.details is None: 719 raise grpc.experimental.UsageError( 720 "Cannot get details until RPC is completed." 721 ) 722 return _common.decode(self._state.details) 723 724 def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]: 725 event = self._call.next_event() 726 with self._state.condition: 727 callbacks = _handle_event( 728 event, self._state, self._response_deserializer 729 ) 730 for callback in callbacks: 731 # NOTE(gnossen): We intentionally allow exceptions to bubble up 732 # to the user when running on a single thread. 733 callback() 734 return event 735 736 def _next_response(self) -> Any: 737 while True: 738 self._consume_next_event() 739 with self._state.condition: 740 if self._state.response is not None: 741 response = self._state.response 742 self._state.response = None 743 return response 744 elif ( 745 cygrpc.OperationType.receive_message not in self._state.due 746 ): 747 if self._state.code is grpc.StatusCode.OK: 748 raise StopIteration() 749 elif self._state.code is not None: 750 raise self 751 752 def _next(self) -> Any: 753 with self._state.condition: 754 if self._state.code is None: 755 # We tentatively add the operation as expected and remove 756 # it if the enqueue operation fails. This allows us to guarantee that 757 # if an event has been submitted to the core completion queue, 758 # it is in `due`. If we waited until after a successful 759 # enqueue operation then a signal could interrupt this 760 # thread between the enqueue operation and the addition of the 761 # operation to `due`. This would cause an exception on the 762 # channel spin thread when the operation completes and no 763 # corresponding operation would be present in state.due. 764 # Note that, since `condition` is held through this block, there is 765 # no data race on `due`. 766 self._state.due.add(cygrpc.OperationType.receive_message) 767 operating = self._call.operate( 768 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None 769 ) 770 if not operating: 771 self._state.due.remove(cygrpc.OperationType.receive_message) 772 elif self._state.code is grpc.StatusCode.OK: 773 raise StopIteration() 774 else: 775 raise self 776 return self._next_response() 777 778 def debug_error_string(self) -> Optional[str]: 779 with self._state.condition: 780 if self._state.debug_error_string is None: 781 raise grpc.experimental.UsageError( 782 "Cannot get debug error string until RPC is completed." 783 ) 784 return _common.decode(self._state.debug_error_string) 785 786 787class _MultiThreadedRendezvous( 788 _Rendezvous, grpc.Call, grpc.Future 789): # pylint: disable=too-many-ancestors 790 """An RPC iterator that depends on a channel spin thread. 791 792 This iterator relies upon a per-channel thread running in the background, 793 dequeueing events from the completion queue, and notifying threads waiting 794 on the threading.Condition object in the _RPCState object. 795 796 This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface 797 and to mediate a bidirection streaming RPC. 798 """ 799 800 _state: _RPCState 801 802 def initial_metadata(self) -> Optional[MetadataType]: 803 """See grpc.Call.initial_metadata""" 804 with self._state.condition: 805 806 def _done(): 807 return self._state.initial_metadata is not None 808 809 _common.wait(self._state.condition.wait, _done) 810 return self._state.initial_metadata 811 812 def trailing_metadata(self) -> Optional[MetadataType]: 813 """See grpc.Call.trailing_metadata""" 814 with self._state.condition: 815 816 def _done(): 817 return self._state.trailing_metadata is not None 818 819 _common.wait(self._state.condition.wait, _done) 820 return self._state.trailing_metadata 821 822 def code(self) -> Optional[grpc.StatusCode]: 823 """See grpc.Call.code""" 824 with self._state.condition: 825 826 def _done(): 827 return self._state.code is not None 828 829 _common.wait(self._state.condition.wait, _done) 830 return self._state.code 831 832 def details(self) -> Optional[str]: 833 """See grpc.Call.details""" 834 with self._state.condition: 835 836 def _done(): 837 return self._state.details is not None 838 839 _common.wait(self._state.condition.wait, _done) 840 return _common.decode(self._state.details) 841 842 def debug_error_string(self) -> Optional[str]: 843 with self._state.condition: 844 845 def _done(): 846 return self._state.debug_error_string is not None 847 848 _common.wait(self._state.condition.wait, _done) 849 return _common.decode(self._state.debug_error_string) 850 851 def cancelled(self) -> bool: 852 with self._state.condition: 853 return self._state.cancelled 854 855 def running(self) -> bool: 856 with self._state.condition: 857 return self._state.code is None 858 859 def done(self) -> bool: 860 with self._state.condition: 861 return self._state.code is not None 862 863 def _is_complete(self) -> bool: 864 return self._state.code is not None 865 866 def result(self, timeout: Optional[float] = None) -> Any: 867 """Returns the result of the computation or raises its exception. 868 869 See grpc.Future.result for the full API contract. 870 """ 871 with self._state.condition: 872 timed_out = _common.wait( 873 self._state.condition.wait, self._is_complete, timeout=timeout 874 ) 875 if timed_out: 876 raise grpc.FutureTimeoutError() 877 else: 878 if self._state.code is grpc.StatusCode.OK: 879 return self._state.response 880 elif self._state.cancelled: 881 raise grpc.FutureCancelledError() 882 else: 883 raise self 884 885 def exception(self, timeout: Optional[float] = None) -> Optional[Exception]: 886 """Return the exception raised by the computation. 887 888 See grpc.Future.exception for the full API contract. 889 """ 890 with self._state.condition: 891 timed_out = _common.wait( 892 self._state.condition.wait, self._is_complete, timeout=timeout 893 ) 894 if timed_out: 895 raise grpc.FutureTimeoutError() 896 else: 897 if self._state.code is grpc.StatusCode.OK: 898 return None 899 elif self._state.cancelled: 900 raise grpc.FutureCancelledError() 901 else: 902 return self 903 904 def traceback( 905 self, timeout: Optional[float] = None 906 ) -> Optional[types.TracebackType]: 907 """Access the traceback of the exception raised by the computation. 908 909 See grpc.future.traceback for the full API contract. 910 """ 911 with self._state.condition: 912 timed_out = _common.wait( 913 self._state.condition.wait, self._is_complete, timeout=timeout 914 ) 915 if timed_out: 916 raise grpc.FutureTimeoutError() 917 else: 918 if self._state.code is grpc.StatusCode.OK: 919 return None 920 elif self._state.cancelled: 921 raise grpc.FutureCancelledError() 922 else: 923 try: 924 raise self 925 except grpc.RpcError: 926 return sys.exc_info()[2] 927 928 def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None: 929 with self._state.condition: 930 if self._state.code is None: 931 self._state.callbacks.append(functools.partial(fn, self)) 932 return 933 934 fn(self) 935 936 def _next(self) -> Any: 937 with self._state.condition: 938 if self._state.code is None: 939 event_handler = _event_handler( 940 self._state, self._response_deserializer 941 ) 942 self._state.due.add(cygrpc.OperationType.receive_message) 943 operating = self._call.operate( 944 (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), 945 event_handler, 946 ) 947 if not operating: 948 self._state.due.remove(cygrpc.OperationType.receive_message) 949 elif self._state.code is grpc.StatusCode.OK: 950 raise StopIteration() 951 else: 952 raise self 953 954 def _response_ready(): 955 return self._state.response is not None or ( 956 cygrpc.OperationType.receive_message not in self._state.due 957 and self._state.code is not None 958 ) 959 960 _common.wait(self._state.condition.wait, _response_ready) 961 if self._state.response is not None: 962 response = self._state.response 963 self._state.response = None 964 return response 965 elif cygrpc.OperationType.receive_message not in self._state.due: 966 if self._state.code is grpc.StatusCode.OK: 967 raise StopIteration() 968 elif self._state.code is not None: 969 raise self 970 971 972def _start_unary_request( 973 request: Any, 974 timeout: Optional[float], 975 request_serializer: SerializingFunction, 976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]: 977 deadline = _deadline(timeout) 978 serialized_request = _common.serialize(request, request_serializer) 979 if serialized_request is None: 980 state = _RPCState( 981 (), 982 (), 983 (), 984 grpc.StatusCode.INTERNAL, 985 "Exception serializing request!", 986 ) 987 error = _InactiveRpcError(state) 988 return deadline, None, error 989 else: 990 return deadline, serialized_request, None 991 992 993def _end_unary_response_blocking( 994 state: _RPCState, 995 call: cygrpc.SegregatedCall, 996 with_call: bool, 997 deadline: Optional[float], 998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]: 999 if state.code is grpc.StatusCode.OK: 1000 if with_call: 1001 rendezvous = _MultiThreadedRendezvous(state, call, None, deadline) 1002 return state.response, rendezvous 1003 else: 1004 return state.response 1005 else: 1006 raise _InactiveRpcError(state) # pytype: disable=not-instantiable 1007 1008 1009def _stream_unary_invocation_operations( 1010 metadata: Optional[MetadataType], initial_metadata_flags: int 1011) -> Sequence[Sequence[cygrpc.Operation]]: 1012 return ( 1013 ( 1014 cygrpc.SendInitialMetadataOperation( 1015 metadata, initial_metadata_flags 1016 ), 1017 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 1018 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1019 ), 1020 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1021 ) 1022 1023 1024def _stream_unary_invocation_operations_and_tags( 1025 metadata: Optional[MetadataType], initial_metadata_flags: int 1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]: 1027 return tuple( 1028 ( 1029 operations, 1030 None, 1031 ) 1032 for operations in _stream_unary_invocation_operations( 1033 metadata, initial_metadata_flags 1034 ) 1035 ) 1036 1037 1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]: 1039 parent_deadline = cygrpc.get_deadline_from_context() 1040 if parent_deadline is None and user_deadline is None: 1041 return None 1042 elif parent_deadline is not None and user_deadline is None: 1043 return parent_deadline 1044 elif user_deadline is not None and parent_deadline is None: 1045 return user_deadline 1046 else: 1047 return min(parent_deadline, user_deadline) 1048 1049 1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): 1051 _channel: cygrpc.Channel 1052 _managed_call: IntegratedCallFactory 1053 _method: bytes 1054 _target: bytes 1055 _request_serializer: Optional[SerializingFunction] 1056 _response_deserializer: Optional[DeserializingFunction] 1057 _context: Any 1058 _registered_call_handle: Optional[int] 1059 1060 __slots__ = [ 1061 "_channel", 1062 "_managed_call", 1063 "_method", 1064 "_target", 1065 "_request_serializer", 1066 "_response_deserializer", 1067 "_context", 1068 ] 1069 1070 # pylint: disable=too-many-arguments 1071 def __init__( 1072 self, 1073 channel: cygrpc.Channel, 1074 managed_call: IntegratedCallFactory, 1075 method: bytes, 1076 target: bytes, 1077 request_serializer: Optional[SerializingFunction], 1078 response_deserializer: Optional[DeserializingFunction], 1079 _registered_call_handle: Optional[int], 1080 ): 1081 self._channel = channel 1082 self._managed_call = managed_call 1083 self._method = method 1084 self._target = target 1085 self._request_serializer = request_serializer 1086 self._response_deserializer = response_deserializer 1087 self._context = cygrpc.build_census_context() 1088 self._registered_call_handle = _registered_call_handle 1089 1090 def _prepare( 1091 self, 1092 request: Any, 1093 timeout: Optional[float], 1094 metadata: Optional[MetadataType], 1095 wait_for_ready: Optional[bool], 1096 compression: Optional[grpc.Compression], 1097 ) -> Tuple[ 1098 Optional[_RPCState], 1099 Optional[Sequence[cygrpc.Operation]], 1100 Optional[float], 1101 Optional[grpc.RpcError], 1102 ]: 1103 deadline, serialized_request, rendezvous = _start_unary_request( 1104 request, timeout, self._request_serializer 1105 ) 1106 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1107 wait_for_ready 1108 ) 1109 augmented_metadata = _compression.augment_metadata( 1110 metadata, compression 1111 ) 1112 if serialized_request is None: 1113 return None, None, None, rendezvous 1114 else: 1115 state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) 1116 operations = ( 1117 cygrpc.SendInitialMetadataOperation( 1118 augmented_metadata, initial_metadata_flags 1119 ), 1120 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 1121 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 1122 cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), 1123 cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), 1124 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1125 ) 1126 return state, operations, deadline, None 1127 1128 def _blocking( 1129 self, 1130 request: Any, 1131 timeout: Optional[float] = None, 1132 metadata: Optional[MetadataType] = None, 1133 credentials: Optional[grpc.CallCredentials] = None, 1134 wait_for_ready: Optional[bool] = None, 1135 compression: Optional[grpc.Compression] = None, 1136 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 1137 state, operations, deadline, rendezvous = self._prepare( 1138 request, timeout, metadata, wait_for_ready, compression 1139 ) 1140 if state is None: 1141 raise rendezvous # pylint: disable-msg=raising-bad-type 1142 else: 1143 state.rpc_start_time = time.perf_counter() 1144 state.method = _common.decode(self._method) 1145 state.target = _common.decode(self._target) 1146 call = self._channel.segregated_call( 1147 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1148 self._method, 1149 None, 1150 _determine_deadline(deadline), 1151 metadata, 1152 None if credentials is None else credentials._credentials, 1153 ( 1154 ( 1155 operations, 1156 None, 1157 ), 1158 ), 1159 self._context, 1160 self._registered_call_handle, 1161 ) 1162 event = call.next_event() 1163 _handle_event(event, state, self._response_deserializer) 1164 return state, call 1165 1166 def __call__( 1167 self, 1168 request: Any, 1169 timeout: Optional[float] = None, 1170 metadata: Optional[MetadataType] = None, 1171 credentials: Optional[grpc.CallCredentials] = None, 1172 wait_for_ready: Optional[bool] = None, 1173 compression: Optional[grpc.Compression] = None, 1174 ) -> Any: 1175 ( 1176 state, 1177 call, 1178 ) = self._blocking( 1179 request, timeout, metadata, credentials, wait_for_ready, compression 1180 ) 1181 return _end_unary_response_blocking(state, call, False, None) 1182 1183 def with_call( 1184 self, 1185 request: Any, 1186 timeout: Optional[float] = None, 1187 metadata: Optional[MetadataType] = None, 1188 credentials: Optional[grpc.CallCredentials] = None, 1189 wait_for_ready: Optional[bool] = None, 1190 compression: Optional[grpc.Compression] = None, 1191 ) -> Tuple[Any, grpc.Call]: 1192 ( 1193 state, 1194 call, 1195 ) = self._blocking( 1196 request, timeout, metadata, credentials, wait_for_ready, compression 1197 ) 1198 return _end_unary_response_blocking(state, call, True, None) 1199 1200 def future( 1201 self, 1202 request: Any, 1203 timeout: Optional[float] = None, 1204 metadata: Optional[MetadataType] = None, 1205 credentials: Optional[grpc.CallCredentials] = None, 1206 wait_for_ready: Optional[bool] = None, 1207 compression: Optional[grpc.Compression] = None, 1208 ) -> _MultiThreadedRendezvous: 1209 state, operations, deadline, rendezvous = self._prepare( 1210 request, timeout, metadata, wait_for_ready, compression 1211 ) 1212 if state is None: 1213 raise rendezvous # pylint: disable-msg=raising-bad-type 1214 else: 1215 event_handler = _event_handler(state, self._response_deserializer) 1216 state.rpc_start_time = time.perf_counter() 1217 state.method = _common.decode(self._method) 1218 state.target = _common.decode(self._target) 1219 call = self._managed_call( 1220 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1221 self._method, 1222 None, 1223 deadline, 1224 metadata, 1225 None if credentials is None else credentials._credentials, 1226 (operations,), 1227 event_handler, 1228 self._context, 1229 self._registered_call_handle, 1230 ) 1231 return _MultiThreadedRendezvous( 1232 state, call, self._response_deserializer, deadline 1233 ) 1234 1235 1236class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 1237 _channel: cygrpc.Channel 1238 _method: bytes 1239 _target: bytes 1240 _request_serializer: Optional[SerializingFunction] 1241 _response_deserializer: Optional[DeserializingFunction] 1242 _context: Any 1243 _registered_call_handle: Optional[int] 1244 1245 __slots__ = [ 1246 "_channel", 1247 "_method", 1248 "_target", 1249 "_request_serializer", 1250 "_response_deserializer", 1251 "_context", 1252 ] 1253 1254 # pylint: disable=too-many-arguments 1255 def __init__( 1256 self, 1257 channel: cygrpc.Channel, 1258 method: bytes, 1259 target: bytes, 1260 request_serializer: SerializingFunction, 1261 response_deserializer: DeserializingFunction, 1262 _registered_call_handle: Optional[int], 1263 ): 1264 self._channel = channel 1265 self._method = method 1266 self._target = target 1267 self._request_serializer = request_serializer 1268 self._response_deserializer = response_deserializer 1269 self._context = cygrpc.build_census_context() 1270 self._registered_call_handle = _registered_call_handle 1271 1272 def __call__( # pylint: disable=too-many-locals 1273 self, 1274 request: Any, 1275 timeout: Optional[float] = None, 1276 metadata: Optional[MetadataType] = None, 1277 credentials: Optional[grpc.CallCredentials] = None, 1278 wait_for_ready: Optional[bool] = None, 1279 compression: Optional[grpc.Compression] = None, 1280 ) -> _SingleThreadedRendezvous: 1281 deadline = _deadline(timeout) 1282 serialized_request = _common.serialize( 1283 request, self._request_serializer 1284 ) 1285 if serialized_request is None: 1286 state = _RPCState( 1287 (), 1288 (), 1289 (), 1290 grpc.StatusCode.INTERNAL, 1291 "Exception serializing request!", 1292 ) 1293 raise _InactiveRpcError(state) 1294 1295 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 1296 call_credentials = ( 1297 None if credentials is None else credentials._credentials 1298 ) 1299 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1300 wait_for_ready 1301 ) 1302 augmented_metadata = _compression.augment_metadata( 1303 metadata, compression 1304 ) 1305 operations = ( 1306 ( 1307 cygrpc.SendInitialMetadataOperation( 1308 augmented_metadata, initial_metadata_flags 1309 ), 1310 cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), 1311 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 1312 ), 1313 (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),), 1314 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1315 ) 1316 operations_and_tags = tuple((ops, None) for ops in operations) 1317 state.rpc_start_time = time.perf_counter() 1318 state.method = _common.decode(self._method) 1319 state.target = _common.decode(self._target) 1320 call = self._channel.segregated_call( 1321 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1322 self._method, 1323 None, 1324 _determine_deadline(deadline), 1325 metadata, 1326 call_credentials, 1327 operations_and_tags, 1328 self._context, 1329 self._registered_call_handle, 1330 ) 1331 return _SingleThreadedRendezvous( 1332 state, call, self._response_deserializer, deadline 1333 ) 1334 1335 1336class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): 1337 _channel: cygrpc.Channel 1338 _managed_call: IntegratedCallFactory 1339 _method: bytes 1340 _target: bytes 1341 _request_serializer: Optional[SerializingFunction] 1342 _response_deserializer: Optional[DeserializingFunction] 1343 _context: Any 1344 _registered_call_handle: Optional[int] 1345 1346 __slots__ = [ 1347 "_channel", 1348 "_managed_call", 1349 "_method", 1350 "_target", 1351 "_request_serializer", 1352 "_response_deserializer", 1353 "_context", 1354 ] 1355 1356 # pylint: disable=too-many-arguments 1357 def __init__( 1358 self, 1359 channel: cygrpc.Channel, 1360 managed_call: IntegratedCallFactory, 1361 method: bytes, 1362 target: bytes, 1363 request_serializer: SerializingFunction, 1364 response_deserializer: DeserializingFunction, 1365 _registered_call_handle: Optional[int], 1366 ): 1367 self._channel = channel 1368 self._managed_call = managed_call 1369 self._method = method 1370 self._target = target 1371 self._request_serializer = request_serializer 1372 self._response_deserializer = response_deserializer 1373 self._context = cygrpc.build_census_context() 1374 self._registered_call_handle = _registered_call_handle 1375 1376 def __call__( # pylint: disable=too-many-locals 1377 self, 1378 request: Any, 1379 timeout: Optional[float] = None, 1380 metadata: Optional[MetadataType] = None, 1381 credentials: Optional[grpc.CallCredentials] = None, 1382 wait_for_ready: Optional[bool] = None, 1383 compression: Optional[grpc.Compression] = None, 1384 ) -> _MultiThreadedRendezvous: 1385 deadline, serialized_request, rendezvous = _start_unary_request( 1386 request, timeout, self._request_serializer 1387 ) 1388 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1389 wait_for_ready 1390 ) 1391 if serialized_request is None: 1392 raise rendezvous # pylint: disable-msg=raising-bad-type 1393 else: 1394 augmented_metadata = _compression.augment_metadata( 1395 metadata, compression 1396 ) 1397 state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None) 1398 operations = ( 1399 ( 1400 cygrpc.SendInitialMetadataOperation( 1401 augmented_metadata, initial_metadata_flags 1402 ), 1403 cygrpc.SendMessageOperation( 1404 serialized_request, _EMPTY_FLAGS 1405 ), 1406 cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), 1407 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1408 ), 1409 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1410 ) 1411 state.rpc_start_time = time.perf_counter() 1412 state.method = _common.decode(self._method) 1413 state.target = _common.decode(self._target) 1414 call = self._managed_call( 1415 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1416 self._method, 1417 None, 1418 _determine_deadline(deadline), 1419 metadata, 1420 None if credentials is None else credentials._credentials, 1421 operations, 1422 _event_handler(state, self._response_deserializer), 1423 self._context, 1424 self._registered_call_handle, 1425 ) 1426 return _MultiThreadedRendezvous( 1427 state, call, self._response_deserializer, deadline 1428 ) 1429 1430 1431class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): 1432 _channel: cygrpc.Channel 1433 _managed_call: IntegratedCallFactory 1434 _method: bytes 1435 _target: bytes 1436 _request_serializer: Optional[SerializingFunction] 1437 _response_deserializer: Optional[DeserializingFunction] 1438 _context: Any 1439 _registered_call_handle: Optional[int] 1440 1441 __slots__ = [ 1442 "_channel", 1443 "_managed_call", 1444 "_method", 1445 "_target", 1446 "_request_serializer", 1447 "_response_deserializer", 1448 "_context", 1449 ] 1450 1451 # pylint: disable=too-many-arguments 1452 def __init__( 1453 self, 1454 channel: cygrpc.Channel, 1455 managed_call: IntegratedCallFactory, 1456 method: bytes, 1457 target: bytes, 1458 request_serializer: Optional[SerializingFunction], 1459 response_deserializer: Optional[DeserializingFunction], 1460 _registered_call_handle: Optional[int], 1461 ): 1462 self._channel = channel 1463 self._managed_call = managed_call 1464 self._method = method 1465 self._target = target 1466 self._request_serializer = request_serializer 1467 self._response_deserializer = response_deserializer 1468 self._context = cygrpc.build_census_context() 1469 self._registered_call_handle = _registered_call_handle 1470 1471 def _blocking( 1472 self, 1473 request_iterator: Iterator, 1474 timeout: Optional[float], 1475 metadata: Optional[MetadataType], 1476 credentials: Optional[grpc.CallCredentials], 1477 wait_for_ready: Optional[bool], 1478 compression: Optional[grpc.Compression], 1479 ) -> Tuple[_RPCState, cygrpc.SegregatedCall]: 1480 deadline = _deadline(timeout) 1481 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 1482 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1483 wait_for_ready 1484 ) 1485 augmented_metadata = _compression.augment_metadata( 1486 metadata, compression 1487 ) 1488 state.rpc_start_time = time.perf_counter() 1489 state.method = _common.decode(self._method) 1490 state.target = _common.decode(self._target) 1491 call = self._channel.segregated_call( 1492 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1493 self._method, 1494 None, 1495 _determine_deadline(deadline), 1496 augmented_metadata, 1497 None if credentials is None else credentials._credentials, 1498 _stream_unary_invocation_operations_and_tags( 1499 augmented_metadata, initial_metadata_flags 1500 ), 1501 self._context, 1502 self._registered_call_handle, 1503 ) 1504 _consume_request_iterator( 1505 request_iterator, state, call, self._request_serializer, None 1506 ) 1507 while True: 1508 event = call.next_event() 1509 with state.condition: 1510 _handle_event(event, state, self._response_deserializer) 1511 state.condition.notify_all() 1512 if not state.due: 1513 break 1514 return state, call 1515 1516 def __call__( 1517 self, 1518 request_iterator: Iterator, 1519 timeout: Optional[float] = None, 1520 metadata: Optional[MetadataType] = None, 1521 credentials: Optional[grpc.CallCredentials] = None, 1522 wait_for_ready: Optional[bool] = None, 1523 compression: Optional[grpc.Compression] = None, 1524 ) -> Any: 1525 ( 1526 state, 1527 call, 1528 ) = self._blocking( 1529 request_iterator, 1530 timeout, 1531 metadata, 1532 credentials, 1533 wait_for_ready, 1534 compression, 1535 ) 1536 return _end_unary_response_blocking(state, call, False, None) 1537 1538 def with_call( 1539 self, 1540 request_iterator: Iterator, 1541 timeout: Optional[float] = None, 1542 metadata: Optional[MetadataType] = None, 1543 credentials: Optional[grpc.CallCredentials] = None, 1544 wait_for_ready: Optional[bool] = None, 1545 compression: Optional[grpc.Compression] = None, 1546 ) -> Tuple[Any, grpc.Call]: 1547 ( 1548 state, 1549 call, 1550 ) = self._blocking( 1551 request_iterator, 1552 timeout, 1553 metadata, 1554 credentials, 1555 wait_for_ready, 1556 compression, 1557 ) 1558 return _end_unary_response_blocking(state, call, True, None) 1559 1560 def future( 1561 self, 1562 request_iterator: Iterator, 1563 timeout: Optional[float] = None, 1564 metadata: Optional[MetadataType] = None, 1565 credentials: Optional[grpc.CallCredentials] = None, 1566 wait_for_ready: Optional[bool] = None, 1567 compression: Optional[grpc.Compression] = None, 1568 ) -> _MultiThreadedRendezvous: 1569 deadline = _deadline(timeout) 1570 state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None) 1571 event_handler = _event_handler(state, self._response_deserializer) 1572 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1573 wait_for_ready 1574 ) 1575 augmented_metadata = _compression.augment_metadata( 1576 metadata, compression 1577 ) 1578 state.rpc_start_time = time.perf_counter() 1579 state.method = _common.decode(self._method) 1580 state.target = _common.decode(self._target) 1581 call = self._managed_call( 1582 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1583 self._method, 1584 None, 1585 deadline, 1586 augmented_metadata, 1587 None if credentials is None else credentials._credentials, 1588 _stream_unary_invocation_operations( 1589 metadata, initial_metadata_flags 1590 ), 1591 event_handler, 1592 self._context, 1593 self._registered_call_handle, 1594 ) 1595 _consume_request_iterator( 1596 request_iterator, 1597 state, 1598 call, 1599 self._request_serializer, 1600 event_handler, 1601 ) 1602 return _MultiThreadedRendezvous( 1603 state, call, self._response_deserializer, deadline 1604 ) 1605 1606 1607class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): 1608 _channel: cygrpc.Channel 1609 _managed_call: IntegratedCallFactory 1610 _method: bytes 1611 _target: bytes 1612 _request_serializer: Optional[SerializingFunction] 1613 _response_deserializer: Optional[DeserializingFunction] 1614 _context: Any 1615 _registered_call_handle: Optional[int] 1616 1617 __slots__ = [ 1618 "_channel", 1619 "_managed_call", 1620 "_method", 1621 "_target", 1622 "_request_serializer", 1623 "_response_deserializer", 1624 "_context", 1625 ] 1626 1627 # pylint: disable=too-many-arguments 1628 def __init__( 1629 self, 1630 channel: cygrpc.Channel, 1631 managed_call: IntegratedCallFactory, 1632 method: bytes, 1633 target: bytes, 1634 request_serializer: Optional[SerializingFunction], 1635 response_deserializer: Optional[DeserializingFunction], 1636 _registered_call_handle: Optional[int], 1637 ): 1638 self._channel = channel 1639 self._managed_call = managed_call 1640 self._method = method 1641 self._target = target 1642 self._request_serializer = request_serializer 1643 self._response_deserializer = response_deserializer 1644 self._context = cygrpc.build_census_context() 1645 self._registered_call_handle = _registered_call_handle 1646 1647 def __call__( 1648 self, 1649 request_iterator: Iterator, 1650 timeout: Optional[float] = None, 1651 metadata: Optional[MetadataType] = None, 1652 credentials: Optional[grpc.CallCredentials] = None, 1653 wait_for_ready: Optional[bool] = None, 1654 compression: Optional[grpc.Compression] = None, 1655 ) -> _MultiThreadedRendezvous: 1656 deadline = _deadline(timeout) 1657 state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None) 1658 initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready( 1659 wait_for_ready 1660 ) 1661 augmented_metadata = _compression.augment_metadata( 1662 metadata, compression 1663 ) 1664 operations = ( 1665 ( 1666 cygrpc.SendInitialMetadataOperation( 1667 augmented_metadata, initial_metadata_flags 1668 ), 1669 cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), 1670 ), 1671 (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), 1672 ) 1673 event_handler = _event_handler(state, self._response_deserializer) 1674 state.rpc_start_time = time.perf_counter() 1675 state.method = _common.decode(self._method) 1676 state.target = _common.decode(self._target) 1677 call = self._managed_call( 1678 cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, 1679 self._method, 1680 None, 1681 _determine_deadline(deadline), 1682 augmented_metadata, 1683 None if credentials is None else credentials._credentials, 1684 operations, 1685 event_handler, 1686 self._context, 1687 self._registered_call_handle, 1688 ) 1689 _consume_request_iterator( 1690 request_iterator, 1691 state, 1692 call, 1693 self._request_serializer, 1694 event_handler, 1695 ) 1696 return _MultiThreadedRendezvous( 1697 state, call, self._response_deserializer, deadline 1698 ) 1699 1700 1701class _InitialMetadataFlags(int): 1702 """Stores immutable initial metadata flags""" 1703 1704 def __new__(cls, value: int = _EMPTY_FLAGS): 1705 value &= cygrpc.InitialMetadataFlags.used_mask 1706 return super(_InitialMetadataFlags, cls).__new__(cls, value) 1707 1708 def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int: 1709 if wait_for_ready is not None: 1710 if wait_for_ready: 1711 return self.__class__( 1712 self 1713 | cygrpc.InitialMetadataFlags.wait_for_ready 1714 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 1715 ) 1716 elif not wait_for_ready: 1717 return self.__class__( 1718 self & ~cygrpc.InitialMetadataFlags.wait_for_ready 1719 | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set 1720 ) 1721 return self 1722 1723 1724class _ChannelCallState(object): 1725 channel: cygrpc.Channel 1726 managed_calls: int 1727 threading: bool 1728 1729 def __init__(self, channel: cygrpc.Channel): 1730 self.lock = threading.Lock() 1731 self.channel = channel 1732 self.managed_calls = 0 1733 self.threading = False 1734 1735 def reset_postfork_child(self) -> None: 1736 self.managed_calls = 0 1737 1738 def __del__(self): 1739 try: 1740 self.channel.close( 1741 cygrpc.StatusCode.cancelled, "Channel deallocated!" 1742 ) 1743 except (TypeError, AttributeError): 1744 pass 1745 1746 1747def _run_channel_spin_thread(state: _ChannelCallState) -> None: 1748 def channel_spin(): 1749 while True: 1750 cygrpc.block_if_fork_in_progress(state) 1751 event = state.channel.next_call_event() 1752 if event.completion_type == cygrpc.CompletionType.queue_timeout: 1753 continue 1754 call_completed = event.tag(event) 1755 if call_completed: 1756 with state.lock: 1757 state.managed_calls -= 1 1758 if state.managed_calls == 0: 1759 return 1760 1761 channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin) 1762 channel_spin_thread.setDaemon(True) 1763 channel_spin_thread.start() 1764 1765 1766def _channel_managed_call_management(state: _ChannelCallState): 1767 # pylint: disable=too-many-arguments 1768 def create( 1769 flags: int, 1770 method: bytes, 1771 host: Optional[str], 1772 deadline: Optional[float], 1773 metadata: Optional[MetadataType], 1774 credentials: Optional[cygrpc.CallCredentials], 1775 operations: Sequence[Sequence[cygrpc.Operation]], 1776 event_handler: UserTag, 1777 context: Any, 1778 _registered_call_handle: Optional[int], 1779 ) -> cygrpc.IntegratedCall: 1780 """Creates a cygrpc.IntegratedCall. 1781 1782 Args: 1783 flags: An integer bitfield of call flags. 1784 method: The RPC method. 1785 host: A host string for the created call. 1786 deadline: A float to be the deadline of the created call or None if 1787 the call is to have an infinite deadline. 1788 metadata: The metadata for the call or None. 1789 credentials: A cygrpc.CallCredentials or None. 1790 operations: A sequence of sequences of cygrpc.Operations to be 1791 started on the call. 1792 event_handler: A behavior to call to handle the events resultant from 1793 the operations on the call. 1794 context: Context object for distributed tracing. 1795 _registered_call_handle: An int representing the call handle of the 1796 method, or None if the method is not registered. 1797 Returns: 1798 A cygrpc.IntegratedCall with which to conduct an RPC. 1799 """ 1800 operations_and_tags = tuple( 1801 ( 1802 operation, 1803 event_handler, 1804 ) 1805 for operation in operations 1806 ) 1807 with state.lock: 1808 call = state.channel.integrated_call( 1809 flags, 1810 method, 1811 host, 1812 deadline, 1813 metadata, 1814 credentials, 1815 operations_and_tags, 1816 context, 1817 _registered_call_handle, 1818 ) 1819 if state.managed_calls == 0: 1820 state.managed_calls = 1 1821 _run_channel_spin_thread(state) 1822 else: 1823 state.managed_calls += 1 1824 return call 1825 1826 return create 1827 1828 1829class _ChannelConnectivityState(object): 1830 lock: threading.RLock 1831 channel: grpc.Channel 1832 polling: bool 1833 connectivity: grpc.ChannelConnectivity 1834 try_to_connect: bool 1835 # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704 1836 callbacks_and_connectivities: List[ 1837 Sequence[ 1838 Union[ 1839 Callable[[grpc.ChannelConnectivity], None], 1840 Optional[grpc.ChannelConnectivity], 1841 ] 1842 ] 1843 ] 1844 delivering: bool 1845 1846 def __init__(self, channel: grpc.Channel): 1847 self.lock = threading.RLock() 1848 self.channel = channel 1849 self.polling = False 1850 self.connectivity = None 1851 self.try_to_connect = False 1852 self.callbacks_and_connectivities = [] 1853 self.delivering = False 1854 1855 def reset_postfork_child(self) -> None: 1856 self.polling = False 1857 self.connectivity = None 1858 self.try_to_connect = False 1859 self.callbacks_and_connectivities = [] 1860 self.delivering = False 1861 1862 1863def _deliveries( 1864 state: _ChannelConnectivityState, 1865) -> List[Callable[[grpc.ChannelConnectivity], None]]: 1866 callbacks_needing_update = [] 1867 for callback_and_connectivity in state.callbacks_and_connectivities: 1868 ( 1869 callback, 1870 callback_connectivity, 1871 ) = callback_and_connectivity 1872 if callback_connectivity is not state.connectivity: 1873 callbacks_needing_update.append(callback) 1874 callback_and_connectivity[1] = state.connectivity 1875 return callbacks_needing_update 1876 1877 1878def _deliver( 1879 state: _ChannelConnectivityState, 1880 initial_connectivity: grpc.ChannelConnectivity, 1881 initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 1882) -> None: 1883 connectivity = initial_connectivity 1884 callbacks = initial_callbacks 1885 while True: 1886 for callback in callbacks: 1887 cygrpc.block_if_fork_in_progress(state) 1888 try: 1889 callback(connectivity) 1890 except Exception: # pylint: disable=broad-except 1891 _LOGGER.exception( 1892 _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE 1893 ) 1894 with state.lock: 1895 callbacks = _deliveries(state) 1896 if callbacks: 1897 connectivity = state.connectivity 1898 else: 1899 state.delivering = False 1900 return 1901 1902 1903def _spawn_delivery( 1904 state: _ChannelConnectivityState, 1905 callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]], 1906) -> None: 1907 delivering_thread = cygrpc.ForkManagedThread( 1908 target=_deliver, 1909 args=( 1910 state, 1911 state.connectivity, 1912 callbacks, 1913 ), 1914 ) 1915 delivering_thread.setDaemon(True) 1916 delivering_thread.start() 1917 state.delivering = True 1918 1919 1920# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll. 1921def _poll_connectivity( 1922 state: _ChannelConnectivityState, 1923 channel: grpc.Channel, 1924 initial_try_to_connect: bool, 1925) -> None: 1926 try_to_connect = initial_try_to_connect 1927 connectivity = channel.check_connectivity_state(try_to_connect) 1928 with state.lock: 1929 state.connectivity = ( 1930 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 1931 connectivity 1932 ] 1933 ) 1934 callbacks = tuple( 1935 callback for callback, _ in state.callbacks_and_connectivities 1936 ) 1937 for callback_and_connectivity in state.callbacks_and_connectivities: 1938 callback_and_connectivity[1] = state.connectivity 1939 if callbacks: 1940 _spawn_delivery(state, callbacks) 1941 while True: 1942 event = channel.watch_connectivity_state( 1943 connectivity, time.time() + 0.2 1944 ) 1945 cygrpc.block_if_fork_in_progress(state) 1946 with state.lock: 1947 if ( 1948 not state.callbacks_and_connectivities 1949 and not state.try_to_connect 1950 ): 1951 state.polling = False 1952 state.connectivity = None 1953 break 1954 try_to_connect = state.try_to_connect 1955 state.try_to_connect = False 1956 if event.success or try_to_connect: 1957 connectivity = channel.check_connectivity_state(try_to_connect) 1958 with state.lock: 1959 state.connectivity = ( 1960 _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ 1961 connectivity 1962 ] 1963 ) 1964 if not state.delivering: 1965 callbacks = _deliveries(state) 1966 if callbacks: 1967 _spawn_delivery(state, callbacks) 1968 1969 1970def _subscribe( 1971 state: _ChannelConnectivityState, 1972 callback: Callable[[grpc.ChannelConnectivity], None], 1973 try_to_connect: bool, 1974) -> None: 1975 with state.lock: 1976 if not state.callbacks_and_connectivities and not state.polling: 1977 polling_thread = cygrpc.ForkManagedThread( 1978 target=_poll_connectivity, 1979 args=(state, state.channel, bool(try_to_connect)), 1980 ) 1981 polling_thread.setDaemon(True) 1982 polling_thread.start() 1983 state.polling = True 1984 state.callbacks_and_connectivities.append([callback, None]) 1985 elif not state.delivering and state.connectivity is not None: 1986 _spawn_delivery(state, (callback,)) 1987 state.try_to_connect |= bool(try_to_connect) 1988 state.callbacks_and_connectivities.append( 1989 [callback, state.connectivity] 1990 ) 1991 else: 1992 state.try_to_connect |= bool(try_to_connect) 1993 state.callbacks_and_connectivities.append([callback, None]) 1994 1995 1996def _unsubscribe( 1997 state: _ChannelConnectivityState, 1998 callback: Callable[[grpc.ChannelConnectivity], None], 1999) -> None: 2000 with state.lock: 2001 for index, (subscribed_callback, unused_connectivity) in enumerate( 2002 state.callbacks_and_connectivities 2003 ): 2004 if callback == subscribed_callback: 2005 state.callbacks_and_connectivities.pop(index) 2006 break 2007 2008 2009def _augment_options( 2010 base_options: Sequence[ChannelArgumentType], 2011 compression: Optional[grpc.Compression], 2012) -> Sequence[ChannelArgumentType]: 2013 compression_option = _compression.create_channel_option(compression) 2014 return ( 2015 tuple(base_options) 2016 + compression_option 2017 + ( 2018 ( 2019 cygrpc.ChannelArgKey.primary_user_agent_string, 2020 _USER_AGENT, 2021 ), 2022 ) 2023 ) 2024 2025 2026def _separate_channel_options( 2027 options: Sequence[ChannelArgumentType], 2028) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]: 2029 """Separates core channel options from Python channel options.""" 2030 core_options = [] 2031 python_options = [] 2032 for pair in options: 2033 if ( 2034 pair[0] 2035 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 2036 ): 2037 python_options.append(pair) 2038 else: 2039 core_options.append(pair) 2040 return python_options, core_options 2041 2042 2043class Channel(grpc.Channel): 2044 """A cygrpc.Channel-backed implementation of grpc.Channel.""" 2045 2046 _single_threaded_unary_stream: bool 2047 _channel: cygrpc.Channel 2048 _call_state: _ChannelCallState 2049 _connectivity_state: _ChannelConnectivityState 2050 _target: str 2051 _registered_call_handles: Dict[str, int] 2052 2053 def __init__( 2054 self, 2055 target: str, 2056 options: Sequence[ChannelArgumentType], 2057 credentials: Optional[grpc.ChannelCredentials], 2058 compression: Optional[grpc.Compression], 2059 ): 2060 """Constructor. 2061 2062 Args: 2063 target: The target to which to connect. 2064 options: Configuration options for the channel. 2065 credentials: A cygrpc.ChannelCredentials or None. 2066 compression: An optional value indicating the compression method to be 2067 used over the lifetime of the channel. 2068 """ 2069 python_options, core_options = _separate_channel_options(options) 2070 self._single_threaded_unary_stream = ( 2071 _DEFAULT_SINGLE_THREADED_UNARY_STREAM 2072 ) 2073 self._process_python_options(python_options) 2074 self._channel = cygrpc.Channel( 2075 _common.encode(target), 2076 _augment_options(core_options, compression), 2077 credentials, 2078 ) 2079 self._target = target 2080 self._call_state = _ChannelCallState(self._channel) 2081 self._connectivity_state = _ChannelConnectivityState(self._channel) 2082 cygrpc.fork_register_channel(self) 2083 if cygrpc.g_gevent_activated: 2084 cygrpc.gevent_increment_channel_count() 2085 2086 def _get_registered_call_handle(self, method: str) -> int: 2087 """ 2088 Get the registered call handle for a method. 2089 2090 This is a semi-private method. It is intended for use only by gRPC generated code. 2091 2092 This method is not thread-safe. 2093 2094 Args: 2095 method: Required, the method name for the RPC. 2096 2097 Returns: 2098 The registered call handle pointer in the form of a Python Long. 2099 """ 2100 return self._channel.get_registered_call_handle(_common.encode(method)) 2101 2102 def _process_python_options( 2103 self, python_options: Sequence[ChannelArgumentType] 2104 ) -> None: 2105 """Sets channel attributes according to python-only channel options.""" 2106 for pair in python_options: 2107 if ( 2108 pair[0] 2109 == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream 2110 ): 2111 self._single_threaded_unary_stream = True 2112 2113 def subscribe( 2114 self, 2115 callback: Callable[[grpc.ChannelConnectivity], None], 2116 try_to_connect: Optional[bool] = None, 2117 ) -> None: 2118 _subscribe(self._connectivity_state, callback, try_to_connect) 2119 2120 def unsubscribe( 2121 self, callback: Callable[[grpc.ChannelConnectivity], None] 2122 ) -> None: 2123 _unsubscribe(self._connectivity_state, callback) 2124 2125 # pylint: disable=arguments-differ 2126 def unary_unary( 2127 self, 2128 method: str, 2129 request_serializer: Optional[SerializingFunction] = None, 2130 response_deserializer: Optional[DeserializingFunction] = None, 2131 _registered_method: Optional[bool] = False, 2132 ) -> grpc.UnaryUnaryMultiCallable: 2133 _registered_call_handle = None 2134 if _registered_method: 2135 _registered_call_handle = self._get_registered_call_handle(method) 2136 return _UnaryUnaryMultiCallable( 2137 self._channel, 2138 _channel_managed_call_management(self._call_state), 2139 _common.encode(method), 2140 _common.encode(self._target), 2141 request_serializer, 2142 response_deserializer, 2143 _registered_call_handle, 2144 ) 2145 2146 # pylint: disable=arguments-differ 2147 def unary_stream( 2148 self, 2149 method: str, 2150 request_serializer: Optional[SerializingFunction] = None, 2151 response_deserializer: Optional[DeserializingFunction] = None, 2152 _registered_method: Optional[bool] = False, 2153 ) -> grpc.UnaryStreamMultiCallable: 2154 _registered_call_handle = None 2155 if _registered_method: 2156 _registered_call_handle = self._get_registered_call_handle(method) 2157 # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC 2158 # on a single Python thread results in an appreciable speed-up. However, 2159 # due to slight differences in capability, the multi-threaded variant 2160 # remains the default. 2161 if self._single_threaded_unary_stream: 2162 return _SingleThreadedUnaryStreamMultiCallable( 2163 self._channel, 2164 _common.encode(method), 2165 _common.encode(self._target), 2166 request_serializer, 2167 response_deserializer, 2168 _registered_call_handle, 2169 ) 2170 else: 2171 return _UnaryStreamMultiCallable( 2172 self._channel, 2173 _channel_managed_call_management(self._call_state), 2174 _common.encode(method), 2175 _common.encode(self._target), 2176 request_serializer, 2177 response_deserializer, 2178 _registered_call_handle, 2179 ) 2180 2181 # pylint: disable=arguments-differ 2182 def stream_unary( 2183 self, 2184 method: str, 2185 request_serializer: Optional[SerializingFunction] = None, 2186 response_deserializer: Optional[DeserializingFunction] = None, 2187 _registered_method: Optional[bool] = False, 2188 ) -> grpc.StreamUnaryMultiCallable: 2189 _registered_call_handle = None 2190 if _registered_method: 2191 _registered_call_handle = self._get_registered_call_handle(method) 2192 return _StreamUnaryMultiCallable( 2193 self._channel, 2194 _channel_managed_call_management(self._call_state), 2195 _common.encode(method), 2196 _common.encode(self._target), 2197 request_serializer, 2198 response_deserializer, 2199 _registered_call_handle, 2200 ) 2201 2202 # pylint: disable=arguments-differ 2203 def stream_stream( 2204 self, 2205 method: str, 2206 request_serializer: Optional[SerializingFunction] = None, 2207 response_deserializer: Optional[DeserializingFunction] = None, 2208 _registered_method: Optional[bool] = False, 2209 ) -> grpc.StreamStreamMultiCallable: 2210 _registered_call_handle = None 2211 if _registered_method: 2212 _registered_call_handle = self._get_registered_call_handle(method) 2213 return _StreamStreamMultiCallable( 2214 self._channel, 2215 _channel_managed_call_management(self._call_state), 2216 _common.encode(method), 2217 _common.encode(self._target), 2218 request_serializer, 2219 response_deserializer, 2220 _registered_call_handle, 2221 ) 2222 2223 def _unsubscribe_all(self) -> None: 2224 state = self._connectivity_state 2225 if state: 2226 with state.lock: 2227 del state.callbacks_and_connectivities[:] 2228 2229 def _close(self) -> None: 2230 self._unsubscribe_all() 2231 self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!") 2232 cygrpc.fork_unregister_channel(self) 2233 if cygrpc.g_gevent_activated: 2234 cygrpc.gevent_decrement_channel_count() 2235 2236 def _close_on_fork(self) -> None: 2237 self._unsubscribe_all() 2238 self._channel.close_on_fork( 2239 cygrpc.StatusCode.cancelled, "Channel closed due to fork" 2240 ) 2241 2242 def __enter__(self): 2243 return self 2244 2245 def __exit__(self, exc_type, exc_val, exc_tb): 2246 self._close() 2247 return False 2248 2249 def close(self) -> None: 2250 self._close() 2251 2252 def __del__(self): 2253 # TODO(https://github.com/grpc/grpc/issues/12531): Several releases 2254 # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call 2255 # here (or more likely, call self._close() here). We don't do this today 2256 # because many valid use cases today allow the channel to be deleted 2257 # immediately after stubs are created. After a sufficient period of time 2258 # has passed for all users to be trusted to freeze out to their channels 2259 # for as long as they are in use and to close them after using them, 2260 # then deletion of this grpc._channel.Channel instance can be made to 2261 # effect closure of the underlying cygrpc.Channel instance. 2262 try: 2263 self._unsubscribe_all() 2264 except: # pylint: disable=bare-except 2265 # Exceptions in __del__ are ignored by Python anyway, but they can 2266 # keep spamming logs. Just silence them. 2267 pass 2268