xref: /aosp_15_r20/external/autotest/client/cros/enterprise/policy_manager.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Liimport json
2*9c5db199SXin Liimport logging
3*9c5db199SXin Li
4*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
5*9c5db199SXin Lifrom autotest_lib.client.cros.enterprise import enterprise_policy_utils
6*9c5db199SXin Lifrom autotest_lib.client.cros.enterprise.policy_group import AllPolicies
7*9c5db199SXin Li
8*9c5db199SXin Li
9*9c5db199SXin LiCHROMEPOLICIES = 'chromePolicies'
10*9c5db199SXin LiDEVICELOCALACCOUNT = 'deviceLocalAccountPolicies'
11*9c5db199SXin LiEXTENSIONPOLICIES = 'extensionPolicies'
12*9c5db199SXin Li
13*9c5db199SXin LiCHROMEOS_CLOUDDPC = {
14*9c5db199SXin Li    'AudioCaptureAllowed': 'unmuteMicrophoneDisabled',
15*9c5db199SXin Li    'DefaultGeolocationSetting': 'shareLocationDisabled',
16*9c5db199SXin Li    'DeviceBlockDevmode': 'debuggingFeaturesDisabled',
17*9c5db199SXin Li    'DisableScreenshots': 'screenCaptureDisabled',
18*9c5db199SXin Li    'ExternalStorageDisabled': 'usbFileTransferDisabled',
19*9c5db199SXin Li    'VideoCaptureAllowed': 'cameraDisabled',
20*9c5db199SXin Li}
21*9c5db199SXin Li
22*9c5db199SXin Li
23*9c5db199SXin Liclass Policy_Manager(object):
24*9c5db199SXin Li
25*9c5db199SXin Li    def __init__(self, username=None, fake_dm_server=None):
26*9c5db199SXin Li        """
27*9c5db199SXin Li        This class is to hanlde:
28*9c5db199SXin Li            Setting policies to a Fake DM server
29*9c5db199SXin Li            Obtaining policies from a DUT
30*9c5db199SXin Li            Obtaining clouddpc policy settings.
31*9c5db199SXin Li            Obtinaing both set/received policies in many different data formats
32*9c5db199SXin Li            Verifying policies provided to the DUT are correct.
33*9c5db199SXin Li
34*9c5db199SXin Li        It has been designed so all the features are independent, meaning a
35*9c5db199SXin Li        fake_dm_server is not required to obtain policies from the DUT, get
36*9c5db199SXin Li        clouddpc values, etc.
37*9c5db199SXin Li
38*9c5db199SXin Li        @params username: username to be used when creating the fake DM policy.
39*9c5db199SXin Li        @params fake_dm_server: The fake DM server object.
40*9c5db199SXin Li
41*9c5db199SXin Li        """
42*9c5db199SXin Li        self._configured = AllPolicies(True)
43*9c5db199SXin Li        self._obtained = AllPolicies()
44*9c5db199SXin Li        self.CHROME = 'chrome'
45*9c5db199SXin Li        self.LOCAL = 'local'
46*9c5db199SXin Li        self.EXT = 'ext'
47*9c5db199SXin Li        self.username = username
48*9c5db199SXin Li        self.fake_dm_server = fake_dm_server
49*9c5db199SXin Li        self.autotest_ext = None
50*9c5db199SXin Li
51*9c5db199SXin Li        # If a fake_dm_sever is provided, enabled auto_update by default.
52*9c5db199SXin Li        # Can be turned off if desired.
53*9c5db199SXin Li        self._auto_updateDM = bool(fake_dm_server)
54*9c5db199SXin Li
55*9c5db199SXin Li    def configure_policies(self,
56*9c5db199SXin Li                           user={},
57*9c5db199SXin Li                           suggested_user={},
58*9c5db199SXin Li                           device={},
59*9c5db199SXin Li                           extension={},
60*9c5db199SXin Li                           new=True):
61*9c5db199SXin Li        """
62*9c5db199SXin Li        Used to configure desired policies on the DUT. Will also save the
63*9c5db199SXin Li        configured policies.
64*9c5db199SXin Li
65*9c5db199SXin Li        If a fake_dm_server was provided on class initialization, and
66*9c5db199SXin Li        auto_updateDM is not False, this will also update the fake_dm_server.
67*9c5db199SXin Li
68*9c5db199SXin Li        @params user/suggested_user/device/extension: dict of the policies to
69*9c5db199SXin Li            be set. extension must be provided with the extension id. e.g.
70*9c5db199SXin Li            {'ExtensionID': {policy_dict}}
71*9c5db199SXin Li        @param new: bool, if True clear all previously configured policies.
72*9c5db199SXin Li
73*9c5db199SXin Li        """
74*9c5db199SXin Li        if new:
75*9c5db199SXin Li            self._configured = AllPolicies(True)
76*9c5db199SXin Li
77*9c5db199SXin Li        self._configured.set_policy('chrome', user, 'user')
78*9c5db199SXin Li        self._configured.set_policy('chrome', suggested_user, 'suggested_user')
79*9c5db199SXin Li        self._configured.set_policy('chrome', device, 'device')
80*9c5db199SXin Li
81*9c5db199SXin Li        self._configured.set_extension_policy(extension)
82*9c5db199SXin Li
83*9c5db199SXin Li        if self.auto_updateDM:
84*9c5db199SXin Li            self.updateDMServer()
85*9c5db199SXin Li
86*9c5db199SXin Li    def configure_extension_visual_policy(self,
87*9c5db199SXin Li                                          ext_policy={},
88*9c5db199SXin Li                                          new=True):
89*9c5db199SXin Li        """
90*9c5db199SXin Li        Extensions are... different. The policy set for them often is not the
91*9c5db199SXin Li        actual policy, but a pointer to where the policy data is. This makes
92*9c5db199SXin Li        verifying the extension policy tricky.
93*9c5db199SXin Li
94*9c5db199SXin Li        To help with, this function will allow you to set the 'real' policy.
95*9c5db199SXin Li        Important things to note: This is only useful for verifying the policy,
96*9c5db199SXin Li        and getting the policy as a dictionary (which has a flag for which
97*9c5db199SXin Li        style of extension policies you want to see. The DM server will not be
98*9c5db199SXin Li        set via this.
99*9c5db199SXin Li
100*9c5db199SXin Li        @param ext_policy: dict, Extension policy must be provided with the
101*9c5db199SXin Li            extension id:
102*9c5db199SXin Li                {'ExtensionID': {policy_dict}}.
103*9c5db199SXin Li        @param new: bool, if True will erase any previously stored VISUAL
104*9c5db199SXin Li            extension data.
105*9c5db199SXin Li
106*9c5db199SXin Li        """
107*9c5db199SXin Li        if new:
108*9c5db199SXin Li            self._configured.ext_values = {}
109*9c5db199SXin Li
110*9c5db199SXin Li        self._configured.set_extension_policy(ext_policy, True)
111*9c5db199SXin Li
112*9c5db199SXin Li    def remove_policy(self,
113*9c5db199SXin Li                      policy_name,
114*9c5db199SXin Li                      policy_type,
115*9c5db199SXin Li                      extID=None):
116*9c5db199SXin Li        """
117*9c5db199SXin Li        Removes the policy from the configured policies. Useful when you want
118*9c5db199SXin Li        clear a specific policy, but leave the other policies untouched.
119*9c5db199SXin Li        If auto_updateDM is True (thus a fake_dm_server has been provided), the
120*9c5db199SXin Li        dm server will be updated.
121*9c5db199SXin Li
122*9c5db199SXin Li        @param policy_name: The policy name
123*9c5db199SXin Li        @param policy_type: The type of policy it is. Valid types:
124*9c5db199SXin Li            "user", "device", "extension", "suggested_user".
125*9c5db199SXin Li        @param extID: The extension ID, if removing an extension policy.
126*9c5db199SXin Li
127*9c5db199SXin Li        """
128*9c5db199SXin Li        if policy_type != 'extension':
129*9c5db199SXin Li            self._removeChromePolicy(policy_name)
130*9c5db199SXin Li        else:
131*9c5db199SXin Li            self._removeExtensionPolicy(policy_name, extID)
132*9c5db199SXin Li
133*9c5db199SXin Li        if self.auto_updateDM:
134*9c5db199SXin Li            self.updateDMServer()
135*9c5db199SXin Li
136*9c5db199SXin Li    def _removeChromePolicy(self, policy_name):
137*9c5db199SXin Li        """
138*9c5db199SXin Li        Attempts to remove the specified extension policy from specified
139*9c5db199SXin Li        extension.
140*9c5db199SXin Li
141*9c5db199SXin Li        @rasies error.TestError: If the policy is not in the configured
142*9c5db199SXin Li            policies.
143*9c5db199SXin Li
144*9c5db199SXin Li        """
145*9c5db199SXin Li        try:
146*9c5db199SXin Li            del self._configured.chrome[policy_name]
147*9c5db199SXin Li        except KeyError:
148*9c5db199SXin Li            raise error.TestError('Policy {} missing from chrome policies.'
149*9c5db199SXin Li                                  .format(policy_name))
150*9c5db199SXin Li
151*9c5db199SXin Li    def _removeExtensionPolicy(self, policy_name, extID):
152*9c5db199SXin Li        """
153*9c5db199SXin Li        Attempts to remove the specified extension policy from specified
154*9c5db199SXin Li        extension.
155*9c5db199SXin Li
156*9c5db199SXin Li        @raises error.TestError: if the policy_type is an 'extension', but the
157*9c5db199SXin Li            extID is not provided, or the policy is not found in the extension.
158*9c5db199SXin Li
159*9c5db199SXin Li        """
160*9c5db199SXin Li        if not extID:
161*9c5db199SXin Li            raise error.TestError(
162*9c5db199SXin Li                'Cannot delete extension policy without extension ID')
163*9c5db199SXin Li        try:
164*9c5db199SXin Li            del self._configured.extension_configured_data[extID][policy_name]
165*9c5db199SXin Li        except KeyError:
166*9c5db199SXin Li            raise error.TestError(
167*9c5db199SXin Li                'Policy {} missing from extension policies.'
168*9c5db199SXin Li                .format(policy_name))
169*9c5db199SXin Li
170*9c5db199SXin Li    def obtain_policies_from_device(self, autotest_ext=None):
171*9c5db199SXin Li        """
172*9c5db199SXin Li        Calls the autotest private getAllEnterprisePolicies() API, and saves
173*9c5db199SXin Li        the response.
174*9c5db199SXin Li
175*9c5db199SXin Li        @param autotest_ext: The autotest browser extension.
176*9c5db199SXin Li
177*9c5db199SXin Li        """
178*9c5db199SXin Li        if autotest_ext:
179*9c5db199SXin Li            self.autotest_ext = autotest_ext
180*9c5db199SXin Li        if not self.autotest_ext:
181*9c5db199SXin Li            raise error.TestError('Cannot obtain policies without autotest_ext')
182*9c5db199SXin Li        self.raw_data = enterprise_policy_utils.get_all_policies(
183*9c5db199SXin Li            self.autotest_ext)
184*9c5db199SXin Li        self._obtained.set_policy(self.CHROME, self.raw_data[CHROMEPOLICIES])
185*9c5db199SXin Li        self._obtained.set_policy(self.LOCAL, self.raw_data[DEVICELOCALACCOUNT])
186*9c5db199SXin Li        self._obtained.set_extension_policy(self.raw_data[EXTENSIONPOLICIES])
187*9c5db199SXin Li
188*9c5db199SXin Li    def verify_policy(self, policyName, policy_value, extID=None):
189*9c5db199SXin Li        """Verifies the configured policies are == to the policies obtained."""
190*9c5db199SXin Li        recieved_value = self.get_policy_value_from_DUT(policyName=policyName,
191*9c5db199SXin Li                                                        extID=extID,
192*9c5db199SXin Li                                                        refresh=True)
193*9c5db199SXin Li        if not recieved_value == policy_value:
194*9c5db199SXin Li            raise error.TestError(
195*9c5db199SXin Li                'Policy {} value was not set correctly. \nExpected:\t {}'
196*9c5db199SXin Li                '\nReceived: \t'.format(policyName,
197*9c5db199SXin Li                                        policy_value,
198*9c5db199SXin Li                                        recieved_value))
199*9c5db199SXin Li        logging.info('Policy verification successful')
200*9c5db199SXin Li
201*9c5db199SXin Li    def verify_policies(self):
202*9c5db199SXin Li        """Verifies the configured policies are == to the policies obtained."""
203*9c5db199SXin Li        if not self._configured == self._obtained:
204*9c5db199SXin Li            raise error.TestError(
205*9c5db199SXin Li                'Configured policies did not match policies received from DUT.')
206*9c5db199SXin Li        logging.info('Policy verification successful')
207*9c5db199SXin Li
208*9c5db199SXin Li    def get_policy_value_from_DUT(self, policyName, extID=None, refresh=False):
209*9c5db199SXin Li        """
210*9c5db199SXin Li        Get the value of a specified policy from the DUT. If the policy is from
211*9c5db199SXin Li        an extension, an extension ID (extID) must be provided.
212*9c5db199SXin Li
213*9c5db199SXin Li        @param policyName: str, the name of the policy.
214*9c5db199SXin Li        @param extID: The ID of the extension.
215*9c5db199SXin Li        @param refresh: bool, if you want to get the policies from the DUT again
216*9c5db199SXin Li            Note: This does NOT force the DUT to re-obtain the policies from the
217*9c5db199SXin Li            DM server.
218*9c5db199SXin Li
219*9c5db199SXin Li        @returns: The value of the policy if found, else None.
220*9c5db199SXin Li        """
221*9c5db199SXin Li        if refresh:
222*9c5db199SXin Li            # Uses the previously provided autotest Extension.
223*9c5db199SXin Li            self.obtain_policies_from_device()
224*9c5db199SXin Li
225*9c5db199SXin Li        if extID:
226*9c5db199SXin Li            if policyName in self._obtained.extension_configured_data[extID]:
227*9c5db199SXin Li                return (
228*9c5db199SXin Li                    self._obtained.extension_configured_data[extID][policyName]
229*9c5db199SXin Li                        .value)
230*9c5db199SXin Li        elif policyName in self._obtained.chrome:
231*9c5db199SXin Li            return self._obtained.chrome[policyName].value
232*9c5db199SXin Li        return None
233*9c5db199SXin Li
234*9c5db199SXin Li    def updateDMServer(self):
235*9c5db199SXin Li        """Updates the Fake DM server with the current configured policy."""
236*9c5db199SXin Li        fake_dm_json = self.getDMConfig()
237*9c5db199SXin Li        logging.info('Policy blob {}'.format(fake_dm_json))
238*9c5db199SXin Li        if not self.fake_dm_server:
239*9c5db199SXin Li            raise error.TestError(
240*9c5db199SXin Li                'Cannot update DM server. DM server not provided')
241*9c5db199SXin Li        self.fake_dm_server.setup_policy(fake_dm_json)
242*9c5db199SXin Li
243*9c5db199SXin Li    def getDMConfig(self, refresh=True):
244*9c5db199SXin Li        """"
245*9c5db199SXin Li        Creates the DM configuration (aka Json blob) to be used by the
246*9c5db199SXin Li        fake DM server
247*9c5db199SXin Li
248*9c5db199SXin Li        @param refresh: bool, if True, will clear any previous configuration.
249*9c5db199SXin Li            If False, return the currently configured DM blob.
250*9c5db199SXin Li
251*9c5db199SXin Li        """
252*9c5db199SXin Li        if refresh:
253*9c5db199SXin Li            self._configured.createNewFakeDMServerJson()
254*9c5db199SXin Li        self._configured.updateDMJson()
255*9c5db199SXin Li        self._configured._DMJSON['policy_user'] = self.username
256*9c5db199SXin Li        return json.dumps(self._configured._DMJSON)
257*9c5db199SXin Li
258*9c5db199SXin Li    def getCloudDpc(self):
259*9c5db199SXin Li        """Gets the Cloud DPC ARC policy settings."""
260*9c5db199SXin Li        expected_cloud_dpc_settings = {}
261*9c5db199SXin Li        if self._arc_certs():
262*9c5db199SXin Li            self._add_arc_certs(expected_cloud_dpc_settings)
263*9c5db199SXin Li        self._add_shared_policies(expected_cloud_dpc_settings)
264*9c5db199SXin Li        self._add_shared_arc_policy(expected_cloud_dpc_settings)
265*9c5db199SXin Li
266*9c5db199SXin Li        return expected_cloud_dpc_settings
267*9c5db199SXin Li
268*9c5db199SXin Li    def _arc_certs(self):
269*9c5db199SXin Li        """
270*9c5db199SXin Li        Returns True if ArcCertificatesSyncMode is set in the configured
271*9c5db199SXin Li        policies and the bool(value) is True, else False.
272*9c5db199SXin Li
273*9c5db199SXin Li        """
274*9c5db199SXin Li        if ('ArcCertificatesSyncMode' in self._configured.chrome and
275*9c5db199SXin Li                self._configured.chrome['ArcCertificatesSyncMode'].value):
276*9c5db199SXin Li            return True
277*9c5db199SXin Li        return False
278*9c5db199SXin Li
279*9c5db199SXin Li    def _add_shared_arc_policy(self, dpc):
280*9c5db199SXin Li        """Adds the shared policies that are subset within the 'ArcPolicy'."""
281*9c5db199SXin Li        Arc_Policy = self._configured.chrome.get('ArcPolicy', {})
282*9c5db199SXin Li        if Arc_Policy:
283*9c5db199SXin Li            Arc_Policy = Arc_Policy.value
284*9c5db199SXin Li
285*9c5db199SXin Li        for key in ['applications', 'accountTypesWithManagementDisabled']:
286*9c5db199SXin Li            if key in Arc_Policy:
287*9c5db199SXin Li                dpc[key] = Arc_Policy[key]
288*9c5db199SXin Li
289*9c5db199SXin Li    def _add_shared_policies(self, dpc):
290*9c5db199SXin Li        """
291*9c5db199SXin Li        Add all of the configured policies that are shared with arc clouddpc,
292*9c5db199SXin Li        to the "dpc" dict, with the clouddpc key. If the policy is not set, it
293*9c5db199SXin Li        will not be added.
294*9c5db199SXin Li
295*9c5db199SXin Li        """
296*9c5db199SXin Li        for policy_name, dpc_name in CHROMEOS_CLOUDDPC.items():
297*9c5db199SXin Li            if policy_name in self._configured.chrome:
298*9c5db199SXin Li                dpc[dpc_name] = self._configured.chrome[policy_name].value
299*9c5db199SXin Li
300*9c5db199SXin Li    def _add_arc_certs(self, dpc):
301*9c5db199SXin Li        open_network_config = 'OpenNetworkConfiguration'
302*9c5db199SXin Li        if open_network_config in self._configured.chrome:
303*9c5db199SXin Li            dpc['caCerts'] = self._configured.chrome[open_network_config].value
304*9c5db199SXin Li        else:
305*9c5db199SXin Li            dpc['caCerts'] = None
306*9c5db199SXin Li
307*9c5db199SXin Li    def get_configured_policies_as_dict(self, visual=False):
308*9c5db199SXin Li        """ Returns the configured policies as a dict."""
309*9c5db199SXin Li        return self._configured.get_policy_as_dict(visual)
310*9c5db199SXin Li
311*9c5db199SXin Li    def get_obtained_policies_as_dict(self):
312*9c5db199SXin Li        """ Returns the obtained policies as a dict."""
313*9c5db199SXin Li        return self._obtained.get_policy_as_dict(visual=True)
314*9c5db199SXin Li
315*9c5db199SXin Li    @property
316*9c5db199SXin Li    def auto_updateDM(self):
317*9c5db199SXin Li        """ Returns the current state of the auto_updateDM setting."""
318*9c5db199SXin Li        return self._auto_updateDM
319*9c5db199SXin Li
320*9c5db199SXin Li    @auto_updateDM.setter
321*9c5db199SXin Li    def auto_updateDM(self, value):
322*9c5db199SXin Li        """Turns on/off auto updating of the DM server."""
323*9c5db199SXin Li        if not isinstance(value, bool):
324*9c5db199SXin Li            raise error.TestError('Auto Update DM must be bool, got {}'
325*9c5db199SXin Li                                   .format(value))
326*9c5db199SXin Li        if value and not self.fake_dm_server:
327*9c5db199SXin Li            raise error.TestError(
328*9c5db199SXin Li                'Cannot autoupdate without the Fake DM server configured.')
329*9c5db199SXin Li        self._auto_updateDM = value
330