1#!/usr/bin/env python3 2# 3# Perform Security Manager Test Cases using two BTstack instances 4# 5# Copyright 2018 BlueKitchen GmbH 6# 7 8import os 9import subprocess 10import sys 11import time 12import signal 13import select 14import fcntl 15import csv 16import shutil 17import datetime 18 19io_capabilities = [ 20'IO_CAPABILITY_DISPLAY_ONLY', 21'IO_CAPABILITY_DISPLAY_YES_NO', 22'IO_CAPABILITY_KEYBOARD_ONLY', 23'IO_CAPABILITY_NO_INPUT_NO_OUTPUT', 24'IO_CAPABILITY_KEYBOARD_DISPLAY'] 25 26SM_AUTHREQ_NO_BONDING = 0x00 27SM_AUTHREQ_BONDING = 0x01 28SM_AUTHREQ_MITM_PROTECTION = 0x04 29SM_AUTHREQ_SECURE_CONNECTION = 0x08 30SM_AUTHREQ_KEYPRESS = 0x10 31 32failures = [ 33'', 34'PASSKEY_ENTRY_FAILED', 35'OOB_NOT_AVAILABLE', 36'AUTHENTHICATION_REQUIREMENTS', 37'CONFIRM_VALUE_FAILED', 38'PAIRING_NOT_SUPPORTED', 39'ENCRYPTION_KEY_SIZE', 40'COMMAND_NOT_SUPPORTED', 41'UNSPECIFIED_REASON', 42'REPEATED_ATTEMPTS', 43'INVALID_PARAMETERS', 44'DHKEY_CHECK_FAILED', 45'NUMERIC_COMPARISON_FAILED', 46] 47 48# tester config 49debug = False 50regenerate = False 51usb_paths = ['07', '02-03'] 52 53class Node: 54 55 def __init__(self): 56 self.name = 'node' 57 self._got_line = False 58 self.peer_addr = None 59 self.failure = None 60 61 def get_name(self): 62 return self.name 63 64 def set_name(self, name): 65 self.name = name 66 67 def set_auth_req(self, auth_req): 68 self.auth_req = auth_req 69 70 def set_io_capabilities(self, io_capabilities): 71 self.io_capabilities = io_capabilities 72 73 def set_oob_data(self, oob_data): 74 self.oob_data = oob_data 75 76 def set_failure(self, failure): 77 self.failure = failure 78 79 def set_usb_path(self, path): 80 self.usb_path = path 81 82 def get_stdout_fd(self): 83 return self.stdout.fileno() 84 85 def read_stdout(self): 86 c = os.read(self.stdout.fileno(), 1).decode("utf-8") 87 if len(c) == 0: 88 return 89 if c in '\n\r': 90 if len(self.linebuffer) > 0: 91 self._got_line = True 92 else: 93 self.linebuffer += c 94 95 def got_line(self): 96 return self._got_line 97 98 def fetch_line(self): 99 line = self.linebuffer 100 self.linebuffer = '' 101 self._got_line = False 102 return line 103 104 def start_process(self): 105 args = ['./sm_test', '-u', self.usb_path] 106 if self.peer_addr != None: 107 args.append('-a') 108 args.append(self.peer_addr) 109 if self.failure != None: 110 args.append('-f') 111 args.append(self.failure) 112 args.append('-i') 113 args.append(self.io_capabilities) 114 args.append('-r') 115 args.append(self.auth_req) 116 args.append('-o') 117 args.append(self.oob_data) 118 print('%s - "%s"' % (self.name, ' '.join(args))) 119 self.p = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 120 (self.stdin, self.stdout) = (self.p.stdin, self.p.stdout) 121 self.linebuffer = '' 122 123 def set_packet_log(self, path): 124 self.packet_log = path 125 126 def get_packet_log(self): 127 return self.packet_log 128 129 def set_bd_addr(self, addr): 130 self.bd_addr = addr 131 132 def get_bd_addr(self): 133 return self.bd_addr 134 135 def set_peer_addr(self, addr): 136 self.peer_addr = addr 137 138 def write(self, string): 139 print("CMD -> %s: %s" % (self.name, string)) 140 self.stdin.write(string.encode('utf-8')) 141 142 def terminate(self): 143 self.write('x') 144 # wait for 'EXIT' message indicating coverage data was written 145 while not self.got_line(): 146 self.read_stdout() 147 self.p.terminate() 148 149def run(test_descriptor, nodes): 150 state = 'W4_SLAVE_OOB_RANDOM' 151 pairing_complete = [] 152 while True: 153 # create map fd -> node 154 nodes_by_fd = { node.get_stdout_fd():node for node in nodes} 155 read_fds = nodes_by_fd.keys() 156 (read_ready, write_ready, exception_ready) = select.select(read_fds,[],[]) 157 for fd in read_ready: 158 node = nodes_by_fd[fd] 159 node.read_stdout() 160 if node.got_line(): 161 line = node.fetch_line() 162 if debug: 163 print('%s: %s' % (node.get_name(), line)) 164 if line.startswith('Packet Log: '): 165 path = line.split(': ')[1] 166 node.set_packet_log(path) 167 print('%s log %s' % (node.get_name(), path)) 168 elif line.startswith('BD_ADDR: '): 169 addr = line.split(': ')[1] 170 node.set_bd_addr(addr) 171 print('%s started' % node.get_name()) 172 elif line.startswith('LOCAL_OOB_CONFIRM:'): 173 confirm = line.split('OOB_CONFIRM: ')[1] 174 test_descriptor[node.get_name()+'_oob_confirm'] = confirm 175 elif line.startswith('LOCAL_OOB_RANDOM:'): 176 random = line.split('OOB_RANDOM: ')[1] 177 test_descriptor[node.get_name()+'_oob_random'] = random 178 print('%s OOB Random: %s' % (node.get_name(), random)) 179 if state == 'W4_SLAVE_OOB_RANDOM': 180 # peripheral started, start central 181 state = 'W4_MASTER_OOB_RANDOM' 182 master_role = test_descriptor['master_role'] 183 master = Node() 184 # configure master 185 master.set_name(master_role) 186 master.usb_path = usb_paths[1] 187 master.set_peer_addr(addr) 188 master.set_auth_req(test_descriptor[master_role + '_auth_req']) 189 master.set_io_capabilities(test_descriptor[master_role + '_io_capabilities']) 190 master.set_oob_data(test_descriptor[master_role + '_oob_data']) 191 if master_role == 'tester': 192 master.set_failure(test_descriptor['tester_failure']) 193 master.start_process() 194 nodes.append(master) 195 # 196 if node.get_name() == 'iut': 197 iut_node = node 198 tester_node = master 199 else: 200 iut_node = master 201 tester_node = node 202 elif state == 'W4_MASTER_OOB_RANDOM': 203 # central started, start connecting 204 node.write('c') 205 print('start to connect') 206 state = 'W4_CONNECTED' 207 elif line.startswith('CONNECTED:'): 208 print('%s connected' % node.get_name()) 209 if state == 'W4_CONNECTED' and node == nodes[1]: 210 # simulate OOK exchange if requested 211 if test_descriptor['tester_oob_data'] == '1': 212 print('Simulate IUT -> Tester OOB') 213 tester_node.write('o' + test_descriptor['iut_oob_confirm']) 214 tester_node.write('r' + test_descriptor['iut_oob_random']) 215 test_descriptor['method'] = 'OOB' 216 if test_descriptor['iut_oob_data'] == '1': 217 print('Simulate Tester -> IUT OOB') 218 iut_node.write('o' + test_descriptor['tester_oob_confirm']) 219 iut_node.write('r' + test_descriptor['tester_oob_random']) 220 test_descriptor['method'] = 'OOB' 221 node.write('p') 222 state = 'W4_PAIRING' 223 elif line.startswith('JUST_WORKS_REQUEST'): 224 print('%s just works requested' % node.get_name()) 225 test_descriptor['method'] = 'Just Works' 226 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 227 print('Decline bonding') 228 node.write('d') 229 else: 230 print('Accept bonding') 231 node.write('a') 232 elif line.startswith('NUMERIC_COMPARISON_REQUEST'): 233 print('%s numeric comparison requested' % node.get_name()) 234 test_descriptor['method'] = 'Numeric Comparison' 235 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 236 print('Decline bonding') 237 node.write('d') 238 else: 239 print('Accept bonding') 240 node.write('a') 241 elif line.startswith('PASSKEY_DISPLAY_NUMBER'): 242 passkey = line.split(': ')[1] 243 print('%s passkey display %s' % (node.get_name(), passkey)) 244 test_descriptor['passkey'] = passkey 245 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 246 print('Decline bonding') 247 node.write('d') 248 if state == 'W4_PAIRING': 249 state = 'W4_PASSKEY_INPUT' 250 else: 251 test_descriptor['waiting_node'].write(test_descriptor['passkey']) 252 elif line.startswith('PASSKEY_INPUT_NUMBER'): 253 test_descriptor['method'] = 'Passkey Entry' 254 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 255 print('Decline bonding') 256 node.write('d') 257 elif state == 'W4_PASSKEY_INPUT': 258 node.write(test_descriptor['passkey']) 259 else: 260 test_descriptor['waiting_node'] = node 261 state = 'W4_PASSKEY_DISPLAY' 262 elif line.startswith('PAIRING_COMPLETE'): 263 result = line.split(': ')[1] 264 (status,reason) = result.split(',') 265 test_descriptor[node.get_name()+'_pairing_complete_status'] = status 266 test_descriptor[node.get_name()+'_pairing_complete_reason'] = reason 267 print('%s pairing complete: status %s, reason %s' % (node.get_name(), status, reason)) 268 pairing_complete.append(node.get_name()) 269 # pairing complete? 270 if len(pairing_complete) == 2: 271 # on error, test is finished, else wait for notify 272 if status != '0': 273 return 274 elif line.startswith('DISCONNECTED'): 275 # Abort on unexpected disconnect 276 print('%s unexpected disconnect' % node.get_name()) 277 test_descriptor['error'] = 'DISCONNECTED' 278 return 279 elif line.startswith('COUNTER'): 280 print('%s notification received' % node.get_name()) 281 return; 282 283def write_config(fout, test_descriptor): 284 attributes = [ 285 'header', 286 '---', 287 'bd_addr', 288 'role', 289 'failure', 290 'io_capabilities', 291 'mitm', 292 'secure_connection', 293 'keypress', 294 'rfu', 295 'oob_data', 296 'method', 297 'passkey', 298 'pairing_complete_status', 299 'pairing_complete_reason'] 300 301 # header 302 fout.write('Test: %s\n' % test_descriptor['name']) 303 fout.write('Date: %s\n' % str(datetime.datetime.now())) 304 fout.write('\n') 305 attribute_len = 28 306 value_len = 35 307 format_string = '%%-%us|%%-%us|%%-%us\n' % (attribute_len, value_len, value_len) 308 for attribute in attributes: 309 name = attribute 310 if attribute == 'header': 311 name = 'Attribute' 312 iut = 'IUT' 313 tester = 'Tester' 314 elif attribute == '---': 315 name = '-' * attribute_len 316 iut = '-' * value_len 317 tester = '-' * value_len 318 elif attribute == 'io_capabilities': 319 iut = io_capabilities[int(test_descriptor['iut_io_capabilities' ])] 320 tester = io_capabilities[int(test_descriptor['tester_io_capabilities'])] 321 elif attribute == 'mitm': 322 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_MITM_PROTECTION) >> 2 323 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_MITM_PROTECTION) >> 2 324 elif attribute == 'secure_connection': 325 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 326 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 327 elif attribute == 'keypress': 328 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_KEYPRESS) >> 4 329 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_KEYPRESS) >> 4 330 elif attribute == 'rfu': 331 iut = (int(test_descriptor['iut_auth_req' ]) & 192) >> 6 332 tester = (int(test_descriptor['tester_auth_req']) & 192) >> 6 333 elif attribute == 'passkey': 334 if not 'passkey' in test_descriptor: 335 continue 336 iut = test_descriptor['passkey'] 337 tester = test_descriptor['passkey'] 338 elif attribute == 'method': 339 if not 'method' in test_descriptor: 340 continue 341 iut = test_descriptor['method'] 342 tester = test_descriptor['method'] 343 elif attribute == 'failure': 344 iut = '' 345 tester = failures[int(test_descriptor['tester_failure'])] 346 else: 347 iut = test_descriptor['iut_' + attribute] 348 tester = test_descriptor['tester_' + attribute] 349 fout.write(format_string % (name, iut, tester)) 350 351def run_test(test_descriptor): 352 # shutdown previous sm_test instances 353 try: 354 subprocess.run(['killall', 'sm_test'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 355 except: 356 pass 357 358 # trash all bonding informatino 359 try: 360 subprocess.call(['rm', '-f', '/tmp/btstack_*']) 361 except: 362 pass 363 364 test_name = test_descriptor['name'] 365 print('Test: %s' % test_name, file=sys.stderr) 366 367 if '/SLA/' in test_descriptor['name']: 368 iut_role = 'responder' 369 tester_role = 'initiator' 370 slave_role = 'iut' 371 master_role = 'tester' 372 else: 373 iut_role = 'initiator' 374 tester_role = 'responder' 375 slave_role = 'tester' 376 master_role = 'iut' 377 378 test_descriptor['iut_role' ] = iut_role 379 test_descriptor['tester_role'] = tester_role 380 test_descriptor['master_role'] = master_role 381 test_descriptor['slave_role'] = slave_role 382 383 slave = Node() 384 385 # configure slave 386 slave.set_name(slave_role) 387 slave.usb_path = usb_paths[0] 388 slave.set_auth_req(test_descriptor[slave_role + '_auth_req']) 389 slave.set_io_capabilities(test_descriptor[slave_role + '_io_capabilities']) 390 slave.set_oob_data(test_descriptor[slave_role + '_oob_data']) 391 if slave_role == 'tester': 392 slave.set_failure(test_descriptor['tester_failure']) 393 394 # start up slave 395 slave.start_process() 396 397 nodes = [slave] 398 399 # run test 400 try: 401 run(test_descriptor, nodes) 402 if 'error' in test_descriptor: 403 sys.exit() 404 raise NameError(test_descriptor['error']) 405 406 # identify iut and tester 407 if iut_role == 'responder': 408 iut = nodes[0] 409 tester = nodes[1] 410 else: 411 iut = nodes[1] 412 tester = nodes[0] 413 414 test_folder = test_descriptor['test_folder'] 415 416 # check result 417 test_ok = True 418 if test_descriptor['tester_failure'] != '0': 419 # expect status != 0 if tester_failure set 420 test_ok &= test_descriptor['iut_pairing_complete_status'] != '0' 421 test_ok &= test_descriptor['iut_pairing_complete_reason'] == test_descriptor['tester_failure'] 422 else: 423 test_ok &= test_descriptor['iut_pairing_complete_status'] == '0' 424 425 # check pairing method 426 if 'method' in test_descriptor: 427 method = test_descriptor['method'] 428 if 'SCJW' in test_name and (method != 'Just Works' and method != 'Numeric Comparison'): 429 test_ok = False 430 if 'SCPK' in test_name and method != 'Passkey Entry': 431 test_ok = False 432 if 'SCOB' in test_name and method != 'OOB': 433 test_ok = False 434 435 # rename folder if test not ok 436 if not test_ok: 437 test_folder = 'TEST_FAIL-' + test_folder 438 439 # move hci logs into result folder 440 os.makedirs(test_folder) 441 shutil.move(iut.get_packet_log(), test_folder + '/iut.pklg') 442 shutil.move(tester.get_packet_log(), test_folder + '/tester.pklg') 443 444 # write config 445 with open (test_folder + '/config.txt', "wt") as fout: 446 test_descriptor['iut_bd_addr'] = iut.get_bd_addr() 447 test_descriptor['tester_bd_addr'] = tester.get_bd_addr() 448 write_config(fout, test_descriptor) 449 450 except KeyboardInterrupt: 451 print('Interrupted') 452 test_descriptor['interrupted'] = 'EXIT' 453 454 except NameError: 455 print('Run-time error') 456 457 # shutdown 458 for node in nodes: 459 node.terminate() 460 print("Done\n") 461 462 463# read tests 464with open('sm_test.csv') as csvfile: 465 reader = csv.DictReader(csvfile) 466 for test_descriptor in reader: 467 test_name = test_descriptor['name'] 468 469 if test_name.startswith('#'): 470 continue 471 if len(test_name) == 0: 472 continue 473 474 test_folder = test_name.replace('/', '_') 475 test_descriptor['test_folder'] = test_folder 476 477 # skip test if regenerate not requested 478 if os.path.exists(test_folder): 479 if regenerate: 480 shutil.rmtree(test_folder) 481 else: 482 print('Test: %s (completed)' % test_name) 483 continue 484 485 # run test (max 10 times) 486 tries = 10 487 done = False 488 while not done: 489 print(test_descriptor) 490 run_test(test_descriptor) 491 tries = tries - 1 492 done = True 493 494 # escalate CTRL-C 495 if 'interrupted' in test_descriptor: 496 break 497 498 # repeat on 'error' 499 if 'error' in test_descriptor: 500 del test_descriptor['error'] 501 if tries > 0: 502 done = False 503 504 if 'interrupted' in test_descriptor: 505 break 506