xref: /aosp_15_r20/external/grpc-grpc/examples/python/async_streaming/client.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2020 The gRPC Authors
2*cc02d7e2SAndroid Build Coastguard Worker#
3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License");
4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License.
5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at
6*cc02d7e2SAndroid Build Coastguard Worker#
7*cc02d7e2SAndroid Build Coastguard Worker#     http://www.apache.org/licenses/LICENSE-2.0
8*cc02d7e2SAndroid Build Coastguard Worker#
9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software
10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS,
11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and
13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License.
14*cc02d7e2SAndroid Build Coastguard Worker
15*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent.futures import ThreadPoolExecutor
16*cc02d7e2SAndroid Build Coastguard Workerimport logging
17*cc02d7e2SAndroid Build Coastguard Workerimport threading
18*cc02d7e2SAndroid Build Coastguard Workerfrom typing import Iterator
19*cc02d7e2SAndroid Build Coastguard Worker
20*cc02d7e2SAndroid Build Coastguard Workerimport grpc
21*cc02d7e2SAndroid Build Coastguard Worker
22*cc02d7e2SAndroid Build Coastguard Workerimport phone_pb2
23*cc02d7e2SAndroid Build Coastguard Workerimport phone_pb2_grpc
24*cc02d7e2SAndroid Build Coastguard Worker
25*cc02d7e2SAndroid Build Coastguard Worker
26*cc02d7e2SAndroid Build Coastguard Workerclass CallMaker:
27*cc02d7e2SAndroid Build Coastguard Worker    def __init__(
28*cc02d7e2SAndroid Build Coastguard Worker        self,
29*cc02d7e2SAndroid Build Coastguard Worker        executor: ThreadPoolExecutor,
30*cc02d7e2SAndroid Build Coastguard Worker        channel: grpc.Channel,
31*cc02d7e2SAndroid Build Coastguard Worker        phone_number: str,
32*cc02d7e2SAndroid Build Coastguard Worker    ) -> None:
33*cc02d7e2SAndroid Build Coastguard Worker        self._executor = executor
34*cc02d7e2SAndroid Build Coastguard Worker        self._channel = channel
35*cc02d7e2SAndroid Build Coastguard Worker        self._stub = phone_pb2_grpc.PhoneStub(self._channel)
36*cc02d7e2SAndroid Build Coastguard Worker        self._phone_number = phone_number
37*cc02d7e2SAndroid Build Coastguard Worker        self._session_id = None
38*cc02d7e2SAndroid Build Coastguard Worker        self._audio_session_link = None
39*cc02d7e2SAndroid Build Coastguard Worker        self._call_state = None
40*cc02d7e2SAndroid Build Coastguard Worker        self._peer_responded = threading.Event()
41*cc02d7e2SAndroid Build Coastguard Worker        self._call_finished = threading.Event()
42*cc02d7e2SAndroid Build Coastguard Worker        self._consumer_future = None
43*cc02d7e2SAndroid Build Coastguard Worker
44*cc02d7e2SAndroid Build Coastguard Worker    def _response_watcher(
45*cc02d7e2SAndroid Build Coastguard Worker        self, response_iterator: Iterator[phone_pb2.StreamCallResponse]
46*cc02d7e2SAndroid Build Coastguard Worker    ) -> None:
47*cc02d7e2SAndroid Build Coastguard Worker        try:
48*cc02d7e2SAndroid Build Coastguard Worker            for response in response_iterator:
49*cc02d7e2SAndroid Build Coastguard Worker                # NOTE: All fields in Proto3 are optional. This is the recommended way
50*cc02d7e2SAndroid Build Coastguard Worker                # to check if a field is present or not, or to exam which one-of field is
51*cc02d7e2SAndroid Build Coastguard Worker                # fulfilled by this message.
52*cc02d7e2SAndroid Build Coastguard Worker                if response.HasField("call_info"):
53*cc02d7e2SAndroid Build Coastguard Worker                    self._on_call_info(response.call_info)
54*cc02d7e2SAndroid Build Coastguard Worker                elif response.HasField("call_state"):
55*cc02d7e2SAndroid Build Coastguard Worker                    self._on_call_state(response.call_state.state)
56*cc02d7e2SAndroid Build Coastguard Worker                else:
57*cc02d7e2SAndroid Build Coastguard Worker                    raise RuntimeError(
58*cc02d7e2SAndroid Build Coastguard Worker                        "Received StreamCallResponse without call_info and"
59*cc02d7e2SAndroid Build Coastguard Worker                        " call_state"
60*cc02d7e2SAndroid Build Coastguard Worker                    )
61*cc02d7e2SAndroid Build Coastguard Worker        except Exception as e:
62*cc02d7e2SAndroid Build Coastguard Worker            self._peer_responded.set()
63*cc02d7e2SAndroid Build Coastguard Worker            raise
64*cc02d7e2SAndroid Build Coastguard Worker
65*cc02d7e2SAndroid Build Coastguard Worker    def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None:
66*cc02d7e2SAndroid Build Coastguard Worker        self._session_id = call_info.session_id
67*cc02d7e2SAndroid Build Coastguard Worker        self._audio_session_link = call_info.media
68*cc02d7e2SAndroid Build Coastguard Worker
69*cc02d7e2SAndroid Build Coastguard Worker    def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None:
70*cc02d7e2SAndroid Build Coastguard Worker        logging.info(
71*cc02d7e2SAndroid Build Coastguard Worker            "Call toward [%s] enters [%s] state",
72*cc02d7e2SAndroid Build Coastguard Worker            self._phone_number,
73*cc02d7e2SAndroid Build Coastguard Worker            phone_pb2.CallState.State.Name(call_state),
74*cc02d7e2SAndroid Build Coastguard Worker        )
75*cc02d7e2SAndroid Build Coastguard Worker        self._call_state = call_state
76*cc02d7e2SAndroid Build Coastguard Worker        if call_state == phone_pb2.CallState.State.ACTIVE:
77*cc02d7e2SAndroid Build Coastguard Worker            self._peer_responded.set()
78*cc02d7e2SAndroid Build Coastguard Worker        if call_state == phone_pb2.CallState.State.ENDED:
79*cc02d7e2SAndroid Build Coastguard Worker            self._peer_responded.set()
80*cc02d7e2SAndroid Build Coastguard Worker            self._call_finished.set()
81*cc02d7e2SAndroid Build Coastguard Worker
82*cc02d7e2SAndroid Build Coastguard Worker    def call(self) -> None:
83*cc02d7e2SAndroid Build Coastguard Worker        request = phone_pb2.StreamCallRequest()
84*cc02d7e2SAndroid Build Coastguard Worker        request.phone_number = self._phone_number
85*cc02d7e2SAndroid Build Coastguard Worker        response_iterator = self._stub.StreamCall(iter((request,)))
86*cc02d7e2SAndroid Build Coastguard Worker        # Instead of consuming the response on current thread, spawn a consumption thread.
87*cc02d7e2SAndroid Build Coastguard Worker        self._consumer_future = self._executor.submit(
88*cc02d7e2SAndroid Build Coastguard Worker            self._response_watcher, response_iterator
89*cc02d7e2SAndroid Build Coastguard Worker        )
90*cc02d7e2SAndroid Build Coastguard Worker
91*cc02d7e2SAndroid Build Coastguard Worker    def wait_peer(self) -> bool:
92*cc02d7e2SAndroid Build Coastguard Worker        logging.info("Waiting for peer to connect [%s]...", self._phone_number)
93*cc02d7e2SAndroid Build Coastguard Worker        self._peer_responded.wait(timeout=None)
94*cc02d7e2SAndroid Build Coastguard Worker        if self._consumer_future.done():
95*cc02d7e2SAndroid Build Coastguard Worker            # If the future raises, forwards the exception here
96*cc02d7e2SAndroid Build Coastguard Worker            self._consumer_future.result()
97*cc02d7e2SAndroid Build Coastguard Worker        return self._call_state == phone_pb2.CallState.State.ACTIVE
98*cc02d7e2SAndroid Build Coastguard Worker
99*cc02d7e2SAndroid Build Coastguard Worker    def audio_session(self) -> None:
100*cc02d7e2SAndroid Build Coastguard Worker        assert self._audio_session_link is not None
101*cc02d7e2SAndroid Build Coastguard Worker        logging.info("Consuming audio resource [%s]", self._audio_session_link)
102*cc02d7e2SAndroid Build Coastguard Worker        self._call_finished.wait(timeout=None)
103*cc02d7e2SAndroid Build Coastguard Worker        logging.info("Audio session finished [%s]", self._audio_session_link)
104*cc02d7e2SAndroid Build Coastguard Worker
105*cc02d7e2SAndroid Build Coastguard Worker
106*cc02d7e2SAndroid Build Coastguard Workerdef process_call(
107*cc02d7e2SAndroid Build Coastguard Worker    executor: ThreadPoolExecutor, channel: grpc.Channel, phone_number: str
108*cc02d7e2SAndroid Build Coastguard Worker) -> None:
109*cc02d7e2SAndroid Build Coastguard Worker    call_maker = CallMaker(executor, channel, phone_number)
110*cc02d7e2SAndroid Build Coastguard Worker    call_maker.call()
111*cc02d7e2SAndroid Build Coastguard Worker    if call_maker.wait_peer():
112*cc02d7e2SAndroid Build Coastguard Worker        call_maker.audio_session()
113*cc02d7e2SAndroid Build Coastguard Worker        logging.info("Call finished!")
114*cc02d7e2SAndroid Build Coastguard Worker    else:
115*cc02d7e2SAndroid Build Coastguard Worker        logging.info("Call failed: peer didn't answer")
116*cc02d7e2SAndroid Build Coastguard Worker
117*cc02d7e2SAndroid Build Coastguard Worker
118*cc02d7e2SAndroid Build Coastguard Workerdef run():
119*cc02d7e2SAndroid Build Coastguard Worker    executor = ThreadPoolExecutor()
120*cc02d7e2SAndroid Build Coastguard Worker    with grpc.insecure_channel("localhost:50051") as channel:
121*cc02d7e2SAndroid Build Coastguard Worker        future = executor.submit(
122*cc02d7e2SAndroid Build Coastguard Worker            process_call, executor, channel, "555-0100-XXXX"
123*cc02d7e2SAndroid Build Coastguard Worker        )
124*cc02d7e2SAndroid Build Coastguard Worker        future.result()
125*cc02d7e2SAndroid Build Coastguard Worker
126*cc02d7e2SAndroid Build Coastguard Worker
127*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__":
128*cc02d7e2SAndroid Build Coastguard Worker    logging.basicConfig(level=logging.INFO)
129*cc02d7e2SAndroid Build Coastguard Worker    run()
130