1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent.futures import ThreadPoolExecutor 16import logging 17import threading 18from typing import Iterator 19 20import grpc 21 22import phone_pb2 23import phone_pb2_grpc 24 25 26class CallMaker: 27 def __init__( 28 self, 29 executor: ThreadPoolExecutor, 30 channel: grpc.Channel, 31 phone_number: str, 32 ) -> None: 33 self._executor = executor 34 self._channel = channel 35 self._stub = phone_pb2_grpc.PhoneStub(self._channel) 36 self._phone_number = phone_number 37 self._session_id = None 38 self._audio_session_link = None 39 self._call_state = None 40 self._peer_responded = threading.Event() 41 self._call_finished = threading.Event() 42 self._consumer_future = None 43 44 def _response_watcher( 45 self, response_iterator: Iterator[phone_pb2.StreamCallResponse] 46 ) -> None: 47 try: 48 for response in response_iterator: 49 # NOTE: All fields in Proto3 are optional. This is the recommended way 50 # to check if a field is present or not, or to exam which one-of field is 51 # fulfilled by this message. 52 if response.HasField("call_info"): 53 self._on_call_info(response.call_info) 54 elif response.HasField("call_state"): 55 self._on_call_state(response.call_state.state) 56 else: 57 raise RuntimeError( 58 "Received StreamCallResponse without call_info and" 59 " call_state" 60 ) 61 except Exception as e: 62 self._peer_responded.set() 63 raise 64 65 def _on_call_info(self, call_info: phone_pb2.CallInfo) -> None: 66 self._session_id = call_info.session_id 67 self._audio_session_link = call_info.media 68 69 def _on_call_state(self, call_state: phone_pb2.CallState.State) -> None: 70 logging.info( 71 "Call toward [%s] enters [%s] state", 72 self._phone_number, 73 phone_pb2.CallState.State.Name(call_state), 74 ) 75 self._call_state = call_state 76 if call_state == phone_pb2.CallState.State.ACTIVE: 77 self._peer_responded.set() 78 if call_state == phone_pb2.CallState.State.ENDED: 79 self._peer_responded.set() 80 self._call_finished.set() 81 82 def call(self) -> None: 83 request = phone_pb2.StreamCallRequest() 84 request.phone_number = self._phone_number 85 response_iterator = self._stub.StreamCall(iter((request,))) 86 # Instead of consuming the response on current thread, spawn a consumption thread. 87 self._consumer_future = self._executor.submit( 88 self._response_watcher, response_iterator 89 ) 90 91 def wait_peer(self) -> bool: 92 logging.info("Waiting for peer to connect [%s]...", self._phone_number) 93 self._peer_responded.wait(timeout=None) 94 if self._consumer_future.done(): 95 # If the future raises, forwards the exception here 96 self._consumer_future.result() 97 return self._call_state == phone_pb2.CallState.State.ACTIVE 98 99 def audio_session(self) -> None: 100 assert self._audio_session_link is not None 101 logging.info("Consuming audio resource [%s]", self._audio_session_link) 102 self._call_finished.wait(timeout=None) 103 logging.info("Audio session finished [%s]", self._audio_session_link) 104 105 106def process_call( 107 executor: ThreadPoolExecutor, channel: grpc.Channel, phone_number: str 108) -> None: 109 call_maker = CallMaker(executor, channel, phone_number) 110 call_maker.call() 111 if call_maker.wait_peer(): 112 call_maker.audio_session() 113 logging.info("Call finished!") 114 else: 115 logging.info("Call failed: peer didn't answer") 116 117 118def run(): 119 executor = ThreadPoolExecutor() 120 with grpc.insecure_channel("localhost:50051") as channel: 121 future = executor.submit( 122 process_call, executor, channel, "555-0100-XXXX" 123 ) 124 future.result() 125 126 127if __name__ == "__main__": 128 logging.basicConfig(level=logging.INFO) 129 run() 130