1# Copyright 2019 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example of multiprocess concurrency with gRPC.""" 15 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20from concurrent import futures 21import contextlib 22import datetime 23import logging 24import math 25import multiprocessing 26import socket 27import sys 28import time 29 30import grpc 31import prime_pb2 32import prime_pb2_grpc 33 34_LOGGER = logging.getLogger(__name__) 35 36_ONE_DAY = datetime.timedelta(days=1) 37_PROCESS_COUNT = multiprocessing.cpu_count() 38_THREAD_CONCURRENCY = _PROCESS_COUNT 39 40 41def is_prime(n): 42 for i in range(2, int(math.ceil(math.sqrt(n)))): 43 if n % i == 0: 44 return False 45 else: 46 return True 47 48 49class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): 50 def check(self, request, context): 51 _LOGGER.info("Determining primality of %s", request.candidate) 52 return prime_pb2.Primality(isPrime=is_prime(request.candidate)) 53 54 55def _wait_forever(server): 56 try: 57 while True: 58 time.sleep(_ONE_DAY.total_seconds()) 59 except KeyboardInterrupt: 60 server.stop(None) 61 62 63def _run_server(bind_address): 64 """Start a server in a subprocess.""" 65 _LOGGER.info("Starting new server.") 66 options = (("grpc.so_reuseport", 1),) 67 68 server = grpc.server( 69 futures.ThreadPoolExecutor( 70 max_workers=_THREAD_CONCURRENCY, 71 ), 72 options=options, 73 ) 74 prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) 75 server.add_insecure_port(bind_address) 76 server.start() 77 _wait_forever(server) 78 79 80@contextlib.contextmanager 81def _reserve_port(): 82 """Find and reserve a port for all subprocesses to use.""" 83 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 84 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 85 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: 86 raise RuntimeError("Failed to set SO_REUSEPORT.") 87 sock.bind(("", 0)) 88 try: 89 yield sock.getsockname()[1] 90 finally: 91 sock.close() 92 93 94def main(): 95 with _reserve_port() as port: 96 bind_address = "localhost:{}".format(port) 97 _LOGGER.info("Binding to '%s'", bind_address) 98 sys.stdout.flush() 99 workers = [] 100 for _ in range(_PROCESS_COUNT): 101 # NOTE: It is imperative that the worker subprocesses be forked before 102 # any gRPC servers start up. See 103 # https://github.com/grpc/grpc/issues/16001 for more details. 104 worker = multiprocessing.Process( 105 target=_run_server, args=(bind_address,) 106 ) 107 worker.start() 108 workers.append(worker) 109 for worker in workers: 110 worker.join() 111 112 113if __name__ == "__main__": 114 handler = logging.StreamHandler(sys.stdout) 115 formatter = logging.Formatter("[PID %(process)d] %(message)s") 116 handler.setFormatter(formatter) 117 _LOGGER.addHandler(handler) 118 _LOGGER.setLevel(logging.INFO) 119 main() 120