1#!/usr/bin/env python3 2# BlueKitchen GmbH (c) 2022 3 4# parse PacketLogger and reconstruct GATT DB 5 6import sys 7import datetime 8import struct 9 10def as_hex(data): 11 str_list = [] 12 for byte in data: 13 str_list.append("{0:02x} ".format(byte)) 14 return ''.join(str_list) 15 16def as_bd_addr(data): 17 str_list = [] 18 for byte in data: 19 str_list.append("{0:02x}".format(byte)) 20 return ':'.join(str_list) 21 22def read_header(f): 23 bytes_read = f.read(13) 24 if bytes_read: 25 return struct.unpack(">IIIB", bytes_read) 26 else: 27 return (-1, 0, 0, 0) 28 29def uuid16_at_offset(data, offset): 30 return "%04x" % struct.unpack_from("<H", data, offset)[0] 31 32def uuid128_at_offset(data, offset): 33 uuid128 = bytes(reversed(data[offset:offset+16])) 34 return uuid128[0:4].hex() + "-" + uuid128[4:6].hex() + "-" + uuid128[6:8].hex() + "-" + uuid128[8:10].hex() + "-" + uuid128[10:].hex() 35 36def handle_at_offset(data, offset): 37 return struct.unpack_from("<H", data, offset)[0]; 38 39def bd_addr_at_offset(data, offset): 40 peer_addr = reversed(data[8:8 + 6]) 41 return as_bd_addr(peer_addr) 42 43class gatt_characteristic: 44 def __init__(self, uuid, properties, characteristic_handle, value_handle): 45 self.uuid = uuid 46 self.properties = properties 47 self.characteristic_handle = characteristic_handle 48 self.value_handle = value_handle 49 def report(self, prefix): 50 print("%sUUID %-36s, Handle %04x, Properties %02x, Value Handle %04x" % (prefix, self.uuid, self.characteristic_handle, self.properties, self.value_handle)) 51 52class gatt_service: 53 54 def __init__(self, uuid, start_handle, end_handle): 55 self.uuid = uuid 56 self.start_handle = start_handle 57 self.end_handle = end_handle 58 self.characteristics = [] 59 60 def report(self, prefix): 61 print("%sUUID: %-36s, Start Handle %04x, End Handle %04x" % (prefix, self.uuid, self.start_handle, self.end_handle)) 62 print(" %sCharacteristics:" % prefix) 63 for characteristic in self.characteristics: 64 characteristic.report(" " + prefix) 65 66class gatt_server: 67 68 primary_services = [] 69 70 client_opcode = 0 71 group_type = 0 72 read_type = 0 73 mtu = 23 74 75 def __init__(self, bd_addr): 76 self.bd_addr = bd_addr 77 78 def service_for_handle(self, handle): 79 for service in self.primary_services: 80 if service.start_handle <= handle and handle <= service.end_handle: 81 return service 82 return None 83 84 def handle_pdu(self, pdu): 85 opcode = pdu[0] 86 if opcode == 0x01: 87 pass 88 elif opcode == 0x02: 89 # exchange mtu 90 pass 91 elif opcode == 0x03: 92 # exchange mtu 93 self.mtu = struct.unpack_from("<H", pdu, 1)[0] 94 elif opcode == 0x08: 95 # read by type request 96 if len(pdu) == 7: 97 (_,_,self.read_type) = struct.unpack_from("<HHH", pdu, 1) 98 elif opcode == 0x09: 99 # read by type response 100 if self.read_type == 0x2803: 101 item_len = pdu[1] 102 pos = 2 103 while pos < len(pdu): 104 (characteristic_handle, properties, value_handle) = struct.unpack_from("<HBH", pdu, pos) 105 if item_len == 11: 106 uuid = uuid16_at_offset(pdu, pos + 5) 107 elif item_len == 21: 108 uuid = uuid128_at_offset(pdu, pos + 5) 109 service = self.service_for_handle(characteristic_handle) 110 if service: 111 service.characteristics.append(gatt_characteristic(uuid, properties, characteristic_handle, value_handle)) 112 pos += item_len 113 elif opcode == 0x10: 114 # read by group type request 115 if len(pdu) == 7: 116 (_,_,self.group_type) = struct.unpack_from("<HHH", pdu, 1) 117 elif opcode == 0x11: 118 # read by group type response 119 item_len = pdu[1] 120 pos = 2 121 while pos < len(pdu): 122 (start, end) = struct.unpack_from("<HH", pdu, pos) 123 if self.group_type == 0x2800: 124 # primary service 125 if item_len == 6: 126 uuid = uuid16_at_offset(pdu, pos+4) 127 elif item_len == 20: 128 uuid = uuid128_at_offset(pdu, pos+4) 129 self.primary_services.append(gatt_service(uuid, start, end)) 130 pos += item_len 131 else: 132 # print(self.bd_addr, "ATT PDU:", as_hex(pdu)) 133 pass 134 135 def report(self): 136 print("GATT Server on", self.bd_addr) 137 print("- MTU", self.mtu) 138 print("- Primary Services:") 139 for service in self.primary_services: 140 service.report(" - ") 141 142class l2cap_reassembler: 143 144 payload_data = bytes() 145 payload_len = 0 146 channel = 0; 147 148 def handle_acl(self, pb, data): 149 if pb in [0, 2]: 150 (self.payload_len, self.channel) = struct.unpack("<HH", data[0:4]) 151 self.payload_data = data[4:] 152 if pb == 0x01: 153 self.payload_data += data[4:] 154 155 def l2cap_complete(self): 156 return len(self.payload_data) == self.payload_len 157 158 def l2cap_packet(self): 159 return (self.channel, self.payload_data) 160 161class hci_connection: 162 163 l2cap_in = l2cap_reassembler() 164 l2cap_out = l2cap_reassembler() 165 166 def __init__(self, bd_addr, con_handle): 167 self.bd_addr = bd_addr 168 self.con_handle = con_handle; 169 self.remote_gatt_server = gatt_server(bd_addr) 170 171 def handle_att_pdu(self, direction_in, pdu): 172 opcode = pdu[0] 173 remote_server = ((opcode & 1) == 1) == direction_in 174 if (remote_server): 175 self.remote_gatt_server.handle_pdu(pdu) 176 else: 177 local_gatt_server.handle_pdu(pdu) 178 179 def handle_acl(self, direction_in, pb, data): 180 if direction_in: 181 self.l2cap_in.handle_acl(pb, data) 182 if self.l2cap_in.l2cap_complete(): 183 (channel, l2cap_data) = self.l2cap_in.l2cap_packet() 184 if channel == 0x004: 185 self.handle_att_pdu(direction_in, l2cap_data) 186 else: 187 self.l2cap_out.handle_acl(pb, data) 188 if self.l2cap_out.l2cap_complete(): 189 (channel, l2cap_data) = self.l2cap_out.l2cap_packet() 190 if channel == 0x004: 191 self.handle_att_pdu(direction_in, l2cap_data) 192 193def connection_for_handle(con_handle): 194 if con_handle in connections: 195 return connections[con_handle] 196 else: 197 return None 198 199def handle_cmd(packet): 200 pass 201 202def handle_evt(event): 203 if event[0] == 0x05: 204 # Disconnection Complete 205 con_handle = handle_at_offset(event, 3) 206 print("Disconnection Complete: handle 0x%04x" % con_handle) 207 connection = connections.pop(con_handle, None) 208 connection.remote_gatt_server.report() 209 210 if event[0] == 0x3e: 211 if event[2] == 0x01: 212 # LE Connection Complete 213 con_handle = handle_at_offset(event, 4); 214 peer_addr = bd_addr_at_offset(event, 8) 215 connection = hci_connection(peer_addr, con_handle) 216 connections[con_handle] = connection 217 print("LE Connection Complete: %s handle 0x%04x" % (peer_addr, con_handle)) 218 219def handle_acl(data, direction_in): 220 (header, hci_len) = struct.unpack("<HH", data[0:4]) 221 pb = (header >> 12) & 0x03 222 con_handle = header & 0x0FFF 223 connection_for_handle(con_handle).handle_acl(direction_in, pb, data[4:]) 224 225# globals 226connections = {} 227local_gatt_server = gatt_server("00:00:00:00:00:00") 228 229if len(sys.argv) == 1: 230 print ('Reconstruct GATT interactions from PacketLogger trace file') 231 print ('Copyright 2022, BlueKitchen GmbH') 232 print ('') 233 print ('Usage: ', sys.argv[0], 'hci_dump.pklg') 234 exit(0) 235 236infile = sys.argv[1] 237 238with open (infile, 'rb') as fin: 239 pos = 0 240 try: 241 while True: 242 (entry_len, ts_sec, ts_usec, type) = read_header(fin) 243 if entry_len < 0: 244 break 245 packet_len = entry_len - 9; 246 if (packet_len > 66000): 247 print ("Error parsing pklg at offset %u (%x)." % (pos, pos)) 248 break 249 packet = fin.read(packet_len) 250 pos = pos + 4 + entry_len 251 if type == 0x00: 252 handle_cmd(packet) 253 elif type == 0x01: 254 handle_evt(packet) 255 elif type == 0x02: 256 handle_acl(packet, False) 257 elif type == 0x03: 258 handle_acl(packet, True) 259 260 except TypeError as e: 261 print(e) 262 print ("Error parsing pklg at offset %u (%x)." % (pos, pos)) 263 264for connection in connections: 265 connection.remote_gatt_server.report() 266