xref: /aosp_15_r20/external/autotest/client/cros/bluetooth/advertisement.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright 2016 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li"""Construction of an Advertisement object from an advertisement data
6*9c5db199SXin Lidictionary.
7*9c5db199SXin Li
8*9c5db199SXin LiMuch of this module refers to the code of test/example-advertisement in
9*9c5db199SXin Libluez project.
10*9c5db199SXin Li"""
11*9c5db199SXin Li
12*9c5db199SXin Lifrom __future__ import absolute_import
13*9c5db199SXin Lifrom __future__ import division
14*9c5db199SXin Lifrom __future__ import print_function
15*9c5db199SXin Lifrom gi.repository import GLib
16*9c5db199SXin Li
17*9c5db199SXin Li# TODO(b/215715213) - Wait until ebuild runs as python3 to remove this try
18*9c5db199SXin Litry:
19*9c5db199SXin Li    import pydbus
20*9c5db199SXin Liexcept:
21*9c5db199SXin Li    pydbus = {}
22*9c5db199SXin Li
23*9c5db199SXin Liimport logging
24*9c5db199SXin Li
25*9c5db199SXin LiLE_ADVERTISEMENT_IFACE = 'org.bluez.LEAdvertisement1'
26*9c5db199SXin Li
27*9c5db199SXin Li
28*9c5db199SXin Lidef InvalidArgsException():
29*9c5db199SXin Li    return GLib.gerror_new_literal(0, 'org.freedesktop.DBus.Error.InvalidArgs',
30*9c5db199SXin Li                                   0)
31*9c5db199SXin Li
32*9c5db199SXin Li
33*9c5db199SXin Liclass Advertisement:
34*9c5db199SXin Li    """An advertisement object."""
35*9c5db199SXin Li    def __init__(self, bus, advertisement_data):
36*9c5db199SXin Li        """Construction of an Advertisement object.
37*9c5db199SXin Li
38*9c5db199SXin Li        @param bus: a dbus system bus.
39*9c5db199SXin Li        @param advertisement_data: advertisement data dictionary.
40*9c5db199SXin Li        """
41*9c5db199SXin Li        self._get_advertising_data(advertisement_data)
42*9c5db199SXin Li
43*9c5db199SXin Li        # Register self on bus and hold object for unregister
44*9c5db199SXin Li        self.obj = bus.register_object(self.path, self, None)
45*9c5db199SXin Li
46*9c5db199SXin Li    # D-Bus service definition (required by pydbus).
47*9c5db199SXin Li    dbus = """
48*9c5db199SXin Li    <node>
49*9c5db199SXin Li        <interface name="org.bluez.LEAdvertisement1">
50*9c5db199SXin Li            <method name="Release" />
51*9c5db199SXin Li        </interface>
52*9c5db199SXin Li        <interface name="org.freedesktop.DBus.Properties">
53*9c5db199SXin Li            <method name="Set">
54*9c5db199SXin Li                <arg type="s" name="interface" direction="in" />
55*9c5db199SXin Li                <arg type="s" name="prop" direction="in" />
56*9c5db199SXin Li                <arg type="v" name="value" direction="in" />
57*9c5db199SXin Li            </method>
58*9c5db199SXin Li            <method name="GetAll">
59*9c5db199SXin Li                <arg type="s" name="interface" direction="in" />
60*9c5db199SXin Li                <arg type="a{sv}" name="properties" direction="out" />
61*9c5db199SXin Li            </method>
62*9c5db199SXin Li        </interface>
63*9c5db199SXin Li    </node>
64*9c5db199SXin Li    """
65*9c5db199SXin Li
66*9c5db199SXin Li    def unregister(self):
67*9c5db199SXin Li        """Unregister self from bus."""
68*9c5db199SXin Li        self.obj.unregister()
69*9c5db199SXin Li
70*9c5db199SXin Li    def _get_advertising_data(self, advertisement_data):
71*9c5db199SXin Li        """Get advertising data from the advertisement_data dictionary.
72*9c5db199SXin Li
73*9c5db199SXin Li        @param bus: a dbus system bus.
74*9c5db199SXin Li
75*9c5db199SXin Li        """
76*9c5db199SXin Li        self.path = advertisement_data.get('Path')
77*9c5db199SXin Li        self.type = advertisement_data.get('Type')
78*9c5db199SXin Li        self.service_uuids = advertisement_data.get('ServiceUUIDs', [])
79*9c5db199SXin Li        self.solicit_uuids = advertisement_data.get('SolicitUUIDs', [])
80*9c5db199SXin Li
81*9c5db199SXin Li        # The xmlrpclib library requires that only string keys are allowed in
82*9c5db199SXin Li        # python dictionary. Hence, we need to define the manufacturer data
83*9c5db199SXin Li        # in an advertisement dictionary like
84*9c5db199SXin Li        #    'ManufacturerData': {'0xff00': [0xa1, 0xa2, 0xa3, 0xa4, 0xa5]},
85*9c5db199SXin Li        # in order to let autotest server transmit the advertisement to
86*9c5db199SXin Li        # a client DUT for testing.
87*9c5db199SXin Li        # On the other hand, the dbus method of advertising requires that
88*9c5db199SXin Li        # the signature of the manufacturer data to be 'qv' where 'q' stands
89*9c5db199SXin Li        # for unsigned 16-bit integer. Hence, we need to convert the key
90*9c5db199SXin Li        # from a string, e.g., '0xff00', to its hex value, 0xff00.
91*9c5db199SXin Li        # For signatures of the advertising properties, refer to
92*9c5db199SXin Li        #     device_properties in src/third_party/bluez/src/device.c
93*9c5db199SXin Li        # For explanation about signature types, refer to
94*9c5db199SXin Li        #     https://dbus.freedesktop.org/doc/dbus-specification.html
95*9c5db199SXin Li        self.manufacturer_data = {}  # Signature = a{qv}
96*9c5db199SXin Li        manufacturer_data = advertisement_data.get('ManufacturerData', {})
97*9c5db199SXin Li        for key, value in manufacturer_data.items():
98*9c5db199SXin Li            self.manufacturer_data[int(key, 16)] = GLib.Variant('ay', value)
99*9c5db199SXin Li
100*9c5db199SXin Li        self.service_data = {}  # Signature = a{sv}
101*9c5db199SXin Li        service_data = advertisement_data.get('ServiceData', {})
102*9c5db199SXin Li        for uuid, data in service_data.items():
103*9c5db199SXin Li            self.service_data[uuid] = GLib.Variant('ay', data)
104*9c5db199SXin Li
105*9c5db199SXin Li        self.include_tx_power = advertisement_data.get('IncludeTxPower')
106*9c5db199SXin Li
107*9c5db199SXin Li        self.discoverable = advertisement_data.get('Discoverable')
108*9c5db199SXin Li
109*9c5db199SXin Li        self.scan_response = advertisement_data.get('ScanResponseData')
110*9c5db199SXin Li
111*9c5db199SXin Li        self.min_interval = advertisement_data.get('MinInterval')
112*9c5db199SXin Li        self.max_interval = advertisement_data.get('MaxInterval')
113*9c5db199SXin Li
114*9c5db199SXin Li        self.tx_power = advertisement_data.get('TxPower')
115*9c5db199SXin Li
116*9c5db199SXin Li    def get_path(self):
117*9c5db199SXin Li        """Get the dbus object path of the advertisement.
118*9c5db199SXin Li
119*9c5db199SXin Li        @returns: the advertisement object path.
120*9c5db199SXin Li
121*9c5db199SXin Li        """
122*9c5db199SXin Li        return self.path
123*9c5db199SXin Li
124*9c5db199SXin Li    def Set(self, interface, prop, value):
125*9c5db199SXin Li        """Called when bluetoothd Sets a property on our advertising object
126*9c5db199SXin Li
127*9c5db199SXin Li        @param interface: String interface, i.e. org.bluez.LEAdvertisement1
128*9c5db199SXin Li        @param prop: String name of the property being set
129*9c5db199SXin Li        @param value: Value of the property being set
130*9c5db199SXin Li        """
131*9c5db199SXin Li        logging.info('Setting prop {} value to {}'.format(prop, value))
132*9c5db199SXin Li
133*9c5db199SXin Li        if prop == 'TxPower':
134*9c5db199SXin Li            self.tx_power = value
135*9c5db199SXin Li
136*9c5db199SXin Li    def GetAll(self, interface):
137*9c5db199SXin Li        """Get the properties dictionary of the advertisement.
138*9c5db199SXin Li
139*9c5db199SXin Li        @param interface: the bluetooth dbus interface.
140*9c5db199SXin Li
141*9c5db199SXin Li        @returns: the advertisement properties dictionary.
142*9c5db199SXin Li
143*9c5db199SXin Li        """
144*9c5db199SXin Li        if interface != LE_ADVERTISEMENT_IFACE:
145*9c5db199SXin Li            raise InvalidArgsException()
146*9c5db199SXin Li
147*9c5db199SXin Li        properties = dict()
148*9c5db199SXin Li        properties['Type'] = GLib.Variant('s', self.type)
149*9c5db199SXin Li
150*9c5db199SXin Li        if self.service_uuids is not None:
151*9c5db199SXin Li            properties['ServiceUUIDs'] = GLib.Variant('as', self.service_uuids)
152*9c5db199SXin Li        if self.solicit_uuids is not None:
153*9c5db199SXin Li            properties['SolicitUUIDs'] = GLib.Variant('as', self.solicit_uuids)
154*9c5db199SXin Li        if self.manufacturer_data is not None:
155*9c5db199SXin Li            properties['ManufacturerData'] = GLib.Variant(
156*9c5db199SXin Li                    'a{qv}', self.manufacturer_data)
157*9c5db199SXin Li
158*9c5db199SXin Li        if self.service_data is not None:
159*9c5db199SXin Li            properties['ServiceData'] = GLib.Variant('a{sv}',
160*9c5db199SXin Li                                                     self.service_data)
161*9c5db199SXin Li        if self.discoverable is not None:
162*9c5db199SXin Li            properties['Discoverable'] = GLib.Variant('b', self.discoverable)
163*9c5db199SXin Li
164*9c5db199SXin Li        if self.include_tx_power is not None:
165*9c5db199SXin Li            properties['IncludeTxPower'] = GLib.Variant(
166*9c5db199SXin Li                    'b', self.include_tx_power)
167*9c5db199SXin Li
168*9c5db199SXin Li        # Note here: Scan response data is an int (tag) -> array (value) mapping
169*9c5db199SXin Li        # but autotest's xmlrpc server can only accept string keys. For this
170*9c5db199SXin Li        # reason, the scan response key is encoded as a hex string, and then
171*9c5db199SXin Li        # re-mapped here before the advertisement is registered.
172*9c5db199SXin Li        if self.scan_response is not None:
173*9c5db199SXin Li            scan_rsp = {}
174*9c5db199SXin Li            for key, value in self.scan_response.items():
175*9c5db199SXin Li                scan_rsp[int(key, 16)] = GLib.Variant('ay', value)
176*9c5db199SXin Li
177*9c5db199SXin Li            properties['ScanResponseData'] = GLib.Variant('a{yv}', scan_rsp)
178*9c5db199SXin Li
179*9c5db199SXin Li        if self.min_interval is not None:
180*9c5db199SXin Li            properties['MinInterval'] = GLib.Variant('u', self.min_interval)
181*9c5db199SXin Li
182*9c5db199SXin Li        if self.max_interval is not None:
183*9c5db199SXin Li            properties['MaxInterval'] = GLib.Variant('u', self.max_interval)
184*9c5db199SXin Li
185*9c5db199SXin Li        if self.tx_power is not None:
186*9c5db199SXin Li            properties['TxPower'] = GLib.Variant('n', self.tx_power)
187*9c5db199SXin Li
188*9c5db199SXin Li        return properties
189*9c5db199SXin Li
190*9c5db199SXin Li    def Release(self):
191*9c5db199SXin Li        """The method callback at release."""
192*9c5db199SXin Li        logging.info('%s: Advertisement Release() called.', self.path)
193*9c5db199SXin Li
194*9c5db199SXin Li
195*9c5db199SXin Lidef example_advertisement(bus):
196*9c5db199SXin Li    """A demo example of creating an Advertisement object.
197*9c5db199SXin Li
198*9c5db199SXin Li    @param bus: a dbus system bus.
199*9c5db199SXin Li    @returns: the Advertisement object.
200*9c5db199SXin Li
201*9c5db199SXin Li    """
202*9c5db199SXin Li    ADVERTISEMENT_DATA = {
203*9c5db199SXin Li            'Path': '/org/bluez/test/advertisement1',
204*9c5db199SXin Li
205*9c5db199SXin Li            # Could be 'central' or 'peripheral'.
206*9c5db199SXin Li            'Type': 'peripheral',
207*9c5db199SXin Li
208*9c5db199SXin Li            # Refer to the specification for a list of service assigned numbers:
209*9c5db199SXin Li            # https://www.bluetooth.com/specifications/gatt/services
210*9c5db199SXin Li            # e.g., 180D represents "Heart Reate" service, and
211*9c5db199SXin Li            #       180F "Battery Service".
212*9c5db199SXin Li            'ServiceUUIDs': ['180D', '180F'],
213*9c5db199SXin Li
214*9c5db199SXin Li            # Service solicitation UUIDs.
215*9c5db199SXin Li            'SolicitUUIDs': [],
216*9c5db199SXin Li
217*9c5db199SXin Li            # Two bytes of manufacturer id followed by manufacturer specific data.
218*9c5db199SXin Li            'ManufacturerData': {
219*9c5db199SXin Li                    '0xff00': [0xa1, 0xa2, 0xa3, 0xa4, 0xa5]
220*9c5db199SXin Li            },
221*9c5db199SXin Li
222*9c5db199SXin Li            # service UUID followed by additional service data.
223*9c5db199SXin Li            'ServiceData': {
224*9c5db199SXin Li                    '9999': [0x10, 0x20, 0x30, 0x40, 0x50]
225*9c5db199SXin Li            },
226*9c5db199SXin Li
227*9c5db199SXin Li            # Does it include transmit power level?
228*9c5db199SXin Li            'IncludeTxPower': True
229*9c5db199SXin Li    }
230*9c5db199SXin Li
231*9c5db199SXin Li    return Advertisement(bus, ADVERTISEMENT_DATA)
232*9c5db199SXin Li
233*9c5db199SXin Li
234*9c5db199SXin Liif __name__ == '__main__':
235*9c5db199SXin Li    bus = pydbus.SystemBus()
236*9c5db199SXin Li    adv = example_advertisement(bus)
237*9c5db199SXin Li    print(adv.GetAll(LE_ADVERTISEMENT_IFACE))
238