1#!/usr/bin/env python3 2# 3# Perform Security Manager Test Cases using two BTstack instances 4# 5# Copyright 2018 BlueKitchen GmbH 6# 7 8import os 9import subprocess 10import sys 11import time 12import signal 13import select 14import fcntl 15import csv 16import shutil 17import datetime 18 19io_capabilities = [ 20'IO_CAPABILITY_DISPLAY_ONLY', 21'IO_CAPABILITY_DISPLAY_YES_NO', 22'IO_CAPABILITY_KEYBOARD_ONLY', 23'IO_CAPABILITY_NO_INPUT_NO_OUTPUT', 24'IO_CAPABILITY_KEYBOARD_DISPLAY'] 25 26SM_AUTHREQ_NO_BONDING = 0x00 27SM_AUTHREQ_BONDING = 0x01 28SM_AUTHREQ_MITM_PROTECTION = 0x04 29SM_AUTHREQ_SECURE_CONNECTION = 0x08 30SM_AUTHREQ_KEYPRESS = 0x10 31 32failures = [ 33'', 34'PASSKEY_ENTRY_FAILED', 35'OOB_NOT_AVAILABLE', 36'AUTHENTHICATION_REQUIREMENTS', 37'CONFIRM_VALUE_FAILED', 38'PAIRING_NOT_SUPPORTED', 39'ENCRYPTION_KEY_SIZE', 40'COMMAND_NOT_SUPPORTED', 41'UNSPECIFIED_REASON', 42'REPEATED_ATTEMPTS', 43'INVALID_PARAMETERS', 44'DHKEY_CHECK_FAILED', 45'NUMERIC_COMPARISON_FAILED', 46] 47 48# tester config 49debug = False 50regenerate = False 51# usb_paths = ['4', '6'] 52usb_paths = ['1', '2'] 53 54class Node: 55 56 def __init__(self): 57 self.name = 'node' 58 self._got_line = False 59 self.peer_addr = None 60 self.failure = None 61 62 def get_name(self): 63 return self.name 64 65 def set_name(self, name): 66 self.name = name 67 68 def set_auth_req(self, auth_req): 69 self.auth_req = auth_req 70 71 def set_io_capabilities(self, io_capabilities): 72 self.io_capabilities = io_capabilities 73 74 def set_oob_data(self, oob_data): 75 self.oob_data = oob_data 76 77 def set_failure(self, failure): 78 self.failure = failure 79 80 def set_usb_path(self, path): 81 self.usb_path = path 82 83 def get_stdout_fd(self): 84 return self.stdout.fileno() 85 86 def read_stdout(self): 87 c = os.read(self.stdout.fileno(), 1).decode("utf-8") 88 if len(c) == 0: 89 return 90 if c in '\n\r': 91 if len(self.linebuffer) > 0: 92 self._got_line = True 93 else: 94 self.linebuffer += c 95 96 def got_line(self): 97 return self._got_line 98 99 def fetch_line(self): 100 line = self.linebuffer 101 self.linebuffer = '' 102 self._got_line = False 103 return line 104 105 def start_process(self): 106 args = ['./sm_test', '-u', self.usb_path] 107 if self.peer_addr != None: 108 args.append('-a') 109 args.append(self.peer_addr) 110 if self.failure != None: 111 args.append('-f') 112 args.append(self.failure) 113 args.append('-i') 114 args.append(self.io_capabilities) 115 args.append('-r') 116 args.append(self.auth_req) 117 args.append('-o') 118 args.append(self.oob_data) 119 print('%s - "%s"' % (self.name, ' '.join(args))) 120 self.p = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 121 (self.stdin, self.stdout) = (self.p.stdin, self.p.stdout) 122 self.linebuffer = '' 123 124 def set_packet_log(self, path): 125 self.packet_log = path 126 127 def get_packet_log(self): 128 return self.packet_log 129 130 def set_bd_addr(self, addr): 131 self.bd_addr = addr 132 133 def get_bd_addr(self): 134 return self.bd_addr 135 136 def set_peer_addr(self, addr): 137 self.peer_addr = addr 138 139 def write(self, string): 140 self.stdin.write(string.encode('utf-8')) 141 142 def terminate(self): 143 self.write('x') 144 # wait for 'EXIT' message indicating coverage data was written 145 while not self.got_line(): 146 self.read_stdout() 147 self.p.terminate() 148 149def run(test_descriptor, nodes): 150 state = 'W4_SLAVE_BD_ADDR' 151 pairing_complete = [] 152 while True: 153 # create map fd -> node 154 nodes_by_fd = { node.get_stdout_fd():node for node in nodes} 155 read_fds = nodes_by_fd.keys() 156 (read_ready, write_ready, exception_ready) = select.select(read_fds,[],[]) 157 for fd in read_ready: 158 node = nodes_by_fd[fd] 159 node.read_stdout() 160 if node.got_line(): 161 line = node.fetch_line() 162 if debug: 163 print('%s: %s' % (node.get_name(), line)) 164 if line.startswith('Packet Log: '): 165 path = line.split(': ')[1] 166 node.set_packet_log(path) 167 print('%s log %s' % (node.get_name(), path)) 168 elif line.startswith('BD_ADDR: '): 169 addr = line.split(': ')[1] 170 node.set_bd_addr(addr) 171 print('%s started' % node.get_name()) 172 if state == 'W4_SLAVE_BD_ADDR': 173 # peripheral started, start central 174 state = 'W4_MASTER_BD_ADDR' 175 master_role = test_descriptor['master_role'] 176 master = Node() 177 # configure master 178 master.set_name(master_role) 179 master.usb_path = usb_paths[1] 180 master.set_peer_addr(addr) 181 master.set_auth_req(test_descriptor[master_role + '_auth_req']) 182 master.set_io_capabilities(test_descriptor[master_role + '_io_capabilities']) 183 master.set_oob_data(test_descriptor[master_role + '_oob_data']) 184 if master_role == 'tester': 185 master.set_failure(test_descriptor['tester_failure']) 186 master.start_process() 187 nodes.append(master) 188 # 189 if node.get_name() == 'iut': 190 iut_node = node 191 tester_node = master 192 else: 193 iut_node = master 194 tester_node = node 195 elif state == 'W4_MASTER_BD_ADDR': 196 # central started, start connecting 197 node.write('c') 198 print('start to connect') 199 state = 'W4_CONNECTED' 200 elif line.startswith('LOCAL_OOB_CONFIRM:'): 201 confirm = line.split('OOB_CONFIRM: ')[1] 202 test_descriptor[node.get_name()+'_oob_confirm'] = confirm 203 elif line.startswith('LOCAL_OOB_RANDOM:'): 204 random = line.split('OOB_RANDOM: ')[1] 205 test_descriptor[node.get_name()+'_oob_random'] = random 206 elif line.startswith('CONNECTED:'): 207 print('%s connected' % node.get_name()) 208 if state == 'W4_CONNECTED' and node == nodes[1]: 209 # simulate OOK exchange if requested 210 if test_descriptor['tester_oob_data'] == '1': 211 print('Simulate IUT -> Tester OOB') 212 tester_node.write('o' + test_descriptor['iut_oob_confirm']) 213 tester_node.write('r' + test_descriptor['iut_oob_random']) 214 test_descriptor['method'] = 'OOB' 215 if test_descriptor['iut_oob_data'] == '1': 216 print('Simulate Tester -> IUT OOB') 217 iut_node.write('o' + test_descriptor['tester_oob_confirm']) 218 iut_node.write('r' + test_descriptor['tester_oob_random']) 219 test_descriptor['method'] = 'OOB' 220 node.write('p') 221 state = 'W4_PAIRING' 222 elif line.startswith('JUST_WORKS_REQUEST'): 223 print('%s just works requested' % node.get_name()) 224 test_descriptor['method'] = 'Just Works' 225 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 226 print('Decline bonding') 227 node.write('d') 228 else: 229 print('Accept bonding') 230 node.write('a') 231 elif line.startswith('NUMERIC_COMPARISON_REQUEST'): 232 print('%s numeric comparison requested' % node.get_name()) 233 test_descriptor['method'] = 'Numeric Comparison' 234 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '12': 235 print('Decline bonding') 236 node.write('d') 237 else: 238 print('Accept bonding') 239 node.write('a') 240 elif line.startswith('PASSKEY_DISPLAY_NUMBER'): 241 passkey = line.split(': ')[1] 242 print('%s passkey display %s' % (node.get_name(), passkey)) 243 test_descriptor['passkey'] = passkey 244 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 245 print('Decline bonding') 246 node.write('d') 247 if state == 'W4_PAIRING': 248 state = 'W4_PASSKEY_INPUT' 249 else: 250 test_descriptor['waiting_node'].write(test_descriptor['passkey']) 251 elif line.startswith('PASSKEY_INPUT_NUMBER'): 252 test_descriptor['method'] = 'Passkey Entry' 253 if node.get_name() == 'tester' and test_descriptor['tester_failure'] == '1': 254 print('Decline bonding') 255 node.write('d') 256 elif state == 'W4_PASSKEY_INPUT': 257 node.write(test_descriptor['passkey']) 258 else: 259 test_descriptor['waiting_node'] = node 260 state = 'W4_PASSKEY_DISPLAY' 261 elif line.startswith('PAIRING_COMPLETE'): 262 result = line.split(': ')[1] 263 (status,reason) = result.split(',') 264 test_descriptor[node.get_name()+'_pairing_complete_status'] = status 265 test_descriptor[node.get_name()+'_pairing_complete_reason'] = reason 266 print('%s pairing complete: status %s, reason %s' % (node.get_name(), status, reason)) 267 pairing_complete.append(node.get_name()) 268 # pairing complete? 269 if len(pairing_complete) == 2: 270 # on error, test is finished, else wait for notify 271 if status != '0': 272 return 273 elif line.startswith('COUNTER'): 274 print('%s notification received' % node.get_name()) 275 return; 276 277def write_config(fout, test_descriptor): 278 attributes = [ 279 'header', 280 '---', 281 'bd_addr', 282 'role', 283 'failure', 284 'io_capabilities', 285 'mitm', 286 'secure_connection', 287 'keypress', 288 'rfu', 289 'oob_data', 290 'method', 291 'passkey', 292 'pairing_complete_status', 293 'pairing_complete_reason'] 294 295 # header 296 fout.write('Test: %s\n' % test_descriptor['name']) 297 fout.write('Date: %s\n' % str(datetime.datetime.now())) 298 fout.write('\n') 299 attribute_len = 28 300 value_len = 35 301 format_string = '%%-%us|%%-%us|%%-%us\n' % (attribute_len, value_len, value_len) 302 for attribute in attributes: 303 name = attribute 304 if attribute == 'header': 305 name = 'Attribute' 306 iut = 'IUT' 307 tester = 'Tester' 308 elif attribute == '---': 309 name = '-' * attribute_len 310 iut = '-' * value_len 311 tester = '-' * value_len 312 elif attribute == 'io_capabilities': 313 iut = io_capabilities[int(test_descriptor['iut_io_capabilities' ])] 314 tester = io_capabilities[int(test_descriptor['tester_io_capabilities'])] 315 elif attribute == 'mitm': 316 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_MITM_PROTECTION) >> 2 317 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_MITM_PROTECTION) >> 2 318 elif attribute == 'secure_connection': 319 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 320 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_SECURE_CONNECTION) >> 3 321 elif attribute == 'keypress': 322 iut = (int(test_descriptor['iut_auth_req' ]) & SM_AUTHREQ_KEYPRESS) >> 4 323 tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_KEYPRESS) >> 4 324 elif attribute == 'rfu': 325 iut = (int(test_descriptor['iut_auth_req' ]) & 192) >> 6 326 tester = (int(test_descriptor['tester_auth_req']) & 192) >> 6 327 elif attribute == 'passkey': 328 if not 'passkey' in test_descriptor: 329 continue 330 iut = test_descriptor['passkey'] 331 tester = test_descriptor['passkey'] 332 elif attribute == 'method': 333 if not 'method' in test_descriptor: 334 continue 335 iut = test_descriptor['method'] 336 tester = test_descriptor['method'] 337 elif attribute == 'failure': 338 iut = '' 339 tester = failures[int(test_descriptor['tester_failure'])] 340 else: 341 iut = test_descriptor['iut_' + attribute] 342 tester = test_descriptor['tester_' + attribute] 343 fout.write(format_string % (name, iut, tester)) 344 345def run_test(test_descriptor): 346 # shutdown previous sm_test instances 347 try: 348 subprocess.call("killall sm_test", shell = True) 349 except: 350 pass 351 352 # trash all bonding informatino 353 try: 354 subprocess.call("rm /tmp/btstack_*", shell = True) 355 except: 356 pass 357 358 test_name = test_descriptor['name'] 359 print('Test: %s' % test_name) 360 361 if '/SLA/' in test_descriptor['name']: 362 iut_role = 'responder' 363 tester_role = 'initiator' 364 slave_role = 'iut' 365 master_role = 'tester' 366 else: 367 iut_role = 'initiator' 368 tester_role = 'responder' 369 slave_role = 'tester' 370 master_role = 'iut' 371 372 test_descriptor['iut_role' ] = iut_role 373 test_descriptor['tester_role'] = tester_role 374 test_descriptor['master_role'] = master_role 375 test_descriptor['slave_role'] = slave_role 376 377 slave = Node() 378 379 # configure slave 380 slave.set_name(slave_role) 381 slave.usb_path = usb_paths[0] 382 slave.set_auth_req(test_descriptor[slave_role + '_auth_req']) 383 slave.set_io_capabilities(test_descriptor[slave_role + '_io_capabilities']) 384 slave.set_oob_data(test_descriptor[slave_role + '_oob_data']) 385 if slave_role == 'tester': 386 slave.set_failure(test_descriptor['tester_failure']) 387 388 # start up slave 389 slave.start_process() 390 391 nodes = [slave] 392 393 # run test 394 try: 395 run(test_descriptor, nodes) 396 397 # identify iut and tester 398 if iut_role == 'responder': 399 iut = nodes[0] 400 tester = nodes[1] 401 else: 402 iut = nodes[1] 403 tester = nodes[0] 404 405 test_folder = test_descriptor['test_folder'] 406 407 # check result 408 test_ok = True 409 if test_descriptor['tester_failure'] != '0': 410 # expect status != 0 if tester_failure set 411 test_ok &= test_descriptor['iut_pairing_complete_status'] != '0' 412 test_ok &= test_descriptor['iut_pairing_complete_reason'] == test_descriptor['tester_failure'] 413 else: 414 test_ok &= test_descriptor['iut_pairing_complete_status'] == '0' 415 416 # check pairing method 417 if 'method' in test_descriptor: 418 method = test_descriptor['method'] 419 if 'SCJW' in test_name and (method != 'Just Works' and method != 'Numeric Comparison'): 420 test_ok = False 421 if 'SCPK' in test_name and method != 'Passkey Entry': 422 test_ok = False 423 if 'SCOB' in test_name and method != 'OOB': 424 test_ok = False 425 426 # rename folder if test not ok 427 if not test_ok: 428 test_folder = 'TEST_FAIL-' + test_folder 429 430 # move hci logs into result folder 431 os.makedirs(test_folder) 432 shutil.move(iut.get_packet_log(), test_folder + '/iut.pklg') 433 shutil.move(tester.get_packet_log(), test_folder + '/tester.pklg') 434 435 # write config 436 with open (test_folder + '/config.txt', "wt") as fout: 437 test_descriptor['iut_bd_addr'] = iut.get_bd_addr() 438 test_descriptor['tester_bd_addr'] = tester.get_bd_addr() 439 write_config(fout, test_descriptor) 440 441 except KeyboardInterrupt: 442 print('Interrupted') 443 test_descriptor['interrupted'] = 'EXIT' 444 445 # shutdown 446 for node in nodes: 447 node.terminate() 448 print("Done\n") 449 450 451# read tests 452with open('sm_test.csv') as csvfile: 453 reader = csv.DictReader(csvfile) 454 for test_descriptor in reader: 455 test_name = test_descriptor['name'] 456 457 if test_name.startswith('#'): 458 continue 459 if len(test_name) == 0: 460 continue 461 462 test_folder = test_name.replace('/', '_') 463 test_descriptor['test_folder'] = test_folder 464 465 # skip test if regenerate not requested 466 if os.path.exists(test_folder): 467 if regenerate: 468 shutil.rmtree(test_folder) 469 else: 470 print('Test: %s (completed)' % test_name) 471 continue 472 473 # run test 474 print(test_descriptor) 475 run_test(test_descriptor) 476 477 if 'interrupted' in test_descriptor: 478 break 479