1*17348a8fSMatthias Ringwald#!/usr/bin/env python3 2*17348a8fSMatthias Ringwald# BlueKitchen GmbH (c) 2022 3*17348a8fSMatthias Ringwald 4*17348a8fSMatthias Ringwald# parse PacketLogger and reconstruct GATT DB 5*17348a8fSMatthias Ringwald 6*17348a8fSMatthias Ringwaldimport sys 7*17348a8fSMatthias Ringwaldimport datetime 8*17348a8fSMatthias Ringwaldimport struct 9*17348a8fSMatthias Ringwald 10*17348a8fSMatthias Ringwalddef as_hex(data): 11*17348a8fSMatthias Ringwald str_list = [] 12*17348a8fSMatthias Ringwald for byte in data: 13*17348a8fSMatthias Ringwald str_list.append("{0:02x} ".format(byte)) 14*17348a8fSMatthias Ringwald return ''.join(str_list) 15*17348a8fSMatthias Ringwald 16*17348a8fSMatthias Ringwalddef as_bd_addr(data): 17*17348a8fSMatthias Ringwald str_list = [] 18*17348a8fSMatthias Ringwald for byte in data: 19*17348a8fSMatthias Ringwald str_list.append("{0:02x}".format(byte)) 20*17348a8fSMatthias Ringwald return ':'.join(str_list) 21*17348a8fSMatthias Ringwald 22*17348a8fSMatthias Ringwalddef read_header(f): 23*17348a8fSMatthias Ringwald bytes_read = f.read(13) 24*17348a8fSMatthias Ringwald if bytes_read: 25*17348a8fSMatthias Ringwald return struct.unpack(">IIIB", bytes_read) 26*17348a8fSMatthias Ringwald else: 27*17348a8fSMatthias Ringwald return (-1, 0, 0, 0) 28*17348a8fSMatthias Ringwald 29*17348a8fSMatthias Ringwalddef uuid16_at_offset(data, offset): 30*17348a8fSMatthias Ringwald return "%04x" % struct.unpack_from("<H", data, offset)[0] 31*17348a8fSMatthias Ringwald 32*17348a8fSMatthias Ringwalddef uuid128_at_offset(data, offset): 33*17348a8fSMatthias Ringwald uuid128 = bytes(reversed(data[offset:offset+16])) 34*17348a8fSMatthias Ringwald return uuid128[0:4].hex() + "-" + uuid128[4:6].hex() + "-" + uuid128[6:8].hex() + "-" + uuid128[8:10].hex() + "-" + uuid128[10:].hex() 35*17348a8fSMatthias Ringwald 36*17348a8fSMatthias Ringwalddef handle_at_offset(data, offset): 37*17348a8fSMatthias Ringwald return struct.unpack_from("<H", data, offset)[0]; 38*17348a8fSMatthias Ringwald 39*17348a8fSMatthias Ringwalddef bd_addr_at_offset(data, offset): 40*17348a8fSMatthias Ringwald peer_addr = reversed(data[8:8 + 6]) 41*17348a8fSMatthias Ringwald return as_bd_addr(peer_addr) 42*17348a8fSMatthias Ringwald 43*17348a8fSMatthias Ringwaldclass gatt_characteristic: 44*17348a8fSMatthias Ringwald def __init__(self, uuid, properties, characteristic_handle, value_handle): 45*17348a8fSMatthias Ringwald self.uuid = uuid 46*17348a8fSMatthias Ringwald self.properties = properties 47*17348a8fSMatthias Ringwald self.characteristic_handle = characteristic_handle 48*17348a8fSMatthias Ringwald self.value_handle = value_handle 49*17348a8fSMatthias Ringwald def report(self, prefix): 50*17348a8fSMatthias Ringwald print("%sUUID %-36s, Handle %04x, Properties %02x, Value Handle %04x" % (prefix, self.uuid, self.characteristic_handle, self.properties, self.value_handle)) 51*17348a8fSMatthias Ringwald 52*17348a8fSMatthias Ringwaldclass gatt_service: 53*17348a8fSMatthias Ringwald 54*17348a8fSMatthias Ringwald def __init__(self, uuid, start_handle, end_handle): 55*17348a8fSMatthias Ringwald self.uuid = uuid 56*17348a8fSMatthias Ringwald self.start_handle = start_handle 57*17348a8fSMatthias Ringwald self.end_handle = end_handle 58*17348a8fSMatthias Ringwald self.characteristics = [] 59*17348a8fSMatthias Ringwald 60*17348a8fSMatthias Ringwald def report(self, prefix): 61*17348a8fSMatthias Ringwald print("%sUUID: %-36s, Start Handle %04x, End Handle %04x" % (prefix, self.uuid, self.start_handle, self.end_handle)) 62*17348a8fSMatthias Ringwald print(" %sCharacteristics:" % prefix) 63*17348a8fSMatthias Ringwald for characteristic in self.characteristics: 64*17348a8fSMatthias Ringwald characteristic.report(" " + prefix) 65*17348a8fSMatthias Ringwald 66*17348a8fSMatthias Ringwaldclass gatt_server: 67*17348a8fSMatthias Ringwald 68*17348a8fSMatthias Ringwald primary_services = [] 69*17348a8fSMatthias Ringwald 70*17348a8fSMatthias Ringwald client_opcode = 0 71*17348a8fSMatthias Ringwald group_type = 0 72*17348a8fSMatthias Ringwald read_type = 0 73*17348a8fSMatthias Ringwald mtu = 23 74*17348a8fSMatthias Ringwald 75*17348a8fSMatthias Ringwald def __init__(self, bd_addr): 76*17348a8fSMatthias Ringwald self.bd_addr = bd_addr 77*17348a8fSMatthias Ringwald 78*17348a8fSMatthias Ringwald def service_for_handle(self, handle): 79*17348a8fSMatthias Ringwald for service in self.primary_services: 80*17348a8fSMatthias Ringwald if service.start_handle <= handle and handle <= service.end_handle: 81*17348a8fSMatthias Ringwald return service 82*17348a8fSMatthias Ringwald return None 83*17348a8fSMatthias Ringwald 84*17348a8fSMatthias Ringwald def handle_pdu(self, pdu): 85*17348a8fSMatthias Ringwald opcode = pdu[0] 86*17348a8fSMatthias Ringwald if opcode == 0x01: 87*17348a8fSMatthias Ringwald pass 88*17348a8fSMatthias Ringwald elif opcode == 0x02: 89*17348a8fSMatthias Ringwald # exchange mtu 90*17348a8fSMatthias Ringwald pass 91*17348a8fSMatthias Ringwald elif opcode == 0x03: 92*17348a8fSMatthias Ringwald # exchange mtu 93*17348a8fSMatthias Ringwald self.mtu = struct.unpack_from("<H", pdu, 1)[0] 94*17348a8fSMatthias Ringwald elif opcode == 0x08: 95*17348a8fSMatthias Ringwald # read by type request 96*17348a8fSMatthias Ringwald if len(pdu) == 7: 97*17348a8fSMatthias Ringwald (_,_,self.read_type) = struct.unpack_from("<HHH", pdu, 1) 98*17348a8fSMatthias Ringwald elif opcode == 0x09: 99*17348a8fSMatthias Ringwald # read by type response 100*17348a8fSMatthias Ringwald if self.read_type == 0x2803: 101*17348a8fSMatthias Ringwald item_len = pdu[1] 102*17348a8fSMatthias Ringwald pos = 2 103*17348a8fSMatthias Ringwald while pos < len(pdu): 104*17348a8fSMatthias Ringwald (characteristic_handle, properties, value_handle) = struct.unpack_from("<HBH", pdu, pos) 105*17348a8fSMatthias Ringwald if item_len == 11: 106*17348a8fSMatthias Ringwald uuid = uuid16_at_offset(pdu, pos + 5) 107*17348a8fSMatthias Ringwald elif item_len == 21: 108*17348a8fSMatthias Ringwald uuid = uuid128_at_offset(pdu, pos + 5) 109*17348a8fSMatthias Ringwald service = self.service_for_handle(characteristic_handle) 110*17348a8fSMatthias Ringwald if service: 111*17348a8fSMatthias Ringwald service.characteristics.append(gatt_characteristic(uuid, properties, characteristic_handle, value_handle)) 112*17348a8fSMatthias Ringwald pos += item_len 113*17348a8fSMatthias Ringwald elif opcode == 0x10: 114*17348a8fSMatthias Ringwald # read by group type request 115*17348a8fSMatthias Ringwald if len(pdu) == 7: 116*17348a8fSMatthias Ringwald (_,_,self.group_type) = struct.unpack_from("<HHH", pdu, 1) 117*17348a8fSMatthias Ringwald elif opcode == 0x11: 118*17348a8fSMatthias Ringwald # read by group type response 119*17348a8fSMatthias Ringwald item_len = pdu[1] 120*17348a8fSMatthias Ringwald pos = 2 121*17348a8fSMatthias Ringwald while pos < len(pdu): 122*17348a8fSMatthias Ringwald (start, end) = struct.unpack_from("<HH", pdu, pos) 123*17348a8fSMatthias Ringwald if self.group_type == 0x2800: 124*17348a8fSMatthias Ringwald # primary service 125*17348a8fSMatthias Ringwald if item_len == 6: 126*17348a8fSMatthias Ringwald uuid = uuid16_at_offset(pdu, pos+4) 127*17348a8fSMatthias Ringwald elif item_len == 20: 128*17348a8fSMatthias Ringwald uuid = uuid128_at_offset(pdu, pos+4) 129*17348a8fSMatthias Ringwald self.primary_services.append(gatt_service(uuid, start, end)) 130*17348a8fSMatthias Ringwald pos += item_len 131*17348a8fSMatthias Ringwald else: 132*17348a8fSMatthias Ringwald # print(self.bd_addr, "ATT PDU:", as_hex(pdu)) 133*17348a8fSMatthias Ringwald pass 134*17348a8fSMatthias Ringwald 135*17348a8fSMatthias Ringwald def report(self): 136*17348a8fSMatthias Ringwald print("GATT Server on", self.bd_addr) 137*17348a8fSMatthias Ringwald print("- MTU", self.mtu) 138*17348a8fSMatthias Ringwald print("- Primary Services:") 139*17348a8fSMatthias Ringwald for service in self.primary_services: 140*17348a8fSMatthias Ringwald service.report(" - ") 141*17348a8fSMatthias Ringwald 142*17348a8fSMatthias Ringwaldclass l2cap_reassembler: 143*17348a8fSMatthias Ringwald 144*17348a8fSMatthias Ringwald payload_data = bytes() 145*17348a8fSMatthias Ringwald payload_len = 0 146*17348a8fSMatthias Ringwald channel = 0; 147*17348a8fSMatthias Ringwald 148*17348a8fSMatthias Ringwald def handle_acl(self, pb, data): 149*17348a8fSMatthias Ringwald if pb in [0, 2]: 150*17348a8fSMatthias Ringwald (self.payload_len, self.channel) = struct.unpack("<HH", data[0:4]) 151*17348a8fSMatthias Ringwald self.payload_data = data[4:] 152*17348a8fSMatthias Ringwald if pb == 0x01: 153*17348a8fSMatthias Ringwald self.payload_data += data[4:] 154*17348a8fSMatthias Ringwald 155*17348a8fSMatthias Ringwald def l2cap_complete(self): 156*17348a8fSMatthias Ringwald return len(self.payload_data) == self.payload_len 157*17348a8fSMatthias Ringwald 158*17348a8fSMatthias Ringwald def l2cap_packet(self): 159*17348a8fSMatthias Ringwald return (self.channel, self.payload_data) 160*17348a8fSMatthias Ringwald 161*17348a8fSMatthias Ringwaldclass hci_connection: 162*17348a8fSMatthias Ringwald 163*17348a8fSMatthias Ringwald l2cap_in = l2cap_reassembler() 164*17348a8fSMatthias Ringwald l2cap_out = l2cap_reassembler() 165*17348a8fSMatthias Ringwald 166*17348a8fSMatthias Ringwald def __init__(self, bd_addr, con_handle): 167*17348a8fSMatthias Ringwald self.bd_addr = bd_addr 168*17348a8fSMatthias Ringwald self.con_handle = con_handle; 169*17348a8fSMatthias Ringwald self.remote_gatt_server = gatt_server(bd_addr) 170*17348a8fSMatthias Ringwald 171*17348a8fSMatthias Ringwald def handle_att_pdu(self, direction_in, pdu): 172*17348a8fSMatthias Ringwald opcode = pdu[0] 173*17348a8fSMatthias Ringwald remote_server = ((opcode & 1) == 1) == direction_in 174*17348a8fSMatthias Ringwald if (remote_server): 175*17348a8fSMatthias Ringwald self.remote_gatt_server.handle_pdu(pdu) 176*17348a8fSMatthias Ringwald else: 177*17348a8fSMatthias Ringwald local_gatt_server.handle_pdu(pdu) 178*17348a8fSMatthias Ringwald 179*17348a8fSMatthias Ringwald def handle_acl(self, direction_in, pb, data): 180*17348a8fSMatthias Ringwald if direction_in: 181*17348a8fSMatthias Ringwald self.l2cap_in.handle_acl(pb, data) 182*17348a8fSMatthias Ringwald if self.l2cap_in.l2cap_complete(): 183*17348a8fSMatthias Ringwald (channel, l2cap_data) = self.l2cap_in.l2cap_packet() 184*17348a8fSMatthias Ringwald if channel == 0x004: 185*17348a8fSMatthias Ringwald self.handle_att_pdu(direction_in, l2cap_data) 186*17348a8fSMatthias Ringwald else: 187*17348a8fSMatthias Ringwald self.l2cap_out.handle_acl(pb, data) 188*17348a8fSMatthias Ringwald if self.l2cap_out.l2cap_complete(): 189*17348a8fSMatthias Ringwald (channel, l2cap_data) = self.l2cap_out.l2cap_packet() 190*17348a8fSMatthias Ringwald if channel == 0x004: 191*17348a8fSMatthias Ringwald self.handle_att_pdu(direction_in, l2cap_data) 192*17348a8fSMatthias Ringwald 193*17348a8fSMatthias Ringwalddef connection_for_handle(con_handle): 194*17348a8fSMatthias Ringwald if con_handle in connections: 195*17348a8fSMatthias Ringwald return connections[con_handle] 196*17348a8fSMatthias Ringwald else: 197*17348a8fSMatthias Ringwald return None 198*17348a8fSMatthias Ringwald 199*17348a8fSMatthias Ringwalddef handle_cmd(packet): 200*17348a8fSMatthias Ringwald pass 201*17348a8fSMatthias Ringwald 202*17348a8fSMatthias Ringwalddef handle_evt(event): 203*17348a8fSMatthias Ringwald if event[0] == 0x05: 204*17348a8fSMatthias Ringwald # Disconnection Complete 205*17348a8fSMatthias Ringwald con_handle = handle_at_offset(event, 3) 206*17348a8fSMatthias Ringwald print("Disconnection Complete: handle 0x%04x" % con_handle) 207*17348a8fSMatthias Ringwald connection = connections.pop(con_handle, None) 208*17348a8fSMatthias Ringwald connection.remote_gatt_server.report() 209*17348a8fSMatthias Ringwald 210*17348a8fSMatthias Ringwald if event[0] == 0x3e: 211*17348a8fSMatthias Ringwald if event[2] == 0x01: 212*17348a8fSMatthias Ringwald # LE Connection Complete 213*17348a8fSMatthias Ringwald con_handle = handle_at_offset(event, 4); 214*17348a8fSMatthias Ringwald peer_addr = bd_addr_at_offset(event, 8) 215*17348a8fSMatthias Ringwald connection = hci_connection(peer_addr, con_handle) 216*17348a8fSMatthias Ringwald connections[con_handle] = connection 217*17348a8fSMatthias Ringwald print("LE Connection Complete: %s handle 0x%04x" % (peer_addr, con_handle)) 218*17348a8fSMatthias Ringwald 219*17348a8fSMatthias Ringwalddef handle_acl(data, direction_in): 220*17348a8fSMatthias Ringwald (header, hci_len) = struct.unpack("<HH", data[0:4]) 221*17348a8fSMatthias Ringwald pb = (header >> 12) & 0x03 222*17348a8fSMatthias Ringwald con_handle = header & 0x0FFF 223*17348a8fSMatthias Ringwald connection_for_handle(con_handle).handle_acl(direction_in, pb, data[4:]) 224*17348a8fSMatthias Ringwald 225*17348a8fSMatthias Ringwald# globals 226*17348a8fSMatthias Ringwaldconnections = {} 227*17348a8fSMatthias Ringwaldlocal_gatt_server = gatt_server("00:00:00:00:00:00") 228*17348a8fSMatthias Ringwald 229*17348a8fSMatthias Ringwaldif len(sys.argv) == 1: 230*17348a8fSMatthias Ringwald print ('Reconstruct GATT interactions from PacketLogger trace file') 231*17348a8fSMatthias Ringwald print ('Copyright 2022, BlueKitchen GmbH') 232*17348a8fSMatthias Ringwald print ('') 233*17348a8fSMatthias Ringwald print ('Usage: ', sys.argv[0], 'hci_dump.pklg') 234*17348a8fSMatthias Ringwald exit(0) 235*17348a8fSMatthias Ringwald 236*17348a8fSMatthias Ringwaldinfile = sys.argv[1] 237*17348a8fSMatthias Ringwald 238*17348a8fSMatthias Ringwaldwith open (infile, 'rb') as fin: 239*17348a8fSMatthias Ringwald pos = 0 240*17348a8fSMatthias Ringwald try: 241*17348a8fSMatthias Ringwald while True: 242*17348a8fSMatthias Ringwald (entry_len, ts_sec, ts_usec, type) = read_header(fin) 243*17348a8fSMatthias Ringwald if entry_len < 0: 244*17348a8fSMatthias Ringwald break 245*17348a8fSMatthias Ringwald packet_len = entry_len - 9; 246*17348a8fSMatthias Ringwald if (packet_len > 66000): 247*17348a8fSMatthias Ringwald print ("Error parsing pklg at offset %u (%x)." % (pos, pos)) 248*17348a8fSMatthias Ringwald break 249*17348a8fSMatthias Ringwald packet = fin.read(packet_len) 250*17348a8fSMatthias Ringwald pos = pos + 4 + entry_len 251*17348a8fSMatthias Ringwald if type == 0x00: 252*17348a8fSMatthias Ringwald handle_cmd(packet) 253*17348a8fSMatthias Ringwald elif type == 0x01: 254*17348a8fSMatthias Ringwald handle_evt(packet) 255*17348a8fSMatthias Ringwald elif type == 0x02: 256*17348a8fSMatthias Ringwald handle_acl(packet, False) 257*17348a8fSMatthias Ringwald elif type == 0x03: 258*17348a8fSMatthias Ringwald handle_acl(packet, True) 259*17348a8fSMatthias Ringwald 260*17348a8fSMatthias Ringwald except TypeError as e: 261*17348a8fSMatthias Ringwald print(e) 262*17348a8fSMatthias Ringwald print ("Error parsing pklg at offset %u (%x)." % (pos, pos)) 263*17348a8fSMatthias Ringwald 264*17348a8fSMatthias Ringwaldfor connection in connections: 265*17348a8fSMatthias Ringwald connection.remote_gatt_server.report() 266