xref: /btstack/test/security_manager_sc/sm_test.py (revision f5228c62dfa305f00bfdabbb48dafdc0b8919667)
1#!/usr/bin/env python3
2#
3# Perform Security Manager Test Cases using two BTstack instances
4#
5# Copyright 2018 BlueKitchen GmbH
6#
7
8import os
9import subprocess
10import sys
11import time
12import signal
13import select
14import fcntl
15import csv
16import shutil
17import datetime
18
19io_capabilities = [
20'IO_CAPABILITY_DISPLAY_ONLY',
21'IO_CAPABILITY_DISPLAY_YES_NO',
22'IO_CAPABILITY_KEYBOARD_ONLY',
23'IO_CAPABILITY_NO_INPUT_NO_OUTPUT',
24'IO_CAPABILITY_KEYBOARD_DISPLAY']
25
26SM_AUTHREQ_NO_BONDING        = 0x00
27SM_AUTHREQ_BONDING           = 0x01
28SM_AUTHREQ_MITM_PROTECTION   = 0x04
29SM_AUTHREQ_SECURE_CONNECTION = 0x08
30SM_AUTHREQ_KEYPRESS          = 0x10
31
32failures = [
33'',
34'PASSKEY_ENTRY_FAILED',
35'OOB_NOT_AVAILABLE',
36'AUTHENTHICATION_REQUIREMENTS',
37'CONFIRM_VALUE_FAILED',
38'PAIRING_NOT_SUPPORTED',
39'ENCRYPTION_KEY_SIZE',
40'COMMAND_NOT_SUPPORTED',
41'UNSPECIFIED_REASON',
42'REPEATED_ATTEMPTS',
43'INVALID_PARAMETERS',
44'DHKEY_CHECK_FAILED',
45'NUMERIC_COMPARISON_FAILED',
46]
47
48# tester config
49debug      = False
50regenerate = False
51# usb_paths = ['4', '6']
52usb_paths = ['1', '2']
53
54class Node:
55
56    def __init__(self):
57        self.name = 'node'
58        self._got_line = False
59        self.peer_addr = None
60        self.failure   = None
61
62    def get_name(self):
63        return self.name
64
65    def set_name(self, name):
66        self.name = name
67
68    def set_auth_req(self, auth_req):
69        self.auth_req = auth_req
70
71    def set_io_capabilities(self, io_capabilities):
72        self.io_capabilities = io_capabilities
73
74    def set_oob_data(self, oob_data):
75        self.oob_data = oob_data
76
77    def set_failure(self, failure):
78        self.failure = failure
79
80    def set_usb_path(self, path):
81        self.usb_path = path
82
83    def get_stdout_fd(self):
84        return self.stdout.fileno()
85
86    def read_stdout(self):
87        c = os.read(self.stdout.fileno(), 1).decode("utf-8")
88        if len(c) == 0:
89            return
90        if c in '\n\r':
91            if len(self.linebuffer) > 0:
92                self._got_line = True
93        else:
94            self.linebuffer += c
95
96    def got_line(self):
97        return self._got_line
98
99    def fetch_line(self):
100        line = self.linebuffer
101        self.linebuffer = ''
102        self._got_line = False
103        return line
104
105    def start_process(self):
106        args = ['./sm_test', '-u', self.usb_path]
107        if self.peer_addr != None:
108            args.append('-a')
109            args.append(self.peer_addr)
110        if self.failure != None:
111            args.append('-f')
112            args.append(self.failure)
113        args.append('-i')
114        args.append(self.io_capabilities)
115        args.append('-r')
116        args.append(self.auth_req)
117        args.append('-o')
118        args.append(self.oob_data)
119        print('%s - "%s"' % (self.name, ' '.join(args)))
120        self.p = subprocess.Popen(args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
121        (self.stdin, self.stdout) = (self.p.stdin, self.p.stdout)
122        self.linebuffer = ''
123
124    def set_packet_log(self, path):
125        self.packet_log = path
126
127    def get_packet_log(self):
128        return self.packet_log
129
130    def set_bd_addr(self, addr):
131        self.bd_addr = addr
132
133    def get_bd_addr(self):
134        return self.bd_addr
135
136    def set_peer_addr(self, addr):
137        self.peer_addr = addr
138
139    def write(self, string):
140        self.stdin.write(string.encode('utf-8'))
141
142    def terminate(self):
143        self.write('x')
144        # wait for 'EXIT' message indicating coverage data was written
145        while not self.got_line():
146            self.read_stdout()
147        self.p.terminate()
148
149def run(test_descriptor, nodes):
150    state = 'W4_SLAVE_BD_ADDR'
151    pairing_complete = []
152    while True:
153        # create map fd -> node
154        nodes_by_fd = { node.get_stdout_fd():node for node in nodes}
155        read_fds = nodes_by_fd.keys()
156        (read_ready, write_ready, exception_ready) = select.select(read_fds,[],[])
157        for fd in read_ready:
158            node = nodes_by_fd[fd]
159            node.read_stdout()
160            if node.got_line():
161                line = node.fetch_line()
162                if debug:
163                    print('%s: %s' % (node.get_name(), line))
164                if line.startswith('Packet Log: '):
165                    path = line.split(': ')[1]
166                    node.set_packet_log(path)
167                    print('%s log %s' % (node.get_name(), path))
168                elif line.startswith('BD_ADDR: '):
169                    addr = line.split(': ')[1]
170                    node.set_bd_addr(addr)
171                    print('%s started' % node.get_name())
172                    if state == 'W4_SLAVE_BD_ADDR':
173                        # peripheral started, start central
174                        state = 'W4_MASTER_BD_ADDR'
175                        master_role = test_descriptor['master_role']
176                        master = Node()
177                        # configure master
178                        master.set_name(master_role)
179                        master.usb_path = usb_paths[1]
180                        master.set_peer_addr(addr)
181                        master.set_auth_req(test_descriptor[master_role + '_auth_req'])
182                        master.set_io_capabilities(test_descriptor[master_role + '_io_capabilities'])
183                        master.set_oob_data(test_descriptor[master_role + '_oob_data'])
184                        if master_role == 'tester':
185                            master.set_failure(test_descriptor['tester_failure'])
186                        master.start_process()
187                        nodes.append(master)
188                        #
189                        if node.get_name() == 'iut':
190                            iut_node = node
191                            tester_node = master
192                        else:
193                            iut_node = master
194                            tester_node = node
195                    elif state == 'W4_MASTER_BD_ADDR':
196                        # central started, start connecting
197                        node.write('c')
198                        print('start to connect')
199                        state = 'W4_CONNECTED'
200                elif line.startswith('LOCAL_OOB_CONFIRM:'):
201                    confirm = line.split('OOB_CONFIRM: ')[1]
202                    test_descriptor[node.get_name()+'_oob_confirm'] = confirm
203                elif line.startswith('LOCAL_OOB_RANDOM:'):
204                    random = line.split('OOB_RANDOM: ')[1]
205                    test_descriptor[node.get_name()+'_oob_random'] = random
206                elif line.startswith('CONNECTED:'):
207                    print('%s connected' % node.get_name())
208                    if state == 'W4_CONNECTED' and node == nodes[1]:
209                        # simulate OOK exchange if requested
210                        if test_descriptor['tester_oob_data'] == '1':
211                            print('Simulate IUT -> Tester OOB')
212                            tester_node.write('o' + test_descriptor['iut_oob_confirm'])
213                            tester_node.write('r' + test_descriptor['iut_oob_random'])
214                            test_descriptor['method'] = 'OOB'
215                        if test_descriptor['iut_oob_data'] == '1':
216                            print('Simulate Tester -> IUT OOB')
217                            iut_node.write('o' + test_descriptor['tester_oob_confirm'])
218                            iut_node.write('r' + test_descriptor['tester_oob_random'])
219                            test_descriptor['method'] = 'OOB'
220                        node.write('p')
221                        state = 'W4_PAIRING'
222                elif line.startswith('JUST_WORKS_REQUEST'):
223                    print('%s just works requested' % node.get_name())
224                    test_descriptor['method'] = 'Just Works'
225                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '12':
226                        print('Decline bonding')
227                        node.write('d')
228                    else:
229                        print('Accept bonding')
230                        node.write('a')
231                elif line.startswith('NUMERIC_COMPARISON_REQUEST'):
232                    print('%s numeric comparison requested' % node.get_name())
233                    test_descriptor['method'] = 'Numeric Comparison'
234                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '12':
235                        print('Decline bonding')
236                        node.write('d')
237                    else:
238                        print('Accept bonding')
239                        node.write('a')
240                elif line.startswith('PASSKEY_DISPLAY_NUMBER'):
241                    passkey = line.split(': ')[1]
242                    print('%s passkey display %s' % (node.get_name(), passkey))
243                    test_descriptor['passkey'] = passkey
244                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '1':
245                        print('Decline bonding')
246                        node.write('d')
247                    if state == 'W4_PAIRING':
248                        state = 'W4_PASSKEY_INPUT'
249                    else:
250                        test_descriptor['waiting_node'].write(test_descriptor['passkey'])
251                elif line.startswith('PASSKEY_INPUT_NUMBER'):
252                    test_descriptor['method'] = 'Passkey Entry'
253                    if node.get_name() == 'tester' and  test_descriptor['tester_failure'] == '1':
254                        print('Decline bonding')
255                        node.write('d')
256                    elif state == 'W4_PASSKEY_INPUT':
257                        node.write(test_descriptor['passkey'])
258                    else:
259                        test_descriptor['waiting_node'] = node
260                        state = 'W4_PASSKEY_DISPLAY'
261                elif line.startswith('PAIRING_COMPLETE'):
262                    result = line.split(': ')[1]
263                    (status,reason) = result.split(',')
264                    test_descriptor[node.get_name()+'_pairing_complete_status'] = status
265                    test_descriptor[node.get_name()+'_pairing_complete_reason'] = reason
266                    print('%s pairing complete: status %s, reason %s' % (node.get_name(), status, reason))
267                    pairing_complete.append(node.get_name())
268                    # pairing complete?
269                    if len(pairing_complete) == 2:
270                        # on error, test is finished, else wait for notify
271                        if status != '0':
272                            return
273                elif line.startswith('COUNTER'):
274                    print('%s notification received' % node.get_name())
275                    return;
276
277def write_config(fout, test_descriptor):
278    attributes = [
279        'header',
280        '---',
281        'bd_addr',
282        'role',
283        'failure',
284        'io_capabilities',
285        'mitm',
286        'secure_connection',
287        'keypress',
288        'rfu',
289        'oob_data',
290        'method',
291        'passkey',
292        'pairing_complete_status',
293        'pairing_complete_reason']
294
295    # header
296    fout.write('Test: %s\n' % test_descriptor['name'])
297    fout.write('Date: %s\n' % str(datetime.datetime.now()))
298    fout.write('\n')
299    attribute_len = 28
300    value_len     = 35
301    format_string = '%%-%us|%%-%us|%%-%us\n' % (attribute_len, value_len, value_len)
302    for attribute in attributes:
303        name = attribute
304        if attribute == 'header':
305            name  = 'Attribute'
306            iut   = 'IUT'
307            tester = 'Tester'
308        elif attribute == '---':
309            name   = '-' * attribute_len
310            iut    = '-' * value_len
311            tester = '-' * value_len
312        elif attribute == 'io_capabilities':
313            iut    = io_capabilities[int(test_descriptor['iut_io_capabilities'   ])]
314            tester = io_capabilities[int(test_descriptor['tester_io_capabilities'])]
315        elif attribute == 'mitm':
316            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_MITM_PROTECTION)   >> 2
317            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_MITM_PROTECTION)   >> 2
318        elif attribute == 'secure_connection':
319            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_SECURE_CONNECTION) >> 3
320            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_SECURE_CONNECTION) >> 3
321        elif attribute == 'keypress':
322            iut    = (int(test_descriptor['iut_auth_req'   ]) & SM_AUTHREQ_KEYPRESS)          >> 4
323            tester = (int(test_descriptor['tester_auth_req']) & SM_AUTHREQ_KEYPRESS)          >> 4
324        elif attribute == 'rfu':
325            iut    = (int(test_descriptor['iut_auth_req'   ]) & 192) >> 6
326            tester = (int(test_descriptor['tester_auth_req']) & 192) >> 6
327        elif attribute == 'passkey':
328            if not 'passkey' in test_descriptor:
329                continue
330            iut    = test_descriptor['passkey']
331            tester = test_descriptor['passkey']
332        elif attribute == 'method':
333            if not 'method' in test_descriptor:
334                continue
335            iut    = test_descriptor['method']
336            tester = test_descriptor['method']
337        elif attribute == 'failure':
338            iut    = ''
339            tester = failures[int(test_descriptor['tester_failure'])]
340        else:
341            iut    = test_descriptor['iut_'    + attribute]
342            tester = test_descriptor['tester_' + attribute]
343        fout.write(format_string % (name, iut, tester))
344
345def run_test(test_descriptor):
346    # shutdown previous sm_test instances
347    try:
348        subprocess.call("killall sm_test", shell = True)
349    except:
350        pass
351
352    # trash all bonding informatino
353    try:
354        subprocess.call("rm /tmp/btstack_*", shell = True)
355    except:
356        pass
357
358    test_name = test_descriptor['name']
359    print('Test: %s' % test_name)
360
361    if '/SLA/' in test_descriptor['name']:
362        iut_role    = 'responder'
363        tester_role = 'initiator'
364        slave_role  = 'iut'
365        master_role = 'tester'
366    else:
367        iut_role    = 'initiator'
368        tester_role = 'responder'
369        slave_role  = 'tester'
370        master_role = 'iut'
371
372    test_descriptor['iut_role'   ] = iut_role
373    test_descriptor['tester_role'] = tester_role
374    test_descriptor['master_role'] = master_role
375    test_descriptor['slave_role']  = slave_role
376
377    slave = Node()
378
379    # configure slave
380    slave.set_name(slave_role)
381    slave.usb_path = usb_paths[0]
382    slave.set_auth_req(test_descriptor[slave_role + '_auth_req'])
383    slave.set_io_capabilities(test_descriptor[slave_role + '_io_capabilities'])
384    slave.set_oob_data(test_descriptor[slave_role + '_oob_data'])
385    if slave_role == 'tester':
386        slave.set_failure(test_descriptor['tester_failure'])
387
388    # start up slave
389    slave.start_process()
390
391    nodes = [slave]
392
393    # run test
394    try:
395        run(test_descriptor, nodes)
396
397        # identify iut and tester
398        if iut_role == 'responder':
399            iut    = nodes[0]
400            tester = nodes[1]
401        else:
402            iut    = nodes[1]
403            tester = nodes[0]
404
405        test_folder =  test_descriptor['test_folder']
406
407        # check result
408        test_ok = True
409        if  test_descriptor['tester_failure'] != '0':
410            # expect status != 0 if tester_failure set
411            test_ok &= test_descriptor['iut_pairing_complete_status'] != '0'
412            test_ok &= test_descriptor['iut_pairing_complete_reason'] == test_descriptor['tester_failure']
413        else:
414            test_ok &= test_descriptor['iut_pairing_complete_status'] == '0'
415
416        # check pairing method
417        if 'method' in test_descriptor:
418            method = test_descriptor['method']
419            if 'SCJW' in test_name and (method != 'Just Works' and method != 'Numeric Comparison'):
420                test_ok = False
421            if 'SCPK' in test_name and method != 'Passkey Entry':
422                test_ok = False
423            if 'SCOB' in test_name and method != 'OOB':
424                test_ok = False
425
426        # rename folder if test not ok
427        if not test_ok:
428            test_folder = 'TEST_FAIL-' + test_folder
429
430        # move hci logs into result folder
431        os.makedirs(test_folder)
432        shutil.move(iut.get_packet_log(),    test_folder + '/iut.pklg')
433        shutil.move(tester.get_packet_log(), test_folder + '/tester.pklg')
434
435        # write config
436        with open (test_folder + '/config.txt', "wt") as fout:
437            test_descriptor['iut_bd_addr']    = iut.get_bd_addr()
438            test_descriptor['tester_bd_addr'] = tester.get_bd_addr()
439            write_config(fout, test_descriptor)
440
441    except KeyboardInterrupt:
442        print('Interrupted')
443        test_descriptor['interrupted'] = 'EXIT'
444
445    # shutdown
446    for node in nodes:
447        node.terminate()
448    print("Done\n")
449
450
451# read tests
452with open('sm_test.csv') as csvfile:
453    reader = csv.DictReader(csvfile)
454    for test_descriptor in reader:
455        test_name = test_descriptor['name']
456
457        if test_name.startswith('#'):
458            continue
459        if len(test_name) == 0:
460            continue
461
462        test_folder = test_name.replace('/', '_')
463        test_descriptor['test_folder'] = test_folder
464
465        # skip test if regenerate not requested
466        if os.path.exists(test_folder):
467            if regenerate:
468                shutil.rmtree(test_folder)
469            else:
470                print('Test: %s (completed)' % test_name)
471                continue
472
473        # run test
474        print(test_descriptor)
475        run_test(test_descriptor)
476
477        if 'interrupted' in test_descriptor:
478            break
479