xref: /aosp_15_r20/tools/netsim/testing/netsim-grpc/src/netsim_grpc/netsim_client.py (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1"""Network simulator Python gRPC client."""
2
3import logging
4import os
5import platform
6from typing import Dict, Optional
7
8from google.protobuf import empty_pb2
9import grpc
10from netsim_grpc.proto.netsim import (
11    common_pb2 as common,
12    frontend_pb2_grpc as frontend_grpc,
13    frontend_pb2 as frontend,
14    model_pb2 as model,
15)
16
17_Empty = empty_pb2.Empty
18_Channel = grpc.Channel
19
20NETSIM_INI = 'netsim.ini'
21
22
23class SetupError(Exception):
24  """Class for exceptions related to netsim setup."""
25
26
27class NetsimClient(object):
28  """Network simulator client."""
29
30  def __init__(self):
31    """Create a NetsimClient.
32
33    Args:
34      local_creds: Use local credentials for gRPC channel.
35    """
36    self._server_addr = _get_grpc_server_addr()
37    self._channel = _create_frontend_grpc_channel(self._server_addr)
38    self._stub = frontend_grpc.FrontendServiceStub(self._channel)
39
40  def get_version(self) -> str:
41    """Get the version of the netsim daemon.
42
43    Returns:
44      The netsim daemon version.
45    """
46    return self._stub.GetVersion(_Empty()).version
47
48  def get_devices(self) -> Dict[str, model.Device]:
49    """Get info for all devices connected to netsim.
50
51    Returns:
52      A dict mapping each connected device to its netsim properties.
53    """
54    response = self._stub.ListDevice(_Empty())
55    return {device.name: device for device in response.devices}
56
57  def set_position(
58      self,
59      device_name: str,
60      position: Optional[model.Position] = None,
61      orientation: Optional[model.Orientation] = None,
62  ) -> bool:
63    """Set the position and/or orientation of the specified device.
64
65    NOTE: Leaving the position/orientation unset would reset the device's
66      position/orientation to zero.
67
68    Args:
69      device_name: The avd name of the specified device.
70      position: The desired (x, y, z) position of the device.
71      orientation: The desired (yaw, pitch, roll) orientation of the device.
72
73    Returns:
74      bool indicating whether device position was successfully set.
75    """
76    request = frontend.PatchDeviceRequest()
77    request.device.name = device_name
78    if position:
79      logging.info(
80          'Setting new position for device %s: %s', device_name, position
81      )
82      request.device.position.x = position.x
83      request.device.position.y = position.y
84      request.device.position.z = position.z
85    if orientation:
86      logging.info(
87          'Setting new orientation for device %s: %s', device_name, orientation
88      )
89      request.device.orientation.yaw = orientation.yaw
90      request.device.orientation.pitch = orientation.pitch
91      request.device.orientation.roll = orientation.roll
92    self._stub.PatchDevice(request)
93    device_info = self.get_devices()[device_name]
94    success = True
95    if position and device_info.position != position:
96      logging.error(
97          'Device %s position not set as expected. Current position: %s',
98          device_name,
99          device_info.position,
100      )
101      success = False
102    if orientation and device_info.orientation != orientation:
103      logging.error(
104          'Device %s orientation not set as expected. Current orientation: %s',
105          device_name,
106          device_info.orientation,
107      )
108      success = False
109    return success
110
111  def set_radio(
112      self, device_name: str, radio: model.PhyKind, state: bool
113  ) -> None:
114    """Set the radio state of the specified device.
115
116    Args:
117      device_name: The avd name of the specified device.
118      radio: The specified radio, e.g. BLUETOOTH_LOW_ENERGY, WIFI
119      state: Set radio state UP if True, DOWN if False.
120    """
121    chip = model.Chip()
122
123    if radio == model.PhyKind.WIFI:
124      chip.wifi.state = state
125      chip.kind = common.ChipKind.WIFI
126    elif radio == model.PhyKind.UWB:
127      chip.uwb.state = state
128      chip.kind = common.ChipKind.UWB
129    else:
130      if radio == model.PhyKind.BLUETOOTH_LOW_ENERGY:
131        chip.bt.low_energy.state = state
132      elif radio == model.PhyKind.BLUETOOTH_CLASSIC:
133        chip.bt.classic.state = state
134      chip.kind = common.ChipKind.BLUETOOTH
135
136    request = frontend.PatchDeviceRequest()
137    request.device.name = device_name
138    request.device.chips.append(chip)
139    self._stub.PatchDevice(request)
140
141  def get_captures(self) -> list[model.Capture]:
142    """Get info for all capture information in netsim.
143
144    Returns:
145      A List of all captures where capture is netsim.model.Capture.
146    """
147    return self._stub.ListCapture(_Empty()).captures
148
149  def set_capture(
150      self, device_name: str, radio: common.ChipKind, state: bool
151  ) -> None:
152    """Set the capture state of the specific device and radio.
153
154    Args:
155      device_name: The avd name of the specified device.
156      radio: The specified radio ChipKind, e.g. BLUETOOTH, WIFI, UWB
157      state: Set capture state UP if True, Down if False.
158    """
159    for capture in self.get_captures():
160      if capture.chip_kind == radio and capture.device_name == device_name:
161        request = frontend.PatchCaptureRequest()
162        request.id = capture.id
163        request.patch.state = state
164        logging.info(
165            'Setting capture state of radio %s for device %s to %s',
166            common.ChipKind.Name(radio),
167            device_name,
168            state,
169        )
170        self._stub.PatchCapture(request)
171
172  def set_capture_all(self, state: bool) -> None:
173    logging.info('Setting capture state for all devices: %s', state)
174    for capture in self.get_captures():
175      request = frontend.PatchCaptureRequest()
176      request.id = capture.id
177      request.patch.state = state
178      self._stub.PatchCapture(request)
179
180  def reset(self) -> None:
181    """Reset all devices."""
182    self._stub.Reset(_Empty())
183
184  def close(self) -> None:
185    """Close the netsim client connection."""
186    if hasattr(self, '_channel'):
187      self._channel.close()
188
189  def __del__(self) -> None:
190    self.close()
191
192
193def _get_grpc_server_addr() -> str:
194  """Locate the grpc server address from netsim's .ini file."""
195  # TMPDIR is set on buildbots
196  file_path = os.path.join('/tmp', NETSIM_INI)
197  if 'TMPDIR' in os.environ and os.path.exists(
198      os.path.join(os.environ['TMPDIR'], NETSIM_INI)
199  ):
200    file_path = os.path.join(os.environ['TMPDIR'], NETSIM_INI)
201  # XDG_RUNTIME_DIR for Linux local discovery env
202  elif platform.system() == 'Linux' and 'XDG_RUNTIME_DIR' in os.environ:
203    file_path = os.path.join(os.environ['XDG_RUNTIME_DIR'], NETSIM_INI)
204  # HOME for Mac local discovery
205  elif platform.system() == 'Darwin' and 'HOME' in os.environ:
206    file_path = os.path.join(
207        os.environ['HOME'], 'Library/Caches/TemporaryItems', NETSIM_INI
208    )
209  # LOCALAPPDATA for Windows local discovery
210  elif platform.system() == 'Windows' and 'LOCALAPPDATA' in os.environ:
211    file_path = os.path.join(os.environ['LOCALAPPDATA'], 'Temp', NETSIM_INI)
212  else:
213    logging.warning(
214        'TMPDIR, XDG_RUNTIME_DIR, HOME, or LOCALAPPDATA environment variable'
215        ' not set.Using /tmp. Is netsimd running?'
216    )
217  if not os.path.exists(file_path):
218    raise SetupError(
219        f'Unable to find the netsim.ini file at {file_path}. Is netsimd'
220        ' running?',
221    )
222  with open(file_path, 'r') as f:
223    for line in f:
224      key, value = line.strip().split('=')
225      if key == 'grpc.port':
226        logging.info('Found netsim server gRPC port: %s.', value)
227        return f'localhost:{value}'
228  raise SetupError(
229      'Unable to find the netsim server address from the .ini file.'
230  )
231
232
233def _create_frontend_grpc_channel(
234    server_addr: str,
235) -> _Channel:
236  """Creates a gRPC channel to communicate with netsim FE service.
237
238  Args:
239    server_addr: Endpoint address of the netsim server.
240
241  Returns:
242    gRPC channel
243  """
244  logging.info(
245      'Creating gRPC channel for netsim frontend service at %s.', server_addr
246  )
247  return grpc.insecure_channel(server_addr)
248