1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus, os, sys 6# AU tests use ToT client code, but ToT -3 client version. 7try: 8 from gi.repository import GObject 9except ImportError: 10 import gobject as GObject 11 12import common 13from autotest_lib.client.common_lib import error 14from autotest_lib.client.common_lib.cros import session_manager 15from autotest_lib.client.cros import ownership 16 17"""Utility class for tests that generate, push and fetch policies. 18 19As the python bindings for the protobufs used in policies are built as a part 20of tests that use them, callers must pass in their location at call time.""" 21 22 23def install_protobufs(autodir, job): 24 """Installs policy protobuf dependencies and set import path. 25 26 After calling this, you can simply import any policy pb2.py file directly, 27 e.g. import chrome_device_policy_pb2. 28 29 @param autodir: Autotest directory (usually the caller's self.autodir). 30 @param job: Job instance (usually the caller's self.job). 31 """ 32 # TODO(crbug.com/807950): Change the installation process so that policy 33 # proto imports can be moved to the top. 34 dep = 'policy_protos' 35 dep_dir = os.path.join(autodir, 'deps', dep) 36 job.install_pkg(dep, 'dep', dep_dir) 37 sys.path.append(dep_dir) 38 39 40def compare_policy_response(policy_response, owner=None, guests=None, 41 new_users=None, roaming=None): 42 """Check the contents of |policy_response| against given args. 43 44 Deserializes |policy_response| into a PolicyFetchResponse protobuf, 45 with an embedded (serialized) PolicyData protobuf that embeds a 46 (serialized) ChromeDeviceSettingsProto, and checks to see if this 47 protobuf turducken contains the information passed in. 48 49 @param policy_response: string serialization of a PolicyData protobuf. 50 @param owner: string representing the owner's name/account. 51 @param guests: boolean indicating whether guests should be allowed. 52 @param new_users: boolean indicating if user pods are on login screen. 53 @param roaming: boolean indicating whether data roaming is enabled. 54 55 @return True if |policy_response| has all the provided data, else False. 56 """ 57 import chrome_device_policy_pb2 58 import device_management_backend_pb2 59 60 response_proto = device_management_backend_pb2.PolicyFetchResponse() 61 response_proto.ParseFromString(policy_response) 62 ownership.assert_has_policy_data(response_proto) 63 64 data_proto = device_management_backend_pb2.PolicyData() 65 data_proto.ParseFromString(response_proto.policy_data) 66 ownership.assert_has_device_settings(data_proto) 67 if owner: ownership.assert_username(data_proto, owner) 68 69 settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto() 70 settings.ParseFromString(data_proto.policy_value) 71 if guests: ownership.assert_guest_setting(settings, guests) 72 if new_users: ownership.assert_show_users(settings, new_users) 73 if roaming: ownership.assert_roaming(settings, roaming) 74 75 76def build_policy_data(): 77 """Generate and serialize a populated device policy protobuffer. 78 79 Creates a PolicyData protobuf, with an embedded 80 ChromeDeviceSettingsProto, containing the information passed in. 81 82 @return serialization of the PolicyData proto that we build. 83 """ 84 import chrome_device_policy_pb2 85 import device_management_backend_pb2 86 87 data_proto = device_management_backend_pb2.PolicyData() 88 data_proto.policy_type = ownership.POLICY_TYPE 89 90 settings = chrome_device_policy_pb2.ChromeDeviceSettingsProto() 91 92 data_proto.policy_value = settings.SerializeToString() 93 return data_proto.SerializeToString() 94 95 96def generate_policy(key, pubkey, policy, old_key=None): 97 """Generate and serialize a populated, signed device policy protobuffer. 98 99 Creates a protobuf containing the device policy |policy|, signed with 100 |key|. Also includes the public key |pubkey|, signed with |old_key| 101 if provided. If not, |pubkey| is signed with |key|. The protobuf 102 is serialized to a string and returned. 103 104 @param key: new policy signing key. 105 @param pubkey: new public key to be signed and embedded in generated 106 PolicyFetchResponse. 107 @param policy: policy data to be embedded in generated PolicyFetchResponse. 108 @param old_key: if provided, this implies the generated PolicyFetchRespone 109 is intended to represent a key rotation. pubkey will be 110 signed with this key before embedding. 111 112 @return serialization of the PolicyFetchResponse proto that we build. 113 """ 114 import device_management_backend_pb2 115 116 if old_key == None: 117 old_key = key 118 policy_proto = device_management_backend_pb2.PolicyFetchResponse() 119 policy_proto.policy_data = policy 120 policy_proto.policy_data_signature = ownership.sign(key, policy) 121 policy_proto.new_public_key = pubkey 122 policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) 123 return policy_proto.SerializeToString() 124 125 126def push_policy_and_verify(policy_string, sm): 127 """Push a device policy to the session manager over DBus. 128 129 The serialized device policy |policy_string| is sent to the session 130 manager with the StorePolicyEx DBus call. Success of the store is 131 validated by fetching the policy again and comparing. 132 133 @param policy_string: serialized policy to push to the session manager. 134 @param sm: a connected SessionManagerInterface. 135 136 @raises error.TestFail if policy push failed. 137 """ 138 listener = session_manager.OwnershipSignalListener(GObject.MainLoop()) 139 listener.listen_for_new_policy() 140 descriptor = session_manager.make_device_policy_descriptor() 141 sm.StorePolicyEx(descriptor, 142 dbus.ByteArray(policy_string), byte_arrays=True) 143 listener.wait_for_signals(desc='Policy push.') 144 145 retrieved_policy = sm.RetrievePolicyEx(descriptor, byte_arrays=True) 146 if retrieved_policy != policy_string: 147 raise error.TestFail('Policy should not be %s' % retrieved_policy) 148 149 150def get_policy(sm): 151 """Get a device policy from the session manager over DBus. 152 153 Provided mainly for symmetry with push_policy_and_verify(). 154 155 @param sm: a connected SessionManagerInterface. 156 157 @return Serialized PolicyFetchResponse. 158 """ 159 return sm.RetrievePolicyEx(session_manager.make_device_policy_descriptor(), 160 byte_arrays=True) 161