1#!/usr/bin/env python 2# Copyright 2021 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15# 16# 17# This script extracts LE Audio audio data from btsnoop. 18# Generates a audio dump file where each frame consists of a two-byte frame 19# length information and the coded frame 20# 21# Audio File Name Format: 22# [Context]_sf[Sample frequency]_fd[Frame duration]_[Channel allocation]_ 23# frame[Octets per frame]_[Stream start timestamp]_[Direction].bin 24# 25# 26# Usage: 27# ./dump_le_audio.py BTSNOOP.cfa [-v] [--header] [--ase_handle ASE_HANDLE] 28# 29# -v, --verbose: to enable the verbose log 30# --header: Add the header for LC3 Conformance Interoperability Test Software V.1.0.3 from LC3 test specification. 31# --ase_handle ASE_HANDLE: Set the ASE handle manually. 32# 33# NOTE: 34# Please make sure you HCI Snoop data file includes the following frames: 35# 1. GATT service discovery for "ASE Control Point" chracteristic (if you give the ase_handle via command, the flow could be skipped) 36# 2. GATT config codec via ASE Control Point 37# 3. HCI create CIS to point out the "Start stream", and the data frames. 38# After all hci packet parse finished, would dump all remain audio data as well 39# 40# Correspondsing Spec. 41# ASCS_1.0 42# PACS_1.0 43# BAP_1.0 44# LC3.TS V1.0.3 45# 46from collections import defaultdict 47from os import X_OK 48 49import argparse 50import struct 51import sys 52import time 53 54BTSNOOP_FILE_NAME = "" 55BTSNOOP_HEADER = b'btsnoop\x00\x00\x00\x00\x01\x00\x00\x03\xea' 56 57COMMADN_PACKET = 1 58ACL_PACKET = 2 59SCO_PACKET = 3 60EVENT_PACKET = 4 61ISO_PACKET = 5 62 63SENT = 0 64RECEIVED = 1 65 66L2CAP_ATT_CID = 0x0004 67L2CAP_CID = 0x0005 68 69PSM_EATT = 0x0027 70 71# opcode for att protocol 72OPCODE_ATT_READ_BY_TYPE_RSP = 0x09 73OPCODE_ATT_WRITE_CMD = 0x52 74 75UUID_ASE_CONTROL_POINT = 0x2BC6 76 77# opcode for ase control 78OPCODE_CONFIG_CODEC = 0x01 79OPCODE_ENABLE = 0x03 80OPCODE_UPDATE_METADATA = 0x07 81OPCODE_RELEASE = 0x08 82 83# opcode for hci command 84OPCODE_HCI_CREATE_CIS = 0x2064 85OPCODE_REMOVE_ISO_DATA_PATH = 0x206F 86OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA = 0x203F 87OPCODE_LE_CREATE_BIG = 0x2068 88OPCODE_LE_SETUP_ISO_DATA_PATH = 0x206E 89 90# opcode for L2CAP channel 91OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ = 0x17 92OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP = 0x18 93 94# HCI event 95EVENT_CODE_LE_META_EVENT = 0x3E 96SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE = 0x1B 97 98TYPE_STREAMING_AUDIO_CONTEXTS = 0x02 99 100TYPE_SAMPLING_FREQUENCIES = 0x01 101TYPE_FRAME_DURATION = 0x02 102TYPE_CHANNEL_ALLOCATION = 0x03 103TYPE_OCTETS_PER_FRAME = 0x04 104 105CONTEXT_TYPE_UNSPECIFIED = 0x0001 106CONTEXT_TYPE_CONVERSATIONAL = 0x0002 107CONTEXT_TYPE_MEDIA = 0x0004 108CONTEXT_TYPE_GAME = 0x0008 109CONTEXT_TYPE_VOICEASSISTANTS = 0x0020 110CONTEXT_TYPE_LIVE = 0x0040 111CONTEXT_TYPE_RINGTONE = 0x0200 112 113# sample frequency 114SAMPLE_FREQUENCY_8000 = 0x01 115SAMPLE_FREQUENCY_11025 = 0x02 116SAMPLE_FREQUENCY_16000 = 0x03 117SAMPLE_FREQUENCY_22050 = 0x04 118SAMPLE_FREQUENCY_24000 = 0x05 119SAMPLE_FREQUENCY_32000 = 0x06 120SAMPLE_FREQUENCY_44100 = 0x07 121SAMPLE_FREQUENCY_48000 = 0x08 122SAMPLE_FREQUENCY_88200 = 0x09 123SAMPLE_FREQUENCY_96000 = 0x0a 124SAMPLE_FREQUENCY_176400 = 0x0b 125SAMPLE_FREQUENCY_192000 = 0x0c 126SAMPLE_FREQUENCY_384000 = 0x0d 127 128FRAME_DURATION_7_5 = 0x00 129FRAME_DURATION_10 = 0x01 130 131AUDIO_LOCATION_MONO = 0x00 132AUDIO_LOCATION_LEFT = 0x01 133AUDIO_LOCATION_RIGHT = 0x02 134AUDIO_LOCATION_CENTER = 0x04 135 136AD_TYPE_SERVICE_DATA_16_BIT = 0x16 137BASIC_AUDIO_ANNOUNCEMENT_SERVICE = 0x1851 138 139packet_number = 0 140debug_enable = False 141add_header = False 142ase_handle = 0xFFFF 143 144l2cap_identifier_set = set() 145source_cid = set() 146destinate_cid = set() 147 148 149class Connection: 150 151 def __init__(self): 152 self.ase_handle = 0xFFFF 153 self.number_of_ases = 0 154 self.ase = defaultdict(AseStream) 155 self.context = 0xFFFF 156 self.cis_handle = 0xFFFF 157 self.input_dump = [] 158 self.output_dump = [] 159 self.start_time = 0xFFFFFFFF 160 161 def dump(self): 162 print("start_time: " + str(self.start_time)) 163 print("ase_handle: " + str(self.ase_handle)) 164 print("context type: " + str(self.context)) 165 print("number_of_ases: " + str(self.number_of_ases)) 166 print("cis_handle: " + str(self.cis_handle)) 167 for id, ase_stream in self.ase.items(): 168 print("ase id: " + str(id)) 169 ase_stream.dump() 170 171 172class AseStream: 173 174 def __init__(self): 175 self.sampling_frequencies = 0xFF 176 self.frame_duration = 0xFF 177 self.channel_allocation = 0xFFFFFFFF 178 self.octets_per_frame = 0xFFFF 179 180 def dump(self): 181 print("sampling_frequencies: " + str(self.sampling_frequencies)) 182 print("frame_duration: " + str(self.frame_duration)) 183 print("channel_allocation: " + str(self.channel_allocation)) 184 print("octets_per_frame: " + str(self.octets_per_frame)) 185 186 187class Broadcast: 188 189 def __init__(self): 190 self.num_of_bis = defaultdict(int) # subgroup - num_of_bis 191 self.bis = defaultdict(BisStream) # bis_index - codec_config 192 self.bis_index_handle_map = defaultdict(int) # bis_index - bis_handle 193 self.bis_index_list = [] 194 195 def dump(self): 196 for bis_index, iso_stream in self.bis.items(): 197 print("bis_index: " + str(bis_index) + " bis handle: " + str(self.bis_index_handle_map[bis_index])) 198 iso_stream.dump() 199 200 201class BisStream: 202 203 def __init__(self): 204 self.sampling_frequencies = 0xFF 205 self.frame_duration = 0xFF 206 self.channel_allocation = 0xFFFFFFFF 207 self.octets_per_frame = 0xFFFF 208 self.output_dump = [] 209 self.start_time = 0xFFFFFFFF 210 211 def dump(self): 212 print("start_time: " + str(self.start_time)) 213 print("sampling_frequencies: " + str(self.sampling_frequencies)) 214 print("frame_duration: " + str(self.frame_duration)) 215 print("channel_allocation: " + str(self.channel_allocation)) 216 print("octets_per_frame: " + str(self.octets_per_frame)) 217 218 219connection_map = defaultdict(Connection) 220cis_acl_map = defaultdict(int) 221broadcast_map = defaultdict(Broadcast) 222big_adv_map = defaultdict(int) 223bis_stream_map = defaultdict(BisStream) 224 225 226def generate_header(file, stream, is_cis): 227 sf_case = { 228 SAMPLE_FREQUENCY_8000: 80, 229 SAMPLE_FREQUENCY_11025: 110, 230 SAMPLE_FREQUENCY_16000: 160, 231 SAMPLE_FREQUENCY_22050: 220, 232 SAMPLE_FREQUENCY_24000: 240, 233 SAMPLE_FREQUENCY_32000: 320, 234 SAMPLE_FREQUENCY_44100: 441, 235 SAMPLE_FREQUENCY_48000: 480, 236 SAMPLE_FREQUENCY_88200: 882, 237 SAMPLE_FREQUENCY_96000: 960, 238 SAMPLE_FREQUENCY_176400: 1764, 239 SAMPLE_FREQUENCY_192000: 1920, 240 SAMPLE_FREQUENCY_384000: 2840, 241 } 242 fd_case = {FRAME_DURATION_7_5: 7.5, FRAME_DURATION_10: 10} 243 al_case = {AUDIO_LOCATION_MONO: 1, AUDIO_LOCATION_LEFT: 1, AUDIO_LOCATION_RIGHT: 1, AUDIO_LOCATION_CENTER: 2} 244 245 header = bytearray.fromhex('1ccc1200') 246 if is_cis: 247 for ase in stream.ase.values(): 248 header = header + struct.pack("<H", sf_case[ase.sampling_frequencies]) 249 header = header + struct.pack("<H", int(ase.octets_per_frame * 8 * 10 / fd_case[ase.frame_duration])) 250 header = header + struct.pack("<HHHL", al_case[ase.channel_allocation], fd_case[ase.frame_duration] * 100, 251 0, 48000000) 252 break 253 else: 254 header = header + struct.pack("<H", sf_case[stream.sampling_frequencies]) 255 header = header + struct.pack("<H", int(stream.octets_per_frame * 8 * 10 / fd_case[stream.frame_duration])) 256 header = header + struct.pack("<HHHL", al_case[stream.channel_allocation], fd_case[stream.frame_duration] * 100, 257 0, 48000000) 258 file.write(header) 259 260 261def parse_codec_information(connection_handle, ase_id, packet): 262 length, packet = unpack_data(packet, 1, False) 263 if len(packet) < length: 264 debug_print("Invalid codec configuration length") 265 return packet 266 ase = connection_map[connection_handle].ase[ase_id] 267 while length > 0: 268 config_length, packet = unpack_data(packet, 1, False) 269 config_type, packet = unpack_data(packet, 1, False) 270 value, packet = unpack_data(packet, config_length - 1, False) 271 if config_type == TYPE_SAMPLING_FREQUENCIES: 272 ase.sampling_frequencies = value 273 elif config_type == TYPE_FRAME_DURATION: 274 ase.frame_duration = value 275 elif config_type == TYPE_CHANNEL_ALLOCATION: 276 ase.channel_allocation = value 277 elif config_type == TYPE_OCTETS_PER_FRAME: 278 ase.octets_per_frame = value 279 length -= (config_length + 1) 280 281 return packet 282 283 284def parse_att_read_by_type_rsp(packet, connection_handle): 285 length, packet = unpack_data(packet, 1, False) 286 if length != 7: 287 #ignore the packet, we're only interested in this packet for the characteristic type UUID 288 return 289 290 if length > len(packet): 291 debug_print("Invalid att packet length") 292 return 293 294 attribute_handle, packet = unpack_data(packet, 2, False) 295 if debug_enable: 296 debug_print("attribute_handle - " + str(attribute_handle)) 297 packet = unpack_data(packet, 1, True) 298 value_handle, packet = unpack_data(packet, 2, False) 299 characteristic_uuid, packet = unpack_data(packet, 2, False) 300 if characteristic_uuid == UUID_ASE_CONTROL_POINT: 301 debug_print("ASE Control point found!") 302 connection_map[connection_handle].ase_handle = value_handle 303 304 305def parse_att_write_cmd(packet, connection_handle, timestamp): 306 attribute_handle, packet = unpack_data(packet, 2, False) 307 global ase_handle 308 if ase_handle != 0xFFFF: 309 connection_map[connection_handle].ase_handle = ase_handle 310 311 if connection_map[connection_handle].ase_handle == attribute_handle: 312 if debug_enable: 313 debug_print("Action with ASE Control point") 314 opcode, packet = unpack_data(packet, 1, False) 315 if opcode == OPCODE_CONFIG_CODEC: 316 debug_print("config_codec") 317 (connection_map[connection_handle].number_of_ases, packet) = unpack_data(packet, 1, False) 318 for i in range(connection_map[connection_handle].number_of_ases): 319 ase_id, packet = unpack_data(packet, 1, False) 320 # ignore target_latency, target_phy, codec_id 321 packet = unpack_data(packet, 7, True) 322 packet = parse_codec_information(connection_handle, ase_id, packet) 323 elif opcode == OPCODE_ENABLE or opcode == OPCODE_UPDATE_METADATA: 324 if debug_enable: 325 debug_print("enable or update metadata") 326 numbers_of_ases, packet = unpack_data(packet, 1, False) 327 for i in range(numbers_of_ases): 328 ase_id, packet = unpack_data(packet, 1, False) 329 metadata_length, packet = unpack_data(packet, 1, False) 330 if metadata_length > len(packet): 331 debug_print("Invalid metadata length") 332 return 333 length, packet = unpack_data(packet, 1, False) 334 if length > len(packet): 335 debug_print("Invalid metadata value length") 336 return 337 metadata_type, packet = unpack_data(packet, 1, False) 338 if metadata_type == TYPE_STREAMING_AUDIO_CONTEXTS: 339 (connection_map[connection_handle].context, packet) = unpack_data(packet, 2, False) 340 break 341 342 if opcode == OPCODE_ENABLE: 343 debug_print("enable, set timestamp") 344 connection_map[connection_handle].start_time = timestamp 345 346 if debug_enable: 347 connection_map[connection_handle].dump() 348 349 350def parse_att_packet(packet, connection_handle, flags, timestamp): 351 opcode, packet = unpack_data(packet, 1, False) 352 packet_handle = { 353 (OPCODE_ATT_READ_BY_TYPE_RSP, RECEIVED): (lambda x, y, z: parse_att_read_by_type_rsp(x, y)), 354 (OPCODE_ATT_WRITE_CMD, SENT): (lambda x, y, z: parse_att_write_cmd(x, y, z)) 355 } 356 packet_handle.get((opcode, flags), lambda x, y, z: None)(packet, connection_handle, timestamp) 357 358 359def parse_big_codec_information(adv_handle, packet): 360 # Ignore presentation delay 361 packet = unpack_data(packet, 3, True) 362 number_of_subgroup, packet = unpack_data(packet, 1, False) 363 for subgroup in range(number_of_subgroup): 364 num_of_bis, packet = unpack_data(packet, 1, False) 365 broadcast_map[adv_handle].num_of_bis[subgroup] = num_of_bis 366 # Ignore codec id 367 packet = unpack_data(packet, 5, True) 368 length, packet = unpack_data(packet, 1, False) 369 if len(packet) < length: 370 print("Invalid subgroup codec information length") 371 return 372 373 while length > 0: 374 config_length, packet = unpack_data(packet, 1, False) 375 config_type, packet = unpack_data(packet, 1, False) 376 value, packet = unpack_data(packet, config_length - 1, False) 377 if config_type == TYPE_SAMPLING_FREQUENCIES: 378 sampling_frequencies = value 379 elif config_type == TYPE_FRAME_DURATION: 380 frame_duration = value 381 elif config_type == TYPE_OCTETS_PER_FRAME: 382 octets_per_frame = value 383 else: 384 print("Unknown config type") 385 length -= (config_length + 1) 386 387 # Ignore metadata 388 metadata_length, packet = unpack_data(packet, 1, False) 389 packet = unpack_data(packet, metadata_length, True) 390 391 for count in range(num_of_bis): 392 bis_index, packet = unpack_data(packet, 1, False) 393 broadcast_map[adv_handle].bis_index_list.append(bis_index) 394 length, packet = unpack_data(packet, 1, False) 395 if len(packet) < length: 396 print("Invalid level 3 codec information length") 397 return 398 399 while length > 0: 400 config_length, packet = unpack_data(packet, 1, False) 401 config_type, packet = unpack_data(packet, 1, False) 402 value, packet = unpack_data(packet, config_length - 1, False) 403 if config_type == TYPE_CHANNEL_ALLOCATION: 404 channel_allocation = value 405 else: 406 print("Ignored config type") 407 length -= (config_length + 1) 408 409 broadcast_map[adv_handle].bis[bis_index].sampling_frequencies = sampling_frequencies 410 broadcast_map[adv_handle].bis[bis_index].frame_duration = frame_duration 411 broadcast_map[adv_handle].bis[bis_index].octets_per_frame = octets_per_frame 412 broadcast_map[adv_handle].bis[bis_index].channel_allocation = channel_allocation 413 414 return packet 415 416 417def debug_print(log): 418 global packet_number 419 print("#" + str(packet_number) + ": " + log) 420 421 422def unpack_data(data, byte, ignore): 423 if ignore: 424 return data[byte:] 425 426 value = 0 427 if byte == 1: 428 value = struct.unpack("<B", data[:byte])[0] 429 elif byte == 2: 430 value = struct.unpack("<H", data[:byte])[0] 431 elif byte == 4: 432 value = struct.unpack("<I", data[:byte])[0] 433 return value, data[byte:] 434 435 436def parse_command_packet(packet, timestamp): 437 opcode, packet = unpack_data(packet, 2, False) 438 if opcode == OPCODE_HCI_CREATE_CIS: 439 debug_print("OPCODE_HCI_CREATE_CIS") 440 441 length, packet = unpack_data(packet, 1, False) 442 if length != len(packet): 443 debug_print("Invalid cmd length") 444 return 445 cis_count, packet = unpack_data(packet, 1, False) 446 for i in range(cis_count): 447 cis_handle, packet = unpack_data(packet, 2, False) 448 cis_handle &= 0x0EFF 449 acl_handle, packet = unpack_data(packet, 2, False) 450 connection_map[acl_handle].cis_handle = cis_handle 451 cis_acl_map[cis_handle] = acl_handle 452 453 if debug_enable: 454 connection_map[acl_handle].dump() 455 elif opcode == OPCODE_REMOVE_ISO_DATA_PATH: 456 debug_print("OPCODE_REMOVE_ISO_DATA_PATH") 457 458 length, packet = unpack_data(packet, 1, False) 459 if length != len(packet): 460 debug_print("Invalid cmd length") 461 return 462 463 iso_handle, packet = unpack_data(packet, 2, False) 464 # CIS stream 465 if iso_handle in cis_acl_map: 466 acl_handle = cis_acl_map[iso_handle] 467 dump_cis_audio_data_to_file(acl_handle) 468 # To Do: BIS stream 469 elif iso_handle in bis_stream_map: 470 dump_bis_audio_data_to_file(iso_handle) 471 elif opcode == OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA: 472 debug_print("OPCODE_LE_SET_PERIODIC_ADVERTISING_DATA") 473 474 length, packet = unpack_data(packet, 1, False) 475 if length != len(packet): 476 debug_print("Invalid cmd length") 477 return 478 479 if length < 21: 480 debug_print("Ignored. Not basic audio announcement") 481 return 482 483 adv_hdl, packet = unpack_data(packet, 1, False) 484 #ignore operation, advertising_data_length 485 packet = unpack_data(packet, 2, True) 486 length, packet = unpack_data(packet, 1, False) 487 if length != len(packet): 488 debug_print("Invalid AD element length") 489 return 490 491 ad_type, packet = unpack_data(packet, 1, False) 492 service, packet = unpack_data(packet, 2, False) 493 if ad_type != AD_TYPE_SERVICE_DATA_16_BIT or service != BASIC_AUDIO_ANNOUNCEMENT_SERVICE: 494 debug_print("Ignored. Not basic audio announcement") 495 return 496 497 packet = parse_big_codec_information(adv_hdl, packet) 498 elif opcode == OPCODE_LE_CREATE_BIG: 499 debug_print("OPCODE_LE_CREATE_BIG") 500 501 length, packet = unpack_data(packet, 1, False) 502 if length != len(packet) and length < 31: 503 debug_print("Invalid Create BIG command length") 504 return 505 506 big_handle, packet = unpack_data(packet, 1, False) 507 adv_handle, packet = unpack_data(packet, 1, False) 508 big_adv_map[big_handle] = adv_handle 509 elif opcode == OPCODE_LE_SETUP_ISO_DATA_PATH: 510 debug_print("OPCODE_LE_SETUP_ISO_DATA_PATH") 511 length, packet = unpack_data(packet, 1, False) 512 if len(packet) != length: 513 debug_print("Invalid LE SETUP ISO DATA PATH command length") 514 return 515 516 iso_handle, packet = unpack_data(packet, 2, False) 517 if iso_handle in bis_stream_map: 518 bis_stream_map[iso_handle].start_time = timestamp 519 520 521def parse_event_packet(packet): 522 event_code, packet = unpack_data(packet, 1, False) 523 if event_code != EVENT_CODE_LE_META_EVENT: 524 return 525 526 length, packet = unpack_data(packet, 1, False) 527 if len(packet) != length: 528 print("Invalid LE mata event length") 529 return 530 531 subevent_code, packet = unpack_data(packet, 1, False) 532 if subevent_code != SUBEVENT_CODE_LE_CREATE_BIG_COMPLETE: 533 return 534 535 status, packet = unpack_data(packet, 1, False) 536 if status != 0x00: 537 debug_print("Create_BIG failed") 538 return 539 540 big_handle, packet = unpack_data(packet, 1, False) 541 if big_handle not in big_adv_map: 542 print("Invalid BIG handle") 543 return 544 adv_handle = big_adv_map[big_handle] 545 # Ignore, we don't care these parameter 546 packet = unpack_data(packet, 15, True) 547 num_of_bis, packet = unpack_data(packet, 1, False) 548 for count in range(num_of_bis): 549 bis_handle, packet = unpack_data(packet, 2, False) 550 bis_index = broadcast_map[adv_handle].bis_index_list[count] 551 broadcast_map[adv_handle].bis_index_handle_map[bis_index] = bis_handle 552 bis_stream_map[bis_handle] = broadcast_map[adv_handle].bis[bis_index] 553 554 555def convert_time_str(timestamp): 556 """This function converts time to string format.""" 557 timestamp_sec = float(timestamp) / 1000000 558 local_timestamp = time.localtime(timestamp_sec) 559 ms = timestamp_sec - int(timestamp_sec) 560 ms_str = "{0:06}".format(int(round(ms * 1000000))) 561 562 str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp) 563 full_str_format = str_format + "_" + ms_str 564 565 return full_str_format 566 567 568def dump_cis_audio_data_to_file(acl_handle): 569 if debug_enable: 570 connection_map[acl_handle].dump() 571 file_name = "" 572 context_case = { 573 CONTEXT_TYPE_UNSPECIFIED: "Unspecified", 574 CONTEXT_TYPE_CONVERSATIONAL: "Conversational", 575 CONTEXT_TYPE_MEDIA: "Media", 576 CONTEXT_TYPE_GAME: "Game", 577 CONTEXT_TYPE_VOICEASSISTANTS: "VoiceAssistants", 578 CONTEXT_TYPE_LIVE: "Live", 579 CONTEXT_TYPE_RINGTONE: "Ringtone" 580 } 581 file_name += context_case.get(connection_map[acl_handle].context, "Unknown") 582 for ase in connection_map[acl_handle].ase.values(): 583 sf_case = { 584 SAMPLE_FREQUENCY_8000: "8000", 585 SAMPLE_FREQUENCY_11025: "11025", 586 SAMPLE_FREQUENCY_16000: "16000", 587 SAMPLE_FREQUENCY_22050: "22050", 588 SAMPLE_FREQUENCY_24000: "24000", 589 SAMPLE_FREQUENCY_32000: "32000", 590 SAMPLE_FREQUENCY_44100: "44100", 591 SAMPLE_FREQUENCY_48000: "48000", 592 SAMPLE_FREQUENCY_88200: "88200", 593 SAMPLE_FREQUENCY_96000: "96000", 594 SAMPLE_FREQUENCY_176400: "176400", 595 SAMPLE_FREQUENCY_192000: "192000", 596 SAMPLE_FREQUENCY_384000: "284000" 597 } 598 file_name += ("_sf" + sf_case[ase.sampling_frequencies]) 599 fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} 600 file_name += ("_fd" + fd_case[ase.frame_duration]) 601 al_case = { 602 AUDIO_LOCATION_MONO: "mono", 603 AUDIO_LOCATION_LEFT: "left", 604 AUDIO_LOCATION_RIGHT: "right", 605 AUDIO_LOCATION_CENTER: "center" 606 } 607 file_name += ("_" + al_case[ase.channel_allocation]) 608 file_name += ("_frame" + str(ase.octets_per_frame)) 609 file_name += ("_" + convert_time_str(connection_map[acl_handle].start_time)) 610 break 611 612 if connection_map[acl_handle].input_dump != []: 613 debug_print("Dump unicast input...") 614 f = open(file_name + "_input.bin", 'wb') 615 if add_header == True: 616 generate_header(f, connection_map[acl_handle], True) 617 arr = bytearray(connection_map[acl_handle].input_dump) 618 f.write(arr) 619 f.close() 620 connection_map[acl_handle].input_dump = [] 621 622 if connection_map[acl_handle].output_dump != []: 623 debug_print("Dump unicast output...") 624 f = open(file_name + "_output.bin", 'wb') 625 if add_header == True: 626 generate_header(f, connection_map[acl_handle], True) 627 arr = bytearray(connection_map[acl_handle].output_dump) 628 f.write(arr) 629 f.close() 630 connection_map[acl_handle].output_dump = [] 631 632 return 633 634 635def dump_bis_audio_data_to_file(iso_handle): 636 if debug_enable: 637 bis_stream_map[iso_handle].dump() 638 file_name = "broadcast" 639 sf_case = { 640 SAMPLE_FREQUENCY_8000: "8000", 641 SAMPLE_FREQUENCY_11025: "11025", 642 SAMPLE_FREQUENCY_16000: "16000", 643 SAMPLE_FREQUENCY_22050: "22050", 644 SAMPLE_FREQUENCY_24000: "24000", 645 SAMPLE_FREQUENCY_32000: "32000", 646 SAMPLE_FREQUENCY_44100: "44100", 647 SAMPLE_FREQUENCY_48000: "48000", 648 SAMPLE_FREQUENCY_88200: "88200", 649 SAMPLE_FREQUENCY_96000: "96000", 650 SAMPLE_FREQUENCY_176400: "176400", 651 SAMPLE_FREQUENCY_192000: "192000", 652 SAMPLE_FREQUENCY_384000: "284000" 653 } 654 file_name += ("_sf" + sf_case[bis_stream_map[iso_handle].sampling_frequencies]) 655 fd_case = {FRAME_DURATION_7_5: "7_5", FRAME_DURATION_10: "10"} 656 file_name += ("_fd" + fd_case[bis_stream_map[iso_handle].frame_duration]) 657 al_case = { 658 AUDIO_LOCATION_MONO: "mono", 659 AUDIO_LOCATION_LEFT: "left", 660 AUDIO_LOCATION_RIGHT: "right", 661 AUDIO_LOCATION_CENTER: "center" 662 } 663 file_name += ("_" + al_case[bis_stream_map[iso_handle].channel_allocation]) 664 file_name += ("_frame" + str(bis_stream_map[iso_handle].octets_per_frame)) 665 file_name += ("_" + convert_time_str(bis_stream_map[iso_handle].start_time)) 666 667 if bis_stream_map[iso_handle].output_dump != []: 668 debug_print("Dump broadcast output...") 669 f = open(file_name + "_output.bin", 'wb') 670 if add_header == True: 671 generate_header(f, bis_stream_map[iso_handle], False) 672 arr = bytearray(bis_stream_map[iso_handle].output_dump) 673 f.write(arr) 674 f.close() 675 bis_stream_map[iso_handle].output_dump = [] 676 677 return 678 679 680def parse_acl_packet(packet, flags, timestamp): 681 # Check the minimum acl length, HCI leader (4 bytes) 682 # + L2CAP header (4 bytes) 683 if len(packet) < 8: 684 debug_print("Invalid acl data length.") 685 return 686 687 connection_handle, packet = unpack_data(packet, 2, False) 688 connection_handle = connection_handle & 0x0FFF 689 if connection_handle > 0x0EFF: 690 debug_print("Invalid packet handle, skip") 691 return 692 total_length, packet = unpack_data(packet, 2, False) 693 if total_length != len(packet): 694 debug_print("Invalid total length, skip") 695 return 696 pdu_length, packet = unpack_data(packet, 2, False) 697 channel_id, packet = unpack_data(packet, 2, False) 698 if pdu_length != len(packet): 699 debug_print("Invalid pdu length, skip") 700 return 701 702 if debug_enable: 703 debug_print("ACL connection_handle - " + str(connection_handle) + " channel id - " + (str(channel_id))) 704 705 # Gather EATT CID 706 if channel_id == L2CAP_CID: 707 global l2cap_identifier_set 708 global source_cid 709 global destinate_cid 710 opcode, packet = unpack_data(packet, 1, False) 711 identifier, packet = unpack_data(packet, 1, False) 712 l2cap_length, packet = unpack_data(packet, 2, False) 713 if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ: 714 spsm, packet = unpack_data(packet, 2, False) 715 if spsm == PSM_EATT: 716 if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_REQ: 717 l2cap_identifier_set.add(identifier) 718 packet = unpack_data(packet, 6, True) 719 for i in range(0, l2cap_length - 8, 2): 720 cid, packet = unpack_data(packet, 2, False) 721 source_cid.add(cid) 722 723 if opcode == OPCODE_L2CAP_CREDIT_BASED_CONNECTION_RSP: 724 if identifier in l2cap_identifier_set: 725 l2cap_identifier_set.remove(identifier) 726 packet = unpack_data(packet, 8, True) 727 for i in range(0, l2cap_length - 8, 2): 728 cid, packet = unpack_data(packet, 2, False) 729 destinate_cid.add(cid) 730 731 # Parse ATT protocol 732 if channel_id == L2CAP_ATT_CID: 733 if debug_enable: 734 debug_print("parse_att_packet") 735 parse_att_packet(packet, connection_handle, flags, timestamp) 736 737 if channel_id in source_cid or channel_id in destinate_cid: 738 if debug_enable: 739 debug_print("parse_eatt_packet") 740 packet = unpack_data(packet, 2, True) 741 parse_att_packet(packet, connection_handle, flags, timestamp) 742 743 744def parse_iso_packet(packet, flags): 745 iso_handle, packet = unpack_data(packet, 2, False) 746 iso_handle &= 0x0EFF 747 iso_data_load_length, packet = unpack_data(packet, 2, False) 748 if iso_data_load_length != len(packet): 749 debug_print("Invalid iso data load length") 750 return 751 752 # Ignore timestamp, sequence number 753 packet = unpack_data(packet, 6, True) 754 iso_sdu_length, packet = unpack_data(packet, 2, False) 755 if len(packet) == 0: 756 debug_print("The iso data is empty") 757 elif iso_sdu_length != len(packet): 758 debug_print("Invalid iso sdu length") 759 return 760 761 # CIS stream 762 if iso_handle in cis_acl_map: 763 acl_handle = cis_acl_map[iso_handle] 764 if flags == SENT: 765 connection_map[acl_handle].output_dump.extend(struct.pack("<H", len(packet))) 766 connection_map[acl_handle].output_dump.extend(list(packet)) 767 elif flags == RECEIVED: 768 connection_map[acl_handle].input_dump.extend(struct.pack("<H", len(packet))) 769 connection_map[acl_handle].input_dump.extend(list(packet)) 770 elif iso_handle in bis_stream_map: 771 bis_stream_map[iso_handle].output_dump.extend(struct.pack("<H", len(packet))) 772 bis_stream_map[iso_handle].output_dump.extend(list(packet)) 773 774 775def parse_next_packet(btsnoop_file): 776 global packet_number 777 packet_number += 1 778 packet_header = btsnoop_file.read(25) 779 if len(packet_header) != 25: 780 return False 781 782 (length_original, length_captured, flags, dropped_packets, timestamp, 783 type) = struct.unpack(">IIIIqB", packet_header) 784 785 if length_original != length_captured: 786 debug_print("Filtered btnsoop, can not be parsed") 787 return False 788 789 packet = btsnoop_file.read(length_captured - 1) 790 if len(packet) != length_original - 1: 791 debug_print("Invalid packet length!") 792 return False 793 794 if dropped_packets: 795 debug_print("Invalid droped value") 796 return False 797 798 packet_handle = { 799 COMMADN_PACKET: (lambda x, y, z: parse_command_packet(x, z)), 800 ACL_PACKET: (lambda x, y, z: parse_acl_packet(x, y, z)), 801 SCO_PACKET: (lambda x, y, z: None), 802 EVENT_PACKET: (lambda x, y, z: parse_event_packet(x)), 803 ISO_PACKET: (lambda x, y, z: parse_iso_packet(x, y)) 804 } 805 packet_handle.get(type, lambda x, y, z: None)(packet, flags, timestamp) 806 return True 807 808 809def main(): 810 parser = argparse.ArgumentParser() 811 parser.add_argument("btsnoop_file", help="btsnoop file contains LE audio start procedure") 812 parser.add_argument("-v", "--verbose", help="Enable verbose log.", action="store_true") 813 parser.add_argument("--header", 814 help="Add the header for LC3 Conformance Interoperability Test Software V.1.0.3.", 815 action="store_true") 816 parser.add_argument("--ase_handle", help="Set the ASE handle manually.", type=int) 817 818 argv = parser.parse_args() 819 BTSNOOP_FILE_NAME = argv.btsnoop_file 820 821 global debug_enable 822 global add_header 823 global ase_handle 824 if argv.verbose: 825 debug_enable = True 826 827 if argv.header: 828 add_header = True 829 830 if argv.ase_handle: 831 ase_handle = int(argv.ase_handle) 832 833 with open(BTSNOOP_FILE_NAME, "rb") as btsnoop_file: 834 if btsnoop_file.read(16) != BTSNOOP_HEADER: 835 print("Invalid btsnoop header") 836 exit(1) 837 838 while True: 839 if not parse_next_packet(btsnoop_file): 840 break 841 842 for handle in connection_map.keys(): 843 dump_cis_audio_data_to_file(handle) 844 845 for handle in bis_stream_map.keys(): 846 dump_bis_audio_data_to_file(handle) 847 848 849if __name__ == "__main__": 850 main() 851