1# Copyright 2019 the gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""An example of cancelling requests in gRPC.""" 15 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20import argparse 21from concurrent import futures 22import logging 23import threading 24 25import grpc 26import search 27 28from examples.python.cancellation import hash_name_pb2 29from examples.python.cancellation import hash_name_pb2_grpc 30 31_LOGGER = logging.getLogger(__name__) 32_SERVER_HOST = "localhost" 33 34_DESCRIPTION = "A server for finding hashes similar to names." 35 36 37class HashFinder(hash_name_pb2_grpc.HashFinderServicer): 38 def __init__(self, maximum_hashes): 39 super(HashFinder, self).__init__() 40 self._maximum_hashes = maximum_hashes 41 42 def Find(self, request, context): 43 stop_event = threading.Event() 44 45 def on_rpc_done(): 46 _LOGGER.debug("Attempting to regain servicer thread.") 47 stop_event.set() 48 49 context.add_callback(on_rpc_done) 50 candidates = [] 51 try: 52 candidates = list( 53 search.search( 54 request.desired_name, 55 request.ideal_hamming_distance, 56 stop_event, 57 self._maximum_hashes, 58 ) 59 ) 60 except search.ResourceLimitExceededError: 61 _LOGGER.info("Cancelling RPC due to exhausted resources.") 62 context.cancel() 63 _LOGGER.debug("Servicer thread returning.") 64 if not candidates: 65 return hash_name_pb2.HashNameResponse() 66 return candidates[-1] 67 68 def FindRange(self, request, context): 69 stop_event = threading.Event() 70 71 def on_rpc_done(): 72 _LOGGER.debug("Attempting to regain servicer thread.") 73 stop_event.set() 74 75 context.add_callback(on_rpc_done) 76 secret_generator = search.search( 77 request.desired_name, 78 request.ideal_hamming_distance, 79 stop_event, 80 self._maximum_hashes, 81 interesting_hamming_distance=request.interesting_hamming_distance, 82 ) 83 try: 84 for candidate in secret_generator: 85 yield candidate 86 except search.ResourceLimitExceededError: 87 _LOGGER.info("Cancelling RPC due to exhausted resources.") 88 context.cancel() 89 _LOGGER.debug("Regained servicer thread.") 90 91 92def _running_server(port, maximum_hashes): 93 # We use only a single servicer thread here to demonstrate that, if managed 94 # carefully, cancelled RPCs can need not continue occupying servicers 95 # threads. 96 server = grpc.server( 97 futures.ThreadPoolExecutor(max_workers=1), maximum_concurrent_rpcs=1 98 ) 99 hash_name_pb2_grpc.add_HashFinderServicer_to_server( 100 HashFinder(maximum_hashes), server 101 ) 102 address = "{}:{}".format(_SERVER_HOST, port) 103 actual_port = server.add_insecure_port(address) 104 server.start() 105 print("Server listening at '{}'".format(address)) 106 return server 107 108 109def main(): 110 parser = argparse.ArgumentParser(description=_DESCRIPTION) 111 parser.add_argument( 112 "--port", 113 type=int, 114 default=50051, 115 nargs="?", 116 help="The port on which the server will listen.", 117 ) 118 parser.add_argument( 119 "--maximum-hashes", 120 type=int, 121 default=1000000, 122 nargs="?", 123 help="The maximum number of hashes to search before cancelling.", 124 ) 125 args = parser.parse_args() 126 server = _running_server(args.port, args.maximum_hashes) 127 server.wait_for_termination() 128 129 130if __name__ == "__main__": 131 logging.basicConfig() 132 main() 133