import json import logging from autotest_lib.client.common_lib import error from autotest_lib.client.cros.enterprise import enterprise_policy_utils from autotest_lib.client.cros.enterprise.policy_group import AllPolicies CHROMEPOLICIES = 'chromePolicies' DEVICELOCALACCOUNT = 'deviceLocalAccountPolicies' EXTENSIONPOLICIES = 'extensionPolicies' CHROMEOS_CLOUDDPC = { 'AudioCaptureAllowed': 'unmuteMicrophoneDisabled', 'DefaultGeolocationSetting': 'shareLocationDisabled', 'DeviceBlockDevmode': 'debuggingFeaturesDisabled', 'DisableScreenshots': 'screenCaptureDisabled', 'ExternalStorageDisabled': 'usbFileTransferDisabled', 'VideoCaptureAllowed': 'cameraDisabled', } class Policy_Manager(object): def __init__(self, username=None, fake_dm_server=None): """ This class is to hanlde: Setting policies to a Fake DM server Obtaining policies from a DUT Obtaining clouddpc policy settings. Obtinaing both set/received policies in many different data formats Verifying policies provided to the DUT are correct. It has been designed so all the features are independent, meaning a fake_dm_server is not required to obtain policies from the DUT, get clouddpc values, etc. @params username: username to be used when creating the fake DM policy. @params fake_dm_server: The fake DM server object. """ self._configured = AllPolicies(True) self._obtained = AllPolicies() self.CHROME = 'chrome' self.LOCAL = 'local' self.EXT = 'ext' self.username = username self.fake_dm_server = fake_dm_server self.autotest_ext = None # If a fake_dm_sever is provided, enabled auto_update by default. # Can be turned off if desired. self._auto_updateDM = bool(fake_dm_server) def configure_policies(self, user={}, suggested_user={}, device={}, extension={}, new=True): """ Used to configure desired policies on the DUT. Will also save the configured policies. If a fake_dm_server was provided on class initialization, and auto_updateDM is not False, this will also update the fake_dm_server. @params user/suggested_user/device/extension: dict of the policies to be set. extension must be provided with the extension id. e.g. {'ExtensionID': {policy_dict}} @param new: bool, if True clear all previously configured policies. """ if new: self._configured = AllPolicies(True) self._configured.set_policy('chrome', user, 'user') self._configured.set_policy('chrome', suggested_user, 'suggested_user') self._configured.set_policy('chrome', device, 'device') self._configured.set_extension_policy(extension) if self.auto_updateDM: self.updateDMServer() def configure_extension_visual_policy(self, ext_policy={}, new=True): """ Extensions are... different. The policy set for them often is not the actual policy, but a pointer to where the policy data is. This makes verifying the extension policy tricky. To help with, this function will allow you to set the 'real' policy. Important things to note: This is only useful for verifying the policy, and getting the policy as a dictionary (which has a flag for which style of extension policies you want to see. The DM server will not be set via this. @param ext_policy: dict, Extension policy must be provided with the extension id: {'ExtensionID': {policy_dict}}. @param new: bool, if True will erase any previously stored VISUAL extension data. """ if new: self._configured.ext_values = {} self._configured.set_extension_policy(ext_policy, True) def remove_policy(self, policy_name, policy_type, extID=None): """ Removes the policy from the configured policies. Useful when you want clear a specific policy, but leave the other policies untouched. If auto_updateDM is True (thus a fake_dm_server has been provided), the dm server will be updated. @param policy_name: The policy name @param policy_type: The type of policy it is. Valid types: "user", "device", "extension", "suggested_user". @param extID: The extension ID, if removing an extension policy. """ if policy_type != 'extension': self._removeChromePolicy(policy_name) else: self._removeExtensionPolicy(policy_name, extID) if self.auto_updateDM: self.updateDMServer() def _removeChromePolicy(self, policy_name): """ Attempts to remove the specified extension policy from specified extension. @rasies error.TestError: If the policy is not in the configured policies. """ try: del self._configured.chrome[policy_name] except KeyError: raise error.TestError('Policy {} missing from chrome policies.' .format(policy_name)) def _removeExtensionPolicy(self, policy_name, extID): """ Attempts to remove the specified extension policy from specified extension. @raises error.TestError: if the policy_type is an 'extension', but the extID is not provided, or the policy is not found in the extension. """ if not extID: raise error.TestError( 'Cannot delete extension policy without extension ID') try: del self._configured.extension_configured_data[extID][policy_name] except KeyError: raise error.TestError( 'Policy {} missing from extension policies.' .format(policy_name)) def obtain_policies_from_device(self, autotest_ext=None): """ Calls the autotest private getAllEnterprisePolicies() API, and saves the response. @param autotest_ext: The autotest browser extension. """ if autotest_ext: self.autotest_ext = autotest_ext if not self.autotest_ext: raise error.TestError('Cannot obtain policies without autotest_ext') self.raw_data = enterprise_policy_utils.get_all_policies( self.autotest_ext) self._obtained.set_policy(self.CHROME, self.raw_data[CHROMEPOLICIES]) self._obtained.set_policy(self.LOCAL, self.raw_data[DEVICELOCALACCOUNT]) self._obtained.set_extension_policy(self.raw_data[EXTENSIONPOLICIES]) def verify_policy(self, policyName, policy_value, extID=None): """Verifies the configured policies are == to the policies obtained.""" recieved_value = self.get_policy_value_from_DUT(policyName=policyName, extID=extID, refresh=True) if not recieved_value == policy_value: raise error.TestError( 'Policy {} value was not set correctly. \nExpected:\t {}' '\nReceived: \t'.format(policyName, policy_value, recieved_value)) logging.info('Policy verification successful') def verify_policies(self): """Verifies the configured policies are == to the policies obtained.""" if not self._configured == self._obtained: raise error.TestError( 'Configured policies did not match policies received from DUT.') logging.info('Policy verification successful') def get_policy_value_from_DUT(self, policyName, extID=None, refresh=False): """ Get the value of a specified policy from the DUT. If the policy is from an extension, an extension ID (extID) must be provided. @param policyName: str, the name of the policy. @param extID: The ID of the extension. @param refresh: bool, if you want to get the policies from the DUT again Note: This does NOT force the DUT to re-obtain the policies from the DM server. @returns: The value of the policy if found, else None. """ if refresh: # Uses the previously provided autotest Extension. self.obtain_policies_from_device() if extID: if policyName in self._obtained.extension_configured_data[extID]: return ( self._obtained.extension_configured_data[extID][policyName] .value) elif policyName in self._obtained.chrome: return self._obtained.chrome[policyName].value return None def updateDMServer(self): """Updates the Fake DM server with the current configured policy.""" fake_dm_json = self.getDMConfig() logging.info('Policy blob {}'.format(fake_dm_json)) if not self.fake_dm_server: raise error.TestError( 'Cannot update DM server. DM server not provided') self.fake_dm_server.setup_policy(fake_dm_json) def getDMConfig(self, refresh=True): """" Creates the DM configuration (aka Json blob) to be used by the fake DM server @param refresh: bool, if True, will clear any previous configuration. If False, return the currently configured DM blob. """ if refresh: self._configured.createNewFakeDMServerJson() self._configured.updateDMJson() self._configured._DMJSON['policy_user'] = self.username return json.dumps(self._configured._DMJSON) def getCloudDpc(self): """Gets the Cloud DPC ARC policy settings.""" expected_cloud_dpc_settings = {} if self._arc_certs(): self._add_arc_certs(expected_cloud_dpc_settings) self._add_shared_policies(expected_cloud_dpc_settings) self._add_shared_arc_policy(expected_cloud_dpc_settings) return expected_cloud_dpc_settings def _arc_certs(self): """ Returns True if ArcCertificatesSyncMode is set in the configured policies and the bool(value) is True, else False. """ if ('ArcCertificatesSyncMode' in self._configured.chrome and self._configured.chrome['ArcCertificatesSyncMode'].value): return True return False def _add_shared_arc_policy(self, dpc): """Adds the shared policies that are subset within the 'ArcPolicy'.""" Arc_Policy = self._configured.chrome.get('ArcPolicy', {}) if Arc_Policy: Arc_Policy = Arc_Policy.value for key in ['applications', 'accountTypesWithManagementDisabled']: if key in Arc_Policy: dpc[key] = Arc_Policy[key] def _add_shared_policies(self, dpc): """ Add all of the configured policies that are shared with arc clouddpc, to the "dpc" dict, with the clouddpc key. If the policy is not set, it will not be added. """ for policy_name, dpc_name in CHROMEOS_CLOUDDPC.items(): if policy_name in self._configured.chrome: dpc[dpc_name] = self._configured.chrome[policy_name].value def _add_arc_certs(self, dpc): open_network_config = 'OpenNetworkConfiguration' if open_network_config in self._configured.chrome: dpc['caCerts'] = self._configured.chrome[open_network_config].value else: dpc['caCerts'] = None def get_configured_policies_as_dict(self, visual=False): """ Returns the configured policies as a dict.""" return self._configured.get_policy_as_dict(visual) def get_obtained_policies_as_dict(self): """ Returns the obtained policies as a dict.""" return self._obtained.get_policy_as_dict(visual=True) @property def auto_updateDM(self): """ Returns the current state of the auto_updateDM setting.""" return self._auto_updateDM @auto_updateDM.setter def auto_updateDM(self, value): """Turns on/off auto updating of the DM server.""" if not isinstance(value, bool): raise error.TestError('Auto Update DM must be bool, got {}' .format(value)) if value and not self.fake_dm_server: raise error.TestError( 'Cannot autoupdate without the Fake DM server configured.') self._auto_updateDM = value