xref: /aosp_15_r20/external/grpc-grpc/examples/python/multiprocessing/client.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2019 gRPC authors.
2*cc02d7e2SAndroid Build Coastguard Worker#
3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at
6*cc02d7e2SAndroid Build Coastguard Worker#
7*cc02d7e2SAndroid Build Coastguard Worker#     http://www.apache.org/licenses/LICENSE-2.0
8*cc02d7e2SAndroid Build Coastguard Worker#
9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License.
14*cc02d7e2SAndroid Build Coastguard Worker"""An example of multiprocessing concurrency with gRPC."""
15*cc02d7e2SAndroid Build Coastguard Worker
16*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import absolute_import
17*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import division
18*cc02d7e2SAndroid Build Coastguard Workerfrom __future__ import print_function
19*cc02d7e2SAndroid Build Coastguard Worker
20*cc02d7e2SAndroid Build Coastguard Workerimport argparse
21*cc02d7e2SAndroid Build Coastguard Workerimport atexit
22*cc02d7e2SAndroid Build Coastguard Workerimport logging
23*cc02d7e2SAndroid Build Coastguard Workerimport multiprocessing
24*cc02d7e2SAndroid Build Coastguard Workerimport operator
25*cc02d7e2SAndroid Build Coastguard Workerimport sys
26*cc02d7e2SAndroid Build Coastguard Worker
27*cc02d7e2SAndroid Build Coastguard Workerimport grpc
28*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2
29*cc02d7e2SAndroid Build Coastguard Workerimport prime_pb2_grpc
30*cc02d7e2SAndroid Build Coastguard Worker
31*cc02d7e2SAndroid Build Coastguard Worker_PROCESS_COUNT = 8
32*cc02d7e2SAndroid Build Coastguard Worker_MAXIMUM_CANDIDATE = 10000
33*cc02d7e2SAndroid Build Coastguard Worker
34*cc02d7e2SAndroid Build Coastguard Worker# Each worker process initializes a single channel after forking.
35*cc02d7e2SAndroid Build Coastguard Worker# It's regrettable, but to ensure that each subprocess only has to instantiate
36*cc02d7e2SAndroid Build Coastguard Worker# a single channel to be reused across all RPCs, we use globals.
37*cc02d7e2SAndroid Build Coastguard Worker_worker_channel_singleton = None
38*cc02d7e2SAndroid Build Coastguard Worker_worker_stub_singleton = None
39*cc02d7e2SAndroid Build Coastguard Worker
40*cc02d7e2SAndroid Build Coastguard Worker_LOGGER = logging.getLogger(__name__)
41*cc02d7e2SAndroid Build Coastguard Worker
42*cc02d7e2SAndroid Build Coastguard Worker
43*cc02d7e2SAndroid Build Coastguard Workerdef _shutdown_worker():
44*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.info("Shutting worker process down.")
45*cc02d7e2SAndroid Build Coastguard Worker    if _worker_channel_singleton is not None:
46*cc02d7e2SAndroid Build Coastguard Worker        _worker_channel_singleton.close()
47*cc02d7e2SAndroid Build Coastguard Worker
48*cc02d7e2SAndroid Build Coastguard Worker
49*cc02d7e2SAndroid Build Coastguard Workerdef _initialize_worker(server_address):
50*cc02d7e2SAndroid Build Coastguard Worker    global _worker_channel_singleton  # pylint: disable=global-statement
51*cc02d7e2SAndroid Build Coastguard Worker    global _worker_stub_singleton  # pylint: disable=global-statement
52*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.info("Initializing worker process.")
53*cc02d7e2SAndroid Build Coastguard Worker    _worker_channel_singleton = grpc.insecure_channel(server_address)
54*cc02d7e2SAndroid Build Coastguard Worker    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
55*cc02d7e2SAndroid Build Coastguard Worker        _worker_channel_singleton
56*cc02d7e2SAndroid Build Coastguard Worker    )
57*cc02d7e2SAndroid Build Coastguard Worker    atexit.register(_shutdown_worker)
58*cc02d7e2SAndroid Build Coastguard Worker
59*cc02d7e2SAndroid Build Coastguard Worker
60*cc02d7e2SAndroid Build Coastguard Workerdef _run_worker_query(primality_candidate):
61*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.info("Checking primality of %s.", primality_candidate)
62*cc02d7e2SAndroid Build Coastguard Worker    return _worker_stub_singleton.check(
63*cc02d7e2SAndroid Build Coastguard Worker        prime_pb2.PrimeCandidate(candidate=primality_candidate)
64*cc02d7e2SAndroid Build Coastguard Worker    )
65*cc02d7e2SAndroid Build Coastguard Worker
66*cc02d7e2SAndroid Build Coastguard Worker
67*cc02d7e2SAndroid Build Coastguard Workerdef _calculate_primes(server_address):
68*cc02d7e2SAndroid Build Coastguard Worker    worker_pool = multiprocessing.Pool(
69*cc02d7e2SAndroid Build Coastguard Worker        processes=_PROCESS_COUNT,
70*cc02d7e2SAndroid Build Coastguard Worker        initializer=_initialize_worker,
71*cc02d7e2SAndroid Build Coastguard Worker        initargs=(server_address,),
72*cc02d7e2SAndroid Build Coastguard Worker    )
73*cc02d7e2SAndroid Build Coastguard Worker    check_range = range(2, _MAXIMUM_CANDIDATE)
74*cc02d7e2SAndroid Build Coastguard Worker    primality = worker_pool.map(_run_worker_query, check_range)
75*cc02d7e2SAndroid Build Coastguard Worker    primes = zip(check_range, map(operator.attrgetter("isPrime"), primality))
76*cc02d7e2SAndroid Build Coastguard Worker    return tuple(primes)
77*cc02d7e2SAndroid Build Coastguard Worker
78*cc02d7e2SAndroid Build Coastguard Worker
79*cc02d7e2SAndroid Build Coastguard Workerdef main():
80*cc02d7e2SAndroid Build Coastguard Worker    msg = "Determine the primality of the first {} integers.".format(
81*cc02d7e2SAndroid Build Coastguard Worker        _MAXIMUM_CANDIDATE
82*cc02d7e2SAndroid Build Coastguard Worker    )
83*cc02d7e2SAndroid Build Coastguard Worker    parser = argparse.ArgumentParser(description=msg)
84*cc02d7e2SAndroid Build Coastguard Worker    parser.add_argument(
85*cc02d7e2SAndroid Build Coastguard Worker        "server_address",
86*cc02d7e2SAndroid Build Coastguard Worker        help="The address of the server (e.g. localhost:50051)",
87*cc02d7e2SAndroid Build Coastguard Worker    )
88*cc02d7e2SAndroid Build Coastguard Worker    args = parser.parse_args()
89*cc02d7e2SAndroid Build Coastguard Worker    primes = _calculate_primes(args.server_address)
90*cc02d7e2SAndroid Build Coastguard Worker    print(primes)
91*cc02d7e2SAndroid Build Coastguard Worker
92*cc02d7e2SAndroid Build Coastguard Worker
93*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__":
94*cc02d7e2SAndroid Build Coastguard Worker    handler = logging.StreamHandler(sys.stdout)
95*cc02d7e2SAndroid Build Coastguard Worker    formatter = logging.Formatter("[PID %(process)d] %(message)s")
96*cc02d7e2SAndroid Build Coastguard Worker    handler.setFormatter(formatter)
97*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.addHandler(handler)
98*cc02d7e2SAndroid Build Coastguard Worker    _LOGGER.setLevel(logging.INFO)
99*cc02d7e2SAndroid Build Coastguard Worker    main()
100