1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16 17_AWAIT_THREADS_TIMEOUT_SECONDS = 5 18 19_TRUE_VALUES = ['yes', 'Yes', 'YES', 'true', 'True', 'TRUE', '1'] 20 21# This flag enables experimental support within gRPC Python for applications 22# that will fork() without exec(). When enabled, gRPC Python will attempt to 23# pause all of its internally created threads before the fork syscall proceeds. 24# 25# For this to be successful, the application must not have multiple threads of 26# its own calling into gRPC when fork is invoked. Any callbacks from gRPC 27# Python-spawned threads into user code (e.g., callbacks for asynchronous RPCs) 28# must not block and should execute quickly. 29# 30# This flag is not supported on Windows. 31# This flag is also not supported for non-native IO manager. 32_GRPC_ENABLE_FORK_SUPPORT = ( 33 os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0') 34 .lower() in _TRUE_VALUES) 35 36_fork_handler_failed = False 37 38cdef void __prefork() noexcept nogil: 39 with gil: 40 global _fork_handler_failed 41 _fork_handler_failed = False 42 with _fork_state.fork_in_progress_condition: 43 _fork_state.fork_in_progress = True 44 if not _fork_state.active_thread_count.await_zero_threads( 45 _AWAIT_THREADS_TIMEOUT_SECONDS): 46 _LOGGER.error( 47 'Failed to shutdown gRPC Python threads prior to fork. ' 48 'Behavior after fork will be undefined.') 49 _fork_handler_failed = True 50 51 52cdef void __postfork_parent() noexcept nogil: 53 with gil: 54 with _fork_state.fork_in_progress_condition: 55 _fork_state.fork_in_progress = False 56 _fork_state.fork_in_progress_condition.notify_all() 57 58 59cdef void __postfork_child() noexcept nogil: 60 with gil: 61 try: 62 if _fork_handler_failed: 63 return 64 # Thread could be holding the fork_in_progress_condition inside of 65 # block_if_fork_in_progress() when fork occurs. Reset the lock here. 66 _fork_state.fork_in_progress_condition = threading.Condition() 67 # A thread in return_from_user_request_generator() may hold this lock 68 # when fork occurs. 69 _fork_state.active_thread_count = _ActiveThreadCount() 70 for state_to_reset in _fork_state.postfork_states_to_reset: 71 state_to_reset.reset_postfork_child() 72 _fork_state.postfork_states_to_reset = [] 73 _fork_state.fork_epoch += 1 74 for channel in _fork_state.channels: 75 channel._close_on_fork() 76 with _fork_state.fork_in_progress_condition: 77 _fork_state.fork_in_progress = False 78 except: 79 _LOGGER.error('Exiting child due to raised exception') 80 _LOGGER.error(sys.exc_info()[0]) 81 os._exit(os.EX_USAGE) 82 83 if grpc_is_initialized() > 0: 84 with gil: 85 _LOGGER.error('Failed to shutdown gRPC Core after fork()') 86 os._exit(os.EX_USAGE) 87 88 89def fork_handlers_and_grpc_init(): 90 grpc_init() 91 if _GRPC_ENABLE_FORK_SUPPORT: 92 with _fork_state.fork_handler_registered_lock: 93 if not _fork_state.fork_handler_registered: 94 os.register_at_fork(before=__prefork, 95 after_in_parent=__postfork_parent, 96 after_in_child=__postfork_child) 97 _fork_state.fork_handler_registered = True 98 99 100 101 102class ForkManagedThread(object): 103 def __init__(self, target, args=()): 104 if _GRPC_ENABLE_FORK_SUPPORT: 105 def managed_target(*args): 106 try: 107 target(*args) 108 finally: 109 _fork_state.active_thread_count.decrement() 110 self._thread = threading.Thread(target=_run_with_context(managed_target), args=args) 111 else: 112 self._thread = threading.Thread(target=_run_with_context(target), args=args) 113 114 def setDaemon(self, daemonic): 115 self._thread.daemon = daemonic 116 117 def start(self): 118 if _GRPC_ENABLE_FORK_SUPPORT: 119 _fork_state.active_thread_count.increment() 120 self._thread.start() 121 122 def join(self): 123 self._thread.join() 124 125 126def block_if_fork_in_progress(postfork_state_to_reset=None): 127 if _GRPC_ENABLE_FORK_SUPPORT: 128 with _fork_state.fork_in_progress_condition: 129 if not _fork_state.fork_in_progress: 130 return 131 if postfork_state_to_reset is not None: 132 _fork_state.postfork_states_to_reset.append(postfork_state_to_reset) 133 _fork_state.active_thread_count.decrement() 134 _fork_state.fork_in_progress_condition.wait() 135 _fork_state.active_thread_count.increment() 136 137 138def enter_user_request_generator(): 139 if _GRPC_ENABLE_FORK_SUPPORT: 140 _fork_state.active_thread_count.decrement() 141 142 143def return_from_user_request_generator(): 144 if _GRPC_ENABLE_FORK_SUPPORT: 145 _fork_state.active_thread_count.increment() 146 block_if_fork_in_progress() 147 148 149def get_fork_epoch(): 150 return _fork_state.fork_epoch 151 152 153def is_fork_support_enabled(): 154 return _GRPC_ENABLE_FORK_SUPPORT 155 156 157def fork_register_channel(channel): 158 if _GRPC_ENABLE_FORK_SUPPORT: 159 _fork_state.channels.add(channel) 160 161 162def fork_unregister_channel(channel): 163 if _GRPC_ENABLE_FORK_SUPPORT: 164 _fork_state.channels.discard(channel) 165 166 167class _ActiveThreadCount(object): 168 def __init__(self): 169 self._num_active_threads = 0 170 self._condition = threading.Condition() 171 172 def increment(self): 173 with self._condition: 174 self._num_active_threads += 1 175 176 def decrement(self): 177 with self._condition: 178 self._num_active_threads -= 1 179 if self._num_active_threads == 0: 180 self._condition.notify_all() 181 182 def await_zero_threads(self, timeout_secs): 183 end_time = time.time() + timeout_secs 184 wait_time = timeout_secs 185 with self._condition: 186 while True: 187 if self._num_active_threads > 0: 188 self._condition.wait(wait_time) 189 if self._num_active_threads == 0: 190 return True 191 # Thread count may have increased before this re-obtains the 192 # lock after a notify(). Wait again until timeout_secs has 193 # elapsed. 194 wait_time = end_time - time.time() 195 if wait_time <= 0: 196 return False 197 198 199class _ForkState(object): 200 def __init__(self): 201 self.fork_in_progress_condition = threading.Condition() 202 self.fork_in_progress = False 203 self.postfork_states_to_reset = [] 204 self.fork_handler_registered_lock = threading.Lock() 205 self.fork_handler_registered = False 206 self.active_thread_count = _ActiveThreadCount() 207 self.fork_epoch = 0 208 self.channels = set() 209 210 211_fork_state = _ForkState() 212