1# Copyright 2021 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import grpc 6 7from chromiumos.test.api import callbox_service_pb2 as cbp 8from chromiumos.test.api import callbox_service_pb2_grpc as cbs 9 10from concurrent import futures 11 12 13class CallBoxServer(cbs.CallboxServiceServicer): 14 """Implements the callbox_service.proto API""" 15 16 def CheckHealth(self, request, context): 17 """ Basic endpoint to check the service is up """ 18 return cbp.CheckHealthResponse() 19 20 21def serve(): 22 """Start/run the server with a single worker thread""" 23 server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) 24 cbs.add_CallboxServiceServicer_to_server(CallBoxServer(), server) 25 server.add_insecure_port('[::]:50051') 26 server.start() 27 return server 28 29 30if __name__ == '__main__': 31 server = serve() 32 server.wait_for_termination() 33