xref: /aosp_15_r20/external/autotest/client/cros/enterprise/policy_group.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Copyright 2019 The Chromium OS Authors. All rights reserved.
2*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
3*9c5db199SXin Li# found in the LICENSE file.
4*9c5db199SXin Li
5*9c5db199SXin Liimport copy
6*9c5db199SXin Liimport json
7*9c5db199SXin Li
8*9c5db199SXin Lifrom autotest_lib.client.cros.enterprise.policy import Policy as Policy
9*9c5db199SXin Lifrom autotest_lib.client.cros.enterprise.device_policy_lookup import DEVICE_POLICY_DICT
10*9c5db199SXin Li
11*9c5db199SXin LiCHROMEPOLICIES = 'chromePolicies'
12*9c5db199SXin LiDEVICELOCALACCOUNT = 'deviceLocalAccountPolicies'
13*9c5db199SXin LiEXTENSIONPOLICIES = 'extensionPolicies'
14*9c5db199SXin Li
15*9c5db199SXin Li
16*9c5db199SXin Liclass AllPolicies(object):
17*9c5db199SXin Li
18*9c5db199SXin Li    def __init__(self, isConfiguredPolicies=False):
19*9c5db199SXin Li        self.extension_configured_data = {}
20*9c5db199SXin Li        # Because Extensions have a different "displayed" value than the
21*9c5db199SXin Li        # configured, the extension_displayed_values will be used when the
22*9c5db199SXin Li        # policy group is "configured" to represent what the "displayed" policy
23*9c5db199SXin Li        # values SHOULD be.
24*9c5db199SXin Li        self.extension_displayed_values = {}
25*9c5db199SXin Li        self.local = {}
26*9c5db199SXin Li        self.chrome = {}
27*9c5db199SXin Li
28*9c5db199SXin Li        self.policy_dict = {CHROMEPOLICIES: {},
29*9c5db199SXin Li                            EXTENSIONPOLICIES: {},
30*9c5db199SXin Li                            DEVICELOCALACCOUNT: {}}
31*9c5db199SXin Li        self.isConfiguredPolicies = isConfiguredPolicies
32*9c5db199SXin Li        if self.isConfiguredPolicies:
33*9c5db199SXin Li            self.createNewFakeDMServerJson()
34*9c5db199SXin Li
35*9c5db199SXin Li    def createNewFakeDMServerJson(self):
36*9c5db199SXin Li        """Creates a fresh DM blob that will be used by the Fake DM server."""
37*9c5db199SXin Li
38*9c5db199SXin Li        self._DMJSON = {
39*9c5db199SXin Li            'managed_users': ['*'],
40*9c5db199SXin Li            'policy_user': None,
41*9c5db199SXin Li            'current_key_index': 0,
42*9c5db199SXin Li            'invalidation_source': 16,
43*9c5db199SXin Li            'invalidation_name': 'test_policy',
44*9c5db199SXin Li            'google/chromeos/user': {'mandatory': {}, 'recommended': {}},
45*9c5db199SXin Li            'google/chromeos/device': {},
46*9c5db199SXin Li            'google/chrome/extension': {}
47*9c5db199SXin Li        }
48*9c5db199SXin Li        self._DM_MANDATORY = self._DMJSON['google/chromeos/user']['mandatory']
49*9c5db199SXin Li        self._DM_RECOMMENDED = (
50*9c5db199SXin Li            self._DMJSON['google/chromeos/user']['recommended'])
51*9c5db199SXin Li        self._DM_DEVICE = self._DMJSON['google/chromeos/device']
52*9c5db199SXin Li        self._DM_EXTENSION = self._DMJSON['google/chrome/extension']
53*9c5db199SXin Li
54*9c5db199SXin Li    def get_policy_as_dict(self, visual=False):
55*9c5db199SXin Li        """Returns the policies as a dictionary."""
56*9c5db199SXin Li        self._update_policy_dict(visual)
57*9c5db199SXin Li        return self.policy_dict
58*9c5db199SXin Li
59*9c5db199SXin Li    def set_extension_policy(self, policies, visual=False):
60*9c5db199SXin Li        """
61*9c5db199SXin Li        Sets the extension policies
62*9c5db199SXin Li
63*9c5db199SXin Li        @param policies: Dict formatted as follows:
64*9c5db199SXin Li            {'extID': {pol1: v1}, extid2: {p2:v2}}
65*9c5db199SXin Li
66*9c5db199SXin Li        @param visual: bool, If the extension policy provided is what should be
67*9c5db199SXin Li            displayed via the api/chrome page. If False, then the 'policies'
68*9c5db199SXin Li            should be the actual value that will be provided to the DMServer.
69*9c5db199SXin Li
70*9c5db199SXin Li        """
71*9c5db199SXin Li        # If the policy is configured (ie this policy group object represents)
72*9c5db199SXin Li        # the policies being SET for testing) add the 'policy_group' value.
73*9c5db199SXin Li        policy_group = 'extension' if self.isConfiguredPolicies else None
74*9c5db199SXin Li        extension_policies = copy.deepcopy(policies)
75*9c5db199SXin Li
76*9c5db199SXin Li        for extension_ID, extension_policy in extension_policies.items():
77*9c5db199SXin Li
78*9c5db199SXin Li            # Adding the extension ID key into the extension dict.
79*9c5db199SXin Li            if visual:
80*9c5db199SXin Li                self.extension_displayed_values[extension_ID] = {}
81*9c5db199SXin Li                key = 'displayed_ext_values'
82*9c5db199SXin Li            else:
83*9c5db199SXin Li                self.extension_configured_data[extension_ID] = {}
84*9c5db199SXin Li                key = 'ext_values'
85*9c5db199SXin Li            self.set_policy(key, extension_policy, policy_group, extension_ID)
86*9c5db199SXin Li
87*9c5db199SXin Li    def set_policy(self,
88*9c5db199SXin Li                   policy_type,
89*9c5db199SXin Li                   policies,
90*9c5db199SXin Li                   group=None,
91*9c5db199SXin Li                   extension_key=None):
92*9c5db199SXin Li        """
93*9c5db199SXin Li        Create and the policy object, and set it in the corresponding group.
94*9c5db199SXin Li
95*9c5db199SXin Li        @param policy_type: str of the policy type. Must be:
96*9c5db199SXin Li            'chrome', 'ext_values', 'displayed_ext_values', or 'local'.
97*9c5db199SXin Li        @param policies: dict of policy values.
98*9c5db199SXin Li        @param group: str, group key for the Policy object setter.
99*9c5db199SXin Li        @param extension_key: optional, key for exentsionID.
100*9c5db199SXin Li
101*9c5db199SXin Li        """
102*9c5db199SXin Li        policy_group = self._get_policy_group(policy_type, extension_key)
103*9c5db199SXin Li        for name, value in policies.items():
104*9c5db199SXin Li            policy_group[name] = self._create_pol_obj(name, value, group)
105*9c5db199SXin Li
106*9c5db199SXin Li    def _get_policy_group(self, policy_type, extension_key=None):
107*9c5db199SXin Li        """Simple lookup to put the policies in the correct bucket."""
108*9c5db199SXin Li        if policy_type == 'chrome':
109*9c5db199SXin Li            policy_group = self.chrome
110*9c5db199SXin Li        elif policy_type == 'ext_values':
111*9c5db199SXin Li            policy_group = self.extension_configured_data[extension_key]
112*9c5db199SXin Li        elif policy_type == 'displayed_ext_values':
113*9c5db199SXin Li            policy_group = self.extension_displayed_values[extension_key]
114*9c5db199SXin Li        elif policy_type == 'local':
115*9c5db199SXin Li            policy_group = self.local
116*9c5db199SXin Li        return policy_group
117*9c5db199SXin Li
118*9c5db199SXin Li    def updateDMJson(self):
119*9c5db199SXin Li        """
120*9c5db199SXin Li        Update the ._DM_JSON with the values currently set in
121*9c5db199SXin Li        self.chrome, self.extension_configured_data, and self.local.
122*9c5db199SXin Li
123*9c5db199SXin Li        """
124*9c5db199SXin Li
125*9c5db199SXin Li        self._populateChromeData()
126*9c5db199SXin Li        self._populateExtensionData()
127*9c5db199SXin Li
128*9c5db199SXin Li    def _populateChromeData(self):
129*9c5db199SXin Li        """Update the DM_JSON's chrome values."""
130*9c5db199SXin Li        for policy_name, policy_object in self.chrome.items():
131*9c5db199SXin Li            if policy_object.scope == 'machine':
132*9c5db199SXin Li                dm_name = DEVICE_POLICY_DICT[policy_name]
133*9c5db199SXin Li                self._DM_DEVICE.update(
134*9c5db199SXin Li                    self._clean_pol({dm_name: policy_object.value}))
135*9c5db199SXin Li
136*9c5db199SXin Li            elif policy_object.level == 'recommended':
137*9c5db199SXin Li                self._DM_RECOMMENDED.update(
138*9c5db199SXin Li                    self._clean_pol({policy_name: policy_object.value}))
139*9c5db199SXin Li
140*9c5db199SXin Li            elif (policy_object.level == 'mandatory' and
141*9c5db199SXin Li                    policy_object.scope == 'user'):
142*9c5db199SXin Li                self._DM_MANDATORY.update(
143*9c5db199SXin Li                    self._clean_pol({policy_name: policy_object.value}))
144*9c5db199SXin Li
145*9c5db199SXin Li    def _populateExtensionData(self):
146*9c5db199SXin Li        """Updates the DM_JSON's extension values."""
147*9c5db199SXin Li        for extension, ext_pol in self.extension_configured_data.items():
148*9c5db199SXin Li            extension_policies = {}
149*9c5db199SXin Li            for polname, polItem in ext_pol.items():
150*9c5db199SXin Li                extension_policies[polname] = polItem.value
151*9c5db199SXin Li            self._DM_EXTENSION.update({extension: extension_policies})
152*9c5db199SXin Li
153*9c5db199SXin Li    def _clean_pol(self, policies):
154*9c5db199SXin Li        """Cleans the policies to be set on the fake DM server."""
155*9c5db199SXin Li        cleaned = {}
156*9c5db199SXin Li        for policy, value in policies.items():
157*9c5db199SXin Li            if value is None:
158*9c5db199SXin Li                continue
159*9c5db199SXin Li            cleaned[policy] = self._jsonify(policy, value)
160*9c5db199SXin Li        return cleaned
161*9c5db199SXin Li
162*9c5db199SXin Li    def _jsonify(self, policy, value):
163*9c5db199SXin Li        """Jsonify policy if its a dict or list that is not kiosk policy."""
164*9c5db199SXin Li        if isinstance(value, dict):
165*9c5db199SXin Li            return json.dumps(value)
166*9c5db199SXin Li        # Kiosk Policy, aka "account", is the only policy not formatted.
167*9c5db199SXin Li        elif (
168*9c5db199SXin Li                isinstance(value, list) and
169*9c5db199SXin Li                (policy != 'device_local_accounts.account')):
170*9c5db199SXin Li            if value and isinstance(value[0], dict):
171*9c5db199SXin Li                return json.dumps(value)
172*9c5db199SXin Li        return value
173*9c5db199SXin Li
174*9c5db199SXin Li    def _create_pol_obj(self, name, data, group=None):
175*9c5db199SXin Li        """
176*9c5db199SXin Li        Create a policy object from Policy.Policy().
177*9c5db199SXin Li
178*9c5db199SXin Li        @param name: str, name of the policy
179*9c5db199SXin Li        @param data: data value of the policy
180*9c5db199SXin Li        @param group: optional, group of the policy.
181*9c5db199SXin Li
182*9c5db199SXin Li        @returns: Policy object, reperesenting the policy args provided.
183*9c5db199SXin Li        """
184*9c5db199SXin Li        policy_obj = Policy()
185*9c5db199SXin Li        policy_obj.name = name
186*9c5db199SXin Li        if policy_obj.is_formatted_value(data):
187*9c5db199SXin Li            policy_obj.set_policy_from_dict(data)
188*9c5db199SXin Li        else:
189*9c5db199SXin Li            policy_obj.value = data
190*9c5db199SXin Li        policy_obj.group = group
191*9c5db199SXin Li        return policy_obj
192*9c5db199SXin Li
193*9c5db199SXin Li    def _update_policy_dict(self, secondary_ext_policies):
194*9c5db199SXin Li        """Update the local .policy_dict with the most current values."""
195*9c5db199SXin Li        for policy in self.chrome:
196*9c5db199SXin Li            self.policy_dict[CHROMEPOLICIES].update(
197*9c5db199SXin Li                self.chrome[policy].get_policy_as_dict())
198*9c5db199SXin Li
199*9c5db199SXin Li        ext_item = self._select_ext_group(secondary_ext_policies)
200*9c5db199SXin Li
201*9c5db199SXin Li        for ext_name, ext_group in ext_item.items():
202*9c5db199SXin Li            ext_dict = {ext_name: {}}
203*9c5db199SXin Li            for policy in ext_group:
204*9c5db199SXin Li                pol_as_dict = ext_group[policy].get_policy_as_dict()
205*9c5db199SXin Li
206*9c5db199SXin Li                ext_dict[ext_name].update(pol_as_dict)
207*9c5db199SXin Li            self.policy_dict[EXTENSIONPOLICIES].update(ext_dict)
208*9c5db199SXin Li        for policy in self.local:
209*9c5db199SXin Li            self.policy_dict[DEVICELOCALACCOUNT].update(
210*9c5db199SXin Li                self.local[policy].get_policy_as_dict())
211*9c5db199SXin Li
212*9c5db199SXin Li    def _select_ext_group(self, secondary_ext_policies):
213*9c5db199SXin Li        """Determine which extension group to use for the configured dictionary
214*9c5db199SXin Li        formatting. If the secondary_ext_policies flag has been set, and
215*9c5db199SXin Li        the self.extension_displayed_values is not None, use
216*9c5db199SXin Li        self.extension_displayed_values,
217*9c5db199SXin Li        else: use the original configured
218*9c5db199SXin Li
219*9c5db199SXin Li        @param secondary_ext_policies: bool
220*9c5db199SXin Li
221*9c5db199SXin Li        """
222*9c5db199SXin Li        if secondary_ext_policies and self.extension_displayed_values:
223*9c5db199SXin Li            return self.extension_displayed_values
224*9c5db199SXin Li        else:
225*9c5db199SXin Li            return self.extension_configured_data
226*9c5db199SXin Li
227*9c5db199SXin Li    def __ne__(self, other):
228*9c5db199SXin Li        return not self.__eq__(other)
229*9c5db199SXin Li
230*9c5db199SXin Li    def __eq__(self, other):
231*9c5db199SXin Li        """
232*9c5db199SXin Li        Override the == to check a policy group object vs another.
233*9c5db199SXin Li
234*9c5db199SXin Li        Will return False if:
235*9c5db199SXin Li            A policy is missing from self is missing in other,
236*9c5db199SXin Li                when the policy is not None.
237*9c5db199SXin Li            An Extension from self is missing in other.
238*9c5db199SXin Li            If the policy valus in self are are not equal to the other
239*9c5db199SXin Li                (less obfuscation).
240*9c5db199SXin Li
241*9c5db199SXin Li        Else: True
242*9c5db199SXin Li        """
243*9c5db199SXin Li        own_ext = self.extension_configured_data
244*9c5db199SXin Li        if self.extension_displayed_values:
245*9c5db199SXin Li            own_ext = self.extension_displayed_values
246*9c5db199SXin Li        for ext_name, ext_group in own_ext.items():
247*9c5db199SXin Li            if ext_name not in other.extension_configured_data:
248*9c5db199SXin Li                return False
249*9c5db199SXin Li            if not self._check(own_ext[ext_name],
250*9c5db199SXin Li                               other.extension_configured_data[ext_name]):
251*9c5db199SXin Li                return False
252*9c5db199SXin Li        if (
253*9c5db199SXin Li                not self._check(self.chrome, other.chrome) or
254*9c5db199SXin Li                not self._check(self.local, other.local)):
255*9c5db199SXin Li            return False
256*9c5db199SXin Li        return True
257*9c5db199SXin Li
258*9c5db199SXin Li    def _check(self, policy_group, other_policy_group):
259*9c5db199SXin Li        """
260*9c5db199SXin Li        Check if the policy_group is ==.
261*9c5db199SXin Li
262*9c5db199SXin Li        Will return False if:
263*9c5db199SXin Li            policy is missing from other policy object
264*9c5db199SXin Li            policy objects != (per the Policy object __eq__ override)
265*9c5db199SXin Li        Will return True if:
266*9c5db199SXin Li            There is no policies
267*9c5db199SXin Li            if the policy value is None
268*9c5db199SXin Li            If no other conditions are violated
269*9c5db199SXin Li
270*9c5db199SXin Li        """
271*9c5db199SXin Li        if not policy_group:  # No object
272*9c5db199SXin Li            return True
273*9c5db199SXin Li        for policy_name, policy_group in policy_group.items():
274*9c5db199SXin Li            if policy_group.value is None:
275*9c5db199SXin Li                return True
276*9c5db199SXin Li            if policy_name not in other_policy_group:
277*9c5db199SXin Li                return False
278*9c5db199SXin Li            if policy_group != other_policy_group[policy_name]:
279*9c5db199SXin Li                return False
280*9c5db199SXin Li        return True
281