1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 gRPC authors. 2*cc02d7e2SAndroid Build Coastguard Worker# 3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at 6*cc02d7e2SAndroid Build Coastguard Worker# 7*cc02d7e2SAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0 8*cc02d7e2SAndroid Build Coastguard Worker# 9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License. 14*cc02d7e2SAndroid Build Coastguard Worker"""An example of multiprocessing concurrency with gRPC.""" 15*cc02d7e2SAndroid Build Coastguard Worker 16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import 17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division 18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function 19*cc02d7e2SAndroid Build Coastguard Worker 20*cc02d7e2SAndroid Build Coastguard Workerimport argparse 21*cc02d7e2SAndroid Build Coastguard Workerimport atexit 22*cc02d7e2SAndroid Build Coastguard Workerimport logging 23*cc02d7e2SAndroid Build Coastguard Workerimport multiprocessing 24*cc02d7e2SAndroid Build Coastguard Workerimport operator 25*cc02d7e2SAndroid Build Coastguard Workerimport sys 26*cc02d7e2SAndroid Build Coastguard Worker 27*cc02d7e2SAndroid Build Coastguard Workerimport grpc 28*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2 29*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2_grpc 30*cc02d7e2SAndroid Build Coastguard Worker 31*cc02d7e2SAndroid Build Coastguard Worker_PROCESS_COUNT = 8 32*cc02d7e2SAndroid Build Coastguard Worker_MAXIMUM_CANDIDATE = 10000 33*cc02d7e2SAndroid Build Coastguard Worker 34*cc02d7e2SAndroid Build Coastguard Worker# Each worker process initializes a single channel after forking. 35*cc02d7e2SAndroid Build Coastguard Worker# It's regrettable, but to ensure that each subprocess only has to instantiate 36*cc02d7e2SAndroid Build Coastguard Worker# a single channel to be reused across all RPCs, we use globals. 37*cc02d7e2SAndroid Build Coastguard Worker_worker_channel_singleton = None 38*cc02d7e2SAndroid Build Coastguard Worker_worker_stub_singleton = None 39*cc02d7e2SAndroid Build Coastguard Worker 40*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__) 41*cc02d7e2SAndroid Build Coastguard Worker 42*cc02d7e2SAndroid Build Coastguard Worker 43*cc02d7e2SAndroid Build Coastguard Workerdef _shutdown_worker(): 44*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Shutting worker process down.") 45*cc02d7e2SAndroid Build Coastguard Worker if _worker_channel_singleton is not None: 46*cc02d7e2SAndroid Build Coastguard Worker _worker_channel_singleton.close() 47*cc02d7e2SAndroid Build Coastguard Worker 48*cc02d7e2SAndroid Build Coastguard Worker 49*cc02d7e2SAndroid Build Coastguard Workerdef _initialize_worker(server_address): 50*cc02d7e2SAndroid Build Coastguard Worker global _worker_channel_singleton # pylint: disable=global-statement 51*cc02d7e2SAndroid Build Coastguard Worker global _worker_stub_singleton # pylint: disable=global-statement 52*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Initializing worker process.") 53*cc02d7e2SAndroid Build Coastguard Worker _worker_channel_singleton = grpc.insecure_channel(server_address) 54*cc02d7e2SAndroid Build Coastguard Worker _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub( 55*cc02d7e2SAndroid Build Coastguard Worker _worker_channel_singleton 56*cc02d7e2SAndroid Build Coastguard Worker ) 57*cc02d7e2SAndroid Build Coastguard Worker atexit.register(_shutdown_worker) 58*cc02d7e2SAndroid Build Coastguard Worker 59*cc02d7e2SAndroid Build Coastguard Worker 60*cc02d7e2SAndroid Build Coastguard Workerdef _run_worker_query(primality_candidate): 61*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.info("Checking primality of %s.", primality_candidate) 62*cc02d7e2SAndroid Build Coastguard Worker return _worker_stub_singleton.check( 63*cc02d7e2SAndroid Build Coastguard Worker prime_pb2.PrimeCandidate(candidate=primality_candidate) 64*cc02d7e2SAndroid Build Coastguard Worker ) 65*cc02d7e2SAndroid Build Coastguard Worker 66*cc02d7e2SAndroid Build Coastguard Worker 67*cc02d7e2SAndroid Build Coastguard Workerdef _calculate_primes(server_address): 68*cc02d7e2SAndroid Build Coastguard Worker worker_pool = multiprocessing.Pool( 69*cc02d7e2SAndroid Build Coastguard Worker processes=_PROCESS_COUNT, 70*cc02d7e2SAndroid Build Coastguard Worker initializer=_initialize_worker, 71*cc02d7e2SAndroid Build Coastguard Worker initargs=(server_address,), 72*cc02d7e2SAndroid Build Coastguard Worker ) 73*cc02d7e2SAndroid Build Coastguard Worker check_range = range(2, _MAXIMUM_CANDIDATE) 74*cc02d7e2SAndroid Build Coastguard Worker primality = worker_pool.map(_run_worker_query, check_range) 75*cc02d7e2SAndroid Build Coastguard Worker primes = zip(check_range, map(operator.attrgetter("isPrime"), primality)) 76*cc02d7e2SAndroid Build Coastguard Worker return tuple(primes) 77*cc02d7e2SAndroid Build Coastguard Worker 78*cc02d7e2SAndroid Build Coastguard Worker 79*cc02d7e2SAndroid Build Coastguard Workerdef main(): 80*cc02d7e2SAndroid Build Coastguard Worker msg = "Determine the primality of the first {} integers.".format( 81*cc02d7e2SAndroid Build Coastguard Worker _MAXIMUM_CANDIDATE 82*cc02d7e2SAndroid Build Coastguard Worker ) 83*cc02d7e2SAndroid Build Coastguard Worker parser = argparse.ArgumentParser(description=msg) 84*cc02d7e2SAndroid Build Coastguard Worker parser.add_argument( 85*cc02d7e2SAndroid Build Coastguard Worker "server_address", 86*cc02d7e2SAndroid Build Coastguard Worker help="The address of the server (e.g. localhost:50051)", 87*cc02d7e2SAndroid Build Coastguard Worker ) 88*cc02d7e2SAndroid Build Coastguard Worker args = parser.parse_args() 89*cc02d7e2SAndroid Build Coastguard Worker primes = _calculate_primes(args.server_address) 90*cc02d7e2SAndroid Build Coastguard Worker print(primes) 91*cc02d7e2SAndroid Build Coastguard Worker 92*cc02d7e2SAndroid Build Coastguard Worker 93*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__": 94*cc02d7e2SAndroid Build Coastguard Worker handler = logging.StreamHandler(sys.stdout) 95*cc02d7e2SAndroid Build Coastguard Worker formatter = logging.Formatter("[PID %(process)d] %(message)s") 96*cc02d7e2SAndroid Build Coastguard Worker handler.setFormatter(formatter) 97*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.addHandler(handler) 98*cc02d7e2SAndroid Build Coastguard Worker _LOGGER.setLevel(logging.INFO) 99*cc02d7e2SAndroid Build Coastguard Worker main() 100