1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import os 21import struct 22import pytest 23from unittest.mock import AsyncMock, Mock, ANY 24 25from bumble.controller import Controller 26from bumble.gatt_client import CharacteristicProxy 27from bumble.link import LocalLink 28from bumble.device import Device, Peer 29from bumble.host import Host 30from bumble.gatt import ( 31 GATT_BATTERY_LEVEL_CHARACTERISTIC, 32 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 33 CharacteristicAdapter, 34 DelegatedCharacteristicAdapter, 35 PackedCharacteristicAdapter, 36 MappedCharacteristicAdapter, 37 UTF8CharacteristicAdapter, 38 Service, 39 Characteristic, 40 CharacteristicValue, 41 Descriptor, 42) 43from bumble.transport import AsyncPipeSink 44from bumble.core import UUID 45from bumble.att import ( 46 Attribute, 47 ATT_EXCHANGE_MTU_REQUEST, 48 ATT_ATTRIBUTE_NOT_FOUND_ERROR, 49 ATT_PDU, 50 ATT_Error, 51 ATT_Error_Response, 52 ATT_Read_By_Group_Type_Request, 53 ErrorCode, 54) 55from .test_utils import async_barrier 56 57 58# ----------------------------------------------------------------------------- 59def basic_check(x): 60 pdu = x.to_bytes() 61 parsed = ATT_PDU.from_bytes(pdu) 62 x_str = str(x) 63 parsed_str = str(parsed) 64 assert x_str == parsed_str 65 66 67# ----------------------------------------------------------------------------- 68def test_UUID(): 69 u = UUID.from_16_bits(0x7788) 70 assert str(u) == 'UUID-16:7788' 71 u = UUID.from_32_bits(0x11223344) 72 assert str(u) == 'UUID-32:11223344' 73 u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') 74 assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 75 v = UUID(str(u)) 76 assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 77 w = UUID.from_bytes(v.to_bytes()) 78 assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' 79 80 u1 = UUID.from_16_bits(0x1234) 81 b1 = u1.to_bytes(force_128=True) 82 u2 = UUID.from_bytes(b1) 83 assert u1 == u2 84 85 u3 = UUID.from_16_bits(0x180A) 86 assert str(u3) == 'UUID-16:180A (Device Information)' 87 88 89# ----------------------------------------------------------------------------- 90def test_ATT_Error_Response(): 91 pdu = ATT_Error_Response( 92 request_opcode_in_error=ATT_EXCHANGE_MTU_REQUEST, 93 attribute_handle_in_error=0x0000, 94 error_code=ATT_ATTRIBUTE_NOT_FOUND_ERROR, 95 ) 96 basic_check(pdu) 97 98 99# ----------------------------------------------------------------------------- 100def test_ATT_Read_By_Group_Type_Request(): 101 pdu = ATT_Read_By_Group_Type_Request( 102 starting_handle=0x0001, 103 ending_handle=0xFFFF, 104 attribute_group_type=UUID.from_16_bits(0x2800), 105 ) 106 basic_check(pdu) 107 108 109# ----------------------------------------------------------------------------- 110@pytest.mark.asyncio 111async def test_characteristic_encoding(): 112 class Foo(Characteristic): 113 def encode_value(self, value): 114 return bytes([value]) 115 116 def decode_value(self, value_bytes): 117 return value_bytes[0] 118 119 c = Foo( 120 GATT_BATTERY_LEVEL_CHARACTERISTIC, 121 Characteristic.Properties.READ, 122 Characteristic.READABLE, 123 123, 124 ) 125 x = await c.read_value(None) 126 assert x == bytes([123]) 127 await c.write_value(None, bytes([122])) 128 assert c.value == 122 129 130 class FooProxy(CharacteristicProxy): 131 def __init__(self, characteristic): 132 super().__init__( 133 characteristic.client, 134 characteristic.handle, 135 characteristic.end_group_handle, 136 characteristic.uuid, 137 characteristic.properties, 138 ) 139 140 def encode_value(self, value): 141 return bytes([value]) 142 143 def decode_value(self, value_bytes): 144 return value_bytes[0] 145 146 [client, server] = LinkedDevices().devices[:2] 147 148 characteristic = Characteristic( 149 'FDB159DB-036C-49E3-B3DB-6325AC750806', 150 Characteristic.Properties.READ 151 | Characteristic.Properties.WRITE 152 | Characteristic.Properties.NOTIFY, 153 Characteristic.READABLE | Characteristic.WRITEABLE, 154 bytes([123]), 155 ) 156 157 async def async_read(connection): 158 return 0x05060708 159 160 async_characteristic = PackedCharacteristicAdapter( 161 Characteristic( 162 '2AB7E91B-43E8-4F73-AC3B-80C1683B47F9', 163 Characteristic.Properties.READ, 164 Characteristic.READABLE, 165 CharacteristicValue(read=async_read), 166 ), 167 '>I', 168 ) 169 170 service = Service( 171 '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic] 172 ) 173 server.add_service(service) 174 175 await client.power_on() 176 await server.power_on() 177 connection = await client.connect(server.random_address) 178 peer = Peer(connection) 179 180 await peer.discover_services() 181 await peer.discover_characteristics() 182 c = peer.get_characteristics_by_uuid(characteristic.uuid) 183 assert len(c) == 1 184 c = c[0] 185 cp = FooProxy(c) 186 187 v = await cp.read_value() 188 assert v == 123 189 await cp.write_value(124) 190 await async_barrier() 191 assert characteristic.value == bytes([124]) 192 193 v = await cp.read_value() 194 assert v == 124 195 await cp.write_value(125, with_response=True) 196 await async_barrier() 197 assert characteristic.value == bytes([125]) 198 199 cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2])) 200 await cd.write_value(100, with_response=True) 201 await async_barrier() 202 assert characteristic.value == bytes([50]) 203 204 c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid) 205 assert len(c2) == 1 206 c2 = c2[0] 207 cd2 = PackedCharacteristicAdapter(c2, ">I") 208 cd2v = await cd2.read_value() 209 assert cd2v == 0x05060708 210 211 last_change = None 212 213 def on_change(value): 214 nonlocal last_change 215 last_change = value 216 217 await c.subscribe(on_change) 218 await server.notify_subscribers(characteristic) 219 await async_barrier() 220 assert last_change == characteristic.value 221 last_change = None 222 223 await server.notify_subscribers(characteristic, value=bytes([125])) 224 await async_barrier() 225 assert last_change == bytes([125]) 226 last_change = None 227 228 await c.unsubscribe(on_change) 229 await server.notify_subscribers(characteristic) 230 await async_barrier() 231 assert last_change is None 232 233 await cp.subscribe(on_change) 234 await server.notify_subscribers(characteristic) 235 await async_barrier() 236 assert last_change == characteristic.value[0] 237 last_change = None 238 239 await server.notify_subscribers(characteristic, value=bytes([126])) 240 await async_barrier() 241 assert last_change == 126 242 last_change = None 243 244 await cp.unsubscribe(on_change) 245 await server.notify_subscribers(characteristic) 246 await async_barrier() 247 assert last_change is None 248 249 cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0]) 250 await cd.subscribe(on_change) 251 await server.notify_subscribers(characteristic) 252 await async_barrier() 253 assert last_change == characteristic.value[0] 254 last_change = None 255 256 await cd.unsubscribe(on_change) 257 await server.notify_subscribers(characteristic) 258 await async_barrier() 259 assert last_change is None 260 261 262# ----------------------------------------------------------------------------- 263@pytest.mark.asyncio 264async def test_attribute_getters(): 265 [client, server] = LinkedDevices().devices[:2] 266 267 characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806') 268 characteristic = Characteristic( 269 characteristic_uuid, 270 Characteristic.Properties.READ 271 | Characteristic.Properties.WRITE 272 | Characteristic.Properties.NOTIFY, 273 Characteristic.READABLE | Characteristic.WRITEABLE, 274 bytes([123]), 275 ) 276 277 service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829') 278 service = Service(service_uuid, [characteristic]) 279 server.add_service(service) 280 281 service_attr = server.gatt_server.get_service_attribute(service_uuid) 282 assert service_attr 283 284 ( 285 char_decl_attr, 286 char_value_attr, 287 ) = server.gatt_server.get_characteristic_attributes( 288 service_uuid, characteristic_uuid 289 ) 290 assert char_decl_attr and char_value_attr 291 292 desc_attr = server.gatt_server.get_descriptor_attribute( 293 service_uuid, 294 characteristic_uuid, 295 GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, 296 ) 297 assert desc_attr 298 299 # assert all handles are in expected order 300 assert ( 301 service_attr.handle 302 < char_decl_attr.handle 303 < char_value_attr.handle 304 < desc_attr.handle 305 == service_attr.end_group_handle 306 ) 307 # assert characteristic declarations attribute is followed by characteristic value attribute 308 assert char_decl_attr.handle + 1 == char_value_attr.handle 309 310 311# ----------------------------------------------------------------------------- 312@pytest.mark.asyncio 313async def test_CharacteristicAdapter(): 314 # Check that the CharacteristicAdapter base class is transparent 315 v = bytes([1, 2, 3]) 316 c = Characteristic( 317 GATT_BATTERY_LEVEL_CHARACTERISTIC, 318 Characteristic.Properties.READ, 319 Characteristic.READABLE, 320 v, 321 ) 322 a = CharacteristicAdapter(c) 323 324 value = await a.read_value(None) 325 assert value == v 326 327 v = bytes([3, 4, 5]) 328 await a.write_value(None, v) 329 assert c.value == v 330 331 # Simple delegated adapter 332 a = DelegatedCharacteristicAdapter( 333 c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) 334 ) 335 336 value = await a.read_value(None) 337 assert value == bytes(reversed(v)) 338 339 v = bytes([3, 4, 5]) 340 await a.write_value(None, v) 341 assert a.value == bytes(reversed(v)) 342 343 # Packed adapter with single element format 344 v = 1234 345 pv = struct.pack('>H', v) 346 c.value = v 347 a = PackedCharacteristicAdapter(c, '>H') 348 349 value = await a.read_value(None) 350 assert value == pv 351 c.value = None 352 await a.write_value(None, pv) 353 assert a.value == v 354 355 # Packed adapter with multi-element format 356 v1 = 1234 357 v2 = 5678 358 pv = struct.pack('>HH', v1, v2) 359 c.value = (v1, v2) 360 a = PackedCharacteristicAdapter(c, '>HH') 361 362 value = await a.read_value(None) 363 assert value == pv 364 c.value = None 365 await a.write_value(None, pv) 366 assert a.value == (v1, v2) 367 368 # Mapped adapter 369 v1 = 1234 370 v2 = 5678 371 pv = struct.pack('>HH', v1, v2) 372 mapped = {'v1': v1, 'v2': v2} 373 c.value = mapped 374 a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) 375 376 value = await a.read_value(None) 377 assert value == pv 378 c.value = None 379 await a.write_value(None, pv) 380 assert a.value == mapped 381 382 # UTF-8 adapter 383 v = 'Hello π' 384 ev = v.encode('utf-8') 385 c.value = v 386 a = UTF8CharacteristicAdapter(c) 387 388 value = await a.read_value(None) 389 assert value == ev 390 c.value = None 391 await a.write_value(None, ev) 392 assert a.value == v 393 394 395# ----------------------------------------------------------------------------- 396@pytest.mark.asyncio 397async def test_CharacteristicValue(): 398 b = bytes([1, 2, 3]) 399 400 async def read_value(connection): 401 return b 402 403 c = CharacteristicValue(read=read_value) 404 x = await c.read(None) 405 assert x == b 406 407 m = Mock() 408 c = CharacteristicValue(write=m) 409 z = object() 410 c.write(z, b) 411 m.assert_called_once_with(z, b) 412 413 414# ----------------------------------------------------------------------------- 415@pytest.mark.asyncio 416async def test_CharacteristicValue_async(): 417 b = bytes([1, 2, 3]) 418 419 async def read_value(connection): 420 return b 421 422 c = CharacteristicValue(read=read_value) 423 x = await c.read(None) 424 assert x == b 425 426 m = AsyncMock() 427 c = CharacteristicValue(write=m) 428 z = object() 429 await c.write(z, b) 430 m.assert_called_once_with(z, b) 431 432 433# ----------------------------------------------------------------------------- 434class LinkedDevices: 435 def __init__(self): 436 self.connections = [None, None, None] 437 438 self.link = LocalLink() 439 self.controllers = [ 440 Controller('C1', link=self.link), 441 Controller('C2', link=self.link), 442 Controller('C3', link=self.link), 443 ] 444 self.devices = [ 445 Device( 446 address='F0:F1:F2:F3:F4:F5', 447 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 448 ), 449 Device( 450 address='F1:F2:F3:F4:F5:F6', 451 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 452 ), 453 Device( 454 address='F2:F3:F4:F5:F6:F7', 455 host=Host(self.controllers[2], AsyncPipeSink(self.controllers[2])), 456 ), 457 ] 458 459 self.paired = [None, None, None] 460 461 462# ----------------------------------------------------------------------------- 463@pytest.mark.asyncio 464async def test_read_write(): 465 [client, server] = LinkedDevices().devices[:2] 466 467 characteristic1 = Characteristic( 468 'FDB159DB-036C-49E3-B3DB-6325AC750806', 469 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 470 Characteristic.READABLE | Characteristic.WRITEABLE, 471 ) 472 473 def on_characteristic1_write(connection, value): 474 characteristic1._last_value = (connection, value) 475 476 characteristic1.on('write', on_characteristic1_write) 477 478 def on_characteristic2_read(connection): 479 return bytes(str(connection.peer_address)) 480 481 def on_characteristic2_write(connection, value): 482 characteristic2._last_value = (connection, value) 483 484 characteristic2 = Characteristic( 485 '66DE9057-C848-4ACA-B993-D675644EBB85', 486 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 487 Characteristic.READABLE | Characteristic.WRITEABLE, 488 CharacteristicValue( 489 read=on_characteristic2_read, write=on_characteristic2_write 490 ), 491 ) 492 493 service1 = Service( 494 '3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1, characteristic2] 495 ) 496 server.add_services([service1]) 497 498 await client.power_on() 499 await server.power_on() 500 connection = await client.connect(server.random_address) 501 peer = Peer(connection) 502 503 await peer.discover_services() 504 await peer.discover_characteristics() 505 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 506 assert len(c) == 1 507 c1 = c[0] 508 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 509 assert len(c) == 1 510 c2 = c[0] 511 512 v1 = await peer.read_value(c1) 513 assert v1 == b'' 514 b = bytes([1, 2, 3]) 515 await peer.write_value(c1, b) 516 await async_barrier() 517 assert characteristic1.value == b 518 v1 = await peer.read_value(c1) 519 assert v1 == b 520 assert type(characteristic1._last_value is tuple) 521 assert len(characteristic1._last_value) == 2 522 assert str(characteristic1._last_value[0].peer_address) == str( 523 client.random_address 524 ) 525 assert characteristic1._last_value[1] == b 526 bb = bytes([3, 4, 5, 6]) 527 characteristic1.value = bb 528 v1 = await peer.read_value(c1) 529 assert v1 == bb 530 531 await peer.write_value(c2, b) 532 await async_barrier() 533 assert type(characteristic2._last_value is tuple) 534 assert len(characteristic2._last_value) == 2 535 assert str(characteristic2._last_value[0].peer_address) == str( 536 client.random_address 537 ) 538 assert characteristic2._last_value[1] == b 539 540 541# ----------------------------------------------------------------------------- 542@pytest.mark.asyncio 543async def test_read_write2(): 544 [client, server] = LinkedDevices().devices[:2] 545 546 v = bytes([0x11, 0x22, 0x33, 0x44]) 547 characteristic1 = Characteristic( 548 'FDB159DB-036C-49E3-B3DB-6325AC750806', 549 Characteristic.Properties.READ | Characteristic.Properties.WRITE, 550 Characteristic.READABLE | Characteristic.WRITEABLE, 551 value=v, 552 ) 553 554 service1 = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic1]) 555 server.add_services([service1]) 556 557 await client.power_on() 558 await server.power_on() 559 connection = await client.connect(server.random_address) 560 peer = Peer(connection) 561 562 await peer.discover_services() 563 c = peer.get_services_by_uuid(service1.uuid) 564 assert len(c) == 1 565 s = c[0] 566 await s.discover_characteristics() 567 c = s.get_characteristics_by_uuid(characteristic1.uuid) 568 assert len(c) == 1 569 c1 = c[0] 570 571 v1 = await c1.read_value() 572 assert v1 == v 573 574 a1 = PackedCharacteristicAdapter(c1, '>I') 575 v1 = await a1.read_value() 576 assert v1 == struct.unpack('>I', v)[0] 577 578 b = bytes([0x55, 0x66, 0x77, 0x88]) 579 await a1.write_value(struct.unpack('>I', b)[0]) 580 await async_barrier() 581 assert characteristic1.value == b 582 v1 = await a1.read_value() 583 assert v1 == struct.unpack('>I', b)[0] 584 585 586# ----------------------------------------------------------------------------- 587@pytest.mark.asyncio 588async def test_subscribe_notify(): 589 [client, server] = LinkedDevices().devices[:2] 590 591 characteristic1 = Characteristic( 592 'FDB159DB-036C-49E3-B3DB-6325AC750806', 593 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 594 Characteristic.READABLE, 595 bytes([1, 2, 3]), 596 ) 597 598 def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): 599 characteristic1._last_subscription = ( 600 connection, 601 notify_enabled, 602 indicate_enabled, 603 ) 604 605 characteristic1.on('subscription', on_characteristic1_subscription) 606 607 characteristic2 = Characteristic( 608 '66DE9057-C848-4ACA-B993-D675644EBB85', 609 Characteristic.Properties.READ | Characteristic.Properties.INDICATE, 610 Characteristic.READABLE, 611 bytes([4, 5, 6]), 612 ) 613 614 def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): 615 characteristic2._last_subscription = ( 616 connection, 617 notify_enabled, 618 indicate_enabled, 619 ) 620 621 characteristic2.on('subscription', on_characteristic2_subscription) 622 623 characteristic3 = Characteristic( 624 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', 625 Characteristic.Properties.READ 626 | Characteristic.Properties.NOTIFY 627 | Characteristic.Properties.INDICATE, 628 Characteristic.READABLE, 629 bytes([7, 8, 9]), 630 ) 631 632 def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): 633 characteristic3._last_subscription = ( 634 connection, 635 notify_enabled, 636 indicate_enabled, 637 ) 638 639 characteristic3.on('subscription', on_characteristic3_subscription) 640 641 service1 = Service( 642 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 643 [characteristic1, characteristic2, characteristic3], 644 ) 645 server.add_services([service1]) 646 647 def on_characteristic_subscription( 648 connection, characteristic, notify_enabled, indicate_enabled 649 ): 650 server._last_subscription = ( 651 connection, 652 characteristic, 653 notify_enabled, 654 indicate_enabled, 655 ) 656 657 server.on('characteristic_subscription', on_characteristic_subscription) 658 659 await client.power_on() 660 await server.power_on() 661 connection = await client.connect(server.random_address) 662 peer = Peer(connection) 663 664 await peer.discover_services() 665 await peer.discover_characteristics() 666 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 667 assert len(c) == 1 668 c1 = c[0] 669 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 670 assert len(c) == 1 671 c2 = c[0] 672 c = peer.get_characteristics_by_uuid(characteristic3.uuid) 673 assert len(c) == 1 674 c3 = c[0] 675 676 c1._called = False 677 c1._last_update = None 678 679 def on_c1_update(value): 680 c1._called = True 681 c1._last_update = value 682 683 c1.on('update', on_c1_update) 684 await peer.subscribe(c1) 685 await async_barrier() 686 assert server._last_subscription[1] == characteristic1 687 assert server._last_subscription[2] 688 assert not server._last_subscription[3] 689 assert characteristic1._last_subscription[1] 690 assert not characteristic1._last_subscription[2] 691 await server.indicate_subscribers(characteristic1) 692 await async_barrier() 693 assert not c1._called 694 await server.notify_subscribers(characteristic1) 695 await async_barrier() 696 assert c1._called 697 assert c1._last_update == characteristic1.value 698 699 c1._called = False 700 c1._last_update = None 701 c1_value = characteristic1.value 702 await server.notify_subscribers(characteristic1, bytes([0, 1, 2])) 703 await async_barrier() 704 assert c1._called 705 assert c1._last_update == bytes([0, 1, 2]) 706 assert characteristic1.value == c1_value 707 708 c1._called = False 709 await peer.unsubscribe(c1) 710 await server.notify_subscribers(characteristic1) 711 assert not c1._called 712 713 c2._called = False 714 c2._last_update = None 715 716 def on_c2_update(value): 717 c2._called = True 718 c2._last_update = value 719 720 await peer.subscribe(c2, on_c2_update) 721 await async_barrier() 722 await server.notify_subscriber( 723 characteristic2._last_subscription[0], characteristic2 724 ) 725 await async_barrier() 726 assert not c2._called 727 await server.indicate_subscriber( 728 characteristic2._last_subscription[0], characteristic2 729 ) 730 await async_barrier() 731 assert c2._called 732 assert c2._last_update == characteristic2.value 733 734 c2._called = False 735 await peer.unsubscribe(c2, on_c2_update) 736 await server.indicate_subscriber( 737 characteristic2._last_subscription[0], characteristic2 738 ) 739 await async_barrier() 740 assert not c2._called 741 742 c3._called = False 743 c3._called_2 = False 744 c3._called_3 = False 745 c3._last_update = None 746 c3._last_update_2 = None 747 c3._last_update_3 = None 748 749 def on_c3_update(value): 750 c3._called = True 751 c3._last_update = value 752 753 def on_c3_update_2(value): # for notify 754 c3._called_2 = True 755 c3._last_update_2 = value 756 757 def on_c3_update_3(value): # for indicate 758 c3._called_3 = True 759 c3._last_update_3 = value 760 761 c3.on('update', on_c3_update) 762 await peer.subscribe(c3, on_c3_update_2) 763 await async_barrier() 764 await server.notify_subscriber( 765 characteristic3._last_subscription[0], characteristic3 766 ) 767 await async_barrier() 768 assert c3._called 769 assert c3._last_update == characteristic3.value 770 assert c3._called_2 771 assert c3._last_update_2 == characteristic3.value 772 assert not c3._called_3 773 774 c3._called = False 775 c3._called_2 = False 776 c3._called_3 = False 777 await peer.unsubscribe(c3) 778 await peer.subscribe(c3, on_c3_update_3, prefer_notify=False) 779 await async_barrier() 780 characteristic3.value = bytes([1, 2, 3]) 781 await server.indicate_subscriber( 782 characteristic3._last_subscription[0], characteristic3 783 ) 784 await async_barrier() 785 assert c3._called 786 assert c3._last_update == characteristic3.value 787 assert not c3._called_2 788 assert c3._called_3 789 assert c3._last_update_3 == characteristic3.value 790 791 c3._called = False 792 c3._called_2 = False 793 c3._called_3 = False 794 await peer.unsubscribe(c3) 795 await server.notify_subscriber( 796 characteristic3._last_subscription[0], characteristic3 797 ) 798 await server.indicate_subscriber( 799 characteristic3._last_subscription[0], characteristic3 800 ) 801 await async_barrier() 802 assert not c3._called 803 assert not c3._called_2 804 assert not c3._called_3 805 806 807# ----------------------------------------------------------------------------- 808@pytest.mark.asyncio 809async def test_unsubscribe(): 810 [client, server] = LinkedDevices().devices[:2] 811 812 characteristic1 = Characteristic( 813 'FDB159DB-036C-49E3-B3DB-6325AC750806', 814 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 815 Characteristic.READABLE, 816 bytes([1, 2, 3]), 817 ) 818 characteristic2 = Characteristic( 819 '3234C4F4-3F34-4616-8935-45A50EE05DEB', 820 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 821 Characteristic.READABLE, 822 bytes([1, 2, 3]), 823 ) 824 825 service1 = Service( 826 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 827 [characteristic1, characteristic2], 828 ) 829 server.add_services([service1]) 830 831 mock1 = Mock() 832 characteristic1.on('subscription', mock1) 833 mock2 = Mock() 834 characteristic2.on('subscription', mock2) 835 836 await client.power_on() 837 await server.power_on() 838 connection = await client.connect(server.random_address) 839 peer = Peer(connection) 840 841 await peer.discover_services() 842 await peer.discover_characteristics() 843 c = peer.get_characteristics_by_uuid(characteristic1.uuid) 844 assert len(c) == 1 845 c1 = c[0] 846 c = peer.get_characteristics_by_uuid(characteristic2.uuid) 847 assert len(c) == 1 848 c2 = c[0] 849 850 await c1.subscribe() 851 await async_barrier() 852 mock1.assert_called_once_with(ANY, True, False) 853 854 await c2.subscribe() 855 await async_barrier() 856 mock2.assert_called_once_with(ANY, True, False) 857 858 mock1.reset_mock() 859 await c1.unsubscribe() 860 await async_barrier() 861 mock1.assert_called_once_with(ANY, False, False) 862 863 mock2.reset_mock() 864 await c2.unsubscribe() 865 await async_barrier() 866 mock2.assert_called_once_with(ANY, False, False) 867 868 mock1.reset_mock() 869 await c1.unsubscribe() 870 await async_barrier() 871 mock1.assert_not_called() 872 873 mock2.reset_mock() 874 await c2.unsubscribe() 875 await async_barrier() 876 mock2.assert_not_called() 877 878 mock1.reset_mock() 879 await c1.unsubscribe(force=True) 880 await async_barrier() 881 mock1.assert_called_once_with(ANY, False, False) 882 883 884# ----------------------------------------------------------------------------- 885@pytest.mark.asyncio 886async def test_discover_all(): 887 [client, server] = LinkedDevices().devices[:2] 888 889 characteristic1 = Characteristic( 890 'FDB159DB-036C-49E3-B3DB-6325AC750806', 891 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 892 Characteristic.READABLE, 893 bytes([1, 2, 3]), 894 ) 895 896 descriptor1 = Descriptor('2902', 'READABLE,WRITEABLE') 897 descriptor2 = Descriptor('AAAA', 'READABLE,WRITEABLE') 898 characteristic2 = Characteristic( 899 '3234C4F4-3F34-4616-8935-45A50EE05DEB', 900 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 901 Characteristic.READABLE, 902 bytes([1, 2, 3]), 903 descriptors=[descriptor1, descriptor2], 904 ) 905 906 service1 = Service( 907 '3A657F47-D34F-46B3-B1EC-698E29B6B829', 908 [characteristic1, characteristic2], 909 ) 910 service2 = Service('1111', []) 911 server.add_services([service1, service2]) 912 913 await client.power_on() 914 await server.power_on() 915 connection = await client.connect(server.random_address) 916 peer = Peer(connection) 917 918 await peer.discover_all() 919 assert len(peer.gatt_client.services) == 3 920 # service 1800 gets added automatically 921 assert peer.gatt_client.services[0].uuid == UUID('1800') 922 assert peer.gatt_client.services[1].uuid == service1.uuid 923 assert peer.gatt_client.services[2].uuid == service2.uuid 924 s = peer.get_services_by_uuid(service1.uuid) 925 assert len(s) == 1 926 assert len(s[0].characteristics) == 2 927 c = peer.get_characteristics_by_uuid(uuid=characteristic2.uuid, service=s[0]) 928 assert len(c) == 1 929 assert len(c[0].descriptors) == 2 930 s = peer.get_services_by_uuid(service2.uuid) 931 assert len(s) == 1 932 assert len(s[0].characteristics) == 0 933 934 935# ----------------------------------------------------------------------------- 936@pytest.mark.asyncio 937async def test_mtu_exchange(): 938 [d1, d2, d3] = LinkedDevices().devices[:3] 939 940 d3.gatt_server.max_mtu = 100 941 942 d3_connections = [] 943 944 @d3.on('connection') 945 def on_d3_connection(connection): 946 d3_connections.append(connection) 947 948 await d1.power_on() 949 await d2.power_on() 950 await d3.power_on() 951 952 d1_connection = await d1.connect(d3.random_address) 953 assert len(d3_connections) == 1 954 assert d3_connections[0] is not None 955 956 d2_connection = await d2.connect(d3.random_address) 957 assert len(d3_connections) == 2 958 assert d3_connections[1] is not None 959 960 d1_peer = Peer(d1_connection) 961 d2_peer = Peer(d2_connection) 962 963 d1_client_mtu = await d1_peer.request_mtu(220) 964 assert d1_client_mtu == 100 965 assert d1_connection.att_mtu == 100 966 967 d2_client_mtu = await d2_peer.request_mtu(50) 968 assert d2_client_mtu == 50 969 assert d2_connection.att_mtu == 50 970 971 972# ----------------------------------------------------------------------------- 973def test_char_property_to_string(): 974 # single 975 assert str(Characteristic.Properties(0x01)) == "BROADCAST" 976 assert str(Characteristic.Properties.BROADCAST) == "BROADCAST" 977 978 # double 979 assert str(Characteristic.Properties(0x03)) == "BROADCAST|READ" 980 assert ( 981 str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ) 982 == "BROADCAST|READ" 983 ) 984 985 986# ----------------------------------------------------------------------------- 987def test_characteristic_property_from_string(): 988 # single 989 assert ( 990 Characteristic.Properties.from_string("BROADCAST") 991 == Characteristic.Properties.BROADCAST 992 ) 993 994 # double 995 assert ( 996 Characteristic.Properties.from_string("BROADCAST,READ") 997 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 998 ) 999 assert ( 1000 Characteristic.Properties.from_string("READ,BROADCAST") 1001 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 1002 ) 1003 assert ( 1004 Characteristic.Properties.from_string("BROADCAST|READ") 1005 == Characteristic.Properties.BROADCAST | Characteristic.Properties.READ 1006 ) 1007 1008 1009# ----------------------------------------------------------------------------- 1010def test_characteristic_property_from_string_assert(): 1011 with pytest.raises(TypeError) as e_info: 1012 Characteristic.Properties.from_string("BROADCAST,HELLO") 1013 1014 assert ( 1015 str(e_info.value) 1016 == """Characteristic.Properties::from_string() error: 1017Expected a string containing any of the keys, separated by , or |: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES 1018Got: BROADCAST,HELLO""" 1019 ) 1020 1021 1022# ----------------------------------------------------------------------------- 1023@pytest.mark.asyncio 1024async def test_server_string(): 1025 [_, server] = LinkedDevices().devices[:2] 1026 1027 characteristic = Characteristic( 1028 'FDB159DB-036C-49E3-B3DB-6325AC750806', 1029 Characteristic.Properties.READ 1030 | Characteristic.Properties.WRITE 1031 | Characteristic.Properties.NOTIFY, 1032 Characteristic.READABLE | Characteristic.WRITEABLE, 1033 bytes([123]), 1034 ) 1035 1036 service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic]) 1037 server.add_service(service) 1038 1039 assert ( 1040 str(server.gatt_server) 1041 == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access)) 1042CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), READ) 1043Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), READ) 1044CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), READ) 1045Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), READ) 1046Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829) 1047CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) 1048Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, READ|WRITE|NOTIFY) 1049Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)""" 1050 ) 1051 1052 1053# ----------------------------------------------------------------------------- 1054async def async_main(): 1055 test_UUID() 1056 test_ATT_Error_Response() 1057 test_ATT_Read_By_Group_Type_Request() 1058 await test_read_write() 1059 await test_read_write2() 1060 await test_subscribe_notify() 1061 await test_unsubscribe() 1062 await test_characteristic_encoding() 1063 await test_mtu_exchange() 1064 await test_CharacteristicValue() 1065 await test_CharacteristicValue_async() 1066 await test_CharacteristicAdapter() 1067 1068 1069# ----------------------------------------------------------------------------- 1070def test_permissions_from_string(): 1071 assert Attribute.Permissions.from_string('READABLE') == 1 1072 assert Attribute.Permissions.from_string('WRITEABLE') == 2 1073 assert Attribute.Permissions.from_string('READABLE,WRITEABLE') == 3 1074 1075 1076# ----------------------------------------------------------------------------- 1077def test_characteristic_permissions(): 1078 characteristic = Characteristic( 1079 'FDB159DB-036C-49E3-B3DB-6325AC750806', 1080 Characteristic.Properties.READ 1081 | Characteristic.Properties.WRITE 1082 | Characteristic.Properties.NOTIFY, 1083 'READABLE,WRITEABLE', 1084 ) 1085 assert characteristic.permissions == 3 1086 1087 1088# ----------------------------------------------------------------------------- 1089def test_characteristic_has_properties(): 1090 characteristic = Characteristic( 1091 'FDB159DB-036C-49E3-B3DB-6325AC750806', 1092 Characteristic.Properties.READ 1093 | Characteristic.Properties.WRITE 1094 | Characteristic.Properties.NOTIFY, 1095 'READABLE,WRITEABLE', 1096 ) 1097 assert characteristic.has_properties(Characteristic.Properties.READ) 1098 assert characteristic.has_properties( 1099 Characteristic.Properties.READ | Characteristic.Properties.WRITE 1100 ) 1101 assert not characteristic.has_properties( 1102 Characteristic.Properties.READ 1103 | Characteristic.Properties.WRITE 1104 | Characteristic.Properties.INDICATE 1105 ) 1106 assert not characteristic.has_properties(Characteristic.Properties.INDICATE) 1107 1108 1109# ----------------------------------------------------------------------------- 1110def test_descriptor_permissions(): 1111 descriptor = Descriptor('2902', 'READABLE,WRITEABLE') 1112 assert descriptor.permissions == 3 1113 1114 1115# ----------------------------------------------------------------------------- 1116def test_get_attribute_group(): 1117 device = Device() 1118 1119 # add some services / characteristics to the gatt server 1120 characteristic1 = Characteristic( 1121 '1111', 1122 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 1123 Characteristic.READABLE | Characteristic.WRITEABLE, 1124 bytes([123]), 1125 ) 1126 characteristic2 = Characteristic( 1127 '2222', 1128 Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, 1129 Characteristic.READABLE | Characteristic.WRITEABLE, 1130 bytes([123]), 1131 ) 1132 services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])] 1133 device.gatt_server.add_services(services) 1134 1135 # get the handles from gatt server 1136 characteristic_attributes1 = device.gatt_server.get_characteristic_attributes( 1137 UUID('1212'), UUID('1111') 1138 ) 1139 assert characteristic_attributes1 is not None 1140 characteristic_attributes2 = device.gatt_server.get_characteristic_attributes( 1141 UUID('3233'), UUID('2222') 1142 ) 1143 assert characteristic_attributes2 is not None 1144 descriptor1 = device.gatt_server.get_descriptor_attribute( 1145 UUID('1212'), UUID('1111'), UUID('2902') 1146 ) 1147 assert descriptor1 is not None 1148 descriptor2 = device.gatt_server.get_descriptor_attribute( 1149 UUID('3233'), UUID('2222'), UUID('2902') 1150 ) 1151 assert descriptor2 is not None 1152 1153 # confirm the handles map back to the service 1154 assert ( 1155 UUID('1212') 1156 == device.gatt_server.get_attribute_group( 1157 characteristic_attributes1[0].handle, Service 1158 ).uuid 1159 ) 1160 assert ( 1161 UUID('1212') 1162 == device.gatt_server.get_attribute_group( 1163 characteristic_attributes1[1].handle, Service 1164 ).uuid 1165 ) 1166 assert ( 1167 UUID('1212') 1168 == device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid 1169 ) 1170 assert ( 1171 UUID('3233') 1172 == device.gatt_server.get_attribute_group( 1173 characteristic_attributes2[0].handle, Service 1174 ).uuid 1175 ) 1176 assert ( 1177 UUID('3233') 1178 == device.gatt_server.get_attribute_group( 1179 characteristic_attributes2[1].handle, Service 1180 ).uuid 1181 ) 1182 assert ( 1183 UUID('3233') 1184 == device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid 1185 ) 1186 1187 # confirm the handles map back to the characteristic 1188 assert ( 1189 UUID('1111') 1190 == device.gatt_server.get_attribute_group( 1191 descriptor1.handle, Characteristic 1192 ).uuid 1193 ) 1194 assert ( 1195 UUID('2222') 1196 == device.gatt_server.get_attribute_group( 1197 descriptor2.handle, Characteristic 1198 ).uuid 1199 ) 1200 1201 1202# ----------------------------------------------------------------------------- 1203@pytest.mark.asyncio 1204async def test_get_characteristics_by_uuid(): 1205 [client, server] = LinkedDevices().devices[:2] 1206 1207 characteristic1 = Characteristic( 1208 '1234', 1209 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 1210 Characteristic.READABLE, 1211 bytes([1, 2, 3]), 1212 ) 1213 characteristic2 = Characteristic( 1214 '5678', 1215 Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, 1216 Characteristic.READABLE, 1217 bytes([1, 2, 3]), 1218 ) 1219 service1 = Service( 1220 'ABCD', 1221 [characteristic1, characteristic2], 1222 ) 1223 service2 = Service( 1224 'FFFF', 1225 [characteristic1], 1226 ) 1227 1228 server.add_services([service1, service2]) 1229 1230 await client.power_on() 1231 await server.power_on() 1232 connection = await client.connect(server.random_address) 1233 peer = Peer(connection) 1234 1235 await peer.discover_services() 1236 await peer.discover_characteristics() 1237 c = peer.get_characteristics_by_uuid(uuid=UUID('1234')) 1238 assert len(c) == 2 1239 assert isinstance(c[0], CharacteristicProxy) 1240 c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('ABCD')) 1241 assert len(c) == 1 1242 assert isinstance(c[0], CharacteristicProxy) 1243 c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=UUID('AAAA')) 1244 assert len(c) == 0 1245 1246 s = peer.get_services_by_uuid(uuid=UUID('ABCD')) 1247 assert len(s) == 1 1248 c = peer.get_characteristics_by_uuid(uuid=UUID('1234'), service=s[0]) 1249 assert len(s) == 1 1250 1251 1252# ----------------------------------------------------------------------------- 1253@pytest.mark.asyncio 1254async def test_write_return_error(): 1255 [client, server] = LinkedDevices().devices[:2] 1256 1257 on_write = Mock(side_effect=ATT_Error(error_code=ErrorCode.VALUE_NOT_ALLOWED)) 1258 characteristic = Characteristic( 1259 '1234', 1260 Characteristic.Properties.WRITE, 1261 Characteristic.Permissions.WRITEABLE, 1262 CharacteristicValue(write=on_write), 1263 ) 1264 service = Service('ABCD', [characteristic]) 1265 server.add_service(service) 1266 1267 await client.power_on() 1268 await server.power_on() 1269 connection = await client.connect(server.random_address) 1270 1271 async with Peer(connection) as peer: 1272 c = peer.get_characteristics_by_uuid(uuid=UUID('1234'))[0] 1273 with pytest.raises(ATT_Error) as e: 1274 await c.write_value(b'', with_response=True) 1275 assert e.value.error_code == ErrorCode.VALUE_NOT_ALLOWED 1276 1277 1278# ----------------------------------------------------------------------------- 1279if __name__ == '__main__': 1280 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 1281 asyncio.run(async_main()) 1282