xref: /aosp_15_r20/external/grpc-grpc/examples/python/multiprocessing/client.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2019 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""An example of multiprocessing concurrency with gRPC."""
15
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import argparse
21import atexit
22import logging
23import multiprocessing
24import operator
25import sys
26
27import grpc
28import prime_pb2
29import prime_pb2_grpc
30
31_PROCESS_COUNT = 8
32_MAXIMUM_CANDIDATE = 10000
33
34# Each worker process initializes a single channel after forking.
35# It's regrettable, but to ensure that each subprocess only has to instantiate
36# a single channel to be reused across all RPCs, we use globals.
37_worker_channel_singleton = None
38_worker_stub_singleton = None
39
40_LOGGER = logging.getLogger(__name__)
41
42
43def _shutdown_worker():
44    _LOGGER.info("Shutting worker process down.")
45    if _worker_channel_singleton is not None:
46        _worker_channel_singleton.close()
47
48
49def _initialize_worker(server_address):
50    global _worker_channel_singleton  # pylint: disable=global-statement
51    global _worker_stub_singleton  # pylint: disable=global-statement
52    _LOGGER.info("Initializing worker process.")
53    _worker_channel_singleton = grpc.insecure_channel(server_address)
54    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
55        _worker_channel_singleton
56    )
57    atexit.register(_shutdown_worker)
58
59
60def _run_worker_query(primality_candidate):
61    _LOGGER.info("Checking primality of %s.", primality_candidate)
62    return _worker_stub_singleton.check(
63        prime_pb2.PrimeCandidate(candidate=primality_candidate)
64    )
65
66
67def _calculate_primes(server_address):
68    worker_pool = multiprocessing.Pool(
69        processes=_PROCESS_COUNT,
70        initializer=_initialize_worker,
71        initargs=(server_address,),
72    )
73    check_range = range(2, _MAXIMUM_CANDIDATE)
74    primality = worker_pool.map(_run_worker_query, check_range)
75    primes = zip(check_range, map(operator.attrgetter("isPrime"), primality))
76    return tuple(primes)
77
78
79def main():
80    msg = "Determine the primality of the first {} integers.".format(
81        _MAXIMUM_CANDIDATE
82    )
83    parser = argparse.ArgumentParser(description=msg)
84    parser.add_argument(
85        "server_address",
86        help="The address of the server (e.g. localhost:50051)",
87    )
88    args = parser.parse_args()
89    primes = _calculate_primes(args.server_address)
90    print(primes)
91
92
93if __name__ == "__main__":
94    handler = logging.StreamHandler(sys.stdout)
95    formatter = logging.Formatter("[PID %(process)d] %(message)s")
96    handler.setFormatter(formatter)
97    _LOGGER.addHandler(handler)
98    _LOGGER.setLevel(logging.INFO)
99    main()
100