1#!/usr/bin/env python3 2# 3# Perform Security Manager Test Cases using two BTstack instances 4# 5# Copyright 2018 BlueKitchen GmbH 6# 7 8import os 9import subprocess 10import sys 11import time 12import signal 13import select 14import fcntl 15import csv 16import shutil 17import datetime 18 19io_capabilities = [ 20'IO_CAPABILITY_DISPLAY_ONLY', 21'IO_CAPABILITY_DISPLAY_YES_NO', 22'IO_CAPABILITY_KEYBOARD_ONLY', 23'IO_CAPABILITY_NO_INPUT_NO_OUTPUT', 24'IO_CAPABILITY_KEYBOARD_DISPLAY'] 25 26SM_AUTHREQ_NO_BONDING = 0x00 27SM_AUTHREQ_BONDING = 0x01 28SM_AUTHREQ_MITM_PROTECTION = 0x04 29SM_AUTHREQ_SECURE_CONNECTION = 0x08 30SM_AUTHREQ_KEYPRESS = 0x10 31 32failures = [ 33'', 34'PASSKEY_ENTRY_FAILED', 35'OOB_NOT_AVAILABLE', 36'AUTHENTHICATION_REQUIREMENTS', 37'CONFIRM_VALUE_FAILED', 38'PAIRING_NOT_SUPPORTED', 39'ENCRYPTION_KEY_SIZE', 40'COMMAND_NOT_SUPPORTED', 41'UNSPECIFIED_REASON', 42'REPEATED_ATTEMPTS', 43'INVALID_PARAMETERS', 44'DHKEY_CHECK_FAILED', 45'NUMERIC_COMPARISON_FAILED', 46] 47 48# tester config 49debug = False 50regenerate = False 51usb_paths = ['07', '02-03'] 52 53class Node: 54 55 def __init__(self): 56 self.name = 'node' 57 self._got_line = False 58 self.peer_addr = None 59 self.failure = None 60 61 def get_name(self): 62 return self.name 63 64 def set_name(self, name): 65 self.name = name 66 67 def set_auth_req(self, auth_req): 68 self.auth_req = auth_req 69 70 def set_io_capabilities(self, io_capabilities): 71 self.io_capabilities = io_capabilities 72 73 def set_oob_data(self, oob_data): 74 self.oob_data = oob_data 75 76 def set_failure(self, failure): 77 self.failure = failure 78 79 def set_usb_path(self, path): 80 self.usb_path = path 81 82 def get_stdout_fd(self): 83 return self.stdout.fileno() 84 85 def read_stdout(self): 86 c = os.read(self.stdout.fileno(), 1).decode("utf-8") 87 if len(c) == 0: 88 return 89 if c in '\n\r': 90 if len(self.linebuffer) > 0: 91 self._got_line = True 92 else: 93 self.linebuffer += c 94 95 def got_line(self): 96 return self._got_line 97 98 def fetch_line(self): 99 line = self.linebuffer 100 self.linebuffer = '' 101 self._got_line = False 102 return line 103 104 def start_process(self): 105 args = ['./sm_test', '-u', self.usb_path] 106 if self.peer_addr != None: 107 args.append('-a') 108 args.append(self.peer_addr) 109 if self.failure != None: 110 args.append('-f') 111 args.append(self.failure) 112 args.append('-i') 113 args.append(self.io_capabilities) 114 args.append('-r') 115 args.append(self.auth_req) 116 args.append('-o') 117 args.append(self.oob_data) 118 print('%s - "%s"' % (self.name, ' '.join(args))) 119 self.p = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 120 (self.stdin, self.stdout) = (self.p.stdin, self.p.stdout) 121 self.linebuffer = '' 122 123 def set_packet_log(self, path): 124 self.packet_log = path 125 126 def get_packet_log(self): 127 return self.packet_log 128 129 def set_bd_addr(self, addr): 130 self.bd_addr = addr 131 132 def get_bd_addr(self): 133 return self.bd_addr 134 135 def set_peer_addr(self, addr): 136 self.peer_addr = addr 137 138 def write(self, string): 139 print("CMD -> %s: %s" % (self.name, string)) 140 self.stdin.write(string.encode('utf-8')) 141 142 def terminate(self): 143 self.write('x') 144 # wait for 'EXIT' message indicating coverage data was written 145 while not self.got_line(): 146 self.read_stdout() 147 self.p.terminate() 148 149def run(test_descriptor, nodes): 150 state = 'W4_SLAVE_OOB_RANDOM' 151 pairing_complete = [] 152 while True: 153 # create map fd -> node 154 nodes_by_fd = { node.get_stdout_fd():node for node in nodes} 155 read_fds = nodes_by_fd.keys() 156 (read_ready, write_ready, exception_ready) = select.select(read_fds,[],[]) 157 for fd in read_ready: 158 node = nodes_by_fd[fd] 159 node.read_stdout() 160 if node.got_line(): 161 line = node.fetch_line() 162 if debug: 163 print('%s: %s' % (node.get_name(), line)) 164 if line.startswith('Packet Log: '): 165 path = line.split(': ')[1] 166 node.set_packet_log(path) 167 print('%s log %s' % (node.get_name(), path)) 168 elif line.startswith('BD_ADDR: '): 169 addr = line.split(': ')[1] 170 node.set_bd_addr(addr) 171 print('%s started' % node.get_name()) 172 elif line.startswith('LOCAL_OOB_CONFIRM:'): 173 confirm = line.split('OOB_CONFIRM: ')[1] 174 test_descriptor[node.get_name()+'_oob_confirm'] = confirm 175 elif line.startswith('LOCAL_OOB_RANDOM:'): 176 random = line.split('OOB_RANDOM: ')[1] 177 test_descriptor[node.get_name()+'_oob_random'] = random 178 print('%s OOB Random: %s' % (node.get_name(), random)) 179 if state == 'W4_SLAVE_OOB_RANDOM': 180 # peripheral started, start central 181 state = 'W4_MASTER_OOB_RANDOM' 182 master_role = test_descriptor['master_role'] 183 master = Node() 184 # configure master 185 master.set_name(master_role) 186 master.usb_path = usb_paths[1] 187 master.set_peer_addr(addr) 188 master.set_auth_req(test_descriptor[master_role + '_auth_req']) 189 master.set_io_capabilities(test_descriptor[master_role + '_io_capabilities']) 190 master.set_oob_data(test_descriptor[master_role + '_oob_data']) 191 if master_role == 'tester': 192 master.set_failure(test_descriptor['tester_failure']) 193 master.start_process() 194 nodes.append(master) 195 # 196 if node.get_name() == 'iut': 197 iut_node = node 198 tester_node = master 199 else: 200 iut_node = master 201 tester_node = node 202 elif state == 'W4_MASTER_OOB_RANDOM': 203 # central started, start connecting 204 node.write('c') 205 print('start to connect') 206 state = 'W4_CONNECTED' 207 elif line.startswith('CONNECTED:'): 208 print('%s connected' % node.get_name()) 209 if state == 'W4_CONNECTED' and node == nodes[1]: 210 # simulate OOK exchange if requested 211 if test_descriptor['tester_oob_data'] == '1': 212 print('Simulate IUT -> Tester OOB') 213 tester_node.write('o' + test_descriptor['iut_oob_confirm']) 214 tester_node.write('r' + test_descriptor['iut_oob_random']) 215 test_descriptor['method'] = 'OOB' 216 if test_descriptor['iut_oob_data'] == '1': 217 print('Simulate Tester -> IUT OOB') 218 iut_node.write('o' + test_descriptor['tester_oob_confirm']) 219 iut_node.write('r' + test_descriptor['tester_oob_random']) 220 test_descriptor['method'] = 'OOB' 221 node.write('p') 222 state = 'W4_PAIRING' 223 elif line.startswith('JUST_WORKS_REQUEST'): 224 print('%s just works requested' % node.get_name()) 225 test_descriptor['method'] = 'Just Works' 226 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 227 print('Decline bonding') 228 node.write('d') 229 else: 230 print('Accept bonding') 231 node.write('a') 232 elif line.startswith('NUMERIC_COMPARISON_REQUEST'): 233 print('%s numeric comparison requested' % node.get_name()) 234 test_descriptor['method'] = 'Numeric Comparison' 235 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 236 print('Decline bonding') 237 node.write('d') 238 else: 239 print('Accept bonding') 240 node.write('a') 241 elif line.startswith('PASSKEY_DISPLAY_NUMBER'): 242 passkey = line.split(': ')[1] 243 print('%s passkey display %s' % (node.get_name(), passkey)) 244 test_descriptor['passkey'] = passkey 245 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 246 print('Decline bonding') 247 node.write('d') 248 if state == 'W4_PAIRING': 249 state = 'W4_PASSKEY_INPUT' 250 else: 251 test_descriptor['waiting_node'].write(test_descriptor['passkey']) 252 elif line.startswith('PASSKEY_INPUT_NUMBER'): 253 test_descriptor['method'] = 'Passkey Entry' 254 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 255 print('Decline bonding') 256 node.write('d') 257 elif state == 'W4_PASSKEY_INPUT': 258 node.write(test_descriptor['passkey']) 259 else: 260 test_descriptor['waiting_node'] = node 261 state = 'W4_PASSKEY_DISPLAY' 262 elif line.startswith('PAIRING_COMPLETE'): 263 result = line.split(': ')[1] 264 (status,reason) = result.split(',') 265 test_descriptor[node.get_name()+'_pairing_complete_status'] = status 266 test_descriptor[node.get_name()+'_pairing_complete_reason'] = reason 267 print('%s pairing complete: status %s, reason %s' % (node.get_name(), status, reason)) 268 pairing_complete.append(node.get_name()) 269 # pairing complete? 270 if len(pairing_complete) == 2: 271 # on error, test is finished, else wait for notify 272 if status != '0': 273 return 274 elif line.startswith('COUNTER'): 275 print('%s notification received' % node.get_name()) 276 return; 277 278def write_config(fout, test_descriptor): 279 attributes = [ 280 'header', 281 '---', 282 'bd_addr', 283 'role', 284 'failure', 285 'io_capabilities', 286 'mitm', 287 'secure_connection', 288 'keypress', 289 'rfu', 290 'oob_data', 291 'method', 292 'passkey', 293 'pairing_complete_status', 294 'pairing_complete_reason'] 295 296 # header 297 fout.write('Test: %s\n' % test_descriptor['name']) 298 fout.write('Date: %s\n' % str(datetime.datetime.now())) 299 fout.write('\n') 300 attribute_len = 28 301 value_len = 35 302 format_string = '%%-%us|%%-%us|%%-%us\n' % (attribute_len, value_len, value_len) 303 for attribute in attributes: 304 name = attribute 305 if attribute == 'header': 306 name = 'Attribute' 307 iut = 'IUT' 308 tester = 'Tester' 309 elif attribute == '---': 310 name = '-' * attribute_len 311 iut = '-' * value_len 312 tester = '-' * value_len 313 elif attribute == 'io_capabilities': 314 iut = io_capabilities[int(test_descriptor['iut_io_capabilities' ])] 315 tester = io_capabilities[int(test_descriptor['tester_io_capabilities'])] 316 elif attribute == 'mitm': 317 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_MITM_PROTECTION) >> 2 318 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_MITM_PROTECTION) >> 2 319 elif attribute == 'secure_connection': 320 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 321 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 322 elif attribute == 'keypress': 323 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_KEYPRESS) >> 4 324 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_KEYPRESS) >> 4 325 elif attribute == 'rfu': 326 iut = (int(test_descriptor['iut_auth_req' ]) & 192) >> 6 327 tester = (int(test_descriptor['tester_auth_req']) & 192) >> 6 328 elif attribute == 'passkey': 329 if not 'passkey' in test_descriptor: 330 continue 331 iut = test_descriptor['passkey'] 332 tester = test_descriptor['passkey'] 333 elif attribute == 'method': 334 if not 'method' in test_descriptor: 335 continue 336 iut = test_descriptor['method'] 337 tester = test_descriptor['method'] 338 elif attribute == 'failure': 339 iut = '' 340 tester = failures[int(test_descriptor['tester_failure'])] 341 else: 342 iut = test_descriptor['iut_' + attribute] 343 tester = test_descriptor['tester_' + attribute] 344 fout.write(format_string % (name, iut, tester)) 345 346def run_test(test_descriptor): 347 # shutdown previous sm_test instances 348 try: 349 subprocess.run(['killall', 'sm_test'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 350 except: 351 pass 352 353 # trash all bonding informatino 354 try: 355 subprocess.call(['rm', '-f', '/tmp/btstack_*']) 356 except: 357 pass 358 359 test_name = test_descriptor['name'] 360 print('Test: %s' % test_name) 361 362 if '/SLA/' in test_descriptor['name']: 363 iut_role = 'responder' 364 tester_role = 'initiator' 365 slave_role = 'iut' 366 master_role = 'tester' 367 else: 368 iut_role = 'initiator' 369 tester_role = 'responder' 370 slave_role = 'tester' 371 master_role = 'iut' 372 373 test_descriptor['iut_role' ] = iut_role 374 test_descriptor['tester_role'] = tester_role 375 test_descriptor['master_role'] = master_role 376 test_descriptor['slave_role'] = slave_role 377 378 slave = Node() 379 380 # configure slave 381 slave.set_name(slave_role) 382 slave.usb_path = usb_paths[0] 383 slave.set_auth_req(test_descriptor[slave_role + '_auth_req']) 384 slave.set_io_capabilities(test_descriptor[slave_role + '_io_capabilities']) 385 slave.set_oob_data(test_descriptor[slave_role + '_oob_data']) 386 if slave_role == 'tester': 387 slave.set_failure(test_descriptor['tester_failure']) 388 389 # start up slave 390 slave.start_process() 391 392 nodes = [slave] 393 394 # run test 395 try: 396 run(test_descriptor, nodes) 397 398 # identify iut and tester 399 if iut_role == 'responder': 400 iut = nodes[0] 401 tester = nodes[1] 402 else: 403 iut = nodes[1] 404 tester = nodes[0] 405 406 test_folder = test_descriptor['test_folder'] 407 408 # check result 409 test_ok = True 410 if test_descriptor['tester_failure'] != '0': 411 # expect status != 0 if tester_failure set 412 test_ok &= test_descriptor['iut_pairing_complete_status'] != '0' 413 test_ok &= test_descriptor['iut_pairing_complete_reason'] == test_descriptor['tester_failure'] 414 else: 415 test_ok &= test_descriptor['iut_pairing_complete_status'] == '0' 416 417 # check pairing method 418 if 'method' in test_descriptor: 419 method = test_descriptor['method'] 420 if 'SCJW' in test_name and (method != 'Just Works' and method != 'Numeric Comparison'): 421 test_ok = False 422 if 'SCPK' in test_name and method != 'Passkey Entry': 423 test_ok = False 424 if 'SCOB' in test_name and method != 'OOB': 425 test_ok = False 426 427 # rename folder if test not ok 428 if not test_ok: 429 test_folder = 'TEST_FAIL-' + test_folder 430 431 # move hci logs into result folder 432 os.makedirs(test_folder) 433 shutil.move(iut.get_packet_log(), test_folder + '/iut.pklg') 434 shutil.move(tester.get_packet_log(), test_folder + '/tester.pklg') 435 436 # write config 437 with open (test_folder + '/config.txt', "wt") as fout: 438 test_descriptor['iut_bd_addr'] = iut.get_bd_addr() 439 test_descriptor['tester_bd_addr'] = tester.get_bd_addr() 440 write_config(fout, test_descriptor) 441 442 except KeyboardInterrupt: 443 print('Interrupted') 444 test_descriptor['interrupted'] = 'EXIT' 445 446 # shutdown 447 for node in nodes: 448 node.terminate() 449 print("Done\n") 450 451 452# read tests 453with open('sm_test.csv') as csvfile: 454 reader = csv.DictReader(csvfile) 455 for test_descriptor in reader: 456 test_name = test_descriptor['name'] 457 458 if test_name.startswith('#'): 459 continue 460 if len(test_name) == 0: 461 continue 462 463 test_folder = test_name.replace('/', '_') 464 test_descriptor['test_folder'] = test_folder 465 466 # skip test if regenerate not requested 467 if os.path.exists(test_folder): 468 if regenerate: 469 shutil.rmtree(test_folder) 470 else: 471 print('Test: %s (completed)' % test_name) 472 continue 473 474 # run test 475 print(test_descriptor) 476 run_test(test_descriptor) 477 478 if 'interrupted' in test_descriptor: 479 break 480