1*9c5db199SXin Li# 2*9c5db199SXin Li# Copyright 2008 Google Inc. All Rights Reserved. 3*9c5db199SXin Li# 4*9c5db199SXin Li 5*9c5db199SXin Lifrom __future__ import print_function 6*9c5db199SXin Li 7*9c5db199SXin Liimport os 8*9c5db199SXin Lifrom autotest_lib.frontend.afe import rpc_client_lib 9*9c5db199SXin Lifrom autotest_lib.frontend.afe.json_rpc import proxy 10*9c5db199SXin Lifrom autotest_lib.client.common_lib import global_config, utils 11*9c5db199SXin Li 12*9c5db199SXin LiGLOBAL_CONFIG = global_config.global_config 13*9c5db199SXin LiDEFAULT_SERVER = 'autotest' 14*9c5db199SXin LiAFE_RPC_PATH = '/afe/server/noauth/rpc/' 15*9c5db199SXin LiTKO_RPC_PATH = '/new_tko/server/noauth/rpc/' 16*9c5db199SXin Li 17*9c5db199SXin Li 18*9c5db199SXin Liclass AuthError(Exception): 19*9c5db199SXin Li pass 20*9c5db199SXin Li 21*9c5db199SXin Li 22*9c5db199SXin Lidef get_autotest_server(web_server=None): 23*9c5db199SXin Li if not web_server: 24*9c5db199SXin Li if 'AUTOTEST_WEB' in os.environ: 25*9c5db199SXin Li web_server = os.environ['AUTOTEST_WEB'] 26*9c5db199SXin Li else: 27*9c5db199SXin Li web_server = GLOBAL_CONFIG.get_config_value( 28*9c5db199SXin Li 'SERVER', 'hostname', default=DEFAULT_SERVER) 29*9c5db199SXin Li 30*9c5db199SXin Li web_server = rpc_client_lib.add_protocol(web_server) 31*9c5db199SXin Li return web_server 32*9c5db199SXin Li 33*9c5db199SXin Li 34*9c5db199SXin Liclass rpc_comm(object): 35*9c5db199SXin Li """Shared AFE/TKO RPC class stuff""" 36*9c5db199SXin Li def __init__(self, web_server, rpc_path, username): 37*9c5db199SXin Li self.username = username 38*9c5db199SXin Li self.web_server = get_autotest_server(web_server) 39*9c5db199SXin Li try: 40*9c5db199SXin Li self.proxy = self._connect(rpc_path) 41*9c5db199SXin Li except rpc_client_lib.AuthError as s: 42*9c5db199SXin Li raise AuthError(s) 43*9c5db199SXin Li 44*9c5db199SXin Li 45*9c5db199SXin Li def _connect(self, rpc_path): 46*9c5db199SXin Li # This does not fail even if the address is wrong. 47*9c5db199SXin Li # We need to wait for an actual RPC to fail 48*9c5db199SXin Li headers = rpc_client_lib.authorization_headers(self.username, 49*9c5db199SXin Li self.web_server) 50*9c5db199SXin Li rpc_server = self.web_server + rpc_path 51*9c5db199SXin Li return rpc_client_lib.get_proxy(rpc_server, headers=headers) 52*9c5db199SXin Li 53*9c5db199SXin Li 54*9c5db199SXin Li def run(self, op, *args, **data): 55*9c5db199SXin Li if 'AUTOTEST_CLI_DEBUG' in os.environ: 56*9c5db199SXin Li print(self.web_server, op, args, data) 57*9c5db199SXin Li function = getattr(self.proxy, op) 58*9c5db199SXin Li result = function(*args, **data) 59*9c5db199SXin Li if 'AUTOTEST_CLI_DEBUG' in os.environ: 60*9c5db199SXin Li print('result:', result) 61*9c5db199SXin Li return result 62*9c5db199SXin Li 63*9c5db199SXin Li 64*9c5db199SXin Liclass afe_comm(rpc_comm): 65*9c5db199SXin Li """Handles the AFE setup and communication through RPC""" 66*9c5db199SXin Li def __init__(self, web_server=None, rpc_path=AFE_RPC_PATH, username=None): 67*9c5db199SXin Li super(afe_comm, self).__init__(web_server, rpc_path, username) 68*9c5db199SXin Li 69*9c5db199SXin Li 70*9c5db199SXin Liclass tko_comm(rpc_comm): 71*9c5db199SXin Li """Handles the TKO setup and communication through RPC""" 72*9c5db199SXin Li def __init__(self, web_server=None, rpc_path=TKO_RPC_PATH, username=None): 73*9c5db199SXin Li super(tko_comm, self).__init__(web_server, rpc_path, username) 74