xref: /btstack/tool/dump_gatt.py (revision 17348a8fb53ed95e14ea128a031a642ffcdad990)
1#!/usr/bin/env python3
2# BlueKitchen GmbH (c) 2022
3
4# parse PacketLogger and reconstruct GATT DB
5
6import sys
7import datetime
8import struct
9
10def as_hex(data):
11	str_list = []
12	for byte in data:
13	    str_list.append("{0:02x} ".format(byte))
14	return ''.join(str_list)
15
16def as_bd_addr(data):
17	str_list = []
18	for byte in data:
19	    str_list.append("{0:02x}".format(byte))
20	return ':'.join(str_list)
21
22def read_header(f):
23	bytes_read = f.read(13)
24	if bytes_read:
25		return struct.unpack(">IIIB", bytes_read)
26	else:
27		return (-1, 0, 0, 0)
28
29def uuid16_at_offset(data, offset):
30	return "%04x" % struct.unpack_from("<H", data, offset)[0]
31
32def uuid128_at_offset(data, offset):
33	uuid128 = bytes(reversed(data[offset:offset+16]))
34	return uuid128[0:4].hex() + "-" + uuid128[4:6].hex() + "-" + uuid128[6:8].hex() + "-" + uuid128[8:10].hex() + "-" + uuid128[10:].hex()
35
36def handle_at_offset(data, offset):
37	return struct.unpack_from("<H", data, offset)[0];
38
39def bd_addr_at_offset(data, offset):
40	peer_addr = reversed(data[8:8 + 6])
41	return as_bd_addr(peer_addr)
42
43class gatt_characteristic:
44	def __init__(self, uuid, properties, characteristic_handle, value_handle):
45		self.uuid = uuid
46		self.properties = properties
47		self.characteristic_handle = characteristic_handle
48		self.value_handle = value_handle
49	def report(self, prefix):
50		print("%sUUID %-36s, Handle %04x, Properties %02x, Value Handle %04x" % (prefix, self.uuid, self.characteristic_handle, self.properties, self.value_handle))
51
52class gatt_service:
53
54	def __init__(self, uuid, start_handle, end_handle):
55		self.uuid = uuid
56		self.start_handle = start_handle
57		self.end_handle = end_handle
58		self.characteristics = []
59
60	def report(self, prefix):
61		print("%sUUID: %-36s, Start Handle %04x, End Handle %04x" % (prefix, self.uuid, self.start_handle, self.end_handle))
62		print("  %sCharacteristics:" % prefix)
63		for characteristic in self.characteristics:
64			characteristic.report("   " + prefix)
65
66class gatt_server:
67
68	primary_services = []
69
70	client_opcode = 0
71	group_type = 0
72	read_type = 0
73	mtu = 23
74
75	def __init__(self, bd_addr):
76		self.bd_addr = bd_addr
77
78	def service_for_handle(self, handle):
79		for service in self.primary_services:
80			if service.start_handle <= handle and handle <= service.end_handle:
81				return service
82		return None
83
84	def handle_pdu(self, pdu):
85		opcode = pdu[0]
86		if opcode == 0x01:
87			pass
88		elif opcode == 0x02:
89			# exchange mtu
90			pass
91		elif opcode == 0x03:
92			# exchange mtu
93			self.mtu = struct.unpack_from("<H", pdu, 1)[0]
94		elif opcode == 0x08:
95			# read by type request
96			if len(pdu) == 7:
97				(_,_,self.read_type) = struct.unpack_from("<HHH", pdu, 1)
98		elif opcode == 0x09:
99			# read by type response
100			if self.read_type == 0x2803:
101				item_len = pdu[1]
102				pos = 2
103				while pos < len(pdu):
104					(characteristic_handle, properties, value_handle) = struct.unpack_from("<HBH", pdu, pos)
105					if item_len == 11:
106						uuid = uuid16_at_offset(pdu, pos + 5)
107					elif item_len == 21:
108						uuid = uuid128_at_offset(pdu, pos + 5)
109					service = self.service_for_handle(characteristic_handle)
110					if service:
111						service.characteristics.append(gatt_characteristic(uuid, properties, characteristic_handle, value_handle))
112					pos += item_len
113		elif opcode == 0x10:
114			# read by group type request
115			if len(pdu) == 7:
116				(_,_,self.group_type) = struct.unpack_from("<HHH", pdu, 1)
117		elif opcode == 0x11:
118			# read by group type response
119			item_len = pdu[1]
120			pos = 2
121			while pos < len(pdu):
122				(start, end) = struct.unpack_from("<HH", pdu, pos)
123				if self.group_type == 0x2800:
124					# primary service
125					if item_len == 6:
126						uuid = uuid16_at_offset(pdu, pos+4)
127					elif item_len == 20:
128						uuid = uuid128_at_offset(pdu, pos+4)
129					self.primary_services.append(gatt_service(uuid, start, end))
130				pos += item_len
131		else:
132			# print(self.bd_addr, "ATT PDU:", as_hex(pdu))
133			pass
134
135	def report(self):
136		print("GATT Server on", self.bd_addr)
137		print("- MTU", self.mtu)
138		print("- Primary Services:")
139		for service in self.primary_services:
140			service.report("  - ")
141
142class l2cap_reassembler:
143
144	payload_data = bytes()
145	payload_len = 0
146	channel = 0;
147
148	def handle_acl(self, pb, data):
149		if pb in [0, 2]:
150			(self.payload_len, self.channel) = struct.unpack("<HH", data[0:4])
151			self.payload_data = data[4:]
152		if pb == 0x01:
153			self.payload_data += data[4:]
154
155	def l2cap_complete(self):
156		return len(self.payload_data) == self.payload_len
157
158	def l2cap_packet(self):
159		return (self.channel, self.payload_data)
160
161class hci_connection:
162
163	l2cap_in = l2cap_reassembler()
164	l2cap_out = l2cap_reassembler()
165
166	def __init__(self, bd_addr, con_handle):
167		self.bd_addr = bd_addr
168		self.con_handle = con_handle;
169		self.remote_gatt_server = gatt_server(bd_addr)
170
171	def handle_att_pdu(self, direction_in, pdu):
172		opcode = pdu[0]
173		remote_server = ((opcode & 1) == 1) == direction_in
174		if (remote_server):
175			self.remote_gatt_server.handle_pdu(pdu)
176		else:
177			local_gatt_server.handle_pdu(pdu)
178
179	def handle_acl(self, direction_in, pb, data):
180		if direction_in:
181			self.l2cap_in.handle_acl(pb, data)
182			if self.l2cap_in.l2cap_complete():
183				(channel, l2cap_data) = self.l2cap_in.l2cap_packet()
184				if channel == 0x004:
185					self.handle_att_pdu(direction_in, l2cap_data)
186		else:
187			self.l2cap_out.handle_acl(pb, data)
188			if self.l2cap_out.l2cap_complete():
189				(channel, l2cap_data) = self.l2cap_out.l2cap_packet()
190				if channel == 0x004:
191					self.handle_att_pdu(direction_in, l2cap_data)
192
193def connection_for_handle(con_handle):
194	if con_handle in connections:
195		return connections[con_handle]
196	else:
197		return None
198
199def handle_cmd(packet):
200	pass
201
202def handle_evt(event):
203	if event[0] == 0x05:
204		# Disconnection Complete
205		con_handle = handle_at_offset(event, 3)
206		print("Disconnection Complete: handle 0x%04x" % con_handle)
207		connection = connections.pop(con_handle, None)
208		connection.remote_gatt_server.report()
209
210	if event[0] == 0x3e:
211		if event[2] == 0x01:
212			# LE Connection Complete
213			con_handle = handle_at_offset(event, 4);
214			peer_addr = bd_addr_at_offset(event, 8)
215			connection = hci_connection(peer_addr, con_handle)
216			connections[con_handle] = connection
217			print("LE Connection Complete: %s handle 0x%04x" % (peer_addr, con_handle))
218
219def handle_acl(data, direction_in):
220	(header, hci_len) = struct.unpack("<HH", data[0:4])
221	pb = (header >> 12) & 0x03
222	con_handle = header & 0x0FFF
223	connection_for_handle(con_handle).handle_acl(direction_in, pb, data[4:])
224
225# globals
226connections = {}
227local_gatt_server = gatt_server("00:00:00:00:00:00")
228
229if len(sys.argv) == 1:
230	print ('Reconstruct GATT interactions from PacketLogger trace file')
231	print ('Copyright 2022, BlueKitchen GmbH')
232	print ('')
233	print ('Usage: ', sys.argv[0], 'hci_dump.pklg')
234	exit(0)
235
236infile = sys.argv[1]
237
238with open (infile, 'rb') as fin:
239	pos = 0
240	try:
241		while True:
242			(entry_len, ts_sec, ts_usec, type) = read_header(fin)
243			if entry_len < 0:
244				break
245			packet_len = entry_len - 9;
246			if (packet_len > 66000):
247				print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
248				break
249			packet  = fin.read(packet_len)
250			pos     = pos + 4 + entry_len
251			if type == 0x00:
252				handle_cmd(packet)
253			elif type == 0x01:
254				handle_evt(packet)
255			elif type == 0x02:
256				handle_acl(packet, False)
257			elif type == 0x03:
258				handle_acl(packet, True)
259
260	except TypeError as e:
261		print(e)
262		print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
263
264for connection in connections:
265	connection.remote_gatt_server.report()
266