xref: /aosp_15_r20/external/grpc-grpc/examples/python/multiprocessing/server.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example of multiprocess concurrency with gRPC."""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20from concurrent import futures
21import contextlib
22import datetime
23import logging
24import math
25import multiprocessing
26import socket
27import sys
28import time
29
30import grpc
31import prime_pb2
32import prime_pb2_grpc
33
34_LOGGER = logging.getLogger(__name__)
35
36_ONE_DAY = datetime.timedelta(days=1)
37_PROCESS_COUNT = multiprocessing.cpu_count()
38_THREAD_CONCURRENCY = _PROCESS_COUNT
39
40
41def is_prime(n):
42    for i in range(2, int(math.ceil(math.sqrt(n)))):
43        if n % i == 0:
44            return False
45    else:
46        return True
47
48
49class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
50    def check(self, request, context):
51        _LOGGER.info("Determining primality of %s", request.candidate)
52        return prime_pb2.Primality(isPrime=is_prime(request.candidate))
53
54
55def _wait_forever(server):
56    try:
57        while True:
58            time.sleep(_ONE_DAY.total_seconds())
59    except KeyboardInterrupt:
60        server.stop(None)
61
62
63def _run_server(bind_address):
64    """Start a server in a subprocess."""
65    _LOGGER.info("Starting new server.")
66    options = (("grpc.so_reuseport", 1),)
67
68    server = grpc.server(
69        futures.ThreadPoolExecutor(
70            max_workers=_THREAD_CONCURRENCY,
71        ),
72        options=options,
73    )
74    prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
75    server.add_insecure_port(bind_address)
76    server.start()
77    _wait_forever(server)
78
79
80@contextlib.contextmanager
81def _reserve_port():
82    """Find and reserve a port for all subprocesses to use."""
83    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
84    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
85    if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
86        raise RuntimeError("Failed to set SO_REUSEPORT.")
87    sock.bind(("", 0))
88    try:
89        yield sock.getsockname()[1]
90    finally:
91        sock.close()
92
93
94def main():
95    with _reserve_port() as port:
96        bind_address = "localhost:{}".format(port)
97        _LOGGER.info("Binding to '%s'", bind_address)
98        sys.stdout.flush()
99        workers = []
100        for _ in range(_PROCESS_COUNT):
101            # NOTE: It is imperative that the worker subprocesses be forked before
102            # any gRPC servers start up. See
103            # https://github.com/grpc/grpc/issues/16001 for more details.
104            worker = multiprocessing.Process(
105                target=_run_server, args=(bind_address,)
106            )
107            worker.start()
108            workers.append(worker)
109        for worker in workers:
110            worker.join()
111
112
113if __name__ == "__main__":
114    handler = logging.StreamHandler(sys.stdout)
115    formatter = logging.Formatter("[PID %(process)d] %(message)s")
116    handler.setFormatter(formatter)
117    _LOGGER.addHandler(handler)
118    _LOGGER.setLevel(logging.INFO)
119    main()
120