1*cc02d7e2SAndroid Build Coastguard Worker# Copyright 2020 The gRPC Authors 2*cc02d7e2SAndroid Build Coastguard Worker# 3*cc02d7e2SAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License"); 4*cc02d7e2SAndroid Build Coastguard Worker# you may not use this file except in compliance with the License. 5*cc02d7e2SAndroid Build Coastguard Worker# You may obtain a copy of the License at 6*cc02d7e2SAndroid Build Coastguard Worker# 7*cc02d7e2SAndroid Build Coastguard Worker# http://www.apache.org/licenses/LICENSE-2.0 8*cc02d7e2SAndroid Build Coastguard Worker# 9*cc02d7e2SAndroid Build Coastguard Worker# Unless required by applicable law or agreed to in writing, software 10*cc02d7e2SAndroid Build Coastguard Worker# distributed under the License is distributed on an "AS IS" BASIS, 11*cc02d7e2SAndroid Build Coastguard Worker# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*cc02d7e2SAndroid Build Coastguard Worker# See the License for the specific language governing permissions and 13*cc02d7e2SAndroid Build Coastguard Worker# limitations under the License. 14*cc02d7e2SAndroid Build Coastguard Worker 15*cc02d7e2SAndroid Build Coastguard Workerfrom concurrent.futures import ThreadPoolExecutor 16*cc02d7e2SAndroid Build Coastguard Workerimport logging 17*cc02d7e2SAndroid Build Coastguard Workerimport threading 18*cc02d7e2SAndroid Build Coastguard Workerfrom typing import Iterator 19*cc02d7e2SAndroid Build Coastguard Worker 20*cc02d7e2SAndroid Build Coastguard Workerimport grpc 21*cc02d7e2SAndroid Build Coastguard Worker 22*cc02d7e2SAndroid Build Coastguard Workerimport phone_pb2 23*cc02d7e2SAndroid Build Coastguard Workerimport phone_pb2_grpc 24*cc02d7e2SAndroid Build Coastguard Worker 25*cc02d7e2SAndroid Build Coastguard Worker 26*cc02d7e2SAndroid Build Coastguard Workerclass CallMaker: 27*cc02d7e2SAndroid Build Coastguard Worker def __init__( 28*cc02d7e2SAndroid Build Coastguard Worker self, 29*cc02d7e2SAndroid Build Coastguard Worker executor: ThreadPoolExecutor, 30*cc02d7e2SAndroid Build Coastguard Worker channel: grpc.Channel, 31*cc02d7e2SAndroid Build Coastguard Worker phone_number: str, 32*cc02d7e2SAndroid Build Coastguard Worker ) -> None: 33*cc02d7e2SAndroid Build Coastguard Worker self._executor = executor 34*cc02d7e2SAndroid Build Coastguard Worker self._channel = channel 35*cc02d7e2SAndroid Build Coastguard Worker self._stub = phone_pb2_grpc.PhoneStub(self._channel) 36*cc02d7e2SAndroid Build Coastguard Worker self._phone_number = phone_number 37*cc02d7e2SAndroid Build Coastguard Worker self._session_id = None 38*cc02d7e2SAndroid Build Coastguard Worker self._audio_session_link = None 39*cc02d7e2SAndroid Build Coastguard Worker self._call_state = None 40*cc02d7e2SAndroid Build Coastguard Worker self._peer_responded = threading.Event() 41*cc02d7e2SAndroid Build Coastguard Worker self._call_finished = threading.Event() 42*cc02d7e2SAndroid Build Coastguard Worker self._consumer_future = None 43*cc02d7e2SAndroid Build Coastguard Worker 44*cc02d7e2SAndroid Build Coastguard Worker def _response_watcher( 45*cc02d7e2SAndroid Build Coastguard Worker self, response_iterator: Iterator[phone_pb2.StreamCallResponse] 46*cc02d7e2SAndroid Build Coastguard Worker ) -> None: 47*cc02d7e2SAndroid Build Coastguard Worker try: 48*cc02d7e2SAndroid Build Coastguard Worker for response in response_iterator: 49*cc02d7e2SAndroid Build Coastguard Worker # NOTE: All fields in Proto3 are optional. This is the recommended way 50*cc02d7e2SAndroid Build Coastguard Worker # to check if a field is present or not, or to exam which one-of field is 51*cc02d7e2SAndroid Build Coastguard Worker # fulfilled by this message. 52*cc02d7e2SAndroid Build Coastguard Worker if response.HasField("call_info"): 53*cc02d7e2SAndroid Build Coastguard Worker self._on_call_info(response.call_info) 54*cc02d7e2SAndroid Build Coastguard Worker elif response.HasField("call_state"): 55*cc02d7e2SAndroid Build Coastguard Worker self._on_call_state(response.call_state.state) 56*cc02d7e2SAndroid Build Coastguard Worker else: 57*cc02d7e2SAndroid Build Coastguard Worker raise RuntimeError( 58*cc02d7e2SAndroid Build Coastguard Worker "Received StreamCallResponse without call_info and" 59*cc02d7e2SAndroid Build Coastguard Worker " call_state" 60*cc02d7e2SAndroid Build Coastguard Worker ) 61*cc02d7e2SAndroid Build Coastguard Worker except Exception as e: 62*cc02d7e2SAndroid Build Coastguard Worker self._peer_responded.set() 63*cc02d7e2SAndroid Build Coastguard Worker raise 64*cc02d7e2SAndroid Build Coastguard Worker 65*cc02d7e2SAndroid Build Coastguard Worker def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None: 66*cc02d7e2SAndroid Build Coastguard Worker self._session_id = call_info.session_id 67*cc02d7e2SAndroid Build Coastguard Worker self._audio_session_link = call_info.media 68*cc02d7e2SAndroid Build Coastguard Worker 69*cc02d7e2SAndroid Build Coastguard Worker def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None: 70*cc02d7e2SAndroid Build Coastguard Worker logging.info( 71*cc02d7e2SAndroid Build Coastguard Worker "Call toward [%s] enters [%s] state", 72*cc02d7e2SAndroid Build Coastguard Worker self._phone_number, 73*cc02d7e2SAndroid Build Coastguard Worker phone_pb2.CallState.State.Name(call_state), 74*cc02d7e2SAndroid Build Coastguard Worker ) 75*cc02d7e2SAndroid Build Coastguard Worker self._call_state = call_state 76*cc02d7e2SAndroid Build Coastguard Worker if call_state == phone_pb2.CallState.State.ACTIVE: 77*cc02d7e2SAndroid Build Coastguard Worker self._peer_responded.set() 78*cc02d7e2SAndroid Build Coastguard Worker if call_state == phone_pb2.CallState.State.ENDED: 79*cc02d7e2SAndroid Build Coastguard Worker self._peer_responded.set() 80*cc02d7e2SAndroid Build Coastguard Worker self._call_finished.set() 81*cc02d7e2SAndroid Build Coastguard Worker 82*cc02d7e2SAndroid Build Coastguard Worker def call(self) -> None: 83*cc02d7e2SAndroid Build Coastguard Worker request = phone_pb2.StreamCallRequest() 84*cc02d7e2SAndroid Build Coastguard Worker request.phone_number = self._phone_number 85*cc02d7e2SAndroid Build Coastguard Worker response_iterator = self._stub.StreamCall(iter((request,))) 86*cc02d7e2SAndroid Build Coastguard Worker # Instead of consuming the response on current thread, spawn a consumption thread. 87*cc02d7e2SAndroid Build Coastguard Worker self._consumer_future = self._executor.submit( 88*cc02d7e2SAndroid Build Coastguard Worker self._response_watcher, response_iterator 89*cc02d7e2SAndroid Build Coastguard Worker ) 90*cc02d7e2SAndroid Build Coastguard Worker 91*cc02d7e2SAndroid Build Coastguard Worker def wait_peer(self) -> bool: 92*cc02d7e2SAndroid Build Coastguard Worker logging.info("Waiting for peer to connect [%s]...", self._phone_number) 93*cc02d7e2SAndroid Build Coastguard Worker self._peer_responded.wait(timeout=None) 94*cc02d7e2SAndroid Build Coastguard Worker if self._consumer_future.done(): 95*cc02d7e2SAndroid Build Coastguard Worker # If the future raises, forwards the exception here 96*cc02d7e2SAndroid Build Coastguard Worker self._consumer_future.result() 97*cc02d7e2SAndroid Build Coastguard Worker return self._call_state == phone_pb2.CallState.State.ACTIVE 98*cc02d7e2SAndroid Build Coastguard Worker 99*cc02d7e2SAndroid Build Coastguard Worker def audio_session(self) -> None: 100*cc02d7e2SAndroid Build Coastguard Worker assert self._audio_session_link is not None 101*cc02d7e2SAndroid Build Coastguard Worker logging.info("Consuming audio resource [%s]", self._audio_session_link) 102*cc02d7e2SAndroid Build Coastguard Worker self._call_finished.wait(timeout=None) 103*cc02d7e2SAndroid Build Coastguard Worker logging.info("Audio session finished [%s]", self._audio_session_link) 104*cc02d7e2SAndroid Build Coastguard Worker 105*cc02d7e2SAndroid Build Coastguard Worker 106*cc02d7e2SAndroid Build Coastguard Workerdef process_call( 107*cc02d7e2SAndroid Build Coastguard Worker executor: ThreadPoolExecutor, channel: grpc.Channel, phone_number: str 108*cc02d7e2SAndroid Build Coastguard Worker) -> None: 109*cc02d7e2SAndroid Build Coastguard Worker call_maker = CallMaker(executor, channel, phone_number) 110*cc02d7e2SAndroid Build Coastguard Worker call_maker.call() 111*cc02d7e2SAndroid Build Coastguard Worker if call_maker.wait_peer(): 112*cc02d7e2SAndroid Build Coastguard Worker call_maker.audio_session() 113*cc02d7e2SAndroid Build Coastguard Worker logging.info("Call finished!") 114*cc02d7e2SAndroid Build Coastguard Worker else: 115*cc02d7e2SAndroid Build Coastguard Worker logging.info("Call failed: peer didn't answer") 116*cc02d7e2SAndroid Build Coastguard Worker 117*cc02d7e2SAndroid Build Coastguard Worker 118*cc02d7e2SAndroid Build Coastguard Workerdef run(): 119*cc02d7e2SAndroid Build Coastguard Worker executor = ThreadPoolExecutor() 120*cc02d7e2SAndroid Build Coastguard Worker with grpc.insecure_channel("localhost:50051") as channel: 121*cc02d7e2SAndroid Build Coastguard Worker future = executor.submit( 122*cc02d7e2SAndroid Build Coastguard Worker process_call, executor, channel, "555-0100-XXXX" 123*cc02d7e2SAndroid Build Coastguard Worker ) 124*cc02d7e2SAndroid Build Coastguard Worker future.result() 125*cc02d7e2SAndroid Build Coastguard Worker 126*cc02d7e2SAndroid Build Coastguard Worker 127*cc02d7e2SAndroid Build Coastguard Workerif __name__ == "__main__": 128*cc02d7e2SAndroid Build Coastguard Worker logging.basicConfig(level=logging.INFO) 129*cc02d7e2SAndroid Build Coastguard Worker run() 130