xref: /btstack/tool/dump_gatt.py (revision 17348a8fb53ed95e14ea128a031a642ffcdad990)
1*17348a8fSMatthias Ringwald#!/usr/bin/env python3
2*17348a8fSMatthias Ringwald# BlueKitchen GmbH (c) 2022
3*17348a8fSMatthias Ringwald
4*17348a8fSMatthias Ringwald# parse PacketLogger and reconstruct GATT DB
5*17348a8fSMatthias Ringwald
6*17348a8fSMatthias Ringwaldimport sys
7*17348a8fSMatthias Ringwaldimport datetime
8*17348a8fSMatthias Ringwaldimport struct
9*17348a8fSMatthias Ringwald
10*17348a8fSMatthias Ringwalddef as_hex(data):
11*17348a8fSMatthias Ringwald	str_list = []
12*17348a8fSMatthias Ringwald	for byte in data:
13*17348a8fSMatthias Ringwald	    str_list.append("{0:02x} ".format(byte))
14*17348a8fSMatthias Ringwald	return ''.join(str_list)
15*17348a8fSMatthias Ringwald
16*17348a8fSMatthias Ringwalddef as_bd_addr(data):
17*17348a8fSMatthias Ringwald	str_list = []
18*17348a8fSMatthias Ringwald	for byte in data:
19*17348a8fSMatthias Ringwald	    str_list.append("{0:02x}".format(byte))
20*17348a8fSMatthias Ringwald	return ':'.join(str_list)
21*17348a8fSMatthias Ringwald
22*17348a8fSMatthias Ringwalddef read_header(f):
23*17348a8fSMatthias Ringwald	bytes_read = f.read(13)
24*17348a8fSMatthias Ringwald	if bytes_read:
25*17348a8fSMatthias Ringwald		return struct.unpack(">IIIB", bytes_read)
26*17348a8fSMatthias Ringwald	else:
27*17348a8fSMatthias Ringwald		return (-1, 0, 0, 0)
28*17348a8fSMatthias Ringwald
29*17348a8fSMatthias Ringwalddef uuid16_at_offset(data, offset):
30*17348a8fSMatthias Ringwald	return "%04x" % struct.unpack_from("<H", data, offset)[0]
31*17348a8fSMatthias Ringwald
32*17348a8fSMatthias Ringwalddef uuid128_at_offset(data, offset):
33*17348a8fSMatthias Ringwald	uuid128 = bytes(reversed(data[offset:offset+16]))
34*17348a8fSMatthias Ringwald	return uuid128[0:4].hex() + "-" + uuid128[4:6].hex() + "-" + uuid128[6:8].hex() + "-" + uuid128[8:10].hex() + "-" + uuid128[10:].hex()
35*17348a8fSMatthias Ringwald
36*17348a8fSMatthias Ringwalddef handle_at_offset(data, offset):
37*17348a8fSMatthias Ringwald	return struct.unpack_from("<H", data, offset)[0];
38*17348a8fSMatthias Ringwald
39*17348a8fSMatthias Ringwalddef bd_addr_at_offset(data, offset):
40*17348a8fSMatthias Ringwald	peer_addr = reversed(data[8:8 + 6])
41*17348a8fSMatthias Ringwald	return as_bd_addr(peer_addr)
42*17348a8fSMatthias Ringwald
43*17348a8fSMatthias Ringwaldclass gatt_characteristic:
44*17348a8fSMatthias Ringwald	def __init__(self, uuid, properties, characteristic_handle, value_handle):
45*17348a8fSMatthias Ringwald		self.uuid = uuid
46*17348a8fSMatthias Ringwald		self.properties = properties
47*17348a8fSMatthias Ringwald		self.characteristic_handle = characteristic_handle
48*17348a8fSMatthias Ringwald		self.value_handle = value_handle
49*17348a8fSMatthias Ringwald	def report(self, prefix):
50*17348a8fSMatthias Ringwald		print("%sUUID %-36s, Handle %04x, Properties %02x, Value Handle %04x" % (prefix, self.uuid, self.characteristic_handle, self.properties, self.value_handle))
51*17348a8fSMatthias Ringwald
52*17348a8fSMatthias Ringwaldclass gatt_service:
53*17348a8fSMatthias Ringwald
54*17348a8fSMatthias Ringwald	def __init__(self, uuid, start_handle, end_handle):
55*17348a8fSMatthias Ringwald		self.uuid = uuid
56*17348a8fSMatthias Ringwald		self.start_handle = start_handle
57*17348a8fSMatthias Ringwald		self.end_handle = end_handle
58*17348a8fSMatthias Ringwald		self.characteristics = []
59*17348a8fSMatthias Ringwald
60*17348a8fSMatthias Ringwald	def report(self, prefix):
61*17348a8fSMatthias Ringwald		print("%sUUID: %-36s, Start Handle %04x, End Handle %04x" % (prefix, self.uuid, self.start_handle, self.end_handle))
62*17348a8fSMatthias Ringwald		print("  %sCharacteristics:" % prefix)
63*17348a8fSMatthias Ringwald		for characteristic in self.characteristics:
64*17348a8fSMatthias Ringwald			characteristic.report("   " + prefix)
65*17348a8fSMatthias Ringwald
66*17348a8fSMatthias Ringwaldclass gatt_server:
67*17348a8fSMatthias Ringwald
68*17348a8fSMatthias Ringwald	primary_services = []
69*17348a8fSMatthias Ringwald
70*17348a8fSMatthias Ringwald	client_opcode = 0
71*17348a8fSMatthias Ringwald	group_type = 0
72*17348a8fSMatthias Ringwald	read_type = 0
73*17348a8fSMatthias Ringwald	mtu = 23
74*17348a8fSMatthias Ringwald
75*17348a8fSMatthias Ringwald	def __init__(self, bd_addr):
76*17348a8fSMatthias Ringwald		self.bd_addr = bd_addr
77*17348a8fSMatthias Ringwald
78*17348a8fSMatthias Ringwald	def service_for_handle(self, handle):
79*17348a8fSMatthias Ringwald		for service in self.primary_services:
80*17348a8fSMatthias Ringwald			if service.start_handle <= handle and handle <= service.end_handle:
81*17348a8fSMatthias Ringwald				return service
82*17348a8fSMatthias Ringwald		return None
83*17348a8fSMatthias Ringwald
84*17348a8fSMatthias Ringwald	def handle_pdu(self, pdu):
85*17348a8fSMatthias Ringwald		opcode = pdu[0]
86*17348a8fSMatthias Ringwald		if opcode == 0x01:
87*17348a8fSMatthias Ringwald			pass
88*17348a8fSMatthias Ringwald		elif opcode == 0x02:
89*17348a8fSMatthias Ringwald			# exchange mtu
90*17348a8fSMatthias Ringwald			pass
91*17348a8fSMatthias Ringwald		elif opcode == 0x03:
92*17348a8fSMatthias Ringwald			# exchange mtu
93*17348a8fSMatthias Ringwald			self.mtu = struct.unpack_from("<H", pdu, 1)[0]
94*17348a8fSMatthias Ringwald		elif opcode == 0x08:
95*17348a8fSMatthias Ringwald			# read by type request
96*17348a8fSMatthias Ringwald			if len(pdu) == 7:
97*17348a8fSMatthias Ringwald				(_,_,self.read_type) = struct.unpack_from("<HHH", pdu, 1)
98*17348a8fSMatthias Ringwald		elif opcode == 0x09:
99*17348a8fSMatthias Ringwald			# read by type response
100*17348a8fSMatthias Ringwald			if self.read_type == 0x2803:
101*17348a8fSMatthias Ringwald				item_len = pdu[1]
102*17348a8fSMatthias Ringwald				pos = 2
103*17348a8fSMatthias Ringwald				while pos < len(pdu):
104*17348a8fSMatthias Ringwald					(characteristic_handle, properties, value_handle) = struct.unpack_from("<HBH", pdu, pos)
105*17348a8fSMatthias Ringwald					if item_len == 11:
106*17348a8fSMatthias Ringwald						uuid = uuid16_at_offset(pdu, pos + 5)
107*17348a8fSMatthias Ringwald					elif item_len == 21:
108*17348a8fSMatthias Ringwald						uuid = uuid128_at_offset(pdu, pos + 5)
109*17348a8fSMatthias Ringwald					service = self.service_for_handle(characteristic_handle)
110*17348a8fSMatthias Ringwald					if service:
111*17348a8fSMatthias Ringwald						service.characteristics.append(gatt_characteristic(uuid, properties, characteristic_handle, value_handle))
112*17348a8fSMatthias Ringwald					pos += item_len
113*17348a8fSMatthias Ringwald		elif opcode == 0x10:
114*17348a8fSMatthias Ringwald			# read by group type request
115*17348a8fSMatthias Ringwald			if len(pdu) == 7:
116*17348a8fSMatthias Ringwald				(_,_,self.group_type) = struct.unpack_from("<HHH", pdu, 1)
117*17348a8fSMatthias Ringwald		elif opcode == 0x11:
118*17348a8fSMatthias Ringwald			# read by group type response
119*17348a8fSMatthias Ringwald			item_len = pdu[1]
120*17348a8fSMatthias Ringwald			pos = 2
121*17348a8fSMatthias Ringwald			while pos < len(pdu):
122*17348a8fSMatthias Ringwald				(start, end) = struct.unpack_from("<HH", pdu, pos)
123*17348a8fSMatthias Ringwald				if self.group_type == 0x2800:
124*17348a8fSMatthias Ringwald					# primary service
125*17348a8fSMatthias Ringwald					if item_len == 6:
126*17348a8fSMatthias Ringwald						uuid = uuid16_at_offset(pdu, pos+4)
127*17348a8fSMatthias Ringwald					elif item_len == 20:
128*17348a8fSMatthias Ringwald						uuid = uuid128_at_offset(pdu, pos+4)
129*17348a8fSMatthias Ringwald					self.primary_services.append(gatt_service(uuid, start, end))
130*17348a8fSMatthias Ringwald				pos += item_len
131*17348a8fSMatthias Ringwald		else:
132*17348a8fSMatthias Ringwald			# print(self.bd_addr, "ATT PDU:", as_hex(pdu))
133*17348a8fSMatthias Ringwald			pass
134*17348a8fSMatthias Ringwald
135*17348a8fSMatthias Ringwald	def report(self):
136*17348a8fSMatthias Ringwald		print("GATT Server on", self.bd_addr)
137*17348a8fSMatthias Ringwald		print("- MTU", self.mtu)
138*17348a8fSMatthias Ringwald		print("- Primary Services:")
139*17348a8fSMatthias Ringwald		for service in self.primary_services:
140*17348a8fSMatthias Ringwald			service.report("  - ")
141*17348a8fSMatthias Ringwald
142*17348a8fSMatthias Ringwaldclass l2cap_reassembler:
143*17348a8fSMatthias Ringwald
144*17348a8fSMatthias Ringwald	payload_data = bytes()
145*17348a8fSMatthias Ringwald	payload_len = 0
146*17348a8fSMatthias Ringwald	channel = 0;
147*17348a8fSMatthias Ringwald
148*17348a8fSMatthias Ringwald	def handle_acl(self, pb, data):
149*17348a8fSMatthias Ringwald		if pb in [0, 2]:
150*17348a8fSMatthias Ringwald			(self.payload_len, self.channel) = struct.unpack("<HH", data[0:4])
151*17348a8fSMatthias Ringwald			self.payload_data = data[4:]
152*17348a8fSMatthias Ringwald		if pb == 0x01:
153*17348a8fSMatthias Ringwald			self.payload_data += data[4:]
154*17348a8fSMatthias Ringwald
155*17348a8fSMatthias Ringwald	def l2cap_complete(self):
156*17348a8fSMatthias Ringwald		return len(self.payload_data) == self.payload_len
157*17348a8fSMatthias Ringwald
158*17348a8fSMatthias Ringwald	def l2cap_packet(self):
159*17348a8fSMatthias Ringwald		return (self.channel, self.payload_data)
160*17348a8fSMatthias Ringwald
161*17348a8fSMatthias Ringwaldclass hci_connection:
162*17348a8fSMatthias Ringwald
163*17348a8fSMatthias Ringwald	l2cap_in = l2cap_reassembler()
164*17348a8fSMatthias Ringwald	l2cap_out = l2cap_reassembler()
165*17348a8fSMatthias Ringwald
166*17348a8fSMatthias Ringwald	def __init__(self, bd_addr, con_handle):
167*17348a8fSMatthias Ringwald		self.bd_addr = bd_addr
168*17348a8fSMatthias Ringwald		self.con_handle = con_handle;
169*17348a8fSMatthias Ringwald		self.remote_gatt_server = gatt_server(bd_addr)
170*17348a8fSMatthias Ringwald
171*17348a8fSMatthias Ringwald	def handle_att_pdu(self, direction_in, pdu):
172*17348a8fSMatthias Ringwald		opcode = pdu[0]
173*17348a8fSMatthias Ringwald		remote_server = ((opcode & 1) == 1) == direction_in
174*17348a8fSMatthias Ringwald		if (remote_server):
175*17348a8fSMatthias Ringwald			self.remote_gatt_server.handle_pdu(pdu)
176*17348a8fSMatthias Ringwald		else:
177*17348a8fSMatthias Ringwald			local_gatt_server.handle_pdu(pdu)
178*17348a8fSMatthias Ringwald
179*17348a8fSMatthias Ringwald	def handle_acl(self, direction_in, pb, data):
180*17348a8fSMatthias Ringwald		if direction_in:
181*17348a8fSMatthias Ringwald			self.l2cap_in.handle_acl(pb, data)
182*17348a8fSMatthias Ringwald			if self.l2cap_in.l2cap_complete():
183*17348a8fSMatthias Ringwald				(channel, l2cap_data) = self.l2cap_in.l2cap_packet()
184*17348a8fSMatthias Ringwald				if channel == 0x004:
185*17348a8fSMatthias Ringwald					self.handle_att_pdu(direction_in, l2cap_data)
186*17348a8fSMatthias Ringwald		else:
187*17348a8fSMatthias Ringwald			self.l2cap_out.handle_acl(pb, data)
188*17348a8fSMatthias Ringwald			if self.l2cap_out.l2cap_complete():
189*17348a8fSMatthias Ringwald				(channel, l2cap_data) = self.l2cap_out.l2cap_packet()
190*17348a8fSMatthias Ringwald				if channel == 0x004:
191*17348a8fSMatthias Ringwald					self.handle_att_pdu(direction_in, l2cap_data)
192*17348a8fSMatthias Ringwald
193*17348a8fSMatthias Ringwalddef connection_for_handle(con_handle):
194*17348a8fSMatthias Ringwald	if con_handle in connections:
195*17348a8fSMatthias Ringwald		return connections[con_handle]
196*17348a8fSMatthias Ringwald	else:
197*17348a8fSMatthias Ringwald		return None
198*17348a8fSMatthias Ringwald
199*17348a8fSMatthias Ringwalddef handle_cmd(packet):
200*17348a8fSMatthias Ringwald	pass
201*17348a8fSMatthias Ringwald
202*17348a8fSMatthias Ringwalddef handle_evt(event):
203*17348a8fSMatthias Ringwald	if event[0] == 0x05:
204*17348a8fSMatthias Ringwald		# Disconnection Complete
205*17348a8fSMatthias Ringwald		con_handle = handle_at_offset(event, 3)
206*17348a8fSMatthias Ringwald		print("Disconnection Complete: handle 0x%04x" % con_handle)
207*17348a8fSMatthias Ringwald		connection = connections.pop(con_handle, None)
208*17348a8fSMatthias Ringwald		connection.remote_gatt_server.report()
209*17348a8fSMatthias Ringwald
210*17348a8fSMatthias Ringwald	if event[0] == 0x3e:
211*17348a8fSMatthias Ringwald		if event[2] == 0x01:
212*17348a8fSMatthias Ringwald			# LE Connection Complete
213*17348a8fSMatthias Ringwald			con_handle = handle_at_offset(event, 4);
214*17348a8fSMatthias Ringwald			peer_addr = bd_addr_at_offset(event, 8)
215*17348a8fSMatthias Ringwald			connection = hci_connection(peer_addr, con_handle)
216*17348a8fSMatthias Ringwald			connections[con_handle] = connection
217*17348a8fSMatthias Ringwald			print("LE Connection Complete: %s handle 0x%04x" % (peer_addr, con_handle))
218*17348a8fSMatthias Ringwald
219*17348a8fSMatthias Ringwalddef handle_acl(data, direction_in):
220*17348a8fSMatthias Ringwald	(header, hci_len) = struct.unpack("<HH", data[0:4])
221*17348a8fSMatthias Ringwald	pb = (header >> 12) & 0x03
222*17348a8fSMatthias Ringwald	con_handle = header & 0x0FFF
223*17348a8fSMatthias Ringwald	connection_for_handle(con_handle).handle_acl(direction_in, pb, data[4:])
224*17348a8fSMatthias Ringwald
225*17348a8fSMatthias Ringwald# globals
226*17348a8fSMatthias Ringwaldconnections = {}
227*17348a8fSMatthias Ringwaldlocal_gatt_server = gatt_server("00:00:00:00:00:00")
228*17348a8fSMatthias Ringwald
229*17348a8fSMatthias Ringwaldif len(sys.argv) == 1:
230*17348a8fSMatthias Ringwald	print ('Reconstruct GATT interactions from PacketLogger trace file')
231*17348a8fSMatthias Ringwald	print ('Copyright 2022, BlueKitchen GmbH')
232*17348a8fSMatthias Ringwald	print ('')
233*17348a8fSMatthias Ringwald	print ('Usage: ', sys.argv[0], 'hci_dump.pklg')
234*17348a8fSMatthias Ringwald	exit(0)
235*17348a8fSMatthias Ringwald
236*17348a8fSMatthias Ringwaldinfile = sys.argv[1]
237*17348a8fSMatthias Ringwald
238*17348a8fSMatthias Ringwaldwith open (infile, 'rb') as fin:
239*17348a8fSMatthias Ringwald	pos = 0
240*17348a8fSMatthias Ringwald	try:
241*17348a8fSMatthias Ringwald		while True:
242*17348a8fSMatthias Ringwald			(entry_len, ts_sec, ts_usec, type) = read_header(fin)
243*17348a8fSMatthias Ringwald			if entry_len < 0:
244*17348a8fSMatthias Ringwald				break
245*17348a8fSMatthias Ringwald			packet_len = entry_len - 9;
246*17348a8fSMatthias Ringwald			if (packet_len > 66000):
247*17348a8fSMatthias Ringwald				print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
248*17348a8fSMatthias Ringwald				break
249*17348a8fSMatthias Ringwald			packet  = fin.read(packet_len)
250*17348a8fSMatthias Ringwald			pos     = pos + 4 + entry_len
251*17348a8fSMatthias Ringwald			if type == 0x00:
252*17348a8fSMatthias Ringwald				handle_cmd(packet)
253*17348a8fSMatthias Ringwald			elif type == 0x01:
254*17348a8fSMatthias Ringwald				handle_evt(packet)
255*17348a8fSMatthias Ringwald			elif type == 0x02:
256*17348a8fSMatthias Ringwald				handle_acl(packet, False)
257*17348a8fSMatthias Ringwald			elif type == 0x03:
258*17348a8fSMatthias Ringwald				handle_acl(packet, True)
259*17348a8fSMatthias Ringwald
260*17348a8fSMatthias Ringwald	except TypeError as e:
261*17348a8fSMatthias Ringwald		print(e)
262*17348a8fSMatthias Ringwald		print ("Error parsing pklg at offset %u (%x)." % (pos, pos))
263*17348a8fSMatthias Ringwald
264*17348a8fSMatthias Ringwaldfor connection in connections:
265*17348a8fSMatthias Ringwald	connection.remote_gatt_server.report()
266