xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16
17_AWAIT_THREADS_TIMEOUT_SECONDS = 5
18
19_TRUE_VALUES = ['yes',  'Yes',  'YES', 'true', 'True', 'TRUE', '1']
20
21# This flag enables experimental support within gRPC Python for applications
22# that will fork() without exec(). When enabled, gRPC Python will attempt to
23# pause all of its internally created threads before the fork syscall proceeds.
24#
25# For this to be successful, the application must not have multiple threads of
26# its own calling into gRPC when fork is invoked. Any callbacks from gRPC
27# Python-spawned threads into user code (e.g., callbacks for asynchronous RPCs)
28# must  not block and should execute quickly.
29#
30# This flag is not supported on Windows.
31# This flag is also not supported for non-native IO manager.
32_GRPC_ENABLE_FORK_SUPPORT = (
33    os.environ.get('GRPC_ENABLE_FORK_SUPPORT', '0')
34        .lower() in _TRUE_VALUES)
35
36_fork_handler_failed = False
37
38cdef void __prefork() noexcept nogil:
39    with gil:
40        global _fork_handler_failed
41        _fork_handler_failed = False
42        with _fork_state.fork_in_progress_condition:
43            _fork_state.fork_in_progress = True
44        if not _fork_state.active_thread_count.await_zero_threads(
45                _AWAIT_THREADS_TIMEOUT_SECONDS):
46            _LOGGER.error(
47                'Failed to shutdown gRPC Python threads prior to fork. '
48                'Behavior after fork will be undefined.')
49            _fork_handler_failed = True
50
51
52cdef void __postfork_parent() noexcept nogil:
53    with gil:
54        with _fork_state.fork_in_progress_condition:
55            _fork_state.fork_in_progress = False
56            _fork_state.fork_in_progress_condition.notify_all()
57
58
59cdef void __postfork_child() noexcept nogil:
60    with gil:
61        try:
62            if _fork_handler_failed:
63                return
64            # Thread could be holding the fork_in_progress_condition inside of
65            # block_if_fork_in_progress() when fork occurs. Reset the lock here.
66            _fork_state.fork_in_progress_condition = threading.Condition()
67            # A thread in return_from_user_request_generator() may hold this lock
68            # when fork occurs.
69            _fork_state.active_thread_count = _ActiveThreadCount()
70            for state_to_reset in _fork_state.postfork_states_to_reset:
71                state_to_reset.reset_postfork_child()
72            _fork_state.postfork_states_to_reset = []
73            _fork_state.fork_epoch += 1
74            for channel in _fork_state.channels:
75                channel._close_on_fork()
76            with _fork_state.fork_in_progress_condition:
77                _fork_state.fork_in_progress = False
78        except:
79            _LOGGER.error('Exiting child due to raised exception')
80            _LOGGER.error(sys.exc_info()[0])
81            os._exit(os.EX_USAGE)
82
83    if grpc_is_initialized() > 0:
84        with gil:
85            _LOGGER.error('Failed to shutdown gRPC Core after fork()')
86            os._exit(os.EX_USAGE)
87
88
89def fork_handlers_and_grpc_init():
90    grpc_init()
91    if _GRPC_ENABLE_FORK_SUPPORT:
92        with _fork_state.fork_handler_registered_lock:
93            if not _fork_state.fork_handler_registered:
94                os.register_at_fork(before=__prefork,
95                                    after_in_parent=__postfork_parent,
96                                    after_in_child=__postfork_child)
97                _fork_state.fork_handler_registered = True
98
99
100
101
102class ForkManagedThread(object):
103    def __init__(self, target, args=()):
104        if _GRPC_ENABLE_FORK_SUPPORT:
105            def managed_target(*args):
106                try:
107                    target(*args)
108                finally:
109                    _fork_state.active_thread_count.decrement()
110            self._thread = threading.Thread(target=_run_with_context(managed_target), args=args)
111        else:
112            self._thread = threading.Thread(target=_run_with_context(target), args=args)
113
114    def setDaemon(self, daemonic):
115        self._thread.daemon = daemonic
116
117    def start(self):
118        if _GRPC_ENABLE_FORK_SUPPORT:
119            _fork_state.active_thread_count.increment()
120        self._thread.start()
121
122    def join(self):
123        self._thread.join()
124
125
126def block_if_fork_in_progress(postfork_state_to_reset=None):
127    if _GRPC_ENABLE_FORK_SUPPORT:
128        with _fork_state.fork_in_progress_condition:
129            if not _fork_state.fork_in_progress:
130                return
131            if postfork_state_to_reset is not None:
132                _fork_state.postfork_states_to_reset.append(postfork_state_to_reset)
133            _fork_state.active_thread_count.decrement()
134            _fork_state.fork_in_progress_condition.wait()
135            _fork_state.active_thread_count.increment()
136
137
138def enter_user_request_generator():
139    if _GRPC_ENABLE_FORK_SUPPORT:
140        _fork_state.active_thread_count.decrement()
141
142
143def return_from_user_request_generator():
144    if _GRPC_ENABLE_FORK_SUPPORT:
145        _fork_state.active_thread_count.increment()
146        block_if_fork_in_progress()
147
148
149def get_fork_epoch():
150    return _fork_state.fork_epoch
151
152
153def is_fork_support_enabled():
154    return _GRPC_ENABLE_FORK_SUPPORT
155
156
157def fork_register_channel(channel):
158    if _GRPC_ENABLE_FORK_SUPPORT:
159        _fork_state.channels.add(channel)
160
161
162def fork_unregister_channel(channel):
163    if _GRPC_ENABLE_FORK_SUPPORT:
164        _fork_state.channels.discard(channel)
165
166
167class _ActiveThreadCount(object):
168    def __init__(self):
169        self._num_active_threads = 0
170        self._condition = threading.Condition()
171
172    def increment(self):
173        with self._condition:
174            self._num_active_threads += 1
175
176    def decrement(self):
177        with self._condition:
178            self._num_active_threads -= 1
179            if self._num_active_threads == 0:
180                self._condition.notify_all()
181
182    def await_zero_threads(self, timeout_secs):
183        end_time = time.time() + timeout_secs
184        wait_time = timeout_secs
185        with self._condition:
186            while True:
187                if self._num_active_threads > 0:
188                    self._condition.wait(wait_time)
189                if self._num_active_threads == 0:
190                    return True
191                # Thread count may have increased before this re-obtains the
192                # lock after a notify(). Wait again until timeout_secs has
193                # elapsed.
194                wait_time = end_time - time.time()
195                if wait_time <= 0:
196                    return False
197
198
199class _ForkState(object):
200    def __init__(self):
201        self.fork_in_progress_condition = threading.Condition()
202        self.fork_in_progress = False
203        self.postfork_states_to_reset = []
204        self.fork_handler_registered_lock = threading.Lock()
205        self.fork_handler_registered = False
206        self.active_thread_count = _ActiveThreadCount()
207        self.fork_epoch = 0
208        self.channels = set()
209
210
211_fork_state = _ForkState()
212