1#!/usr/bin/env python
2# Copyright 2021 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15#
16#
17# This script extracts LE Audio audio data from btsnoop.
18# Generates a audio dump file where each frame consists of a two-byte frame
19# length information and the coded frame
20#
21# Audio File Name Format:
22# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_
23# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin
24#
25#
26# Usage:
27# ./dump_le_audio.py BTSNOOP.cfa [-v] [--header] [--ase_handle ASE_HANDLE]
28#
29# -v, --verbose: to enable the verbose log
30# --header: Add the header for LC3 Conformance Interoperability Test Software V.1.0.3 from LC3 test specification.
31#  --ase_handle ASE_HANDLE: Set the ASE handle manually.
32#
33# NOTE:
34# Please make sure you HCI Snoop data file includes the following frames:
35# 1. GATT service discovery for "ASE Control Point" chracteristic (if you give the ase_handle via command, the flow could be skipped)
36# 2. GATT config codec via ASE Control Point
37# 3. HCI create CIS to point out the "Start stream", and the data frames.
38# After all hci packet parse finished, would dump all remain audio data as well
39#
40# Correspondsing Spec.
41# ASCS_1.0
42# PACS_1.0
43# BAP_1.0
44# LC3.TS V1.0.3
45#
46from collections import defaultdict
47from os import X_OK
48
49import argparse
50import struct
51import sys
52import time
53
54BTSNOOP_FILE_NAME = ""
55BTSNOOP_HEADER = b'btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea'
56
57COMMADN_PACKET = 1
58ACL_PACKET = 2
59SCO_PACKET = 3
60EVENT_PACKET = 4
61ISO_PACKET = 5
62
63SENT = 0
64RECEIVED = 1
65
66L2CAP_ATT_CID = 0x0004
67L2CAP_CID = 0x0005
68
69PSM_EATT = 0x0027
70
71# opcode for att protocol
72OPCODE_ATT_READ_BY_TYPE_RSP = 0x09
73OPCODE_ATT_WRITE_CMD = 0x52
74
75UUID_ASE_CONTROL_POINT = 0x2BC6
76
77# opcode for ase control
78OPCODE_CONFIG_CODEC = 0x01
79OPCODE_ENABLE = 0x03
80OPCODE_UPDATE_METADATA = 0x07
81OPCODE_RELEASE = 0x08
82
83# opcode for hci command
84OPCODE_HCI_CREATE_CIS = 0x2064
85OPCODE_REMOVE_ISO_DATA_PATH = 0x206F
86OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F
87OPCODE_LE_CREATE_BIG = 0x2068
88OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E
89
90# opcode for L2CAP channel
91OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ = 0x17
92OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP = 0x18
93
94# HCI event
95EVENT_CODE_LE_META_EVENT = 0x3E
96SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B
97
98TYPE_STREAMING_AUDIO_CONTEXTS = 0x02
99
100TYPE_SAMPLING_FREQUENCIES = 0x01
101TYPE_FRAME_DURATION = 0x02
102TYPE_CHANNEL_ALLOCATION = 0x03
103TYPE_OCTETS_PER_FRAME = 0x04
104
105CONTEXT_TYPE_UNSPECIFIED = 0x0001
106CONTEXT_TYPE_CONVERSATIONAL = 0x0002
107CONTEXT_TYPE_MEDIA = 0x0004
108CONTEXT_TYPE_GAME = 0x0008
109CONTEXT_TYPE_VOICEASSISTANTS = 0x0020
110CONTEXT_TYPE_LIVE = 0x0040
111CONTEXT_TYPE_RINGTONE = 0x0200
112
113# sample frequency
114SAMPLE_FREQUENCY_8000 = 0x01
115SAMPLE_FREQUENCY_11025 = 0x02
116SAMPLE_FREQUENCY_16000 = 0x03
117SAMPLE_FREQUENCY_22050 = 0x04
118SAMPLE_FREQUENCY_24000 = 0x05
119SAMPLE_FREQUENCY_32000 = 0x06
120SAMPLE_FREQUENCY_44100 = 0x07
121SAMPLE_FREQUENCY_48000 = 0x08
122SAMPLE_FREQUENCY_88200 = 0x09
123SAMPLE_FREQUENCY_96000 = 0x0a
124SAMPLE_FREQUENCY_176400 = 0x0b
125SAMPLE_FREQUENCY_192000 = 0x0c
126SAMPLE_FREQUENCY_384000 = 0x0d
127
128FRAME_DURATION_7_5 = 0x00
129FRAME_DURATION_10 = 0x01
130
131AUDIO_LOCATION_MONO = 0x00
132AUDIO_LOCATION_LEFT = 0x01
133AUDIO_LOCATION_RIGHT = 0x02
134AUDIO_LOCATION_CENTER = 0x04
135
136AD_TYPE_SERVICE_DATA_16_BIT = 0x16
137BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851
138
139packet_number = 0
140debug_enable = False
141add_header = False
142ase_handle = 0xFFFF
143
144l2cap_identifier_set = set()
145source_cid = set()
146destinate_cid = set()
147
148
149class Connection:
150
151    def __init__(self):
152        self.ase_handle = 0xFFFF
153        self.number_of_ases = 0
154        self.ase = defaultdict(AseStream)
155        self.context = 0xFFFF
156        self.cis_handle = 0xFFFF
157        self.input_dump = []
158        self.output_dump = []
159        self.start_time = 0xFFFFFFFF
160
161    def dump(self):
162        print("start_time: " + str(self.start_time))
163        print("ase_handle: " + str(self.ase_handle))
164        print("context type: " + str(self.context))
165        print("number_of_ases:  " + str(self.number_of_ases))
166        print("cis_handle:  " + str(self.cis_handle))
167        for id, ase_stream in self.ase.items():
168            print("ase id: " + str(id))
169            ase_stream.dump()
170
171
172class AseStream:
173
174    def __init__(self):
175        self.sampling_frequencies = 0xFF
176        self.frame_duration = 0xFF
177        self.channel_allocation = 0xFFFFFFFF
178        self.octets_per_frame = 0xFFFF
179
180    def dump(self):
181        print("sampling_frequencies: " + str(self.sampling_frequencies))
182        print("frame_duration: " + str(self.frame_duration))
183        print("channel_allocation: " + str(self.channel_allocation))
184        print("octets_per_frame: " + str(self.octets_per_frame))
185
186
187class Broadcast:
188
189    def __init__(self):
190        self.num_of_bis = defaultdict(int)  # subgroup - num_of_bis
191        self.bis = defaultdict(BisStream)  # bis_index - codec_config
192        self.bis_index_handle_map = defaultdict(int)  # bis_index - bis_handle
193        self.bis_index_list = []
194
195    def dump(self):
196        for bis_index, iso_stream in self.bis.items():
197            print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index]))
198            iso_stream.dump()
199
200
201class BisStream:
202
203    def __init__(self):
204        self.sampling_frequencies = 0xFF
205        self.frame_duration = 0xFF
206        self.channel_allocation = 0xFFFFFFFF
207        self.octets_per_frame = 0xFFFF
208        self.output_dump = []
209        self.start_time = 0xFFFFFFFF
210
211    def dump(self):
212        print("start_time: " + str(self.start_time))
213        print("sampling_frequencies: " + str(self.sampling_frequencies))
214        print("frame_duration: " + str(self.frame_duration))
215        print("channel_allocation: " + str(self.channel_allocation))
216        print("octets_per_frame: " + str(self.octets_per_frame))
217
218
219connection_map = defaultdict(Connection)
220cis_acl_map = defaultdict(int)
221broadcast_map = defaultdict(Broadcast)
222big_adv_map = defaultdict(int)
223bis_stream_map = defaultdict(BisStream)
224
225
226def generate_header(file, stream, is_cis):
227    sf_case = {
228        SAMPLE_FREQUENCY_8000: 80,
229        SAMPLE_FREQUENCY_11025: 110,
230        SAMPLE_FREQUENCY_16000: 160,
231        SAMPLE_FREQUENCY_22050: 220,
232        SAMPLE_FREQUENCY_24000: 240,
233        SAMPLE_FREQUENCY_32000: 320,
234        SAMPLE_FREQUENCY_44100: 441,
235        SAMPLE_FREQUENCY_48000: 480,
236        SAMPLE_FREQUENCY_88200: 882,
237        SAMPLE_FREQUENCY_96000: 960,
238        SAMPLE_FREQUENCY_176400: 1764,
239        SAMPLE_FREQUENCY_192000: 1920,
240        SAMPLE_FREQUENCY_384000: 2840,
241    }
242    fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10}
243    al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2}
244
245    header = bytearray.fromhex('1ccc1200')
246    if is_cis:
247        for ase in stream.ase.values():
248            header = header + struct.pack("<H", sf_case[ase.sampling_frequencies])
249            header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration]))
250            header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100,
251                                          0, 48000000)
252            break
253    else:
254        header = header + struct.pack("<H", sf_case[stream.sampling_frequencies])
255        header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration]))
256        header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100,
257                                      0, 48000000)
258    file.write(header)
259
260
261def parse_codec_information(connection_handle, ase_id, packet):
262    length, packet = unpack_data(packet, 1, False)
263    if len(packet) < length:
264        debug_print("Invalid codec configuration length")
265        return packet
266    ase = connection_map[connection_handle].ase[ase_id]
267    while length > 0:
268        config_length, packet = unpack_data(packet, 1, False)
269        config_type, packet = unpack_data(packet, 1, False)
270        value, packet = unpack_data(packet, config_length - 1, False)
271        if config_type == TYPE_SAMPLING_FREQUENCIES:
272            ase.sampling_frequencies = value
273        elif config_type == TYPE_FRAME_DURATION:
274            ase.frame_duration = value
275        elif config_type == TYPE_CHANNEL_ALLOCATION:
276            ase.channel_allocation = value
277        elif config_type == TYPE_OCTETS_PER_FRAME:
278            ase.octets_per_frame = value
279        length -= (config_length + 1)
280
281    return packet
282
283
284def parse_att_read_by_type_rsp(packet, connection_handle):
285    length, packet = unpack_data(packet, 1, False)
286    if length != 7:
287        #ignore the packet, we're only interested in this packet for the characteristic type UUID
288        return
289
290    if length > len(packet):
291        debug_print("Invalid att packet length")
292        return
293
294    attribute_handle, packet = unpack_data(packet, 2, False)
295    if debug_enable:
296        debug_print("attribute_handle - " + str(attribute_handle))
297    packet = unpack_data(packet, 1, True)
298    value_handle, packet = unpack_data(packet, 2, False)
299    characteristic_uuid, packet = unpack_data(packet, 2, False)
300    if characteristic_uuid == UUID_ASE_CONTROL_POINT:
301        debug_print("ASE Control point found!")
302        connection_map[connection_handle].ase_handle = value_handle
303
304
305def parse_att_write_cmd(packet, connection_handle, timestamp):
306    attribute_handle, packet = unpack_data(packet, 2, False)
307    global ase_handle
308    if ase_handle != 0xFFFF:
309        connection_map[connection_handle].ase_handle = ase_handle
310
311    if connection_map[connection_handle].ase_handle == attribute_handle:
312        if debug_enable:
313            debug_print("Action with ASE Control point")
314        opcode, packet = unpack_data(packet, 1, False)
315        if opcode == OPCODE_CONFIG_CODEC:
316            debug_print("config_codec")
317            (connection_map[connection_handle].number_of_ases, packet) = unpack_data(packet, 1, False)
318            for i in range(connection_map[connection_handle].number_of_ases):
319                ase_id, packet = unpack_data(packet, 1, False)
320                # ignore target_latency, target_phy, codec_id
321                packet = unpack_data(packet, 7, True)
322                packet = parse_codec_information(connection_handle, ase_id, packet)
323        elif opcode == OPCODE_ENABLE or opcode == OPCODE_UPDATE_METADATA:
324            if debug_enable:
325                debug_print("enable or update metadata")
326            numbers_of_ases, packet = unpack_data(packet, 1, False)
327            for i in range(numbers_of_ases):
328                ase_id, packet = unpack_data(packet, 1, False)
329                metadata_length, packet = unpack_data(packet, 1, False)
330                if metadata_length > len(packet):
331                    debug_print("Invalid metadata length")
332                    return
333                length, packet = unpack_data(packet, 1, False)
334                if length > len(packet):
335                    debug_print("Invalid metadata value length")
336                    return
337                metadata_type, packet = unpack_data(packet, 1, False)
338                if metadata_type == TYPE_STREAMING_AUDIO_CONTEXTS:
339                    (connection_map[connection_handle].context, packet) = unpack_data(packet, 2, False)
340                    break
341
342            if opcode == OPCODE_ENABLE:
343                debug_print("enable, set timestamp")
344                connection_map[connection_handle].start_time = timestamp
345
346            if debug_enable:
347                connection_map[connection_handle].dump()
348
349
350def parse_att_packet(packet, connection_handle, flags, timestamp):
351    opcode, packet = unpack_data(packet, 1, False)
352    packet_handle = {
353        (OPCODE_ATT_READ_BY_TYPE_RSP, RECEIVED): (lambda x, y, z: parse_att_read_by_type_rsp(x, y)),
354        (OPCODE_ATT_WRITE_CMD, SENT): (lambda x, y, z: parse_att_write_cmd(x, y, z))
355    }
356    packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp)
357
358
359def parse_big_codec_information(adv_handle, packet):
360    # Ignore presentation delay
361    packet = unpack_data(packet, 3, True)
362    number_of_subgroup, packet = unpack_data(packet, 1, False)
363    for subgroup in range(number_of_subgroup):
364        num_of_bis, packet = unpack_data(packet, 1, False)
365        broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis
366        # Ignore codec id
367        packet = unpack_data(packet, 5, True)
368        length, packet = unpack_data(packet, 1, False)
369        if len(packet) < length:
370            print("Invalid subgroup codec information length")
371            return
372
373        while length > 0:
374            config_length, packet = unpack_data(packet, 1, False)
375            config_type, packet = unpack_data(packet, 1, False)
376            value, packet = unpack_data(packet, config_length - 1, False)
377            if config_type == TYPE_SAMPLING_FREQUENCIES:
378                sampling_frequencies = value
379            elif config_type == TYPE_FRAME_DURATION:
380                frame_duration = value
381            elif config_type == TYPE_OCTETS_PER_FRAME:
382                octets_per_frame = value
383            else:
384                print("Unknown config type")
385            length -= (config_length + 1)
386
387        # Ignore metadata
388        metadata_length, packet = unpack_data(packet, 1, False)
389        packet = unpack_data(packet, metadata_length, True)
390
391        for count in range(num_of_bis):
392            bis_index, packet = unpack_data(packet, 1, False)
393            broadcast_map[adv_handle].bis_index_list.append(bis_index)
394            length, packet = unpack_data(packet, 1, False)
395            if len(packet) < length:
396                print("Invalid level 3 codec information length")
397                return
398
399            while length > 0:
400                config_length, packet = unpack_data(packet, 1, False)
401                config_type, packet = unpack_data(packet, 1, False)
402                value, packet = unpack_data(packet, config_length - 1, False)
403                if config_type == TYPE_CHANNEL_ALLOCATION:
404                    channel_allocation = value
405                else:
406                    print("Ignored config type")
407                length -= (config_length + 1)
408
409            broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies
410            broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration
411            broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame
412            broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation
413
414    return packet
415
416
417def debug_print(log):
418    global packet_number
419    print("#" + str(packet_number) + ": " + log)
420
421
422def unpack_data(data, byte, ignore):
423    if ignore:
424        return data[byte:]
425
426    value = 0
427    if byte == 1:
428        value = struct.unpack("<B", data[:byte])[0]
429    elif byte == 2:
430        value = struct.unpack("<H", data[:byte])[0]
431    elif byte == 4:
432        value = struct.unpack("<I", data[:byte])[0]
433    return value, data[byte:]
434
435
436def parse_command_packet(packet, timestamp):
437    opcode, packet = unpack_data(packet, 2, False)
438    if opcode == OPCODE_HCI_CREATE_CIS:
439        debug_print("OPCODE_HCI_CREATE_CIS")
440
441        length, packet = unpack_data(packet, 1, False)
442        if length != len(packet):
443            debug_print("Invalid cmd length")
444            return
445        cis_count, packet = unpack_data(packet, 1, False)
446        for i in range(cis_count):
447            cis_handle, packet = unpack_data(packet, 2, False)
448            cis_handle &= 0x0EFF
449            acl_handle, packet = unpack_data(packet, 2, False)
450            connection_map[acl_handle].cis_handle = cis_handle
451            cis_acl_map[cis_handle] = acl_handle
452
453        if debug_enable:
454            connection_map[acl_handle].dump()
455    elif opcode == OPCODE_REMOVE_ISO_DATA_PATH:
456        debug_print("OPCODE_REMOVE_ISO_DATA_PATH")
457
458        length, packet = unpack_data(packet, 1, False)
459        if length != len(packet):
460            debug_print("Invalid cmd length")
461            return
462
463        iso_handle, packet = unpack_data(packet, 2, False)
464        # CIS stream
465        if iso_handle in cis_acl_map:
466            acl_handle = cis_acl_map[iso_handle]
467            dump_cis_audio_data_to_file(acl_handle)
468        # To Do: BIS stream
469        elif iso_handle in bis_stream_map:
470            dump_bis_audio_data_to_file(iso_handle)
471    elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA:
472        debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA")
473
474        length, packet = unpack_data(packet, 1, False)
475        if length != len(packet):
476            debug_print("Invalid cmd length")
477            return
478
479        if length < 21:
480            debug_print("Ignored. Not basic audio announcement")
481            return
482
483        adv_hdl, packet = unpack_data(packet, 1, False)
484        #ignore operation, advertising_data_length
485        packet = unpack_data(packet, 2, True)
486        length, packet = unpack_data(packet, 1, False)
487        if length != len(packet):
488            debug_print("Invalid AD element length")
489            return
490
491        ad_type, packet = unpack_data(packet, 1, False)
492        service, packet = unpack_data(packet, 2, False)
493        if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
494            debug_print("Ignored. Not basic audio announcement")
495            return
496
497        packet = parse_big_codec_information(adv_hdl, packet)
498    elif opcode == OPCODE_LE_CREATE_BIG:
499        debug_print("OPCODE_LE_CREATE_BIG")
500
501        length, packet = unpack_data(packet, 1, False)
502        if length != len(packet) and length < 31:
503            debug_print("Invalid Create BIG command length")
504            return
505
506        big_handle, packet = unpack_data(packet, 1, False)
507        adv_handle, packet = unpack_data(packet, 1, False)
508        big_adv_map[big_handle] = adv_handle
509    elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH:
510        debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH")
511        length, packet = unpack_data(packet, 1, False)
512        if len(packet) != length:
513            debug_print("Invalid LE SETUP ISO DATA PATH command length")
514            return
515
516        iso_handle, packet = unpack_data(packet, 2, False)
517        if iso_handle in bis_stream_map:
518            bis_stream_map[iso_handle].start_time = timestamp
519
520
521def parse_event_packet(packet):
522    event_code, packet = unpack_data(packet, 1, False)
523    if event_code != EVENT_CODE_LE_META_EVENT:
524        return
525
526    length, packet = unpack_data(packet, 1, False)
527    if len(packet) != length:
528        print("Invalid LE mata event length")
529        return
530
531    subevent_code, packet = unpack_data(packet, 1, False)
532    if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE:
533        return
534
535    status, packet = unpack_data(packet, 1, False)
536    if status != 0x00:
537        debug_print("Create_BIG failed")
538        return
539
540    big_handle, packet = unpack_data(packet, 1, False)
541    if big_handle not in big_adv_map:
542        print("Invalid BIG handle")
543        return
544    adv_handle = big_adv_map[big_handle]
545    # Ignore, we don't care these parameter
546    packet = unpack_data(packet, 15, True)
547    num_of_bis, packet = unpack_data(packet, 1, False)
548    for count in range(num_of_bis):
549        bis_handle, packet = unpack_data(packet, 2, False)
550        bis_index = broadcast_map[adv_handle].bis_index_list[count]
551        broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle
552        bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index]
553
554
555def convert_time_str(timestamp):
556    """This function converts time to string format."""
557    timestamp_sec = float(timestamp) / 1000000
558    local_timestamp = time.localtime(timestamp_sec)
559    ms = timestamp_sec - int(timestamp_sec)
560    ms_str = "{0:06}".format(int(round(ms * 1000000)))
561
562    str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
563    full_str_format = str_format + "_" + ms_str
564
565    return full_str_format
566
567
568def dump_cis_audio_data_to_file(acl_handle):
569    if debug_enable:
570        connection_map[acl_handle].dump()
571    file_name = ""
572    context_case = {
573        CONTEXT_TYPE_UNSPECIFIED: "Unspecified",
574        CONTEXT_TYPE_CONVERSATIONAL: "Conversational",
575        CONTEXT_TYPE_MEDIA: "Media",
576        CONTEXT_TYPE_GAME: "Game",
577        CONTEXT_TYPE_VOICEASSISTANTS: "VoiceAssistants",
578        CONTEXT_TYPE_LIVE: "Live",
579        CONTEXT_TYPE_RINGTONE: "Ringtone"
580    }
581    file_name += context_case.get(connection_map[acl_handle].context, "Unknown")
582    for ase in connection_map[acl_handle].ase.values():
583        sf_case = {
584            SAMPLE_FREQUENCY_8000: "8000",
585            SAMPLE_FREQUENCY_11025: "11025",
586            SAMPLE_FREQUENCY_16000: "16000",
587            SAMPLE_FREQUENCY_22050: "22050",
588            SAMPLE_FREQUENCY_24000: "24000",
589            SAMPLE_FREQUENCY_32000: "32000",
590            SAMPLE_FREQUENCY_44100: "44100",
591            SAMPLE_FREQUENCY_48000: "48000",
592            SAMPLE_FREQUENCY_88200: "88200",
593            SAMPLE_FREQUENCY_96000: "96000",
594            SAMPLE_FREQUENCY_176400: "176400",
595            SAMPLE_FREQUENCY_192000: "192000",
596            SAMPLE_FREQUENCY_384000: "284000"
597        }
598        file_name += ("_sf" + sf_case[ase.sampling_frequencies])
599        fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
600        file_name += ("_fd" + fd_case[ase.frame_duration])
601        al_case = {
602            AUDIO_LOCATION_MONO: "mono",
603            AUDIO_LOCATION_LEFT: "left",
604            AUDIO_LOCATION_RIGHT: "right",
605            AUDIO_LOCATION_CENTER: "center"
606        }
607        file_name += ("_" + al_case[ase.channel_allocation])
608        file_name += ("_frame" + str(ase.octets_per_frame))
609        file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time))
610        break
611
612    if connection_map[acl_handle].input_dump != []:
613        debug_print("Dump unicast input...")
614        f = open(file_name + "_input.bin", 'wb')
615        if add_header == True:
616            generate_header(f, connection_map[acl_handle], True)
617        arr = bytearray(connection_map[acl_handle].input_dump)
618        f.write(arr)
619        f.close()
620        connection_map[acl_handle].input_dump = []
621
622    if connection_map[acl_handle].output_dump != []:
623        debug_print("Dump unicast output...")
624        f = open(file_name + "_output.bin", 'wb')
625        if add_header == True:
626            generate_header(f, connection_map[acl_handle], True)
627        arr = bytearray(connection_map[acl_handle].output_dump)
628        f.write(arr)
629        f.close()
630        connection_map[acl_handle].output_dump = []
631
632    return
633
634
635def dump_bis_audio_data_to_file(iso_handle):
636    if debug_enable:
637        bis_stream_map[iso_handle].dump()
638    file_name = "broadcast"
639    sf_case = {
640        SAMPLE_FREQUENCY_8000: "8000",
641        SAMPLE_FREQUENCY_11025: "11025",
642        SAMPLE_FREQUENCY_16000: "16000",
643        SAMPLE_FREQUENCY_22050: "22050",
644        SAMPLE_FREQUENCY_24000: "24000",
645        SAMPLE_FREQUENCY_32000: "32000",
646        SAMPLE_FREQUENCY_44100: "44100",
647        SAMPLE_FREQUENCY_48000: "48000",
648        SAMPLE_FREQUENCY_88200: "88200",
649        SAMPLE_FREQUENCY_96000: "96000",
650        SAMPLE_FREQUENCY_176400: "176400",
651        SAMPLE_FREQUENCY_192000: "192000",
652        SAMPLE_FREQUENCY_384000: "284000"
653    }
654    file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies])
655    fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"}
656    file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration])
657    al_case = {
658        AUDIO_LOCATION_MONO: "mono",
659        AUDIO_LOCATION_LEFT: "left",
660        AUDIO_LOCATION_RIGHT: "right",
661        AUDIO_LOCATION_CENTER: "center"
662    }
663    file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation])
664    file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame))
665    file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time))
666
667    if bis_stream_map[iso_handle].output_dump != []:
668        debug_print("Dump broadcast output...")
669        f = open(file_name + "_output.bin", 'wb')
670        if add_header == True:
671            generate_header(f, bis_stream_map[iso_handle], False)
672        arr = bytearray(bis_stream_map[iso_handle].output_dump)
673        f.write(arr)
674        f.close()
675        bis_stream_map[iso_handle].output_dump = []
676
677    return
678
679
680def parse_acl_packet(packet, flags, timestamp):
681    # Check the minimum acl length, HCI leader (4 bytes)
682    # + L2CAP header (4 bytes)
683    if len(packet) < 8:
684        debug_print("Invalid acl data length.")
685        return
686
687    connection_handle, packet = unpack_data(packet, 2, False)
688    connection_handle = connection_handle & 0x0FFF
689    if connection_handle > 0x0EFF:
690        debug_print("Invalid packet handle, skip")
691        return
692    total_length, packet = unpack_data(packet, 2, False)
693    if total_length != len(packet):
694        debug_print("Invalid total length, skip")
695        return
696    pdu_length, packet = unpack_data(packet, 2, False)
697    channel_id, packet = unpack_data(packet, 2, False)
698    if pdu_length != len(packet):
699        debug_print("Invalid pdu length, skip")
700        return
701
702    if debug_enable:
703        debug_print("ACL connection_handle - " + str(connection_handle) + " channel id - " + (str(channel_id)))
704
705    # Gather EATT CID
706    if channel_id == L2CAP_CID:
707        global l2cap_identifier_set
708        global source_cid
709        global destinate_cid
710        opcode, packet = unpack_data(packet, 1, False)
711        identifier, packet = unpack_data(packet, 1, False)
712        l2cap_length, packet = unpack_data(packet, 2, False)
713        if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ:
714            spsm, packet = unpack_data(packet, 2, False)
715            if spsm == PSM_EATT:
716                if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ:
717                    l2cap_identifier_set.add(identifier)
718                    packet = unpack_data(packet, 6, True)
719                    for i in range(0, l2cap_length - 8, 2):
720                        cid, packet = unpack_data(packet, 2, False)
721                        source_cid.add(cid)
722
723        if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP:
724            if identifier in l2cap_identifier_set:
725                l2cap_identifier_set.remove(identifier)
726                packet = unpack_data(packet, 8, True)
727                for i in range(0, l2cap_length - 8, 2):
728                    cid, packet = unpack_data(packet, 2, False)
729                    destinate_cid.add(cid)
730
731    # Parse ATT protocol
732    if channel_id == L2CAP_ATT_CID:
733        if debug_enable:
734            debug_print("parse_att_packet")
735        parse_att_packet(packet, connection_handle, flags, timestamp)
736
737    if channel_id in source_cid or channel_id in destinate_cid:
738        if debug_enable:
739            debug_print("parse_eatt_packet")
740        packet = unpack_data(packet, 2, True)
741        parse_att_packet(packet, connection_handle, flags, timestamp)
742
743
744def parse_iso_packet(packet, flags):
745    iso_handle, packet = unpack_data(packet, 2, False)
746    iso_handle &= 0x0EFF
747    iso_data_load_length, packet = unpack_data(packet, 2, False)
748    if iso_data_load_length != len(packet):
749        debug_print("Invalid iso data load length")
750        return
751
752    # Ignore timestamp, sequence number
753    packet = unpack_data(packet, 6, True)
754    iso_sdu_length, packet = unpack_data(packet, 2, False)
755    if len(packet) == 0:
756        debug_print("The iso data is empty")
757    elif iso_sdu_length != len(packet):
758        debug_print("Invalid iso sdu length")
759        return
760
761    # CIS stream
762    if iso_handle in cis_acl_map:
763        acl_handle = cis_acl_map[iso_handle]
764        if flags == SENT:
765            connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet)))
766            connection_map[acl_handle].output_dump.extend(list(packet))
767        elif flags == RECEIVED:
768            connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet)))
769            connection_map[acl_handle].input_dump.extend(list(packet))
770    elif iso_handle in bis_stream_map:
771        bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet)))
772        bis_stream_map[iso_handle].output_dump.extend(list(packet))
773
774
775def parse_next_packet(btsnoop_file):
776    global packet_number
777    packet_number += 1
778    packet_header = btsnoop_file.read(25)
779    if len(packet_header) != 25:
780        return False
781
782    (length_original, length_captured, flags, dropped_packets, timestamp,
783     type) = struct.unpack(">IIIIqB", packet_header)
784
785    if length_original != length_captured:
786        debug_print("Filtered btnsoop, can not be parsed")
787        return False
788
789    packet = btsnoop_file.read(length_captured - 1)
790    if len(packet) != length_original - 1:
791        debug_print("Invalid packet length!")
792        return False
793
794    if dropped_packets:
795        debug_print("Invalid droped value")
796        return False
797
798    packet_handle = {
799        COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)),
800        ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)),
801        SCO_PACKET: (lambda x, y, z: None),
802        EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)),
803        ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y))
804    }
805    packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp)
806    return True
807
808
809def main():
810    parser = argparse.ArgumentParser()
811    parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure")
812    parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true")
813    parser.add_argument("--header",
814                        help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.",
815                        action="store_true")
816    parser.add_argument("--ase_handle", help="Set the ASE handle manually.", type=int)
817
818    argv = parser.parse_args()
819    BTSNOOP_FILE_NAME = argv.btsnoop_file
820
821    global debug_enable
822    global add_header
823    global ase_handle
824    if argv.verbose:
825        debug_enable = True
826
827    if argv.header:
828        add_header = True
829
830    if argv.ase_handle:
831        ase_handle = int(argv.ase_handle)
832
833    with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file:
834        if btsnoop_file.read(16) != BTSNOOP_HEADER:
835            print("Invalid btsnoop header")
836            exit(1)
837
838        while True:
839            if not parse_next_packet(btsnoop_file):
840                break
841
842    for handle in connection_map.keys():
843        dump_cis_audio_data_to_file(handle)
844
845    for handle in bis_stream_map.keys():
846        dump_bis_audio_data_to_file(handle)
847
848
849if __name__ == "__main__":
850    main()
851