1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines behavior for WHEN clients send requests. 15 16Each client exposes a non-blocking send_request() method that the 17ClientRunner invokes either periodically or in response to some event. 18""" 19 20import abc 21import threading 22import time 23 24 25class ClientRunner: 26 """Abstract interface for sending requests from clients.""" 27 28 __metaclass__ = abc.ABCMeta 29 30 def __init__(self, client): 31 self._client = client 32 33 @abc.abstractmethod 34 def start(self): 35 raise NotImplementedError() 36 37 @abc.abstractmethod 38 def stop(self): 39 raise NotImplementedError() 40 41 42class OpenLoopClientRunner(ClientRunner): 43 def __init__(self, client, interval_generator): 44 super(OpenLoopClientRunner, self).__init__(client) 45 self._is_running = False 46 self._interval_generator = interval_generator 47 self._dispatch_thread = threading.Thread( 48 target=self._dispatch_requests, args=() 49 ) 50 51 def start(self): 52 self._is_running = True 53 self._client.start() 54 self._dispatch_thread.start() 55 56 def stop(self): 57 self._is_running = False 58 self._client.stop() 59 self._dispatch_thread.join() 60 self._client = None 61 62 def _dispatch_requests(self): 63 while self._is_running: 64 self._client.send_request() 65 time.sleep(next(self._interval_generator)) 66 67 68class ClosedLoopClientRunner(ClientRunner): 69 def __init__(self, client, request_count, no_ping_pong): 70 super(ClosedLoopClientRunner, self).__init__(client) 71 self._is_running = False 72 self._request_count = request_count 73 # For server-streaming RPC, don't spawn new RPC after each responses. 74 # This yield at most ~17% for single RPC scenarios. 75 if not no_ping_pong: 76 # Send a new request on each response for closed loop 77 self._client.add_response_callback(self._send_request) 78 79 def start(self): 80 self._is_running = True 81 self._client.start() 82 for _ in range(self._request_count): 83 self._client.send_request() 84 85 def stop(self): 86 self._is_running = False 87 self._client.stop() 88 self._client = None 89 90 def _send_request(self, client, unused_response_time): 91 if self._is_running: 92 client.send_request() 93