xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_channel.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Invocation-side implementation of gRPC Python."""
15
16import copy
17import functools
18import logging
19import os
20import sys
21import threading
22import time
23import types
24from typing import (
25    Any,
26    Callable,
27    Dict,
28    Iterator,
29    List,
30    Optional,
31    Sequence,
32    Set,
33    Tuple,
34    Union,
35)
36
37import grpc  # pytype: disable=pyi-error
38from grpc import _common  # pytype: disable=pyi-error
39from grpc import _compression  # pytype: disable=pyi-error
40from grpc import _grpcio_metadata  # pytype: disable=pyi-error
41from grpc import _observability  # pytype: disable=pyi-error
42from grpc._cython import cygrpc
43from grpc._typing import ChannelArgumentType
44from grpc._typing import DeserializingFunction
45from grpc._typing import IntegratedCallFactory
46from grpc._typing import MetadataType
47from grpc._typing import NullaryCallbackType
48from grpc._typing import ResponseType
49from grpc._typing import SerializingFunction
50from grpc._typing import UserTag
51import grpc.experimental  # pytype: disable=pyi-error
52
53_LOGGER = logging.getLogger(__name__)
54
55_USER_AGENT = "grpc-python/{}".format(_grpcio_metadata.__version__)
56
57_EMPTY_FLAGS = 0
58
59# NOTE(rbellevi): No guarantees are given about the maintenance of this
60# environment variable.
61_DEFAULT_SINGLE_THREADED_UNARY_STREAM = (
62    os.getenv("GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
63)
64
65_UNARY_UNARY_INITIAL_DUE = (
66    cygrpc.OperationType.send_initial_metadata,
67    cygrpc.OperationType.send_message,
68    cygrpc.OperationType.send_close_from_client,
69    cygrpc.OperationType.receive_initial_metadata,
70    cygrpc.OperationType.receive_message,
71    cygrpc.OperationType.receive_status_on_client,
72)
73_UNARY_STREAM_INITIAL_DUE = (
74    cygrpc.OperationType.send_initial_metadata,
75    cygrpc.OperationType.send_message,
76    cygrpc.OperationType.send_close_from_client,
77    cygrpc.OperationType.receive_initial_metadata,
78    cygrpc.OperationType.receive_status_on_client,
79)
80_STREAM_UNARY_INITIAL_DUE = (
81    cygrpc.OperationType.send_initial_metadata,
82    cygrpc.OperationType.receive_initial_metadata,
83    cygrpc.OperationType.receive_message,
84    cygrpc.OperationType.receive_status_on_client,
85)
86_STREAM_STREAM_INITIAL_DUE = (
87    cygrpc.OperationType.send_initial_metadata,
88    cygrpc.OperationType.receive_initial_metadata,
89    cygrpc.OperationType.receive_status_on_client,
90)
91
92_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
93    "Exception calling channel subscription callback!"
94)
95
96_OK_RENDEZVOUS_REPR_FORMAT = (
97    '<{} of RPC that terminated with:\n\tstatus = {}\n\tdetails = "{}"\n>'
98)
99
100_NON_OK_RENDEZVOUS_REPR_FORMAT = (
101    "<{} of RPC that terminated with:\n"
102    "\tstatus = {}\n"
103    '\tdetails = "{}"\n'
104    '\tdebug_error_string = "{}"\n'
105    ">"
106)
107
108
109def _deadline(timeout: Optional[float]) -> Optional[float]:
110    return None if timeout is None else time.time() + timeout
111
112
113def _unknown_code_details(
114    unknown_cygrpc_code: Optional[grpc.StatusCode], details: Optional[str]
115) -> str:
116    return 'Server sent unknown code {} and details "{}"'.format(
117        unknown_cygrpc_code, details
118    )
119
120
121class _RPCState(object):
122    condition: threading.Condition
123    due: Set[cygrpc.OperationType]
124    initial_metadata: Optional[MetadataType]
125    response: Any
126    trailing_metadata: Optional[MetadataType]
127    code: Optional[grpc.StatusCode]
128    details: Optional[str]
129    debug_error_string: Optional[str]
130    cancelled: bool
131    callbacks: List[NullaryCallbackType]
132    fork_epoch: Optional[int]
133    rpc_start_time: Optional[float]  # In relative seconds
134    rpc_end_time: Optional[float]  # In relative seconds
135    method: Optional[str]
136    target: Optional[str]
137
138    def __init__(
139        self,
140        due: Sequence[cygrpc.OperationType],
141        initial_metadata: Optional[MetadataType],
142        trailing_metadata: Optional[MetadataType],
143        code: Optional[grpc.StatusCode],
144        details: Optional[str],
145    ):
146        # `condition` guards all members of _RPCState. `notify_all` is called on
147        # `condition` when the state of the RPC has changed.
148        self.condition = threading.Condition()
149
150        # The cygrpc.OperationType objects representing events due from the RPC's
151        # completion queue. If an operation is in `due`, it is guaranteed that
152        # `operate()` has been called on a corresponding operation. But the
153        # converse is not true. That is, in the case of failed `operate()`
154        # calls, there may briefly be events in `due` that do not correspond to
155        # operations submitted to Core.
156        self.due = set(due)
157        self.initial_metadata = initial_metadata
158        self.response = None
159        self.trailing_metadata = trailing_metadata
160        self.code = code
161        self.details = details
162        self.debug_error_string = None
163        # The following three fields are used for observability.
164        # Updates to those fields do not trigger self.condition.
165        self.rpc_start_time = None
166        self.rpc_end_time = None
167        self.method = None
168        self.target = None
169
170        # The semantics of grpc.Future.cancel and grpc.Future.cancelled are
171        # slightly wonky, so they have to be tracked separately from the rest of the
172        # result of the RPC. This field tracks whether cancellation was requested
173        # prior to termination of the RPC.
174        self.cancelled = False
175        self.callbacks = []
176        self.fork_epoch = cygrpc.get_fork_epoch()
177
178    def reset_postfork_child(self):
179        self.condition = threading.Condition()
180
181
182def _abort(state: _RPCState, code: grpc.StatusCode, details: str) -> None:
183    if state.code is None:
184        state.code = code
185        state.details = details
186        if state.initial_metadata is None:
187            state.initial_metadata = ()
188        state.trailing_metadata = ()
189
190
191def _handle_event(
192    event: cygrpc.BaseEvent,
193    state: _RPCState,
194    response_deserializer: Optional[DeserializingFunction],
195) -> List[NullaryCallbackType]:
196    callbacks = []
197    for batch_operation in event.batch_operations:
198        operation_type = batch_operation.type()
199        state.due.remove(operation_type)
200        if operation_type == cygrpc.OperationType.receive_initial_metadata:
201            state.initial_metadata = batch_operation.initial_metadata()
202        elif operation_type == cygrpc.OperationType.receive_message:
203            serialized_response = batch_operation.message()
204            if serialized_response is not None:
205                response = _common.deserialize(
206                    serialized_response, response_deserializer
207                )
208                if response is None:
209                    details = "Exception deserializing response!"
210                    _abort(state, grpc.StatusCode.INTERNAL, details)
211                else:
212                    state.response = response
213        elif operation_type == cygrpc.OperationType.receive_status_on_client:
214            state.trailing_metadata = batch_operation.trailing_metadata()
215            if state.code is None:
216                code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
217                    batch_operation.code()
218                )
219                if code is None:
220                    state.code = grpc.StatusCode.UNKNOWN
221                    state.details = _unknown_code_details(
222                        code, batch_operation.details()
223                    )
224                else:
225                    state.code = code
226                    state.details = batch_operation.details()
227                    state.debug_error_string = batch_operation.error_string()
228            state.rpc_end_time = time.perf_counter()
229            _observability.maybe_record_rpc_latency(state)
230            callbacks.extend(state.callbacks)
231            state.callbacks = None
232    return callbacks
233
234
235def _event_handler(
236    state: _RPCState, response_deserializer: Optional[DeserializingFunction]
237) -> UserTag:
238    def handle_event(event):
239        with state.condition:
240            callbacks = _handle_event(event, state, response_deserializer)
241            state.condition.notify_all()
242            done = not state.due
243        for callback in callbacks:
244            try:
245                callback()
246            except Exception as e:  # pylint: disable=broad-except
247                # NOTE(rbellevi): We suppress but log errors here so as not to
248                # kill the channel spin thread.
249                logging.error(
250                    "Exception in callback %s: %s", repr(callback.func), repr(e)
251                )
252        return done and state.fork_epoch >= cygrpc.get_fork_epoch()
253
254    return handle_event
255
256
257# TODO(xuanwn): Create a base class for IntegratedCall and SegregatedCall.
258# pylint: disable=too-many-statements
259def _consume_request_iterator(
260    request_iterator: Iterator,
261    state: _RPCState,
262    call: Union[cygrpc.IntegratedCall, cygrpc.SegregatedCall],
263    request_serializer: SerializingFunction,
264    event_handler: Optional[UserTag],
265) -> None:
266    """Consume a request supplied by the user."""
267
268    def consume_request_iterator():  # pylint: disable=too-many-branches
269        # Iterate over the request iterator until it is exhausted or an error
270        # condition is encountered.
271        while True:
272            return_from_user_request_generator_invoked = False
273            try:
274                # The thread may die in user-code. Do not block fork for this.
275                cygrpc.enter_user_request_generator()
276                request = next(request_iterator)
277            except StopIteration:
278                break
279            except Exception:  # pylint: disable=broad-except
280                cygrpc.return_from_user_request_generator()
281                return_from_user_request_generator_invoked = True
282                code = grpc.StatusCode.UNKNOWN
283                details = "Exception iterating requests!"
284                _LOGGER.exception(details)
285                call.cancel(
286                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
287                )
288                _abort(state, code, details)
289                return
290            finally:
291                if not return_from_user_request_generator_invoked:
292                    cygrpc.return_from_user_request_generator()
293            serialized_request = _common.serialize(request, request_serializer)
294            with state.condition:
295                if state.code is None and not state.cancelled:
296                    if serialized_request is None:
297                        code = grpc.StatusCode.INTERNAL
298                        details = "Exception serializing request!"
299                        call.cancel(
300                            _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
301                            details,
302                        )
303                        _abort(state, code, details)
304                        return
305                    else:
306                        state.due.add(cygrpc.OperationType.send_message)
307                        operations = (
308                            cygrpc.SendMessageOperation(
309                                serialized_request, _EMPTY_FLAGS
310                            ),
311                        )
312                        operating = call.operate(operations, event_handler)
313                        if not operating:
314                            state.due.remove(cygrpc.OperationType.send_message)
315                            return
316
317                        def _done():
318                            return (
319                                state.code is not None
320                                or cygrpc.OperationType.send_message
321                                not in state.due
322                            )
323
324                        _common.wait(
325                            state.condition.wait,
326                            _done,
327                            spin_cb=functools.partial(
328                                cygrpc.block_if_fork_in_progress, state
329                            ),
330                        )
331                        if state.code is not None:
332                            return
333                else:
334                    return
335        with state.condition:
336            if state.code is None:
337                state.due.add(cygrpc.OperationType.send_close_from_client)
338                operations = (
339                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
340                )
341                operating = call.operate(operations, event_handler)
342                if not operating:
343                    state.due.remove(
344                        cygrpc.OperationType.send_close_from_client
345                    )
346
347    consumption_thread = cygrpc.ForkManagedThread(
348        target=consume_request_iterator
349    )
350    consumption_thread.setDaemon(True)
351    consumption_thread.start()
352
353
354def _rpc_state_string(class_name: str, rpc_state: _RPCState) -> str:
355    """Calculates error string for RPC."""
356    with rpc_state.condition:
357        if rpc_state.code is None:
358            return "<{} object>".format(class_name)
359        elif rpc_state.code is grpc.StatusCode.OK:
360            return _OK_RENDEZVOUS_REPR_FORMAT.format(
361                class_name, rpc_state.code, rpc_state.details
362            )
363        else:
364            return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
365                class_name,
366                rpc_state.code,
367                rpc_state.details,
368                rpc_state.debug_error_string,
369            )
370
371
372class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
373    """An RPC error not tied to the execution of a particular RPC.
374
375    The RPC represented by the state object must not be in-progress or
376    cancelled.
377
378    Attributes:
379      _state: An instance of _RPCState.
380    """
381
382    _state: _RPCState
383
384    def __init__(self, state: _RPCState):
385        with state.condition:
386            self._state = _RPCState(
387                (),
388                copy.deepcopy(state.initial_metadata),
389                copy.deepcopy(state.trailing_metadata),
390                state.code,
391                copy.deepcopy(state.details),
392            )
393            self._state.response = copy.copy(state.response)
394            self._state.debug_error_string = copy.copy(state.debug_error_string)
395
396    def initial_metadata(self) -> Optional[MetadataType]:
397        return self._state.initial_metadata
398
399    def trailing_metadata(self) -> Optional[MetadataType]:
400        return self._state.trailing_metadata
401
402    def code(self) -> Optional[grpc.StatusCode]:
403        return self._state.code
404
405    def details(self) -> Optional[str]:
406        return _common.decode(self._state.details)
407
408    def debug_error_string(self) -> Optional[str]:
409        return _common.decode(self._state.debug_error_string)
410
411    def _repr(self) -> str:
412        return _rpc_state_string(self.__class__.__name__, self._state)
413
414    def __repr__(self) -> str:
415        return self._repr()
416
417    def __str__(self) -> str:
418        return self._repr()
419
420    def cancel(self) -> bool:
421        """See grpc.Future.cancel."""
422        return False
423
424    def cancelled(self) -> bool:
425        """See grpc.Future.cancelled."""
426        return False
427
428    def running(self) -> bool:
429        """See grpc.Future.running."""
430        return False
431
432    def done(self) -> bool:
433        """See grpc.Future.done."""
434        return True
435
436    def result(
437        self, timeout: Optional[float] = None
438    ) -> Any:  # pylint: disable=unused-argument
439        """See grpc.Future.result."""
440        raise self
441
442    def exception(
443        self, timeout: Optional[float] = None  # pylint: disable=unused-argument
444    ) -> Optional[Exception]:
445        """See grpc.Future.exception."""
446        return self
447
448    def traceback(
449        self, timeout: Optional[float] = None  # pylint: disable=unused-argument
450    ) -> Optional[types.TracebackType]:
451        """See grpc.Future.traceback."""
452        try:
453            raise self
454        except grpc.RpcError:
455            return sys.exc_info()[2]
456
457    def add_done_callback(
458        self,
459        fn: Callable[[grpc.Future], None],
460        timeout: Optional[float] = None,  # pylint: disable=unused-argument
461    ) -> None:
462        """See grpc.Future.add_done_callback."""
463        fn(self)
464
465
466class _Rendezvous(grpc.RpcError, grpc.RpcContext):
467    """An RPC iterator.
468
469    Attributes:
470      _state: An instance of _RPCState.
471      _call: An instance of SegregatedCall or IntegratedCall.
472        In either case, the _call object is expected to have operate, cancel,
473        and next_event methods.
474      _response_deserializer: A callable taking bytes and return a Python
475        object.
476      _deadline: A float representing the deadline of the RPC in seconds. Or
477        possibly None, to represent an RPC with no deadline at all.
478    """
479
480    _state: _RPCState
481    _call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall]
482    _response_deserializer: Optional[DeserializingFunction]
483    _deadline: Optional[float]
484
485    def __init__(
486        self,
487        state: _RPCState,
488        call: Union[cygrpc.SegregatedCall, cygrpc.IntegratedCall],
489        response_deserializer: Optional[DeserializingFunction],
490        deadline: Optional[float],
491    ):
492        super(_Rendezvous, self).__init__()
493        self._state = state
494        self._call = call
495        self._response_deserializer = response_deserializer
496        self._deadline = deadline
497
498    def is_active(self) -> bool:
499        """See grpc.RpcContext.is_active"""
500        with self._state.condition:
501            return self._state.code is None
502
503    def time_remaining(self) -> Optional[float]:
504        """See grpc.RpcContext.time_remaining"""
505        with self._state.condition:
506            if self._deadline is None:
507                return None
508            else:
509                return max(self._deadline - time.time(), 0)
510
511    def cancel(self) -> bool:
512        """See grpc.RpcContext.cancel"""
513        with self._state.condition:
514            if self._state.code is None:
515                code = grpc.StatusCode.CANCELLED
516                details = "Locally cancelled by application!"
517                self._call.cancel(
518                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details
519                )
520                self._state.cancelled = True
521                _abort(self._state, code, details)
522                self._state.condition.notify_all()
523                return True
524            else:
525                return False
526
527    def add_callback(self, callback: NullaryCallbackType) -> bool:
528        """See grpc.RpcContext.add_callback"""
529        with self._state.condition:
530            if self._state.callbacks is None:
531                return False
532            else:
533                self._state.callbacks.append(callback)
534                return True
535
536    def __iter__(self):
537        return self
538
539    def next(self):
540        return self._next()
541
542    def __next__(self):
543        return self._next()
544
545    def _next(self):
546        raise NotImplementedError()
547
548    def debug_error_string(self) -> Optional[str]:
549        raise NotImplementedError()
550
551    def _repr(self) -> str:
552        return _rpc_state_string(self.__class__.__name__, self._state)
553
554    def __repr__(self) -> str:
555        return self._repr()
556
557    def __str__(self) -> str:
558        return self._repr()
559
560    def __del__(self) -> None:
561        with self._state.condition:
562            if self._state.code is None:
563                self._state.code = grpc.StatusCode.CANCELLED
564                self._state.details = "Cancelled upon garbage collection!"
565                self._state.cancelled = True
566                self._call.cancel(
567                    _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
568                    self._state.details,
569                )
570                self._state.condition.notify_all()
571
572
573class _SingleThreadedRendezvous(
574    _Rendezvous, grpc.Call, grpc.Future
575):  # pylint: disable=too-many-ancestors
576    """An RPC iterator operating entirely on a single thread.
577
578    The __next__ method of _SingleThreadedRendezvous does not depend on the
579    existence of any other thread, including the "channel spin thread".
580    However, this means that its interface is entirely synchronous. So this
581    class cannot completely fulfill the grpc.Future interface. The result,
582    exception, and traceback methods will never block and will instead raise
583    an exception if calling the method would result in blocking.
584
585    This means that these methods are safe to call from add_done_callback
586    handlers.
587    """
588
589    _state: _RPCState
590
591    def _is_complete(self) -> bool:
592        return self._state.code is not None
593
594    def cancelled(self) -> bool:
595        with self._state.condition:
596            return self._state.cancelled
597
598    def running(self) -> bool:
599        with self._state.condition:
600            return self._state.code is None
601
602    def done(self) -> bool:
603        with self._state.condition:
604            return self._state.code is not None
605
606    def result(self, timeout: Optional[float] = None) -> Any:
607        """Returns the result of the computation or raises its exception.
608
609        This method will never block. Instead, it will raise an exception
610        if calling this method would otherwise result in blocking.
611
612        Since this method will never block, any `timeout` argument passed will
613        be ignored.
614        """
615        del timeout
616        with self._state.condition:
617            if not self._is_complete():
618                raise grpc.experimental.UsageError(
619                    "_SingleThreadedRendezvous only supports result() when the"
620                    " RPC is complete."
621                )
622            if self._state.code is grpc.StatusCode.OK:
623                return self._state.response
624            elif self._state.cancelled:
625                raise grpc.FutureCancelledError()
626            else:
627                raise self
628
629    def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
630        """Return the exception raised by the computation.
631
632        This method will never block. Instead, it will raise an exception
633        if calling this method would otherwise result in blocking.
634
635        Since this method will never block, any `timeout` argument passed will
636        be ignored.
637        """
638        del timeout
639        with self._state.condition:
640            if not self._is_complete():
641                raise grpc.experimental.UsageError(
642                    "_SingleThreadedRendezvous only supports exception() when"
643                    " the RPC is complete."
644                )
645            if self._state.code is grpc.StatusCode.OK:
646                return None
647            elif self._state.cancelled:
648                raise grpc.FutureCancelledError()
649            else:
650                return self
651
652    def traceback(
653        self, timeout: Optional[float] = None
654    ) -> Optional[types.TracebackType]:
655        """Access the traceback of the exception raised by the computation.
656
657        This method will never block. Instead, it will raise an exception
658        if calling this method would otherwise result in blocking.
659
660        Since this method will never block, any `timeout` argument passed will
661        be ignored.
662        """
663        del timeout
664        with self._state.condition:
665            if not self._is_complete():
666                raise grpc.experimental.UsageError(
667                    "_SingleThreadedRendezvous only supports traceback() when"
668                    " the RPC is complete."
669                )
670            if self._state.code is grpc.StatusCode.OK:
671                return None
672            elif self._state.cancelled:
673                raise grpc.FutureCancelledError()
674            else:
675                try:
676                    raise self
677                except grpc.RpcError:
678                    return sys.exc_info()[2]
679
680    def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
681        with self._state.condition:
682            if self._state.code is None:
683                self._state.callbacks.append(functools.partial(fn, self))
684                return
685
686        fn(self)
687
688    def initial_metadata(self) -> Optional[MetadataType]:
689        """See grpc.Call.initial_metadata"""
690        with self._state.condition:
691            # NOTE(gnossen): Based on our initial call batch, we are guaranteed
692            # to receive initial metadata before any messages.
693            while self._state.initial_metadata is None:
694                self._consume_next_event()
695            return self._state.initial_metadata
696
697    def trailing_metadata(self) -> Optional[MetadataType]:
698        """See grpc.Call.trailing_metadata"""
699        with self._state.condition:
700            if self._state.trailing_metadata is None:
701                raise grpc.experimental.UsageError(
702                    "Cannot get trailing metadata until RPC is completed."
703                )
704            return self._state.trailing_metadata
705
706    def code(self) -> Optional[grpc.StatusCode]:
707        """See grpc.Call.code"""
708        with self._state.condition:
709            if self._state.code is None:
710                raise grpc.experimental.UsageError(
711                    "Cannot get code until RPC is completed."
712                )
713            return self._state.code
714
715    def details(self) -> Optional[str]:
716        """See grpc.Call.details"""
717        with self._state.condition:
718            if self._state.details is None:
719                raise grpc.experimental.UsageError(
720                    "Cannot get details until RPC is completed."
721                )
722            return _common.decode(self._state.details)
723
724    def _consume_next_event(self) -> Optional[cygrpc.BaseEvent]:
725        event = self._call.next_event()
726        with self._state.condition:
727            callbacks = _handle_event(
728                event, self._state, self._response_deserializer
729            )
730            for callback in callbacks:
731                # NOTE(gnossen): We intentionally allow exceptions to bubble up
732                # to the user when running on a single thread.
733                callback()
734        return event
735
736    def _next_response(self) -> Any:
737        while True:
738            self._consume_next_event()
739            with self._state.condition:
740                if self._state.response is not None:
741                    response = self._state.response
742                    self._state.response = None
743                    return response
744                elif (
745                    cygrpc.OperationType.receive_message not in self._state.due
746                ):
747                    if self._state.code is grpc.StatusCode.OK:
748                        raise StopIteration()
749                    elif self._state.code is not None:
750                        raise self
751
752    def _next(self) -> Any:
753        with self._state.condition:
754            if self._state.code is None:
755                # We tentatively add the operation as expected and remove
756                # it if the enqueue operation fails. This allows us to guarantee that
757                # if an event has been submitted to the core completion queue,
758                # it is in `due`. If we waited until after a successful
759                # enqueue operation then a signal could interrupt this
760                # thread between the enqueue operation and the addition of the
761                # operation to `due`. This would cause an exception on the
762                # channel spin thread when the operation completes and no
763                # corresponding operation would be present in state.due.
764                # Note that, since `condition` is held through this block, there is
765                # no data race on `due`.
766                self._state.due.add(cygrpc.OperationType.receive_message)
767                operating = self._call.operate(
768                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None
769                )
770                if not operating:
771                    self._state.due.remove(cygrpc.OperationType.receive_message)
772            elif self._state.code is grpc.StatusCode.OK:
773                raise StopIteration()
774            else:
775                raise self
776        return self._next_response()
777
778    def debug_error_string(self) -> Optional[str]:
779        with self._state.condition:
780            if self._state.debug_error_string is None:
781                raise grpc.experimental.UsageError(
782                    "Cannot get debug error string until RPC is completed."
783                )
784            return _common.decode(self._state.debug_error_string)
785
786
787class _MultiThreadedRendezvous(
788    _Rendezvous, grpc.Call, grpc.Future
789):  # pylint: disable=too-many-ancestors
790    """An RPC iterator that depends on a channel spin thread.
791
792    This iterator relies upon a per-channel thread running in the background,
793    dequeueing events from the completion queue, and notifying threads waiting
794    on the threading.Condition object in the _RPCState object.
795
796    This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
797    and to mediate a bidirection streaming RPC.
798    """
799
800    _state: _RPCState
801
802    def initial_metadata(self) -> Optional[MetadataType]:
803        """See grpc.Call.initial_metadata"""
804        with self._state.condition:
805
806            def _done():
807                return self._state.initial_metadata is not None
808
809            _common.wait(self._state.condition.wait, _done)
810            return self._state.initial_metadata
811
812    def trailing_metadata(self) -> Optional[MetadataType]:
813        """See grpc.Call.trailing_metadata"""
814        with self._state.condition:
815
816            def _done():
817                return self._state.trailing_metadata is not None
818
819            _common.wait(self._state.condition.wait, _done)
820            return self._state.trailing_metadata
821
822    def code(self) -> Optional[grpc.StatusCode]:
823        """See grpc.Call.code"""
824        with self._state.condition:
825
826            def _done():
827                return self._state.code is not None
828
829            _common.wait(self._state.condition.wait, _done)
830            return self._state.code
831
832    def details(self) -> Optional[str]:
833        """See grpc.Call.details"""
834        with self._state.condition:
835
836            def _done():
837                return self._state.details is not None
838
839            _common.wait(self._state.condition.wait, _done)
840            return _common.decode(self._state.details)
841
842    def debug_error_string(self) -> Optional[str]:
843        with self._state.condition:
844
845            def _done():
846                return self._state.debug_error_string is not None
847
848            _common.wait(self._state.condition.wait, _done)
849            return _common.decode(self._state.debug_error_string)
850
851    def cancelled(self) -> bool:
852        with self._state.condition:
853            return self._state.cancelled
854
855    def running(self) -> bool:
856        with self._state.condition:
857            return self._state.code is None
858
859    def done(self) -> bool:
860        with self._state.condition:
861            return self._state.code is not None
862
863    def _is_complete(self) -> bool:
864        return self._state.code is not None
865
866    def result(self, timeout: Optional[float] = None) -> Any:
867        """Returns the result of the computation or raises its exception.
868
869        See grpc.Future.result for the full API contract.
870        """
871        with self._state.condition:
872            timed_out = _common.wait(
873                self._state.condition.wait, self._is_complete, timeout=timeout
874            )
875            if timed_out:
876                raise grpc.FutureTimeoutError()
877            else:
878                if self._state.code is grpc.StatusCode.OK:
879                    return self._state.response
880                elif self._state.cancelled:
881                    raise grpc.FutureCancelledError()
882                else:
883                    raise self
884
885    def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
886        """Return the exception raised by the computation.
887
888        See grpc.Future.exception for the full API contract.
889        """
890        with self._state.condition:
891            timed_out = _common.wait(
892                self._state.condition.wait, self._is_complete, timeout=timeout
893            )
894            if timed_out:
895                raise grpc.FutureTimeoutError()
896            else:
897                if self._state.code is grpc.StatusCode.OK:
898                    return None
899                elif self._state.cancelled:
900                    raise grpc.FutureCancelledError()
901                else:
902                    return self
903
904    def traceback(
905        self, timeout: Optional[float] = None
906    ) -> Optional[types.TracebackType]:
907        """Access the traceback of the exception raised by the computation.
908
909        See grpc.future.traceback for the full API contract.
910        """
911        with self._state.condition:
912            timed_out = _common.wait(
913                self._state.condition.wait, self._is_complete, timeout=timeout
914            )
915            if timed_out:
916                raise grpc.FutureTimeoutError()
917            else:
918                if self._state.code is grpc.StatusCode.OK:
919                    return None
920                elif self._state.cancelled:
921                    raise grpc.FutureCancelledError()
922                else:
923                    try:
924                        raise self
925                    except grpc.RpcError:
926                        return sys.exc_info()[2]
927
928    def add_done_callback(self, fn: Callable[[grpc.Future], None]) -> None:
929        with self._state.condition:
930            if self._state.code is None:
931                self._state.callbacks.append(functools.partial(fn, self))
932                return
933
934        fn(self)
935
936    def _next(self) -> Any:
937        with self._state.condition:
938            if self._state.code is None:
939                event_handler = _event_handler(
940                    self._state, self._response_deserializer
941                )
942                self._state.due.add(cygrpc.OperationType.receive_message)
943                operating = self._call.operate(
944                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
945                    event_handler,
946                )
947                if not operating:
948                    self._state.due.remove(cygrpc.OperationType.receive_message)
949            elif self._state.code is grpc.StatusCode.OK:
950                raise StopIteration()
951            else:
952                raise self
953
954            def _response_ready():
955                return self._state.response is not None or (
956                    cygrpc.OperationType.receive_message not in self._state.due
957                    and self._state.code is not None
958                )
959
960            _common.wait(self._state.condition.wait, _response_ready)
961            if self._state.response is not None:
962                response = self._state.response
963                self._state.response = None
964                return response
965            elif cygrpc.OperationType.receive_message not in self._state.due:
966                if self._state.code is grpc.StatusCode.OK:
967                    raise StopIteration()
968                elif self._state.code is not None:
969                    raise self
970
971
972def _start_unary_request(
973    request: Any,
974    timeout: Optional[float],
975    request_serializer: SerializingFunction,
976) -> Tuple[Optional[float], Optional[bytes], Optional[grpc.RpcError]]:
977    deadline = _deadline(timeout)
978    serialized_request = _common.serialize(request, request_serializer)
979    if serialized_request is None:
980        state = _RPCState(
981            (),
982            (),
983            (),
984            grpc.StatusCode.INTERNAL,
985            "Exception serializing request!",
986        )
987        error = _InactiveRpcError(state)
988        return deadline, None, error
989    else:
990        return deadline, serialized_request, None
991
992
993def _end_unary_response_blocking(
994    state: _RPCState,
995    call: cygrpc.SegregatedCall,
996    with_call: bool,
997    deadline: Optional[float],
998) -> Union[ResponseType, Tuple[ResponseType, grpc.Call]]:
999    if state.code is grpc.StatusCode.OK:
1000        if with_call:
1001            rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
1002            return state.response, rendezvous
1003        else:
1004            return state.response
1005    else:
1006        raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
1007
1008
1009def _stream_unary_invocation_operations(
1010    metadata: Optional[MetadataType], initial_metadata_flags: int
1011) -> Sequence[Sequence[cygrpc.Operation]]:
1012    return (
1013        (
1014            cygrpc.SendInitialMetadataOperation(
1015                metadata, initial_metadata_flags
1016            ),
1017            cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1018            cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1019        ),
1020        (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1021    )
1022
1023
1024def _stream_unary_invocation_operations_and_tags(
1025    metadata: Optional[MetadataType], initial_metadata_flags: int
1026) -> Sequence[Tuple[Sequence[cygrpc.Operation], Optional[UserTag]]]:
1027    return tuple(
1028        (
1029            operations,
1030            None,
1031        )
1032        for operations in _stream_unary_invocation_operations(
1033            metadata, initial_metadata_flags
1034        )
1035    )
1036
1037
1038def _determine_deadline(user_deadline: Optional[float]) -> Optional[float]:
1039    parent_deadline = cygrpc.get_deadline_from_context()
1040    if parent_deadline is None and user_deadline is None:
1041        return None
1042    elif parent_deadline is not None and user_deadline is None:
1043        return parent_deadline
1044    elif user_deadline is not None and parent_deadline is None:
1045        return user_deadline
1046    else:
1047        return min(parent_deadline, user_deadline)
1048
1049
1050class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
1051    _channel: cygrpc.Channel
1052    _managed_call: IntegratedCallFactory
1053    _method: bytes
1054    _target: bytes
1055    _request_serializer: Optional[SerializingFunction]
1056    _response_deserializer: Optional[DeserializingFunction]
1057    _context: Any
1058    _registered_call_handle: Optional[int]
1059
1060    __slots__ = [
1061        "_channel",
1062        "_managed_call",
1063        "_method",
1064        "_target",
1065        "_request_serializer",
1066        "_response_deserializer",
1067        "_context",
1068    ]
1069
1070    # pylint: disable=too-many-arguments
1071    def __init__(
1072        self,
1073        channel: cygrpc.Channel,
1074        managed_call: IntegratedCallFactory,
1075        method: bytes,
1076        target: bytes,
1077        request_serializer: Optional[SerializingFunction],
1078        response_deserializer: Optional[DeserializingFunction],
1079        _registered_call_handle: Optional[int],
1080    ):
1081        self._channel = channel
1082        self._managed_call = managed_call
1083        self._method = method
1084        self._target = target
1085        self._request_serializer = request_serializer
1086        self._response_deserializer = response_deserializer
1087        self._context = cygrpc.build_census_context()
1088        self._registered_call_handle = _registered_call_handle
1089
1090    def _prepare(
1091        self,
1092        request: Any,
1093        timeout: Optional[float],
1094        metadata: Optional[MetadataType],
1095        wait_for_ready: Optional[bool],
1096        compression: Optional[grpc.Compression],
1097    ) -> Tuple[
1098        Optional[_RPCState],
1099        Optional[Sequence[cygrpc.Operation]],
1100        Optional[float],
1101        Optional[grpc.RpcError],
1102    ]:
1103        deadline, serialized_request, rendezvous = _start_unary_request(
1104            request, timeout, self._request_serializer
1105        )
1106        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1107            wait_for_ready
1108        )
1109        augmented_metadata = _compression.augment_metadata(
1110            metadata, compression
1111        )
1112        if serialized_request is None:
1113            return None, None, None, rendezvous
1114        else:
1115            state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
1116            operations = (
1117                cygrpc.SendInitialMetadataOperation(
1118                    augmented_metadata, initial_metadata_flags
1119                ),
1120                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1121                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1122                cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
1123                cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
1124                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1125            )
1126            return state, operations, deadline, None
1127
1128    def _blocking(
1129        self,
1130        request: Any,
1131        timeout: Optional[float] = None,
1132        metadata: Optional[MetadataType] = None,
1133        credentials: Optional[grpc.CallCredentials] = None,
1134        wait_for_ready: Optional[bool] = None,
1135        compression: Optional[grpc.Compression] = None,
1136    ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1137        state, operations, deadline, rendezvous = self._prepare(
1138            request, timeout, metadata, wait_for_ready, compression
1139        )
1140        if state is None:
1141            raise rendezvous  # pylint: disable-msg=raising-bad-type
1142        else:
1143            state.rpc_start_time = time.perf_counter()
1144            state.method = _common.decode(self._method)
1145            state.target = _common.decode(self._target)
1146            call = self._channel.segregated_call(
1147                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1148                self._method,
1149                None,
1150                _determine_deadline(deadline),
1151                metadata,
1152                None if credentials is None else credentials._credentials,
1153                (
1154                    (
1155                        operations,
1156                        None,
1157                    ),
1158                ),
1159                self._context,
1160                self._registered_call_handle,
1161            )
1162            event = call.next_event()
1163            _handle_event(event, state, self._response_deserializer)
1164            return state, call
1165
1166    def __call__(
1167        self,
1168        request: Any,
1169        timeout: Optional[float] = None,
1170        metadata: Optional[MetadataType] = None,
1171        credentials: Optional[grpc.CallCredentials] = None,
1172        wait_for_ready: Optional[bool] = None,
1173        compression: Optional[grpc.Compression] = None,
1174    ) -> Any:
1175        (
1176            state,
1177            call,
1178        ) = self._blocking(
1179            request, timeout, metadata, credentials, wait_for_ready, compression
1180        )
1181        return _end_unary_response_blocking(state, call, False, None)
1182
1183    def with_call(
1184        self,
1185        request: Any,
1186        timeout: Optional[float] = None,
1187        metadata: Optional[MetadataType] = None,
1188        credentials: Optional[grpc.CallCredentials] = None,
1189        wait_for_ready: Optional[bool] = None,
1190        compression: Optional[grpc.Compression] = None,
1191    ) -> Tuple[Any, grpc.Call]:
1192        (
1193            state,
1194            call,
1195        ) = self._blocking(
1196            request, timeout, metadata, credentials, wait_for_ready, compression
1197        )
1198        return _end_unary_response_blocking(state, call, True, None)
1199
1200    def future(
1201        self,
1202        request: Any,
1203        timeout: Optional[float] = None,
1204        metadata: Optional[MetadataType] = None,
1205        credentials: Optional[grpc.CallCredentials] = None,
1206        wait_for_ready: Optional[bool] = None,
1207        compression: Optional[grpc.Compression] = None,
1208    ) -> _MultiThreadedRendezvous:
1209        state, operations, deadline, rendezvous = self._prepare(
1210            request, timeout, metadata, wait_for_ready, compression
1211        )
1212        if state is None:
1213            raise rendezvous  # pylint: disable-msg=raising-bad-type
1214        else:
1215            event_handler = _event_handler(state, self._response_deserializer)
1216            state.rpc_start_time = time.perf_counter()
1217            state.method = _common.decode(self._method)
1218            state.target = _common.decode(self._target)
1219            call = self._managed_call(
1220                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1221                self._method,
1222                None,
1223                deadline,
1224                metadata,
1225                None if credentials is None else credentials._credentials,
1226                (operations,),
1227                event_handler,
1228                self._context,
1229                self._registered_call_handle,
1230            )
1231            return _MultiThreadedRendezvous(
1232                state, call, self._response_deserializer, deadline
1233            )
1234
1235
1236class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1237    _channel: cygrpc.Channel
1238    _method: bytes
1239    _target: bytes
1240    _request_serializer: Optional[SerializingFunction]
1241    _response_deserializer: Optional[DeserializingFunction]
1242    _context: Any
1243    _registered_call_handle: Optional[int]
1244
1245    __slots__ = [
1246        "_channel",
1247        "_method",
1248        "_target",
1249        "_request_serializer",
1250        "_response_deserializer",
1251        "_context",
1252    ]
1253
1254    # pylint: disable=too-many-arguments
1255    def __init__(
1256        self,
1257        channel: cygrpc.Channel,
1258        method: bytes,
1259        target: bytes,
1260        request_serializer: SerializingFunction,
1261        response_deserializer: DeserializingFunction,
1262        _registered_call_handle: Optional[int],
1263    ):
1264        self._channel = channel
1265        self._method = method
1266        self._target = target
1267        self._request_serializer = request_serializer
1268        self._response_deserializer = response_deserializer
1269        self._context = cygrpc.build_census_context()
1270        self._registered_call_handle = _registered_call_handle
1271
1272    def __call__(  # pylint: disable=too-many-locals
1273        self,
1274        request: Any,
1275        timeout: Optional[float] = None,
1276        metadata: Optional[MetadataType] = None,
1277        credentials: Optional[grpc.CallCredentials] = None,
1278        wait_for_ready: Optional[bool] = None,
1279        compression: Optional[grpc.Compression] = None,
1280    ) -> _SingleThreadedRendezvous:
1281        deadline = _deadline(timeout)
1282        serialized_request = _common.serialize(
1283            request, self._request_serializer
1284        )
1285        if serialized_request is None:
1286            state = _RPCState(
1287                (),
1288                (),
1289                (),
1290                grpc.StatusCode.INTERNAL,
1291                "Exception serializing request!",
1292            )
1293            raise _InactiveRpcError(state)
1294
1295        state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1296        call_credentials = (
1297            None if credentials is None else credentials._credentials
1298        )
1299        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1300            wait_for_ready
1301        )
1302        augmented_metadata = _compression.augment_metadata(
1303            metadata, compression
1304        )
1305        operations = (
1306            (
1307                cygrpc.SendInitialMetadataOperation(
1308                    augmented_metadata, initial_metadata_flags
1309                ),
1310                cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1311                cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1312            ),
1313            (cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1314            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1315        )
1316        operations_and_tags = tuple((ops, None) for ops in operations)
1317        state.rpc_start_time = time.perf_counter()
1318        state.method = _common.decode(self._method)
1319        state.target = _common.decode(self._target)
1320        call = self._channel.segregated_call(
1321            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1322            self._method,
1323            None,
1324            _determine_deadline(deadline),
1325            metadata,
1326            call_credentials,
1327            operations_and_tags,
1328            self._context,
1329            self._registered_call_handle,
1330        )
1331        return _SingleThreadedRendezvous(
1332            state, call, self._response_deserializer, deadline
1333        )
1334
1335
1336class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1337    _channel: cygrpc.Channel
1338    _managed_call: IntegratedCallFactory
1339    _method: bytes
1340    _target: bytes
1341    _request_serializer: Optional[SerializingFunction]
1342    _response_deserializer: Optional[DeserializingFunction]
1343    _context: Any
1344    _registered_call_handle: Optional[int]
1345
1346    __slots__ = [
1347        "_channel",
1348        "_managed_call",
1349        "_method",
1350        "_target",
1351        "_request_serializer",
1352        "_response_deserializer",
1353        "_context",
1354    ]
1355
1356    # pylint: disable=too-many-arguments
1357    def __init__(
1358        self,
1359        channel: cygrpc.Channel,
1360        managed_call: IntegratedCallFactory,
1361        method: bytes,
1362        target: bytes,
1363        request_serializer: SerializingFunction,
1364        response_deserializer: DeserializingFunction,
1365        _registered_call_handle: Optional[int],
1366    ):
1367        self._channel = channel
1368        self._managed_call = managed_call
1369        self._method = method
1370        self._target = target
1371        self._request_serializer = request_serializer
1372        self._response_deserializer = response_deserializer
1373        self._context = cygrpc.build_census_context()
1374        self._registered_call_handle = _registered_call_handle
1375
1376    def __call__(  # pylint: disable=too-many-locals
1377        self,
1378        request: Any,
1379        timeout: Optional[float] = None,
1380        metadata: Optional[MetadataType] = None,
1381        credentials: Optional[grpc.CallCredentials] = None,
1382        wait_for_ready: Optional[bool] = None,
1383        compression: Optional[grpc.Compression] = None,
1384    ) -> _MultiThreadedRendezvous:
1385        deadline, serialized_request, rendezvous = _start_unary_request(
1386            request, timeout, self._request_serializer
1387        )
1388        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1389            wait_for_ready
1390        )
1391        if serialized_request is None:
1392            raise rendezvous  # pylint: disable-msg=raising-bad-type
1393        else:
1394            augmented_metadata = _compression.augment_metadata(
1395                metadata, compression
1396            )
1397            state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1398            operations = (
1399                (
1400                    cygrpc.SendInitialMetadataOperation(
1401                        augmented_metadata, initial_metadata_flags
1402                    ),
1403                    cygrpc.SendMessageOperation(
1404                        serialized_request, _EMPTY_FLAGS
1405                    ),
1406                    cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1407                    cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1408                ),
1409                (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1410            )
1411            state.rpc_start_time = time.perf_counter()
1412            state.method = _common.decode(self._method)
1413            state.target = _common.decode(self._target)
1414            call = self._managed_call(
1415                cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1416                self._method,
1417                None,
1418                _determine_deadline(deadline),
1419                metadata,
1420                None if credentials is None else credentials._credentials,
1421                operations,
1422                _event_handler(state, self._response_deserializer),
1423                self._context,
1424                self._registered_call_handle,
1425            )
1426            return _MultiThreadedRendezvous(
1427                state, call, self._response_deserializer, deadline
1428            )
1429
1430
1431class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1432    _channel: cygrpc.Channel
1433    _managed_call: IntegratedCallFactory
1434    _method: bytes
1435    _target: bytes
1436    _request_serializer: Optional[SerializingFunction]
1437    _response_deserializer: Optional[DeserializingFunction]
1438    _context: Any
1439    _registered_call_handle: Optional[int]
1440
1441    __slots__ = [
1442        "_channel",
1443        "_managed_call",
1444        "_method",
1445        "_target",
1446        "_request_serializer",
1447        "_response_deserializer",
1448        "_context",
1449    ]
1450
1451    # pylint: disable=too-many-arguments
1452    def __init__(
1453        self,
1454        channel: cygrpc.Channel,
1455        managed_call: IntegratedCallFactory,
1456        method: bytes,
1457        target: bytes,
1458        request_serializer: Optional[SerializingFunction],
1459        response_deserializer: Optional[DeserializingFunction],
1460        _registered_call_handle: Optional[int],
1461    ):
1462        self._channel = channel
1463        self._managed_call = managed_call
1464        self._method = method
1465        self._target = target
1466        self._request_serializer = request_serializer
1467        self._response_deserializer = response_deserializer
1468        self._context = cygrpc.build_census_context()
1469        self._registered_call_handle = _registered_call_handle
1470
1471    def _blocking(
1472        self,
1473        request_iterator: Iterator,
1474        timeout: Optional[float],
1475        metadata: Optional[MetadataType],
1476        credentials: Optional[grpc.CallCredentials],
1477        wait_for_ready: Optional[bool],
1478        compression: Optional[grpc.Compression],
1479    ) -> Tuple[_RPCState, cygrpc.SegregatedCall]:
1480        deadline = _deadline(timeout)
1481        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1482        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1483            wait_for_ready
1484        )
1485        augmented_metadata = _compression.augment_metadata(
1486            metadata, compression
1487        )
1488        state.rpc_start_time = time.perf_counter()
1489        state.method = _common.decode(self._method)
1490        state.target = _common.decode(self._target)
1491        call = self._channel.segregated_call(
1492            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1493            self._method,
1494            None,
1495            _determine_deadline(deadline),
1496            augmented_metadata,
1497            None if credentials is None else credentials._credentials,
1498            _stream_unary_invocation_operations_and_tags(
1499                augmented_metadata, initial_metadata_flags
1500            ),
1501            self._context,
1502            self._registered_call_handle,
1503        )
1504        _consume_request_iterator(
1505            request_iterator, state, call, self._request_serializer, None
1506        )
1507        while True:
1508            event = call.next_event()
1509            with state.condition:
1510                _handle_event(event, state, self._response_deserializer)
1511                state.condition.notify_all()
1512                if not state.due:
1513                    break
1514        return state, call
1515
1516    def __call__(
1517        self,
1518        request_iterator: Iterator,
1519        timeout: Optional[float] = None,
1520        metadata: Optional[MetadataType] = None,
1521        credentials: Optional[grpc.CallCredentials] = None,
1522        wait_for_ready: Optional[bool] = None,
1523        compression: Optional[grpc.Compression] = None,
1524    ) -> Any:
1525        (
1526            state,
1527            call,
1528        ) = self._blocking(
1529            request_iterator,
1530            timeout,
1531            metadata,
1532            credentials,
1533            wait_for_ready,
1534            compression,
1535        )
1536        return _end_unary_response_blocking(state, call, False, None)
1537
1538    def with_call(
1539        self,
1540        request_iterator: Iterator,
1541        timeout: Optional[float] = None,
1542        metadata: Optional[MetadataType] = None,
1543        credentials: Optional[grpc.CallCredentials] = None,
1544        wait_for_ready: Optional[bool] = None,
1545        compression: Optional[grpc.Compression] = None,
1546    ) -> Tuple[Any, grpc.Call]:
1547        (
1548            state,
1549            call,
1550        ) = self._blocking(
1551            request_iterator,
1552            timeout,
1553            metadata,
1554            credentials,
1555            wait_for_ready,
1556            compression,
1557        )
1558        return _end_unary_response_blocking(state, call, True, None)
1559
1560    def future(
1561        self,
1562        request_iterator: Iterator,
1563        timeout: Optional[float] = None,
1564        metadata: Optional[MetadataType] = None,
1565        credentials: Optional[grpc.CallCredentials] = None,
1566        wait_for_ready: Optional[bool] = None,
1567        compression: Optional[grpc.Compression] = None,
1568    ) -> _MultiThreadedRendezvous:
1569        deadline = _deadline(timeout)
1570        state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1571        event_handler = _event_handler(state, self._response_deserializer)
1572        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1573            wait_for_ready
1574        )
1575        augmented_metadata = _compression.augment_metadata(
1576            metadata, compression
1577        )
1578        state.rpc_start_time = time.perf_counter()
1579        state.method = _common.decode(self._method)
1580        state.target = _common.decode(self._target)
1581        call = self._managed_call(
1582            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1583            self._method,
1584            None,
1585            deadline,
1586            augmented_metadata,
1587            None if credentials is None else credentials._credentials,
1588            _stream_unary_invocation_operations(
1589                metadata, initial_metadata_flags
1590            ),
1591            event_handler,
1592            self._context,
1593            self._registered_call_handle,
1594        )
1595        _consume_request_iterator(
1596            request_iterator,
1597            state,
1598            call,
1599            self._request_serializer,
1600            event_handler,
1601        )
1602        return _MultiThreadedRendezvous(
1603            state, call, self._response_deserializer, deadline
1604        )
1605
1606
1607class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1608    _channel: cygrpc.Channel
1609    _managed_call: IntegratedCallFactory
1610    _method: bytes
1611    _target: bytes
1612    _request_serializer: Optional[SerializingFunction]
1613    _response_deserializer: Optional[DeserializingFunction]
1614    _context: Any
1615    _registered_call_handle: Optional[int]
1616
1617    __slots__ = [
1618        "_channel",
1619        "_managed_call",
1620        "_method",
1621        "_target",
1622        "_request_serializer",
1623        "_response_deserializer",
1624        "_context",
1625    ]
1626
1627    # pylint: disable=too-many-arguments
1628    def __init__(
1629        self,
1630        channel: cygrpc.Channel,
1631        managed_call: IntegratedCallFactory,
1632        method: bytes,
1633        target: bytes,
1634        request_serializer: Optional[SerializingFunction],
1635        response_deserializer: Optional[DeserializingFunction],
1636        _registered_call_handle: Optional[int],
1637    ):
1638        self._channel = channel
1639        self._managed_call = managed_call
1640        self._method = method
1641        self._target = target
1642        self._request_serializer = request_serializer
1643        self._response_deserializer = response_deserializer
1644        self._context = cygrpc.build_census_context()
1645        self._registered_call_handle = _registered_call_handle
1646
1647    def __call__(
1648        self,
1649        request_iterator: Iterator,
1650        timeout: Optional[float] = None,
1651        metadata: Optional[MetadataType] = None,
1652        credentials: Optional[grpc.CallCredentials] = None,
1653        wait_for_ready: Optional[bool] = None,
1654        compression: Optional[grpc.Compression] = None,
1655    ) -> _MultiThreadedRendezvous:
1656        deadline = _deadline(timeout)
1657        state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1658        initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1659            wait_for_ready
1660        )
1661        augmented_metadata = _compression.augment_metadata(
1662            metadata, compression
1663        )
1664        operations = (
1665            (
1666                cygrpc.SendInitialMetadataOperation(
1667                    augmented_metadata, initial_metadata_flags
1668                ),
1669                cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1670            ),
1671            (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1672        )
1673        event_handler = _event_handler(state, self._response_deserializer)
1674        state.rpc_start_time = time.perf_counter()
1675        state.method = _common.decode(self._method)
1676        state.target = _common.decode(self._target)
1677        call = self._managed_call(
1678            cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1679            self._method,
1680            None,
1681            _determine_deadline(deadline),
1682            augmented_metadata,
1683            None if credentials is None else credentials._credentials,
1684            operations,
1685            event_handler,
1686            self._context,
1687            self._registered_call_handle,
1688        )
1689        _consume_request_iterator(
1690            request_iterator,
1691            state,
1692            call,
1693            self._request_serializer,
1694            event_handler,
1695        )
1696        return _MultiThreadedRendezvous(
1697            state, call, self._response_deserializer, deadline
1698        )
1699
1700
1701class _InitialMetadataFlags(int):
1702    """Stores immutable initial metadata flags"""
1703
1704    def __new__(cls, value: int = _EMPTY_FLAGS):
1705        value &= cygrpc.InitialMetadataFlags.used_mask
1706        return super(_InitialMetadataFlags, cls).__new__(cls, value)
1707
1708    def with_wait_for_ready(self, wait_for_ready: Optional[bool]) -> int:
1709        if wait_for_ready is not None:
1710            if wait_for_ready:
1711                return self.__class__(
1712                    self
1713                    | cygrpc.InitialMetadataFlags.wait_for_ready
1714                    | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1715                )
1716            elif not wait_for_ready:
1717                return self.__class__(
1718                    self & ~cygrpc.InitialMetadataFlags.wait_for_ready
1719                    | cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set
1720                )
1721        return self
1722
1723
1724class _ChannelCallState(object):
1725    channel: cygrpc.Channel
1726    managed_calls: int
1727    threading: bool
1728
1729    def __init__(self, channel: cygrpc.Channel):
1730        self.lock = threading.Lock()
1731        self.channel = channel
1732        self.managed_calls = 0
1733        self.threading = False
1734
1735    def reset_postfork_child(self) -> None:
1736        self.managed_calls = 0
1737
1738    def __del__(self):
1739        try:
1740            self.channel.close(
1741                cygrpc.StatusCode.cancelled, "Channel deallocated!"
1742            )
1743        except (TypeError, AttributeError):
1744            pass
1745
1746
1747def _run_channel_spin_thread(state: _ChannelCallState) -> None:
1748    def channel_spin():
1749        while True:
1750            cygrpc.block_if_fork_in_progress(state)
1751            event = state.channel.next_call_event()
1752            if event.completion_type == cygrpc.CompletionType.queue_timeout:
1753                continue
1754            call_completed = event.tag(event)
1755            if call_completed:
1756                with state.lock:
1757                    state.managed_calls -= 1
1758                    if state.managed_calls == 0:
1759                        return
1760
1761    channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1762    channel_spin_thread.setDaemon(True)
1763    channel_spin_thread.start()
1764
1765
1766def _channel_managed_call_management(state: _ChannelCallState):
1767    # pylint: disable=too-many-arguments
1768    def create(
1769        flags: int,
1770        method: bytes,
1771        host: Optional[str],
1772        deadline: Optional[float],
1773        metadata: Optional[MetadataType],
1774        credentials: Optional[cygrpc.CallCredentials],
1775        operations: Sequence[Sequence[cygrpc.Operation]],
1776        event_handler: UserTag,
1777        context: Any,
1778        _registered_call_handle: Optional[int],
1779    ) -> cygrpc.IntegratedCall:
1780        """Creates a cygrpc.IntegratedCall.
1781
1782        Args:
1783          flags: An integer bitfield of call flags.
1784          method: The RPC method.
1785          host: A host string for the created call.
1786          deadline: A float to be the deadline of the created call or None if
1787            the call is to have an infinite deadline.
1788          metadata: The metadata for the call or None.
1789          credentials: A cygrpc.CallCredentials or None.
1790          operations: A sequence of sequences of cygrpc.Operations to be
1791            started on the call.
1792          event_handler: A behavior to call to handle the events resultant from
1793            the operations on the call.
1794          context: Context object for distributed tracing.
1795          _registered_call_handle: An int representing the call handle of the
1796            method, or None if the method is not registered.
1797        Returns:
1798          A cygrpc.IntegratedCall with which to conduct an RPC.
1799        """
1800        operations_and_tags = tuple(
1801            (
1802                operation,
1803                event_handler,
1804            )
1805            for operation in operations
1806        )
1807        with state.lock:
1808            call = state.channel.integrated_call(
1809                flags,
1810                method,
1811                host,
1812                deadline,
1813                metadata,
1814                credentials,
1815                operations_and_tags,
1816                context,
1817                _registered_call_handle,
1818            )
1819            if state.managed_calls == 0:
1820                state.managed_calls = 1
1821                _run_channel_spin_thread(state)
1822            else:
1823                state.managed_calls += 1
1824            return call
1825
1826    return create
1827
1828
1829class _ChannelConnectivityState(object):
1830    lock: threading.RLock
1831    channel: grpc.Channel
1832    polling: bool
1833    connectivity: grpc.ChannelConnectivity
1834    try_to_connect: bool
1835    # TODO(xuanwn): Refactor this: https://github.com/grpc/grpc/issues/31704
1836    callbacks_and_connectivities: List[
1837        Sequence[
1838            Union[
1839                Callable[[grpc.ChannelConnectivity], None],
1840                Optional[grpc.ChannelConnectivity],
1841            ]
1842        ]
1843    ]
1844    delivering: bool
1845
1846    def __init__(self, channel: grpc.Channel):
1847        self.lock = threading.RLock()
1848        self.channel = channel
1849        self.polling = False
1850        self.connectivity = None
1851        self.try_to_connect = False
1852        self.callbacks_and_connectivities = []
1853        self.delivering = False
1854
1855    def reset_postfork_child(self) -> None:
1856        self.polling = False
1857        self.connectivity = None
1858        self.try_to_connect = False
1859        self.callbacks_and_connectivities = []
1860        self.delivering = False
1861
1862
1863def _deliveries(
1864    state: _ChannelConnectivityState,
1865) -> List[Callable[[grpc.ChannelConnectivity], None]]:
1866    callbacks_needing_update = []
1867    for callback_and_connectivity in state.callbacks_and_connectivities:
1868        (
1869            callback,
1870            callback_connectivity,
1871        ) = callback_and_connectivity
1872        if callback_connectivity is not state.connectivity:
1873            callbacks_needing_update.append(callback)
1874            callback_and_connectivity[1] = state.connectivity
1875    return callbacks_needing_update
1876
1877
1878def _deliver(
1879    state: _ChannelConnectivityState,
1880    initial_connectivity: grpc.ChannelConnectivity,
1881    initial_callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1882) -> None:
1883    connectivity = initial_connectivity
1884    callbacks = initial_callbacks
1885    while True:
1886        for callback in callbacks:
1887            cygrpc.block_if_fork_in_progress(state)
1888            try:
1889                callback(connectivity)
1890            except Exception:  # pylint: disable=broad-except
1891                _LOGGER.exception(
1892                    _CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE
1893                )
1894        with state.lock:
1895            callbacks = _deliveries(state)
1896            if callbacks:
1897                connectivity = state.connectivity
1898            else:
1899                state.delivering = False
1900                return
1901
1902
1903def _spawn_delivery(
1904    state: _ChannelConnectivityState,
1905    callbacks: Sequence[Callable[[grpc.ChannelConnectivity], None]],
1906) -> None:
1907    delivering_thread = cygrpc.ForkManagedThread(
1908        target=_deliver,
1909        args=(
1910            state,
1911            state.connectivity,
1912            callbacks,
1913        ),
1914    )
1915    delivering_thread.setDaemon(True)
1916    delivering_thread.start()
1917    state.delivering = True
1918
1919
1920# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1921def _poll_connectivity(
1922    state: _ChannelConnectivityState,
1923    channel: grpc.Channel,
1924    initial_try_to_connect: bool,
1925) -> None:
1926    try_to_connect = initial_try_to_connect
1927    connectivity = channel.check_connectivity_state(try_to_connect)
1928    with state.lock:
1929        state.connectivity = (
1930            _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1931                connectivity
1932            ]
1933        )
1934        callbacks = tuple(
1935            callback for callback, _ in state.callbacks_and_connectivities
1936        )
1937        for callback_and_connectivity in state.callbacks_and_connectivities:
1938            callback_and_connectivity[1] = state.connectivity
1939        if callbacks:
1940            _spawn_delivery(state, callbacks)
1941    while True:
1942        event = channel.watch_connectivity_state(
1943            connectivity, time.time() + 0.2
1944        )
1945        cygrpc.block_if_fork_in_progress(state)
1946        with state.lock:
1947            if (
1948                not state.callbacks_and_connectivities
1949                and not state.try_to_connect
1950            ):
1951                state.polling = False
1952                state.connectivity = None
1953                break
1954            try_to_connect = state.try_to_connect
1955            state.try_to_connect = False
1956        if event.success or try_to_connect:
1957            connectivity = channel.check_connectivity_state(try_to_connect)
1958            with state.lock:
1959                state.connectivity = (
1960                    _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1961                        connectivity
1962                    ]
1963                )
1964                if not state.delivering:
1965                    callbacks = _deliveries(state)
1966                    if callbacks:
1967                        _spawn_delivery(state, callbacks)
1968
1969
1970def _subscribe(
1971    state: _ChannelConnectivityState,
1972    callback: Callable[[grpc.ChannelConnectivity], None],
1973    try_to_connect: bool,
1974) -> None:
1975    with state.lock:
1976        if not state.callbacks_and_connectivities and not state.polling:
1977            polling_thread = cygrpc.ForkManagedThread(
1978                target=_poll_connectivity,
1979                args=(state, state.channel, bool(try_to_connect)),
1980            )
1981            polling_thread.setDaemon(True)
1982            polling_thread.start()
1983            state.polling = True
1984            state.callbacks_and_connectivities.append([callback, None])
1985        elif not state.delivering and state.connectivity is not None:
1986            _spawn_delivery(state, (callback,))
1987            state.try_to_connect |= bool(try_to_connect)
1988            state.callbacks_and_connectivities.append(
1989                [callback, state.connectivity]
1990            )
1991        else:
1992            state.try_to_connect |= bool(try_to_connect)
1993            state.callbacks_and_connectivities.append([callback, None])
1994
1995
1996def _unsubscribe(
1997    state: _ChannelConnectivityState,
1998    callback: Callable[[grpc.ChannelConnectivity], None],
1999) -> None:
2000    with state.lock:
2001        for index, (subscribed_callback, unused_connectivity) in enumerate(
2002            state.callbacks_and_connectivities
2003        ):
2004            if callback == subscribed_callback:
2005                state.callbacks_and_connectivities.pop(index)
2006                break
2007
2008
2009def _augment_options(
2010    base_options: Sequence[ChannelArgumentType],
2011    compression: Optional[grpc.Compression],
2012) -> Sequence[ChannelArgumentType]:
2013    compression_option = _compression.create_channel_option(compression)
2014    return (
2015        tuple(base_options)
2016        + compression_option
2017        + (
2018            (
2019                cygrpc.ChannelArgKey.primary_user_agent_string,
2020                _USER_AGENT,
2021            ),
2022        )
2023    )
2024
2025
2026def _separate_channel_options(
2027    options: Sequence[ChannelArgumentType],
2028) -> Tuple[Sequence[ChannelArgumentType], Sequence[ChannelArgumentType]]:
2029    """Separates core channel options from Python channel options."""
2030    core_options = []
2031    python_options = []
2032    for pair in options:
2033        if (
2034            pair[0]
2035            == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2036        ):
2037            python_options.append(pair)
2038        else:
2039            core_options.append(pair)
2040    return python_options, core_options
2041
2042
2043class Channel(grpc.Channel):
2044    """A cygrpc.Channel-backed implementation of grpc.Channel."""
2045
2046    _single_threaded_unary_stream: bool
2047    _channel: cygrpc.Channel
2048    _call_state: _ChannelCallState
2049    _connectivity_state: _ChannelConnectivityState
2050    _target: str
2051    _registered_call_handles: Dict[str, int]
2052
2053    def __init__(
2054        self,
2055        target: str,
2056        options: Sequence[ChannelArgumentType],
2057        credentials: Optional[grpc.ChannelCredentials],
2058        compression: Optional[grpc.Compression],
2059    ):
2060        """Constructor.
2061
2062        Args:
2063          target: The target to which to connect.
2064          options: Configuration options for the channel.
2065          credentials: A cygrpc.ChannelCredentials or None.
2066          compression: An optional value indicating the compression method to be
2067            used over the lifetime of the channel.
2068        """
2069        python_options, core_options = _separate_channel_options(options)
2070        self._single_threaded_unary_stream = (
2071            _DEFAULT_SINGLE_THREADED_UNARY_STREAM
2072        )
2073        self._process_python_options(python_options)
2074        self._channel = cygrpc.Channel(
2075            _common.encode(target),
2076            _augment_options(core_options, compression),
2077            credentials,
2078        )
2079        self._target = target
2080        self._call_state = _ChannelCallState(self._channel)
2081        self._connectivity_state = _ChannelConnectivityState(self._channel)
2082        cygrpc.fork_register_channel(self)
2083        if cygrpc.g_gevent_activated:
2084            cygrpc.gevent_increment_channel_count()
2085
2086    def _get_registered_call_handle(self, method: str) -> int:
2087        """
2088        Get the registered call handle for a method.
2089
2090        This is a semi-private method. It is intended for use only by gRPC generated code.
2091
2092        This method is not thread-safe.
2093
2094        Args:
2095          method: Required, the method name for the RPC.
2096
2097        Returns:
2098          The registered call handle pointer in the form of a Python Long.
2099        """
2100        return self._channel.get_registered_call_handle(_common.encode(method))
2101
2102    def _process_python_options(
2103        self, python_options: Sequence[ChannelArgumentType]
2104    ) -> None:
2105        """Sets channel attributes according to python-only channel options."""
2106        for pair in python_options:
2107            if (
2108                pair[0]
2109                == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream
2110            ):
2111                self._single_threaded_unary_stream = True
2112
2113    def subscribe(
2114        self,
2115        callback: Callable[[grpc.ChannelConnectivity], None],
2116        try_to_connect: Optional[bool] = None,
2117    ) -> None:
2118        _subscribe(self._connectivity_state, callback, try_to_connect)
2119
2120    def unsubscribe(
2121        self, callback: Callable[[grpc.ChannelConnectivity], None]
2122    ) -> None:
2123        _unsubscribe(self._connectivity_state, callback)
2124
2125    # pylint: disable=arguments-differ
2126    def unary_unary(
2127        self,
2128        method: str,
2129        request_serializer: Optional[SerializingFunction] = None,
2130        response_deserializer: Optional[DeserializingFunction] = None,
2131        _registered_method: Optional[bool] = False,
2132    ) -> grpc.UnaryUnaryMultiCallable:
2133        _registered_call_handle = None
2134        if _registered_method:
2135            _registered_call_handle = self._get_registered_call_handle(method)
2136        return _UnaryUnaryMultiCallable(
2137            self._channel,
2138            _channel_managed_call_management(self._call_state),
2139            _common.encode(method),
2140            _common.encode(self._target),
2141            request_serializer,
2142            response_deserializer,
2143            _registered_call_handle,
2144        )
2145
2146    # pylint: disable=arguments-differ
2147    def unary_stream(
2148        self,
2149        method: str,
2150        request_serializer: Optional[SerializingFunction] = None,
2151        response_deserializer: Optional[DeserializingFunction] = None,
2152        _registered_method: Optional[bool] = False,
2153    ) -> grpc.UnaryStreamMultiCallable:
2154        _registered_call_handle = None
2155        if _registered_method:
2156            _registered_call_handle = self._get_registered_call_handle(method)
2157        # NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
2158        # on a single Python thread results in an appreciable speed-up. However,
2159        # due to slight differences in capability, the multi-threaded variant
2160        # remains the default.
2161        if self._single_threaded_unary_stream:
2162            return _SingleThreadedUnaryStreamMultiCallable(
2163                self._channel,
2164                _common.encode(method),
2165                _common.encode(self._target),
2166                request_serializer,
2167                response_deserializer,
2168                _registered_call_handle,
2169            )
2170        else:
2171            return _UnaryStreamMultiCallable(
2172                self._channel,
2173                _channel_managed_call_management(self._call_state),
2174                _common.encode(method),
2175                _common.encode(self._target),
2176                request_serializer,
2177                response_deserializer,
2178                _registered_call_handle,
2179            )
2180
2181    # pylint: disable=arguments-differ
2182    def stream_unary(
2183        self,
2184        method: str,
2185        request_serializer: Optional[SerializingFunction] = None,
2186        response_deserializer: Optional[DeserializingFunction] = None,
2187        _registered_method: Optional[bool] = False,
2188    ) -> grpc.StreamUnaryMultiCallable:
2189        _registered_call_handle = None
2190        if _registered_method:
2191            _registered_call_handle = self._get_registered_call_handle(method)
2192        return _StreamUnaryMultiCallable(
2193            self._channel,
2194            _channel_managed_call_management(self._call_state),
2195            _common.encode(method),
2196            _common.encode(self._target),
2197            request_serializer,
2198            response_deserializer,
2199            _registered_call_handle,
2200        )
2201
2202    # pylint: disable=arguments-differ
2203    def stream_stream(
2204        self,
2205        method: str,
2206        request_serializer: Optional[SerializingFunction] = None,
2207        response_deserializer: Optional[DeserializingFunction] = None,
2208        _registered_method: Optional[bool] = False,
2209    ) -> grpc.StreamStreamMultiCallable:
2210        _registered_call_handle = None
2211        if _registered_method:
2212            _registered_call_handle = self._get_registered_call_handle(method)
2213        return _StreamStreamMultiCallable(
2214            self._channel,
2215            _channel_managed_call_management(self._call_state),
2216            _common.encode(method),
2217            _common.encode(self._target),
2218            request_serializer,
2219            response_deserializer,
2220            _registered_call_handle,
2221        )
2222
2223    def _unsubscribe_all(self) -> None:
2224        state = self._connectivity_state
2225        if state:
2226            with state.lock:
2227                del state.callbacks_and_connectivities[:]
2228
2229    def _close(self) -> None:
2230        self._unsubscribe_all()
2231        self._channel.close(cygrpc.StatusCode.cancelled, "Channel closed!")
2232        cygrpc.fork_unregister_channel(self)
2233        if cygrpc.g_gevent_activated:
2234            cygrpc.gevent_decrement_channel_count()
2235
2236    def _close_on_fork(self) -> None:
2237        self._unsubscribe_all()
2238        self._channel.close_on_fork(
2239            cygrpc.StatusCode.cancelled, "Channel closed due to fork"
2240        )
2241
2242    def __enter__(self):
2243        return self
2244
2245    def __exit__(self, exc_type, exc_val, exc_tb):
2246        self._close()
2247        return False
2248
2249    def close(self) -> None:
2250        self._close()
2251
2252    def __del__(self):
2253        # TODO(https://github.com/grpc/grpc/issues/12531): Several releases
2254        # after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
2255        # here (or more likely, call self._close() here). We don't do this today
2256        # because many valid use cases today allow the channel to be deleted
2257        # immediately after stubs are created. After a sufficient period of time
2258        # has passed for all users to be trusted to freeze out to their channels
2259        # for as long as they are in use and to close them after using them,
2260        # then deletion of this grpc._channel.Channel instance can be made to
2261        # effect closure of the underlying cygrpc.Channel instance.
2262        try:
2263            self._unsubscribe_all()
2264        except:  # pylint: disable=bare-except
2265            # Exceptions in __del__ are ignored by Python anyway, but they can
2266            # keep spamming logs.  Just silence them.
2267            pass
2268