xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/aio/common.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import warnings
16
17from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION
18
19TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]"
20
21
22cdef grpc_status_code get_status_code(object code) except *:
23    if isinstance(code, int):
24        if code >= StatusCode.ok and code <= StatusCode.data_loss:
25            return code
26        else:
27            return StatusCode.unknown
28    else:
29        try:
30            return code.value[0]
31        except (KeyError, AttributeError):
32            return StatusCode.unknown
33
34
35cdef object deserialize(object deserializer, bytes raw_message):
36    """Perform deserialization on raw bytes.
37
38    Failure to deserialize is a fatal error.
39    """
40    if deserializer:
41        return deserializer(raw_message)
42    else:
43        return raw_message
44
45
46cdef bytes serialize(object serializer, object message):
47    """Perform serialization on a message.
48
49    Failure to serialize is a fatal error.
50    """
51    if isinstance(message, str):
52        message = message.encode('utf-8')
53    if serializer:
54        return serializer(message)
55    else:
56        return message
57
58
59class _EOF:
60
61    def __bool__(self):
62        return False
63
64    def __len__(self):
65        return 0
66
67    def _repr(self) -> str:
68        return '<grpc.aio.EOF>'
69
70    def __repr__(self) -> str:
71        return self._repr()
72
73    def __str__(self) -> str:
74        return self._repr()
75
76
77EOF = _EOF()
78
79_COMPRESSION_METADATA_STRING_MAPPING = {
80    CompressionAlgorithm.none: 'identity',
81    CompressionAlgorithm.deflate: 'deflate',
82    CompressionAlgorithm.gzip: 'gzip',
83}
84
85class BaseError(Exception):
86    """The base class for exceptions generated by gRPC AsyncIO stack."""
87
88
89class UsageError(BaseError):
90    """Raised when the usage of API by applications is inappropriate.
91
92    For example, trying to invoke RPC on a closed channel, mixing two styles
93    of streaming API on the client side. This exception should not be
94    suppressed.
95    """
96
97
98class AbortError(BaseError):
99    """Raised when calling abort in servicer methods.
100
101    This exception should not be suppressed. Applications may catch it to
102    perform certain clean-up logic, and then re-raise it.
103    """
104
105
106class InternalError(BaseError):
107    """Raised upon unexpected errors in native code."""
108
109
110def schedule_coro_threadsafe(object coro, object loop):
111    try:
112        return loop.create_task(coro)
113    except RuntimeError as runtime_error:
114        if 'Non-thread-safe operation' in str(runtime_error):
115            return asyncio.run_coroutine_threadsafe(
116                coro,
117                loop,
118            )
119        else:
120            raise
121
122
123def async_generator_to_generator(object agen, object loop):
124    """Converts an async generator into generator."""
125    try:
126        while True:
127            future = asyncio.run_coroutine_threadsafe(
128                agen.__anext__(),
129                loop
130            )
131            response = future.result()
132            if response is EOF:
133                break
134            else:
135                yield response
136    except StopAsyncIteration:
137        # If StopAsyncIteration is raised, end this generator.
138        pass
139
140
141async def generator_to_async_generator(object gen, object loop, object thread_pool):
142    """Converts a generator into async generator.
143
144    The generator might block, so we need to delegate the iteration to thread
145    pool. Also, we can't simply delegate __next__ to the thread pool, otherwise
146    we will see following error:
147
148        TypeError: StopIteration interacts badly with generators and cannot be
149            raised into a Future
150    """
151    queue = asyncio.Queue(maxsize=1)
152
153    def yield_to_queue():
154        try:
155            for item in gen:
156                asyncio.run_coroutine_threadsafe(queue.put(item), loop).result()
157        finally:
158            asyncio.run_coroutine_threadsafe(queue.put(EOF), loop).result()
159
160    future = loop.run_in_executor(
161        thread_pool,
162        yield_to_queue,
163    )
164
165    while True:
166        response = await queue.get()
167        if response is EOF:
168            break
169        else:
170            yield response
171
172    # Port the exception if there is any
173    await future
174
175
176if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7:
177    def get_working_loop():
178        """Returns a running event loop.
179
180        Due to a defect of asyncio.get_event_loop, its returned event loop might
181        not be set as the default event loop for the main thread.
182        """
183        try:
184            return asyncio.get_running_loop()
185        except RuntimeError:
186            with warnings.catch_warnings():
187                # Convert DeprecationWarning to errors so we can capture them with except
188                warnings.simplefilter("error", DeprecationWarning)
189                try:
190                    return asyncio.get_event_loop_policy().get_event_loop()
191                # Since version 3.12, DeprecationWarning is emitted if there is no
192                # current event loop.
193                except DeprecationWarning:
194                    return asyncio.get_event_loop_policy().new_event_loop()
195else:
196    def get_working_loop():
197        """Returns a running event loop."""
198        return asyncio.get_event_loop()
199
200
201def raise_if_not_valid_trailing_metadata(object metadata):
202    if not hasattr(metadata, '__iter__') or isinstance(metadata, dict):
203        raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
204    for item in metadata:
205        if not isinstance(item, tuple):
206            raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
207        if len(item) != 2:
208            raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
209        if not isinstance(item[0], str):
210            raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
211        if not isinstance(item[1], str) and not isinstance(item[1], bytes):
212            raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}')
213