xref: /aosp_15_r20/external/autotest/client/common_lib/cros/policy.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
3*9c5db199SXin Li# found in the LICENSE file.
4*9c5db199SXin Li
5*9c5db199SXin Liimport dbus, os, sys
6*9c5db199SXin Li# AU tests use ToT client code, but ToT -3 client version.
7*9c5db199SXin Litry:
8*9c5db199SXin Li    from gi.repository import GObject
9*9c5db199SXin Liexcept ImportError:
10*9c5db199SXin Li    import gobject as GObject
11*9c5db199SXin Li
12*9c5db199SXin Liimport common
13*9c5db199SXin Lifrom autotest_lib.client.common_lib import error
14*9c5db199SXin Lifrom autotest_lib.client.common_lib.cros import session_manager
15*9c5db199SXin Lifrom autotest_lib.client.cros import ownership
16*9c5db199SXin Li
17*9c5db199SXin Li"""Utility class for tests that generate, push and fetch policies.
18*9c5db199SXin Li
19*9c5db199SXin LiAs the python bindings for the protobufs used in policies are built as a part
20*9c5db199SXin Liof tests that use them, callers must pass in their location at call time."""
21*9c5db199SXin Li
22*9c5db199SXin Li
23*9c5db199SXin Lidef install_protobufs(autodir, job):
24*9c5db199SXin Li    """Installs policy protobuf dependencies and set import path.
25*9c5db199SXin Li
26*9c5db199SXin Li    After calling this, you can simply import any policy pb2.py file directly,
27*9c5db199SXin Li    e.g. import chrome_device_policy_pb2.
28*9c5db199SXin Li
29*9c5db199SXin Li    @param autodir: Autotest directory (usually the caller's self.autodir).
30*9c5db199SXin Li    @param job: Job instance (usually the caller's self.job).
31*9c5db199SXin Li    """
32*9c5db199SXin Li    # TODO(crbug.com/807950): Change the installation process so that policy
33*9c5db199SXin Li    #                         proto imports can be moved to the top.
34*9c5db199SXin Li    dep = 'policy_protos'
35*9c5db199SXin Li    dep_dir = os.path.join(autodir, 'deps', dep)
36*9c5db199SXin Li    job.install_pkg(dep, 'dep', dep_dir)
37*9c5db199SXin Li    sys.path.append(dep_dir)
38*9c5db199SXin Li
39*9c5db199SXin Li
40*9c5db199SXin Lidef compare_policy_response(policy_response, owner=None, guests=None,
41*9c5db199SXin Li                            new_users=None, roaming=None):
42*9c5db199SXin Li    """Check the contents of |policy_response| against given args.
43*9c5db199SXin Li
44*9c5db199SXin Li    Deserializes |policy_response| into a PolicyFetchResponse protobuf,
45*9c5db199SXin Li    with an embedded (serialized) PolicyData protobuf that embeds a
46*9c5db199SXin Li    (serialized) ChromeDeviceSettingsProto, and checks to see if this
47*9c5db199SXin Li    protobuf turducken contains the information passed in.
48*9c5db199SXin Li
49*9c5db199SXin Li    @param policy_response: string serialization of a PolicyData protobuf.
50*9c5db199SXin Li    @param owner: string representing the owner's name/account.
51*9c5db199SXin Li    @param guests: boolean indicating whether guests should be allowed.
52*9c5db199SXin Li    @param new_users: boolean indicating if user pods are on login screen.
53*9c5db199SXin Li    @param roaming: boolean indicating whether data roaming is enabled.
54*9c5db199SXin Li
55*9c5db199SXin Li    @return True if |policy_response| has all the provided data, else False.
56*9c5db199SXin Li    """
57*9c5db199SXin Li    import chrome_device_policy_pb2
58*9c5db199SXin Li    import device_management_backend_pb2
59*9c5db199SXin Li
60*9c5db199SXin Li    response_proto = device_management_backend_pb2.PolicyFetchResponse()
61*9c5db199SXin Li    response_proto.ParseFromString(policy_response)
62*9c5db199SXin Li    ownership.assert_has_policy_data(response_proto)
63*9c5db199SXin Li
64*9c5db199SXin Li    data_proto = device_management_backend_pb2.PolicyData()
65*9c5db199SXin Li    data_proto.ParseFromString(response_proto.policy_data)
66*9c5db199SXin Li    ownership.assert_has_device_settings(data_proto)
67*9c5db199SXin Li    if owner: ownership.assert_username(data_proto, owner)
68*9c5db199SXin Li
69*9c5db199SXin Li    settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
70*9c5db199SXin Li    settings.ParseFromString(data_proto.policy_value)
71*9c5db199SXin Li    if guests: ownership.assert_guest_setting(settings, guests)
72*9c5db199SXin Li    if new_users: ownership.assert_show_users(settings, new_users)
73*9c5db199SXin Li    if roaming: ownership.assert_roaming(settings, roaming)
74*9c5db199SXin Li
75*9c5db199SXin Li
76*9c5db199SXin Lidef build_policy_data():
77*9c5db199SXin Li    """Generate and serialize a populated device policy protobuffer.
78*9c5db199SXin Li
79*9c5db199SXin Li    Creates a PolicyData protobuf, with an embedded
80*9c5db199SXin Li    ChromeDeviceSettingsProto, containing the information passed in.
81*9c5db199SXin Li
82*9c5db199SXin Li    @return serialization of the PolicyData proto that we build.
83*9c5db199SXin Li    """
84*9c5db199SXin Li    import chrome_device_policy_pb2
85*9c5db199SXin Li    import device_management_backend_pb2
86*9c5db199SXin Li
87*9c5db199SXin Li    data_proto = device_management_backend_pb2.PolicyData()
88*9c5db199SXin Li    data_proto.policy_type = ownership.POLICY_TYPE
89*9c5db199SXin Li
90*9c5db199SXin Li    settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto()
91*9c5db199SXin Li
92*9c5db199SXin Li    data_proto.policy_value = settings.SerializeToString()
93*9c5db199SXin Li    return data_proto.SerializeToString()
94*9c5db199SXin Li
95*9c5db199SXin Li
96*9c5db199SXin Lidef generate_policy(key, pubkey, policy, old_key=None):
97*9c5db199SXin Li    """Generate and serialize a populated, signed device policy protobuffer.
98*9c5db199SXin Li
99*9c5db199SXin Li    Creates a protobuf containing the device policy |policy|, signed with
100*9c5db199SXin Li    |key|.  Also includes the public key |pubkey|, signed with |old_key|
101*9c5db199SXin Li    if provided.  If not, |pubkey| is signed with |key|.  The protobuf
102*9c5db199SXin Li    is serialized to a string and returned.
103*9c5db199SXin Li
104*9c5db199SXin Li    @param key: new policy signing key.
105*9c5db199SXin Li    @param pubkey: new public key to be signed and embedded in generated
106*9c5db199SXin Li                   PolicyFetchResponse.
107*9c5db199SXin Li    @param policy: policy data to be embedded in generated PolicyFetchResponse.
108*9c5db199SXin Li    @param old_key: if provided, this implies the generated PolicyFetchRespone
109*9c5db199SXin Li                    is intended to represent a key rotation.  pubkey will be
110*9c5db199SXin Li                    signed with this key before embedding.
111*9c5db199SXin Li
112*9c5db199SXin Li    @return serialization of the PolicyFetchResponse proto that we build.
113*9c5db199SXin Li    """
114*9c5db199SXin Li    import device_management_backend_pb2
115*9c5db199SXin Li
116*9c5db199SXin Li    if old_key == None:
117*9c5db199SXin Li        old_key = key
118*9c5db199SXin Li    policy_proto = device_management_backend_pb2.PolicyFetchResponse()
119*9c5db199SXin Li    policy_proto.policy_data = policy
120*9c5db199SXin Li    policy_proto.policy_data_signature = ownership.sign(key, policy)
121*9c5db199SXin Li    policy_proto.new_public_key = pubkey
122*9c5db199SXin Li    policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey)
123*9c5db199SXin Li    return policy_proto.SerializeToString()
124*9c5db199SXin Li
125*9c5db199SXin Li
126*9c5db199SXin Lidef push_policy_and_verify(policy_string, sm):
127*9c5db199SXin Li    """Push a device policy to the session manager over DBus.
128*9c5db199SXin Li
129*9c5db199SXin Li    The serialized device policy |policy_string| is sent to the session
130*9c5db199SXin Li    manager with the StorePolicyEx DBus call.  Success of the store is
131*9c5db199SXin Li    validated by fetching the policy again and comparing.
132*9c5db199SXin Li
133*9c5db199SXin Li    @param policy_string: serialized policy to push to the session manager.
134*9c5db199SXin Li    @param sm: a connected SessionManagerInterface.
135*9c5db199SXin Li
136*9c5db199SXin Li    @raises error.TestFail if policy push failed.
137*9c5db199SXin Li    """
138*9c5db199SXin Li    listener = session_manager.OwnershipSignalListener(GObject.MainLoop())
139*9c5db199SXin Li    listener.listen_for_new_policy()
140*9c5db199SXin Li    descriptor = session_manager.make_device_policy_descriptor()
141*9c5db199SXin Li    sm.StorePolicyEx(descriptor,
142*9c5db199SXin Li                     dbus.ByteArray(policy_string), byte_arrays=True)
143*9c5db199SXin Li    listener.wait_for_signals(desc='Policy push.')
144*9c5db199SXin Li
145*9c5db199SXin Li    retrieved_policy = sm.RetrievePolicyEx(descriptor, byte_arrays=True)
146*9c5db199SXin Li    if retrieved_policy != policy_string:
147*9c5db199SXin Li        raise error.TestFail('Policy should not be %s' % retrieved_policy)
148*9c5db199SXin Li
149*9c5db199SXin Li
150*9c5db199SXin Lidef get_policy(sm):
151*9c5db199SXin Li    """Get a device policy from the session manager over DBus.
152*9c5db199SXin Li
153*9c5db199SXin Li    Provided mainly for symmetry with push_policy_and_verify().
154*9c5db199SXin Li
155*9c5db199SXin Li    @param sm: a connected SessionManagerInterface.
156*9c5db199SXin Li
157*9c5db199SXin Li    @return Serialized PolicyFetchResponse.
158*9c5db199SXin Li    """
159*9c5db199SXin Li    return sm.RetrievePolicyEx(session_manager.make_device_policy_descriptor(),
160*9c5db199SXin Li                               byte_arrays=True)
161