1# Copyright 2019 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import warnings 16 17from cpython.version cimport PY_MAJOR_VERSION, PY_MINOR_VERSION 18 19TYPE_METADATA_STRING = "Tuple[Tuple[str, Union[str, bytes]]...]" 20 21 22cdef grpc_status_code get_status_code(object code) except *: 23 if isinstance(code, int): 24 if code >= StatusCode.ok and code <= StatusCode.data_loss: 25 return code 26 else: 27 return StatusCode.unknown 28 else: 29 try: 30 return code.value[0] 31 except (KeyError, AttributeError): 32 return StatusCode.unknown 33 34 35cdef object deserialize(object deserializer, bytes raw_message): 36 """Perform deserialization on raw bytes. 37 38 Failure to deserialize is a fatal error. 39 """ 40 if deserializer: 41 return deserializer(raw_message) 42 else: 43 return raw_message 44 45 46cdef bytes serialize(object serializer, object message): 47 """Perform serialization on a message. 48 49 Failure to serialize is a fatal error. 50 """ 51 if isinstance(message, str): 52 message = message.encode('utf-8') 53 if serializer: 54 return serializer(message) 55 else: 56 return message 57 58 59class _EOF: 60 61 def __bool__(self): 62 return False 63 64 def __len__(self): 65 return 0 66 67 def _repr(self) -> str: 68 return '<grpc.aio.EOF>' 69 70 def __repr__(self) -> str: 71 return self._repr() 72 73 def __str__(self) -> str: 74 return self._repr() 75 76 77EOF = _EOF() 78 79_COMPRESSION_METADATA_STRING_MAPPING = { 80 CompressionAlgorithm.none: 'identity', 81 CompressionAlgorithm.deflate: 'deflate', 82 CompressionAlgorithm.gzip: 'gzip', 83} 84 85class BaseError(Exception): 86 """The base class for exceptions generated by gRPC AsyncIO stack.""" 87 88 89class UsageError(BaseError): 90 """Raised when the usage of API by applications is inappropriate. 91 92 For example, trying to invoke RPC on a closed channel, mixing two styles 93 of streaming API on the client side. This exception should not be 94 suppressed. 95 """ 96 97 98class AbortError(BaseError): 99 """Raised when calling abort in servicer methods. 100 101 This exception should not be suppressed. Applications may catch it to 102 perform certain clean-up logic, and then re-raise it. 103 """ 104 105 106class InternalError(BaseError): 107 """Raised upon unexpected errors in native code.""" 108 109 110def schedule_coro_threadsafe(object coro, object loop): 111 try: 112 return loop.create_task(coro) 113 except RuntimeError as runtime_error: 114 if 'Non-thread-safe operation' in str(runtime_error): 115 return asyncio.run_coroutine_threadsafe( 116 coro, 117 loop, 118 ) 119 else: 120 raise 121 122 123def async_generator_to_generator(object agen, object loop): 124 """Converts an async generator into generator.""" 125 try: 126 while True: 127 future = asyncio.run_coroutine_threadsafe( 128 agen.__anext__(), 129 loop 130 ) 131 response = future.result() 132 if response is EOF: 133 break 134 else: 135 yield response 136 except StopAsyncIteration: 137 # If StopAsyncIteration is raised, end this generator. 138 pass 139 140 141async def generator_to_async_generator(object gen, object loop, object thread_pool): 142 """Converts a generator into async generator. 143 144 The generator might block, so we need to delegate the iteration to thread 145 pool. Also, we can't simply delegate __next__ to the thread pool, otherwise 146 we will see following error: 147 148 TypeError: StopIteration interacts badly with generators and cannot be 149 raised into a Future 150 """ 151 queue = asyncio.Queue(maxsize=1) 152 153 def yield_to_queue(): 154 try: 155 for item in gen: 156 asyncio.run_coroutine_threadsafe(queue.put(item), loop).result() 157 finally: 158 asyncio.run_coroutine_threadsafe(queue.put(EOF), loop).result() 159 160 future = loop.run_in_executor( 161 thread_pool, 162 yield_to_queue, 163 ) 164 165 while True: 166 response = await queue.get() 167 if response is EOF: 168 break 169 else: 170 yield response 171 172 # Port the exception if there is any 173 await future 174 175 176if PY_MAJOR_VERSION >= 3 and PY_MINOR_VERSION >= 7: 177 def get_working_loop(): 178 """Returns a running event loop. 179 180 Due to a defect of asyncio.get_event_loop, its returned event loop might 181 not be set as the default event loop for the main thread. 182 """ 183 try: 184 return asyncio.get_running_loop() 185 except RuntimeError: 186 with warnings.catch_warnings(): 187 # Convert DeprecationWarning to errors so we can capture them with except 188 warnings.simplefilter("error", DeprecationWarning) 189 try: 190 return asyncio.get_event_loop_policy().get_event_loop() 191 # Since version 3.12, DeprecationWarning is emitted if there is no 192 # current event loop. 193 except DeprecationWarning: 194 return asyncio.get_event_loop_policy().new_event_loop() 195else: 196 def get_working_loop(): 197 """Returns a running event loop.""" 198 return asyncio.get_event_loop() 199 200 201def raise_if_not_valid_trailing_metadata(object metadata): 202 if not hasattr(metadata, '__iter__') or isinstance(metadata, dict): 203 raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') 204 for item in metadata: 205 if not isinstance(item, tuple): 206 raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') 207 if len(item) != 2: 208 raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') 209 if not isinstance(item[0], str): 210 raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') 211 if not isinstance(item[1], str) and not isinstance(item[1], bytes): 212 raise TypeError(f'Invalid trailing metadata type, expected {TYPE_METADATA_STRING}: {metadata}') 213