1"""Network simulator Python gRPC client.""" 2 3import logging 4import os 5import platform 6from typing import Dict, Optional 7 8from google.protobuf import empty_pb2 9import grpc 10from netsim_grpc.proto.netsim import ( 11 common_pb2 as common, 12 frontend_pb2_grpc as frontend_grpc, 13 frontend_pb2 as frontend, 14 model_pb2 as model, 15) 16 17_Empty = empty_pb2.Empty 18_Channel = grpc.Channel 19 20NETSIM_INI = 'netsim.ini' 21 22 23class SetupError(Exception): 24 """Class for exceptions related to netsim setup.""" 25 26 27class NetsimClient(object): 28 """Network simulator client.""" 29 30 def __init__(self): 31 """Create a NetsimClient. 32 33 Args: 34 local_creds: Use local credentials for gRPC channel. 35 """ 36 self._server_addr = _get_grpc_server_addr() 37 self._channel = _create_frontend_grpc_channel(self._server_addr) 38 self._stub = frontend_grpc.FrontendServiceStub(self._channel) 39 40 def get_version(self) -> str: 41 """Get the version of the netsim daemon. 42 43 Returns: 44 The netsim daemon version. 45 """ 46 return self._stub.GetVersion(_Empty()).version 47 48 def get_devices(self) -> Dict[str, model.Device]: 49 """Get info for all devices connected to netsim. 50 51 Returns: 52 A dict mapping each connected device to its netsim properties. 53 """ 54 response = self._stub.ListDevice(_Empty()) 55 return {device.name: device for device in response.devices} 56 57 def set_position( 58 self, 59 device_name: str, 60 position: Optional[model.Position] = None, 61 orientation: Optional[model.Orientation] = None, 62 ) -> bool: 63 """Set the position and/or orientation of the specified device. 64 65 NOTE: Leaving the position/orientation unset would reset the device's 66 position/orientation to zero. 67 68 Args: 69 device_name: The avd name of the specified device. 70 position: The desired (x, y, z) position of the device. 71 orientation: The desired (yaw, pitch, roll) orientation of the device. 72 73 Returns: 74 bool indicating whether device position was successfully set. 75 """ 76 request = frontend.PatchDeviceRequest() 77 request.device.name = device_name 78 if position: 79 logging.info( 80 'Setting new position for device %s: %s', device_name, position 81 ) 82 request.device.position.x = position.x 83 request.device.position.y = position.y 84 request.device.position.z = position.z 85 if orientation: 86 logging.info( 87 'Setting new orientation for device %s: %s', device_name, orientation 88 ) 89 request.device.orientation.yaw = orientation.yaw 90 request.device.orientation.pitch = orientation.pitch 91 request.device.orientation.roll = orientation.roll 92 self._stub.PatchDevice(request) 93 device_info = self.get_devices()[device_name] 94 success = True 95 if position and device_info.position != position: 96 logging.error( 97 'Device %s position not set as expected. Current position: %s', 98 device_name, 99 device_info.position, 100 ) 101 success = False 102 if orientation and device_info.orientation != orientation: 103 logging.error( 104 'Device %s orientation not set as expected. Current orientation: %s', 105 device_name, 106 device_info.orientation, 107 ) 108 success = False 109 return success 110 111 def set_radio( 112 self, device_name: str, radio: model.PhyKind, state: bool 113 ) -> None: 114 """Set the radio state of the specified device. 115 116 Args: 117 device_name: The avd name of the specified device. 118 radio: The specified radio, e.g. BLUETOOTH_LOW_ENERGY, WIFI 119 state: Set radio state UP if True, DOWN if False. 120 """ 121 chip = model.Chip() 122 123 if radio == model.PhyKind.WIFI: 124 chip.wifi.state = state 125 chip.kind = common.ChipKind.WIFI 126 elif radio == model.PhyKind.UWB: 127 chip.uwb.state = state 128 chip.kind = common.ChipKind.UWB 129 else: 130 if radio == model.PhyKind.BLUETOOTH_LOW_ENERGY: 131 chip.bt.low_energy.state = state 132 elif radio == model.PhyKind.BLUETOOTH_CLASSIC: 133 chip.bt.classic.state = state 134 chip.kind = common.ChipKind.BLUETOOTH 135 136 request = frontend.PatchDeviceRequest() 137 request.device.name = device_name 138 request.device.chips.append(chip) 139 self._stub.PatchDevice(request) 140 141 def get_captures(self) -> list[model.Capture]: 142 """Get info for all capture information in netsim. 143 144 Returns: 145 A List of all captures where capture is netsim.model.Capture. 146 """ 147 return self._stub.ListCapture(_Empty()).captures 148 149 def set_capture( 150 self, device_name: str, radio: common.ChipKind, state: bool 151 ) -> None: 152 """Set the capture state of the specific device and radio. 153 154 Args: 155 device_name: The avd name of the specified device. 156 radio: The specified radio ChipKind, e.g. BLUETOOTH, WIFI, UWB 157 state: Set capture state UP if True, Down if False. 158 """ 159 for capture in self.get_captures(): 160 if capture.chip_kind == radio and capture.device_name == device_name: 161 request = frontend.PatchCaptureRequest() 162 request.id = capture.id 163 request.patch.state = state 164 logging.info( 165 'Setting capture state of radio %s for device %s to %s', 166 common.ChipKind.Name(radio), 167 device_name, 168 state, 169 ) 170 self._stub.PatchCapture(request) 171 172 def set_capture_all(self, state: bool) -> None: 173 logging.info('Setting capture state for all devices: %s', state) 174 for capture in self.get_captures(): 175 request = frontend.PatchCaptureRequest() 176 request.id = capture.id 177 request.patch.state = state 178 self._stub.PatchCapture(request) 179 180 def reset(self) -> None: 181 """Reset all devices.""" 182 self._stub.Reset(_Empty()) 183 184 def close(self) -> None: 185 """Close the netsim client connection.""" 186 if hasattr(self, '_channel'): 187 self._channel.close() 188 189 def __del__(self) -> None: 190 self.close() 191 192 193def _get_grpc_server_addr() -> str: 194 """Locate the grpc server address from netsim's .ini file.""" 195 # TMPDIR is set on buildbots 196 file_path = os.path.join('/tmp', NETSIM_INI) 197 if 'TMPDIR' in os.environ and os.path.exists( 198 os.path.join(os.environ['TMPDIR'], NETSIM_INI) 199 ): 200 file_path = os.path.join(os.environ['TMPDIR'], NETSIM_INI) 201 # XDG_RUNTIME_DIR for Linux local discovery env 202 elif platform.system() == 'Linux' and 'XDG_RUNTIME_DIR' in os.environ: 203 file_path = os.path.join(os.environ['XDG_RUNTIME_DIR'], NETSIM_INI) 204 # HOME for Mac local discovery 205 elif platform.system() == 'Darwin' and 'HOME' in os.environ: 206 file_path = os.path.join( 207 os.environ['HOME'], 'Library/Caches/TemporaryItems', NETSIM_INI 208 ) 209 # LOCALAPPDATA for Windows local discovery 210 elif platform.system() == 'Windows' and 'LOCALAPPDATA' in os.environ: 211 file_path = os.path.join(os.environ['LOCALAPPDATA'], 'Temp', NETSIM_INI) 212 else: 213 logging.warning( 214 'TMPDIR, XDG_RUNTIME_DIR, HOME, or LOCALAPPDATA environment variable' 215 ' not set.Using /tmp. Is netsimd running?' 216 ) 217 if not os.path.exists(file_path): 218 raise SetupError( 219 f'Unable to find the netsim.ini file at {file_path}. Is netsimd' 220 ' running?', 221 ) 222 with open(file_path, 'r') as f: 223 for line in f: 224 key, value = line.strip().split('=') 225 if key == 'grpc.port': 226 logging.info('Found netsim server gRPC port: %s.', value) 227 return f'localhost:{value}' 228 raise SetupError( 229 'Unable to find the netsim server address from the .ini file.' 230 ) 231 232 233def _create_frontend_grpc_channel( 234 server_addr: str, 235) -> _Channel: 236 """Creates a gRPC channel to communicate with netsim FE service. 237 238 Args: 239 server_addr: Endpoint address of the netsim server. 240 241 Returns: 242 gRPC channel 243 """ 244 logging.info( 245 'Creating gRPC channel for netsim frontend service at %s.', server_addr 246 ) 247 return grpc.insecure_channel(server_addr) 248