1# Lint as: python2, python3 2# Copyright (c) 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7This module provides bindings for Hermes. 8 9""" 10import dbus 11import logging 12import dbus.mainloop.glib 13from autotest_lib.client.bin import utils 14from autotest_lib.client.common_lib import error 15from autotest_lib.client.cros.cellular import cellular_logging 16from autotest_lib.client.cros.cellular import hermes_constants 17from autotest_lib.client.cros.cellular import mm1_constants 18 19log = cellular_logging.SetupCellularLogging('Hermes') 20 21def _is_unknown_dbus_binding_exception(e): 22 return (isinstance(e, dbus.exceptions.DBusException) and 23 e.get_dbus_name() in [mm1_constants.DBUS_SERVICE_UNKNOWN, 24 mm1_constants.DBUS_UNKNOWN_METHOD, 25 mm1_constants.DBUS_UNKNOWN_OBJECT, 26 mm1_constants.DBUS_UNKNOWN_INTERFACE]) 27 28class HermesManagerProxyError(Exception): 29 """Exceptions raised by HermesManager1ProxyError and it's children.""" 30 pass 31 32class HermesManagerProxy(object): 33 """A wrapper around a DBus proxy for HermesManager.""" 34 35 @classmethod 36 def get_hermes_manager(cls, bus=None, timeout_seconds=10): 37 """Connect to HermesManager over DBus, retrying if necessary. 38 39 After connecting to HermesManager, this method will verify that 40 HermesManager is answering RPCs. 41 42 @param bus: D-Bus bus to use, or specify None and this object will 43 create a mainloop and bus. 44 @param timeout_seconds: float number of seconds to try connecting 45 A value <= 0 will cause the method to return immediately, 46 without trying to connect. 47 @return a HermesManagerProxy instance if we connected, or None 48 otherwise. 49 @raise HermesManagerProxyError if it fails to connect to 50 HermesManager. 51 52 """ 53 def _connect_to_hermes_manager(bus): 54 try: 55 # We create instance of class on which this classmethod was 56 # called. This way, calling get_hermes_manager 57 # SubclassOfHermesManagerProxy._connect_to_hermes_manager() 58 # will get a proxy of the right type 59 return cls(bus=bus) 60 except dbus.exceptions.DBusException as e: 61 if _is_unknown_dbus_binding_exception(e): 62 return None 63 raise HermesManagerProxyError( 64 'Error connecting to HermesManager. DBus error: |%s|', 65 repr(e)) 66 67 utils.poll_for_condition( 68 condition=lambda: _connect_to_hermes_manager(bus) is not None, 69 exception=HermesManagerProxyError( 70 'Timed out connecting to HermesManager dbus'), 71 desc='Waiting for hermes to start', 72 timeout=timeout_seconds, 73 sleep_interval=hermes_constants.CONNECT_WAIT_INTERVAL_SECONDS) 74 connection = _connect_to_hermes_manager(bus) 75 return connection 76 77 def __init__(self, bus=None): 78 if bus is None: 79 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 80 bus = dbus.SystemBus() 81 self._bus = bus 82 self._manager = dbus.Interface( 83 self._bus.get_object(hermes_constants.HERMES_SERVICE, 84 hermes_constants.HERMES_MANAGER_OBJECT), 85 hermes_constants.HERMES_MANAGER_IFACE) 86 87 @property 88 def manager(self): 89 """@return the DBus Hermes Manager object.""" 90 return self._manager 91 92 @property 93 def iface_properties(self): 94 """@return org.freedesktop.DBus.Properties DBus interface.""" 95 return dbus.Interface(self._manager, hermes_constants.I_PROPERTIES) 96 97 def properties(self, iface=hermes_constants.HERMES_MANAGER_IFACE): 98 """ 99 Return the properties associated with the specified interface. 100 101 @param iface: Name of interface to retrieve the properties from. 102 @return array of properties. 103 104 """ 105 return self.iface_properties.GetAll(iface) 106 107 def get_available_euiccs(self): 108 """ 109 Return AvailableEuiccs property from manager interface 110 111 @return array of euicc paths 112 113 """ 114 available_euiccs = self.properties() 115 if len(available_euiccs) <= 0: 116 return None 117 118 return available_euiccs.get('AvailableEuiccs') 119 120 def get_first_inactive_euicc(self): 121 """ 122 Read all euiccs objects in loop and get an non active euicc object 123 124 @return non active euicc object 125 126 """ 127 try: 128 euiccs = self.get_available_euiccs() 129 euicc_obj = None 130 for euicc in euiccs: 131 euicc_obj = self.get_euicc(euicc) 132 props = euicc_obj.properties() 133 if not props.get('IsActive'): 134 break 135 return euicc_obj 136 except dbus.DBusException as e: 137 logging.error('get non active euicc failed with error:%s', e) 138 139 def get_first_active_euicc(self): 140 """ 141 Read all euiccs and get an active euicc object 142 by reading isactive property of each euicc object 143 144 @return active euicc dbus object path 145 146 """ 147 try: 148 euiccs = self.get_available_euiccs() 149 euicc_obj = None 150 for euicc in euiccs: 151 euicc_obj = self.get_euicc(euicc) 152 props = euicc_obj.properties() 153 if props.get('IsActive'): 154 break 155 return euicc_obj 156 except dbus.DBusException as e: 157 logging.error('get active euicc failed with error:%s', e) 158 159 def get_euicc(self, euicc_path): 160 """ 161 Create a proxy object for given euicc path 162 163 @param euicc_path: available euicc dbus path as string 164 @return euicc proxy dbus object 165 166 """ 167 if not euicc_path: 168 logging.debug('No euicc path given for %s', euicc_path) 169 raise error.TestFail('No euicc path given for' + euicc_path) 170 171 try: 172 euicc_proxy = EuiccProxy(self._bus, euicc_path) 173 props = euicc_proxy.properties() 174 if not props: 175 raise error.TestFail('No euicc props found for ' + euicc_path) 176 return euicc_proxy 177 except dbus.exceptions.DBusException as e: 178 if _is_unknown_dbus_binding_exception(e): 179 return None 180 raise HermesManagerProxyError( 181 'Failed to obtain dbus object for the euicc. DBus error: ' 182 '|%s|', repr(e)) 183 184 def get_profile_from_iccid(self, iccid): 185 """ 186 Generic function to get profile based on given iccid 187 188 @return euicc object and profile object 189 190 """ 191 logging.debug('Get profile from given iccid:%s', iccid) 192 euiccs = self.get_available_euiccs() 193 for euicc in euiccs: 194 euicc_obj = self.get_euicc(euicc) 195 if euicc_obj.get_profile_from_iccid(iccid) != None: 196 return euicc_obj, euicc.get_profile_from_iccid 197 return None 198 199 def set_debug_logging(self): 200 self.manager.SetLogging('DEBUG') 201 202 def get_profile_iccid(self, profile_path): 203 profile_proxy = ProfileProxy(self._bus, profile_path) 204 props = profile_proxy.properties() 205 return props.get('Iccid') 206 207# End of Manager class 208 209class ProfileProxy(object): 210 """A wrapper around a DBus proxy for Hermes profile object.""" 211 212 # Amount of time to wait for a state transition. 213 STATE_TRANSITION_WAIT_SECONDS = 10 214 215 def __init__(self, bus, path): 216 self._bus = bus 217 self._path = path 218 self._profile = self._bus.get_object( 219 hermes_constants.HERMES_SERVICE, path) 220 221 def enable(self): 222 """ Enables a profile """ 223 profile_interface = dbus.Interface( 224 self.profile, hermes_constants.HERMES_PROFILE_IFACE) 225 logging.debug('ProfileProxy Manager enable_profile') 226 return profile_interface.Enable( 227 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 228 229 def disable(self): 230 """ Disables a profile """ 231 profile_interface = dbus.Interface( 232 self.profile, hermes_constants.HERMES_PROFILE_IFACE) 233 logging.debug('ProfileProxy Manager disable_profile') 234 return profile_interface.Disable( 235 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 236 237 @property 238 def profile(self): 239 """@return the DBus profiles object.""" 240 return self._profile 241 242 @property 243 def path(self): 244 """@return profile path.""" 245 return self._path 246 247 @property 248 def iface_properties(self): 249 """@return org.freedesktop.DBus.Properties DBus interface.""" 250 return dbus.Interface(self._profile, dbus.PROPERTIES_IFACE) 251 252 def iface_profile(self): 253 """@return org.freedesktop.HermesManager.Profile DBus interface.""" 254 return dbus.Interface(self._profile, 255 hermes_constants.HERMES_PROFILE_IFACE) 256 257 def properties(self, iface=hermes_constants.HERMES_PROFILE_IFACE): 258 """Return the properties associated with the specified interface. 259 @param iface: Name of interface to retrieve the properties from. 260 @return array of properties. 261 """ 262 return self.iface_properties.GetAll(iface) 263 264 # Get functions for each property from properties 265 #"Iccid", "ServiceProvider", "MccMnc", "ActivationCode", "State" 266 #"ProfileClass", "Name", "Nickname" 267 @property 268 def iccid(self): 269 """ @return iccid of profile also confirmation code """ 270 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 271 return props.get('Iccid') 272 273 @property 274 def serviceprovider(self): 275 """ @return serviceprovider of profile """ 276 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 277 return props.get('ServiceProvider') 278 279 @property 280 def mccmnc(self): 281 """ @return mccmnc of profile """ 282 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 283 return props.get('MccMnc') 284 285 @property 286 def activationcode(self): 287 """ @return activationcode of profile """ 288 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 289 return props.get('ActivationCode') 290 291 @property 292 def state(self): 293 """ @return state of profile """ 294 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 295 return props.get('State') 296 297 @property 298 def profileclass(self): 299 """ @return profileclass of profile """ 300 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 301 return props.get('ProfileClass') 302 303 @property 304 def name(self): 305 """ @return name of profile """ 306 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 307 return props.get('Name') 308 309 @property 310 def nickname(self): 311 """ @return nickname of profile """ 312 props = self.properties(hermes_constants.HERMES_PROFILE_IFACE) 313 return props.get('Nickname') 314 315class EuiccProxy(object): 316 """A wrapper around a DBus proxy for Hermes euicc object.""" 317 318 def __init__(self, bus, path): 319 self._bus = bus 320 self._euicc = self._bus.get_object( 321 hermes_constants.HERMES_SERVICE, path) 322 323 @property 324 def euicc(self): 325 """@return the DBus Euicc object.""" 326 return self._euicc 327 328 @property 329 def iface_properties(self): 330 """@return org.freedesktop.DBus.Properties DBus interface.""" 331 return dbus.Interface(self._euicc, dbus.PROPERTIES_IFACE) 332 333 @property 334 def iface_euicc(self): 335 """@return org.freedesktop.HermesManager.Euicc DBus interface.""" 336 return dbus.Interface(self._euicc, hermes_constants.HERMES_EUICC_IFACE) 337 338 def properties(self, iface=hermes_constants.HERMES_EUICC_IFACE): 339 """ 340 Return the properties associated with the specified interface. 341 342 @param iface: Name of interface to retrieve the properties from. 343 @return array of properties. 344 345 """ 346 return self.iface_properties.GetAll(iface) 347 348 def request_installed_profiles(self): 349 """Refreshes/Loads current euicc object profiles. 350 """ 351 self.iface_euicc.RequestInstalledProfiles( 352 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 353 354 def request_pending_profiles(self, root_smds): 355 """Refreshes/Loads current euicc object pending profiles. 356 @return profile objects 357 """ 358 logging.debug( 359 'Request pending profile call here for %s bus %s', 360 self._euicc, self._bus) 361 return self.iface_euicc.RequestPendingProfiles( 362 dbus.String(root_smds), 363 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 364 365 def is_test_euicc(self): 366 """ 367 Returns if the eUICC is a test eSIM. Automatically chooses the correct 368 TLS certs to use for the eUICC 369 """ 370 try: 371 logging.info('Calling Euicc.IsTestEuicc') 372 return self.iface_euicc.IsTestEuicc() 373 except dbus.DBusException as e: 374 logging.error('IsTestEuicc failed with error: %s', e) 375 376 def use_test_certs(self, is_test_certs): 377 """ 378 Sets Hermes daemon to test mode, required to run autotests 379 380 Set to true if downloading profiles from an SMDX with a test 381 certificate. This method used to download profiles to an esim from a 382 test CI. 383 384 @param is_test_certs boolean to set true or false 385 386 """ 387 try: 388 logging.info('Hermes call UseTestCerts') 389 self.iface_euicc.UseTestCerts(dbus.Boolean(is_test_certs)) 390 except dbus.DBusException as e: 391 logging.error('Hermes UseTestCerts failed with error:%s', e) 392 393 def install_profile_from_activation_code(self, act_code, conf_code): 394 """ Install the profile from given act code, confirmation code """ 395 profile = self.iface_euicc.InstallProfileFromActivationCode( 396 act_code, 397 conf_code, 398 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 399 return profile 400 401 def install_pending_profile(self, profile_path, conf_code): 402 """ Install the profile from given confirmation code""" 403 profile = self.iface_euicc.InstallPendingProfile( 404 profile_path, 405 conf_code, 406 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 407 return profile 408 409 def uninstall_profile(self, profile_path): 410 """ uninstall the given profile""" 411 self.iface_euicc.UninstallProfile( 412 profile_path, 413 timeout=hermes_constants.HERMES_DBUS_METHOD_REPLY_TIMEOUT) 414 415 def get_installed_profiles(self): 416 """ 417 Return all the available profiles objects. 418 419 Every call to |get_installed_profiles| obtains a fresh DBus proxy 420 for the profiles. So, if the profiles DBus object has changed between 421 two calls to this method, the proxy returned will be for the currently 422 available profiles. 423 424 @return a dict of profiles objects. Return None if no profile is found. 425 @raise HermesManagerProxyError if any corrupted profile found. 426 427 """ 428 if self.installedprofiles is None: 429 return None 430 try: 431 profiles_dict = {} 432 for profile in self.installedprofiles: 433 profile_proxy = ProfileProxy(self._bus, profile) 434 profiles_dict[profile] = profile_proxy 435 logging.debug('Get installed profiles for current euicc') 436 return profiles_dict 437 except dbus.exceptions.DBusException as e: 438 if _is_unknown_dbus_binding_exception(e): 439 return None 440 raise HermesManagerProxyError( 441 'Failed to obtain dbus object for the profiles. DBus error: ' 442 '|%s|', repr(e)) 443 444 def get_profile_from_iccid(self, iccid): 445 """@return profile object having given iccid or none if not found""" 446 profiles = self.installedprofiles 447 for profile in profiles: 448 profile_proxy = ProfileProxy(self._bus, profile) 449 props = profile_proxy.properties() 450 if props.get('Iccid') == iccid: 451 return profile_proxy 452 return None 453 454 def get_pending_profiles(self): 455 """ 456 Read all pending profiles of current euicc and create & return dict of 457 all pending profiles 458 459 @return dictionary of pending profiles proxy dbus objects 460 461 """ 462 try: 463 logging.debug('Hermes euicc getting pending profiles') 464 465 if self.pendingprofiles is None: 466 return None 467 468 profiles_dict = {} 469 # Read & Create each profile object and add to dictionary 470 for profile in self.pendingprofiles: 471 profile_proxy = ProfileProxy(self._bus, profile) 472 profiles_dict[profile] = profile_proxy 473 logging.debug('Hermes euicc pending profile: %s', profile) 474 return profiles_dict 475 except dbus.exceptions.DBusException as e: 476 if _is_unknown_dbus_binding_exception(e): 477 return None 478 raise HermesManagerProxyError( 479 'Failed to obtain dbus object for the profiles. DBus error: ' 480 '|%s|', repr(e)) 481 482 @property 483 def get_eid(self): 484 """@return Eid string property of euicc""" 485 props = self.properties() 486 return props.get('Eid') 487 488 @property 489 def installedprofiles(self): 490 """@return the installedprofiles ao property of euicc""" 491 props = self.properties() 492 return props.get('InstalledProfiles') 493 494 @property 495 def isactive(self): 496 """@return the isactive property of euicc""" 497 props = self.properties() 498 return props.get('IsActive') 499 500 @property 501 def pendingprofiles(self): 502 """@return the pendingprofiles ao property of euicc""" 503 props = self.properties() 504 return props.get('PendingProfiles') 505