xref: /btstack/test/security_manager_sc/sm_test.py (revision 169aa8795fb494e8711bb6e918319bf2b13fa72f)
1#!/usr/bin/env python3
2#
3# Perform Security Manager Test Cases using two BTstack instances
4#
5# Copyright 2018 BlueKitchen GmbH
6#
7
8import os
9import subprocess
10import sys
11import time
12import signal
13import select
14import fcntl
15import csv
16import shutil
17import datetime
18
19io_capabilities = [
20'IO_CAPABILITY_DISPLAY_ONLY',
21'IO_CAPABILITY_DISPLAY_YES_NO',
22'IO_CAPABILITY_KEYBOARD_ONLY',
23'IO_CAPABILITY_NO_INPUT_NO_OUTPUT',
24'IO_CAPABILITY_KEYBOARD_DISPLAY']
25
26SM_AUTHREQ_NO_BONDING        = 0x00
27SM_AUTHREQ_BONDING           = 0x01
28SM_AUTHREQ_MITM_PROTECTION   = 0x04
29SM_AUTHREQ_SECURE_CONNECTION = 0x08
30SM_AUTHREQ_KEYPRESS          = 0x10
31
32failures = [
33'',
34'PASSKEY_ENTRY_FAILED',
35'OOB_NOT_AVAILABLE',
36'AUTHENTHICATION_REQUIREMENTS',
37'CONFIRM_VALUE_FAILED',
38'PAIRING_NOT_SUPPORTED',
39'ENCRYPTION_KEY_SIZE',
40'COMMAND_NOT_SUPPORTED',
41'UNSPECIFIED_REASON',
42'REPEATED_ATTEMPTS',
43'INVALID_PARAMETERS',
44'DHKEY_CHECK_FAILED',
45'NUMERIC_COMPARISON_FAILED',
46]
47
48# tester config
49debug      = False
50regenerate = False
51usb_paths = ['02-04', '02-03']
52
53class Node:
54
55    def __init__(self):
56        self.name = 'node'
57        self._got_line = False
58        self.peer_addr = None
59        self.failure   = None
60
61    def get_name(self):
62        return self.name
63
64    def set_name(self, name):
65        self.name = name
66
67    def set_auth_req(self, auth_req):
68        self.auth_req = auth_req
69
70    def set_io_capabilities(self, io_capabilities):
71        self.io_capabilities = io_capabilities
72
73    def set_oob_data(self, oob_data):
74        self.oob_data = oob_data
75
76    def set_failure(self, failure):
77        self.failure = failure
78
79    def set_usb_path(self, path):
80        self.usb_path = path
81
82    def get_stdout_fd(self):
83        return self.stdout.fileno()
84
85    def read_stdout(self):
86        c = os.read(self.stdout.fileno(), 1).decode("utf-8")
87        if len(c) == 0:
88            return
89        if c in '\n\r':
90            if len(self.linebuffer) > 0:
91                self._got_line = True
92        else:
93            self.linebuffer += c
94
95    def got_line(self):
96        return self._got_line
97
98    def fetch_line(self):
99        line = self.linebuffer
100        self.linebuffer = ''
101        self._got_line = False
102        return line
103
104    def start_process(self):
105        args = ['build-coverage/sm_test', '-u', self.usb_path, '-c']
106        if self.peer_addr != None:
107            args.append('-a')
108            args.append(self.peer_addr)
109        if self.failure != None:
110            args.append('-f')
111            args.append(self.failure)
112        args.append('-i')
113        args.append(self.io_capabilities)
114        args.append('-r')
115        args.append(self.auth_req)
116        args.append('-o')
117        args.append(self.oob_data)
118        print('%s - "%s"' % (self.name, ' '.join(args)))
119        self.p = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
120        (self.stdin, self.stdout) = (self.p.stdin, self.p.stdout)
121        self.linebuffer = ''
122
123    def set_packet_log(self, path):
124        self.packet_log = path
125
126    def get_packet_log(self):
127        return self.packet_log
128
129    def set_bd_addr(self, addr):
130        self.bd_addr = addr
131
132    def get_bd_addr(self):
133        return self.bd_addr
134
135    def set_peer_addr(self, addr):
136        self.peer_addr = addr
137
138    def write(self, string):
139        print("CMD -> %s: %s" % (self.name, string))
140        self.stdin.write(string.encode('utf-8'))
141
142    def terminate(self):
143        self.write('x')
144        # wait for 'EXIT' message indicating coverage data was written
145        while not self.got_line():
146            self.read_stdout()
147        self.p.terminate()
148
149def run(test_descriptor, nodes):
150    state = 'W4_SLAVE_OOB_RANDOM'
151    pairing_complete = []
152    while True:
153        # create map fd -> node
154        nodes_by_fd = { node.get_stdout_fd():node for node in nodes}
155        read_fds = nodes_by_fd.keys()
156        (read_ready, write_ready, exception_ready) = select.select(read_fds,[],[])
157        for fd in read_ready:
158            node = nodes_by_fd[fd]
159            node.read_stdout()
160            if node.got_line():
161                line = node.fetch_line()
162                if debug:
163                    print('%s: %s' % (node.get_name(), line))
164                if line.startswith('Packet Log: '):
165                    path = line.split(': ')[1]
166                    node.set_packet_log(path)
167                    print('%s log %s' % (node.get_name(), path))
168                elif line.startswith('BD_ADDR: '):
169                    addr = line.split(': ')[1]
170                    node.set_bd_addr(addr)
171                    print('%s started' % node.get_name())
172                elif line.startswith('LOCAL_OOB_CONFIRM:'):
173                    confirm = line.split('OOB_CONFIRM: ')[1]
174                    test_descriptor[node.get_name()+'_oob_confirm'] = confirm
175                elif line.startswith('LOCAL_OOB_RANDOM:'):
176                    random = line.split('OOB_RANDOM: ')[1]
177                    test_descriptor[node.get_name()+'_oob_random'] = random
178                    print('%s OOB Random: %s' % (node.get_name(), random))
179                    if state == 'W4_SLAVE_OOB_RANDOM':
180                        # peripheral started, start central
181                        state = 'W4_MASTER_OOB_RANDOM'
182                        master_role = test_descriptor['master_role']
183                        master = Node()
184                        # configure master
185                        master.set_name(master_role)
186                        master.usb_path = usb_paths[1]
187                        master.set_peer_addr(addr)
188                        master.set_auth_req(test_descriptor[master_role + '_auth_req'])
189                        master.set_io_capabilities(test_descriptor[master_role + '_io_capabilities'])
190                        master.set_oob_data(test_descriptor[master_role + '_oob_data'])
191                        if master_role == 'tester':
192                            master.set_failure(test_descriptor['tester_failure'])
193                        master.start_process()
194                        nodes.append(master)
195                        #
196                        if node.get_name() == 'iut':
197                            iut_node = node
198                            tester_node = master
199                        else:
200                            iut_node = master
201                            tester_node = node
202                    elif state == 'W4_MASTER_OOB_RANDOM':
203                        # central started, start connecting
204                        node.write('c')
205                        print('start to connect')
206                        state = 'W4_CONNECTED'
207                elif line.startswith('CONNECTED:'):
208                    print('%s connected' % node.get_name())
209                    if state == 'W4_CONNECTED' and node == nodes[1]:
210                        # simulate OOK exchange if requested
211                        if test_descriptor['tester_oob_data'] == '1':
212                            print('Simulate IUT -> Tester OOB')
213                            tester_node.write('o' + test_descriptor['iut_oob_confirm'])
214                            tester_node.write('r' + test_descriptor['iut_oob_random'])
215                            test_descriptor['method'] = 'OOB'
216                        if test_descriptor['iut_oob_data'] == '1':
217                            print('Simulate Tester -> IUT OOB')
218                            iut_node.write('o' + test_descriptor['tester_oob_confirm'])
219                            iut_node.write('r' + test_descriptor['tester_oob_random'])
220                            test_descriptor['method'] = 'OOB'
221                        node.write('p')
222                        state = 'W4_PAIRING'
223                elif line.startswith('JUST_WORKS_REQUEST'):
224                    print('%s just works requested' % node.get_name())
225                    test_descriptor['method'] = 'Just Works'
226                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '12':
227                        print('Decline bonding')
228                        node.write('d')
229                    else:
230                        print('Accept bonding')
231                        node.write('a')
232                elif line.startswith('NUMERIC_COMPARISON_REQUEST'):
233                    print('%s numeric comparison requested' % node.get_name())
234                    test_descriptor['method'] = 'Numeric Comparison'
235                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '12':
236                        print('Decline bonding')
237                        node.write('d')
238                    else:
239                        print('Accept bonding')
240                        node.write('a')
241                elif line.startswith('PASSKEY_DISPLAY_NUMBER'):
242                    passkey = line.split(': ')[1]
243                    print('%s passkey display %s' % (node.get_name(), passkey))
244                    test_descriptor['passkey'] = passkey
245                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '1':
246                        print('Decline bonding')
247                        node.write('d')
248                    if state == 'W4_PAIRING':
249                        state = 'W4_PASSKEY_INPUT'
250                    else:
251                        test_descriptor['waiting_node'].write(test_descriptor['passkey'])
252                elif line.startswith('PASSKEY_INPUT_NUMBER'):
253                    test_descriptor['method'] = 'Passkey Entry'
254                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '1':
255                        print('Decline bonding')
256                        node.write('d')
257                    elif state == 'W4_PASSKEY_INPUT':
258                        node.write(test_descriptor['passkey'])
259                    else:
260                        test_descriptor['waiting_node'] = node
261                        state = 'W4_PASSKEY_DISPLAY'
262                elif line.startswith('PAIRING_COMPLETE'):
263                    result = line.split(': ')[1]
264                    (status,reason) = result.split(',')
265                    test_descriptor[node.get_name()+'_pairing_complete_status'] = status
266                    test_descriptor[node.get_name()+'_pairing_complete_reason'] = reason
267                    print('%s pairing complete: status %s, reason %s' % (node.get_name(), status, reason))
268                    pairing_complete.append(node.get_name())
269                    # pairing complete?
270                    if len(pairing_complete) == 2:
271                        # on error, test is finished, else wait for notify
272                        if status != '0':
273                            return
274                elif line.startswith('DISCONNECTED'):
275                    # Abort on unexpected disconnect
276                    print('%s unexpected disconnect' % node.get_name())
277                    test_descriptor['error'] = 'DISCONNECTED'
278                    return
279                elif line.startswith('COUNTER'):
280                    print('%s notification received' % node.get_name())
281                    return;
282
283def write_config(fout, test_descriptor):
284    attributes = [
285        'header',
286        '---',
287        'bd_addr',
288        'role',
289        'failure',
290        'io_capabilities',
291        'mitm',
292        'secure_connection',
293        'keypress',
294        'rfu',
295        'oob_data',
296        'method',
297        'passkey',
298        'pairing_complete_status',
299        'pairing_complete_reason']
300
301    # header
302    fout.write('Test: %s\n' % test_descriptor['name'])
303    fout.write('Date: %s\n' % str(datetime.datetime.now()))
304    fout.write('\n')
305    attribute_len = 28
306    value_len     = 35
307    format_string = '%%-%us|%%-%us|%%-%us\n' % (attribute_len, value_len, value_len)
308    for attribute in attributes:
309        name = attribute
310        if attribute == 'header':
311            name  = 'Attribute'
312            iut   = 'IUT'
313            tester = 'Tester'
314        elif attribute == '---':
315            name   = '-' * attribute_len
316            iut    = '-' * value_len
317            tester = '-' * value_len
318        elif attribute == 'io_capabilities':
319            iut    = io_capabilities[int(test_descriptor['iut_io_capabilities'   ])]
320            tester = io_capabilities[int(test_descriptor['tester_io_capabilities'])]
321        elif attribute == 'mitm':
322            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_MITM_PROTECTION)   >> 2
323            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_MITM_PROTECTION)   >> 2
324        elif attribute == 'secure_connection':
325            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_SECURE_CONNECTION) >> 3
326            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_SECURE_CONNECTION) >> 3
327        elif attribute == 'keypress':
328            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_KEYPRESS)          >> 4
329            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_KEYPRESS)          >> 4
330        elif attribute == 'rfu':
331            iut    = (int(test_descriptor['iut_auth_req'   ]) & 192) >> 6
332            tester = (int(test_descriptor['tester_auth_req']) & 192) >> 6
333        elif attribute == 'passkey':
334            if not 'passkey' in test_descriptor:
335                continue
336            iut    = test_descriptor['passkey']
337            tester = test_descriptor['passkey']
338        elif attribute == 'method':
339            if not 'method' in test_descriptor:
340                continue
341            iut    = test_descriptor['method']
342            tester = test_descriptor['method']
343        elif attribute == 'failure':
344            iut    = ''
345            tester = failures[int(test_descriptor['tester_failure'])]
346        else:
347            iut    = test_descriptor['iut_'    + attribute]
348            tester = test_descriptor['tester_' + attribute]
349        fout.write(format_string % (name, iut, tester))
350
351def run_test(test_descriptor):
352    # shutdown previous sm_test instances
353    try:
354        subprocess.run(['killall', 'sm_test'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
355    except:
356        pass
357
358    # trash all bonding informatino
359    try:
360        subprocess.call(['rm', '-f', '/tmp/btstack_*'])
361    except:
362        pass
363
364    test_name = test_descriptor['name']
365    print('Test: %s' % test_name,  file=sys.stderr)
366
367    if '/SLA/' in test_descriptor['name']:
368        iut_role    = 'responder'
369        tester_role = 'initiator'
370        slave_role  = 'iut'
371        master_role = 'tester'
372    else:
373        iut_role    = 'initiator'
374        tester_role = 'responder'
375        slave_role  = 'tester'
376        master_role = 'iut'
377
378    test_descriptor['iut_role'   ] = iut_role
379    test_descriptor['tester_role'] = tester_role
380    test_descriptor['master_role'] = master_role
381    test_descriptor['slave_role']  = slave_role
382
383    slave = Node()
384
385    # configure slave
386    slave.set_name(slave_role)
387    slave.usb_path = usb_paths[0]
388    slave.set_auth_req(test_descriptor[slave_role + '_auth_req'])
389    slave.set_io_capabilities(test_descriptor[slave_role + '_io_capabilities'])
390    slave.set_oob_data(test_descriptor[slave_role + '_oob_data'])
391    if slave_role == 'tester':
392        slave.set_failure(test_descriptor['tester_failure'])
393
394    # start up slave
395    slave.start_process()
396
397    nodes = [slave]
398
399    # run test
400    try:
401        run(test_descriptor, nodes)
402        if 'error' in test_descriptor:
403            sys.exit()
404            raise NameError(test_descriptor['error'])
405
406        # identify iut and tester
407        if iut_role == 'responder':
408            iut    = nodes[0]
409            tester = nodes[1]
410        else:
411            iut    = nodes[1]
412            tester = nodes[0]
413
414        test_folder =  test_descriptor['test_folder']
415
416        # check result
417        test_ok = True
418        if  test_descriptor['tester_failure'] != '0':
419            # expect status != 0 if tester_failure set
420            test_ok &= test_descriptor['iut_pairing_complete_status'] != '0'
421            test_ok &= test_descriptor['iut_pairing_complete_reason'] == test_descriptor['tester_failure']
422        else:
423            test_ok &= test_descriptor['iut_pairing_complete_status'] == '0'
424
425        # check pairing method
426        if 'method' in test_descriptor:
427            method = test_descriptor['method']
428            if 'SCJW' in test_name and (method != 'Just Works' and method != 'Numeric Comparison'):
429                test_ok = False
430            if 'SCPK' in test_name and method != 'Passkey Entry':
431                test_ok = False
432            if 'SCOB' in test_name and method != 'OOB':
433                test_ok = False
434
435        # rename folder if test not ok
436        if not test_ok:
437            test_folder = 'TEST_FAIL-' + test_folder
438
439        # move hci logs into result folder
440        os.makedirs(test_folder)
441        shutil.move(iut.get_packet_log(),    test_folder + '/iut.pklg')
442        shutil.move(tester.get_packet_log(), test_folder + '/tester.pklg')
443
444        # write config
445        with open (test_folder + '/config.txt', "wt") as fout:
446            test_descriptor['iut_bd_addr']    = iut.get_bd_addr()
447            test_descriptor['tester_bd_addr'] = tester.get_bd_addr()
448            write_config(fout, test_descriptor)
449
450    except KeyboardInterrupt:
451        print('Interrupted')
452        test_descriptor['interrupted'] = 'EXIT'
453
454    except NameError:
455        print('Run-time error')
456
457    # shutdown
458    for node in nodes:
459        node.terminate()
460    print("Done\n")
461
462
463# read tests
464with open('sm_test.csv') as csvfile:
465    reader = csv.DictReader(csvfile)
466    for test_descriptor in reader:
467        test_name = test_descriptor['name']
468
469        if test_name.startswith('#'):
470            continue
471        if len(test_name) == 0:
472            continue
473
474        test_folder = test_name.replace('/', '_')
475        test_descriptor['test_folder'] = test_folder
476
477        # skip test if regenerate not requested
478        if os.path.exists(test_folder):
479            if regenerate:
480                shutil.rmtree(test_folder)
481            else:
482                print('Test: %s (completed)' % test_name)
483                continue
484
485        # run test (max 10 times)
486        tries = 10
487        done = False
488        while not done:
489            print(test_descriptor)
490            run_test(test_descriptor)
491            tries = tries - 1
492            done = True
493
494            # escalate CTRL-C
495            if 'interrupted' in test_descriptor:
496                break
497
498            # repeat on 'error'
499            if 'error' in test_descriptor:
500                del test_descriptor['error']
501                if tries > 0:
502                    done = False
503
504        if 'interrupted' in test_descriptor:
505            break
506