xref: /aosp_15_r20/external/grpc-grpc/examples/python/async_streaming/client.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from concurrent.futures import ThreadPoolExecutor
16import logging
17import threading
18from typing import Iterator
19
20import grpc
21
22import phone_pb2
23import phone_pb2_grpc
24
25
26class CallMaker:
27    def __init__(
28        self,
29        executor: ThreadPoolExecutor,
30        channel: grpc.Channel,
31        phone_number: str,
32    ) -> None:
33        self._executor = executor
34        self._channel = channel
35        self._stub = phone_pb2_grpc.PhoneStub(self._channel)
36        self._phone_number = phone_number
37        self._session_id = None
38        self._audio_session_link = None
39        self._call_state = None
40        self._peer_responded = threading.Event()
41        self._call_finished = threading.Event()
42        self._consumer_future = None
43
44    def _response_watcher(
45        self, response_iterator: Iterator[phone_pb2.StreamCallResponse]
46    ) -> None:
47        try:
48            for response in response_iterator:
49                # NOTE: All fields in Proto3 are optional. This is the recommended way
50                # to check if a field is present or not, or to exam which one-of field is
51                # fulfilled by this message.
52                if response.HasField("call_info"):
53                    self._on_call_info(response.call_info)
54                elif response.HasField("call_state"):
55                    self._on_call_state(response.call_state.state)
56                else:
57                    raise RuntimeError(
58                        "Received StreamCallResponse without call_info and"
59                        " call_state"
60                    )
61        except Exception as e:
62            self._peer_responded.set()
63            raise
64
65    def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None:
66        self._session_id = call_info.session_id
67        self._audio_session_link = call_info.media
68
69    def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None:
70        logging.info(
71            "Call toward [%s] enters [%s] state",
72            self._phone_number,
73            phone_pb2.CallState.State.Name(call_state),
74        )
75        self._call_state = call_state
76        if call_state == phone_pb2.CallState.State.ACTIVE:
77            self._peer_responded.set()
78        if call_state == phone_pb2.CallState.State.ENDED:
79            self._peer_responded.set()
80            self._call_finished.set()
81
82    def call(self) -> None:
83        request = phone_pb2.StreamCallRequest()
84        request.phone_number = self._phone_number
85        response_iterator = self._stub.StreamCall(iter((request,)))
86        # Instead of consuming the response on current thread, spawn a consumption thread.
87        self._consumer_future = self._executor.submit(
88            self._response_watcher, response_iterator
89        )
90
91    def wait_peer(self) -> bool:
92        logging.info("Waiting for peer to connect [%s]...", self._phone_number)
93        self._peer_responded.wait(timeout=None)
94        if self._consumer_future.done():
95            # If the future raises, forwards the exception here
96            self._consumer_future.result()
97        return self._call_state == phone_pb2.CallState.State.ACTIVE
98
99    def audio_session(self) -> None:
100        assert self._audio_session_link is not None
101        logging.info("Consuming audio resource [%s]", self._audio_session_link)
102        self._call_finished.wait(timeout=None)
103        logging.info("Audio session finished [%s]", self._audio_session_link)
104
105
106def process_call(
107    executor: ThreadPoolExecutor, channel: grpc.Channel, phone_number: str
108) -> None:
109    call_maker = CallMaker(executor, channel, phone_number)
110    call_maker.call()
111    if call_maker.wait_peer():
112        call_maker.audio_session()
113        logging.info("Call finished!")
114    else:
115        logging.info("Call failed: peer didn't answer")
116
117
118def run():
119    executor = ThreadPoolExecutor()
120    with grpc.insecure_channel("localhost:50051") as channel:
121        future = executor.submit(
122            process_call, executor, channel, "555-0100-XXXX"
123        )
124        future.result()
125
126
127if __name__ == "__main__":
128    logging.basicConfig(level=logging.INFO)
129    run()
130