xref: /aosp_15_r20/external/openthread/tests/scripts/thread-cert/node.py (revision cfb92d1480a9e65faed56933e9c12405f45898b4)
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2016, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import json
31import binascii
32import ipaddress
33import logging
34import os
35import re
36import shlex
37import socket
38import subprocess
39import sys
40import time
41import traceback
42import typing
43import unittest
44from ipaddress import IPv6Address, IPv6Network
45from typing import Union, Dict, Optional, List, Any
46
47import pexpect
48import pexpect.popen_spawn
49
50import config
51import simulator
52import thread_cert
53
54PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0"))
55
56INFRA_DNS64 = int(os.getenv('NAT64', 0))
57
58
59class OtbrDocker:
60    RESET_DELAY = 3
61
62    _socat_proc = None
63    _ot_rcp_proc = None
64    _docker_proc = None
65    _border_routing_counters = None
66
67    def __init__(self, nodeid: int, backbone_network: str, **kwargs):
68        self.verbose = int(float(os.getenv('VERBOSE', 0)))
69
70        assert backbone_network is not None
71        self.backbone_network = backbone_network
72        try:
73            self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid)
74            self._prepare_ot_rcp_sim(nodeid)
75            self._launch_docker()
76        except Exception:
77            traceback.print_exc()
78            self.destroy()
79            raise
80
81    def _prepare_ot_rcp_sim(self, nodeid: int):
82        self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'],
83                                            stderr=subprocess.PIPE,
84                                            stdin=subprocess.DEVNULL,
85                                            stdout=subprocess.DEVNULL)
86
87        line = self._socat_proc.stderr.readline().decode('ascii').strip()
88        self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:]
89        line = self._socat_proc.stderr.readline().decode('ascii').strip()
90        self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:]
91        logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}")
92
93        ot_rcp_path = self._get_ot_rcp_path()
94        self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}",
95                                             shell=True,
96                                             stdin=subprocess.DEVNULL,
97                                             stdout=subprocess.DEVNULL,
98                                             stderr=subprocess.DEVNULL)
99
100        try:
101            self._ot_rcp_proc.wait(1)
102        except subprocess.TimeoutExpired:
103            # We expect ot-rcp not to quit in 1 second.
104            pass
105        else:
106            raise Exception(f"ot-rcp {nodeid} exited unexpectedly!")
107
108    def _get_ot_rcp_path(self) -> str:
109        srcdir = os.environ['top_builddir']
110        path = '%s/examples/apps/ncp/ot-rcp' % srcdir
111        logging.info("ot-rcp path: %s", path)
112        return path
113
114    def _launch_docker(self):
115        logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}')
116        subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True)
117        CI_ENV = os.getenv('CI_ENV', '').split()
118        dns = ['--dns=127.0.0.1'] if INFRA_DNS64 == 1 else ['--dns=8.8.8.8']
119        nat64_prefix = ['--nat64-prefix', '2001:db8:1:ffff::/96'] if INFRA_DNS64 == 1 else []
120        os.makedirs('/tmp/coverage/', exist_ok=True)
121
122        cmd = ['docker', 'run'] + CI_ENV + [
123            '--rm',
124            '--name',
125            self._docker_name,
126            '--network',
127            self.backbone_network,
128        ] + dns + [
129            '-i',
130            '--sysctl',
131            'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1',
132            '--privileged',
133            '--cap-add=NET_ADMIN',
134            '--volume',
135            f'{self._rcp_device}:/dev/ttyUSB0',
136            '-v',
137            '/tmp/coverage/:/tmp/coverage/',
138            config.OTBR_DOCKER_IMAGE,
139            '-B',
140            config.BACKBONE_IFNAME,
141            '--trel-url',
142            f'trel://{config.BACKBONE_IFNAME}',
143        ] + nat64_prefix
144        logging.info(' '.join(cmd))
145        self._docker_proc = subprocess.Popen(cmd,
146                                             stdin=subprocess.DEVNULL,
147                                             stdout=sys.stdout if self.verbose else subprocess.DEVNULL,
148                                             stderr=sys.stderr if self.verbose else subprocess.DEVNULL)
149
150        launch_docker_deadline = time.time() + 300
151        launch_ok = False
152
153        while time.time() < launch_docker_deadline:
154            try:
155                subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True)
156                launch_ok = True
157                logging.info("OTBR Docker %s on %s Is Ready!", self._docker_name, self.backbone_network)
158                break
159            except subprocess.CalledProcessError:
160                time.sleep(5)
161                continue
162
163        assert launch_ok
164
165        self.start_ot_ctl()
166
167    def __repr__(self):
168        return f'OtbrDocker<{self.nodeid}>'
169
170    def start_otbr_service(self):
171        self.bash('service otbr-agent start')
172        self.simulator.go(3)
173        self.start_ot_ctl()
174
175    def stop_otbr_service(self):
176        self.stop_ot_ctl()
177        self.bash('service otbr-agent stop')
178
179    def stop_mdns_service(self):
180        self.bash('service avahi-daemon stop; service mdns stop; !(cat /proc/net/udp | grep -i :14E9)')
181
182    def start_mdns_service(self):
183        self.bash('service avahi-daemon start; service mdns start; cat /proc/net/udp | grep -i :14E9')
184
185    def start_ot_ctl(self):
186        cmd = f'docker exec -i {self._docker_name} ot-ctl'
187        self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30)
188        if self.verbose:
189            self.pexpect.logfile_read = sys.stdout.buffer
190
191        # Add delay to ensure that the process is ready to receive commands.
192        timeout = 0.4
193        while timeout > 0:
194            self.pexpect.send('\r\n')
195            try:
196                self.pexpect.expect('> ', timeout=0.1)
197                break
198            except pexpect.TIMEOUT:
199                timeout -= 0.1
200
201    def stop_ot_ctl(self):
202        self.pexpect.sendeof()
203        self.pexpect.wait()
204        self.pexpect.proc.kill()
205
206    def reserve_udp_port(self, port):
207        self.bash(f'socat -u UDP6-LISTEN:{port},bindtodevice=wpan0 - &')
208
209    def destroy(self):
210        logging.info("Destroying %s", self)
211        self._shutdown_docker()
212        self._shutdown_ot_rcp()
213        self._shutdown_socat()
214
215    def _shutdown_docker(self):
216        if self._docker_proc is None:
217            return
218
219        try:
220            COVERAGE = int(os.getenv('COVERAGE', '0'))
221            OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0'))
222            test_name = os.getenv('TEST_NAME')
223            unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}'
224
225            if COVERAGE or OTBR_COVERAGE:
226                self.bash('service otbr-agent stop')
227
228                cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info'
229                # Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage.
230                if OTBR_COVERAGE:
231                    codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}'
232                else:
233                    codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture '
234                                   f'--output-file {cov_file_path}')
235
236                self.bash(codecov_cmd)
237
238            copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True)
239            if copyCore.returncode == 0:
240                subprocess.check_call(
241                    f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True)
242
243        finally:
244            subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True)
245            self._docker_proc.wait()
246            del self._docker_proc
247
248    def _shutdown_ot_rcp(self):
249        if self._ot_rcp_proc is not None:
250            self._ot_rcp_proc.kill()
251            self._ot_rcp_proc.wait()
252            del self._ot_rcp_proc
253
254    def _shutdown_socat(self):
255        if self._socat_proc is not None:
256            self._socat_proc.stderr.close()
257            self._socat_proc.kill()
258            self._socat_proc.wait()
259            del self._socat_proc
260
261    def bash(self, cmd: str, encoding='ascii') -> List[str]:
262        logging.info("%s $ %s", self, cmd)
263        proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd],
264                                stdin=subprocess.DEVNULL,
265                                stdout=subprocess.PIPE,
266                                stderr=sys.stderr,
267                                encoding=encoding)
268
269        with proc:
270
271            lines = []
272
273            while True:
274                line = proc.stdout.readline()
275
276                if not line:
277                    break
278
279                lines.append(line)
280                logging.info("%s $ %r", self, line.rstrip('\r\n'))
281
282            proc.wait()
283
284            if proc.returncode != 0:
285                raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines))
286            else:
287                return lines
288
289    def dns_dig(self, server: str, name: str, qtype: str):
290        """
291        Run dig command to query a DNS server.
292
293        Args:
294            server: the server address.
295            name: the name to query.
296            qtype: the query type (e.g. AAAA, PTR, TXT, SRV).
297
298        Returns:
299            The dig result similar as below:
300            {
301                "opcode": "QUERY",
302                "status": "NOERROR",
303                "id": "64144",
304                "QUESTION": [
305                    ('google.com.', 'IN', 'AAAA')
306                ],
307                "ANSWER": [
308                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::71'),
309                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8a'),
310                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::66'),
311                    ('google.com.', 107,	'IN', 'AAAA', '2404:6800:4008:c00::8b'),
312                ],
313                "ADDITIONAL": [
314                ],
315            }
316        """
317        output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape')
318
319        section = None
320        dig_result = {
321            'QUESTION': [],
322            'ANSWER': [],
323            'ADDITIONAL': [],
324        }
325
326        for line in output:
327            line = line.strip()
328
329            if line.startswith(';; ->>HEADER<<- '):
330                headers = line[len(';; ->>HEADER<<- '):].split(', ')
331                for header in headers:
332                    key, val = header.split(': ')
333                    dig_result[key] = val
334
335                continue
336
337            if line == ';; QUESTION SECTION:':
338                section = 'QUESTION'
339                continue
340            elif line == ';; ANSWER SECTION:':
341                section = 'ANSWER'
342                continue
343            elif line == ';; ADDITIONAL SECTION:':
344                section = 'ADDITIONAL'
345                continue
346            elif section and not line:
347                section = None
348                continue
349
350            if section:
351                assert line
352
353                if section == 'QUESTION':
354                    assert line.startswith(';')
355                    line = line[1:]
356                record = list(line.split())
357
358                if section == 'QUESTION':
359                    if record[2] in ('SRV', 'TXT'):
360                        record[0] = self.__unescape_dns_instance_name(record[0])
361                else:
362                    record[1] = int(record[1])
363                    if record[3] == 'SRV':
364                        record[0] = self.__unescape_dns_instance_name(record[0])
365                        record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]])
366                    elif record[3] == 'TXT':
367                        record[0] = self.__unescape_dns_instance_name(record[0])
368                        record[4:] = [self.__parse_dns_dig_txt(line)]
369                    elif record[3] == 'PTR':
370                        record[4] = self.__unescape_dns_instance_name(record[4])
371
372                dig_result[section].append(tuple(record))
373
374        return dig_result
375
376    def call_dbus_method(self, *args):
377        args = shlex.join([args[0], args[1], json.dumps(args[2:])])
378        return json.loads(
379            self.bash(f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/call_dbus_method.py {args}')
380            [0])
381
382    def get_dbus_property(self, property_name):
383        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Get', 'io.openthread.BorderRouter',
384                                     property_name)
385
386    def set_dbus_property(self, property_name, property_value):
387        return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Set', 'io.openthread.BorderRouter',
388                                     property_name, property_value)
389
390    def get_border_routing_counters(self):
391        counters = self.get_dbus_property('BorderRoutingCounters')
392        counters = {
393            'inbound_unicast': counters[0],
394            'inbound_multicast': counters[1],
395            'outbound_unicast': counters[2],
396            'outbound_multicast': counters[3],
397            'ra_rx': counters[4],
398            'ra_tx_success': counters[5],
399            'ra_tx_failure': counters[6],
400            'rs_rx': counters[7],
401            'rs_tx_success': counters[8],
402            'rs_tx_failure': counters[9],
403        }
404        logging.info(f'border routing counters: {counters}')
405        return counters
406
407    def _process_traffic_counters(self, counter):
408        return {
409            '4to6': {
410                'packets': counter[0],
411                'bytes': counter[1],
412            },
413            '6to4': {
414                'packets': counter[2],
415                'bytes': counter[3],
416            }
417        }
418
419    def _process_packet_counters(self, counter):
420        return {'4to6': {'packets': counter[0]}, '6to4': {'packets': counter[1]}}
421
422    def nat64_set_enabled(self, enable):
423        return self.call_dbus_method('io.openthread.BorderRouter', 'SetNat64Enabled', enable)
424
425    def activate_ephemeral_key_mode(self, lifetime):
426        return self.call_dbus_method('io.openthread.BorderRouter', 'ActivateEphemeralKeyMode', lifetime)
427
428    def deactivate_ephemeral_key_mode(self):
429        return self.call_dbus_method('io.openthread.BorderRouter', 'DeactivateEphemeralKeyMode')
430
431    @property
432    def nat64_cidr(self):
433        self.send_command('nat64 cidr')
434        cidr = self._expect_command_output()[0].strip()
435        return ipaddress.IPv4Network(cidr, strict=False)
436
437    @nat64_cidr.setter
438    def nat64_cidr(self, cidr: ipaddress.IPv4Network):
439        if not isinstance(cidr, ipaddress.IPv4Network):
440            raise ValueError("cidr is expected to be an instance of ipaddress.IPv4Network")
441        self.send_command(f'nat64 cidr {cidr}')
442        self._expect_done()
443
444    @property
445    def nat64_state(self):
446        state = self.get_dbus_property('Nat64State')
447        return {'PrefixManager': state[0], 'Translator': state[1]}
448
449    @property
450    def nat64_mappings(self):
451        return [{
452            'id': row[0],
453            'ip4': row[1],
454            'ip6': row[2],
455            'expiry': row[3],
456            'counters': {
457                'total': self._process_traffic_counters(row[4][0]),
458                'ICMP': self._process_traffic_counters(row[4][1]),
459                'UDP': self._process_traffic_counters(row[4][2]),
460                'TCP': self._process_traffic_counters(row[4][3]),
461            }
462        } for row in self.get_dbus_property('Nat64Mappings')]
463
464    @property
465    def nat64_counters(self):
466        res_error = self.get_dbus_property('Nat64ErrorCounters')
467        res_proto = self.get_dbus_property('Nat64ProtocolCounters')
468        return {
469            'protocol': {
470                'Total': self._process_traffic_counters(res_proto[0]),
471                'ICMP': self._process_traffic_counters(res_proto[1]),
472                'UDP': self._process_traffic_counters(res_proto[2]),
473                'TCP': self._process_traffic_counters(res_proto[3]),
474            },
475            'errors': {
476                'Unknown': self._process_packet_counters(res_error[0]),
477                'Illegal Pkt': self._process_packet_counters(res_error[1]),
478                'Unsup Proto': self._process_packet_counters(res_error[2]),
479                'No Mapping': self._process_packet_counters(res_error[3]),
480            }
481        }
482
483    @property
484    def nat64_traffic_counters(self):
485        res = self.get_dbus_property('Nat64TrafficCounters')
486        return {
487            'Total': self._process_traffic_counters(res[0]),
488            'ICMP': self._process_traffic_counters(res[1]),
489            'UDP': self._process_traffic_counters(res[2]),
490            'TCP': self._process_traffic_counters(res[3]),
491        }
492
493    @property
494    def dns_upstream_query_state(self):
495        return bool(self.get_dbus_property('DnsUpstreamQueryState'))
496
497    @dns_upstream_query_state.setter
498    def dns_upstream_query_state(self, value):
499        if type(value) is not bool:
500            raise ValueError("dns_upstream_query_state must be a bool")
501        return self.set_dbus_property('DnsUpstreamQueryState', value)
502
503    @property
504    def ephemeral_key_enabled(self):
505        return bool(self.get_dbus_property('EphemeralKeyEnabled'))
506
507    @ephemeral_key_enabled.setter
508    def ephemeral_key_enabled(self, value):
509        if type(value) is not bool:
510            raise ValueError("ephemeral_key_enabled must be a bool")
511        return self.set_dbus_property('EphemeralKeyEnabled', value)
512
513    def read_border_routing_counters_delta(self):
514        old_counters = self._border_routing_counters
515        new_counters = self.get_border_routing_counters()
516        self._border_routing_counters = new_counters
517        delta_counters = {}
518        if old_counters is None:
519            delta_counters = new_counters
520        else:
521            for i in ('inbound', 'outbound'):
522                for j in ('unicast', 'multicast'):
523                    key = f'{i}_{j}'
524                    assert (key in old_counters)
525                    assert (key in new_counters)
526                    value = [new_counters[key][0] - old_counters[key][0], new_counters[key][1] - old_counters[key][1]]
527                    delta_counters[key] = value
528        delta_counters = {
529            key: value for key, value in delta_counters.items() if not isinstance(value, int) and value[0] and value[1]
530        }
531
532        return delta_counters
533
534    @staticmethod
535    def __unescape_dns_instance_name(name: str) -> str:
536        new_name = []
537        i = 0
538        while i < len(name):
539            c = name[i]
540
541            if c == '\\':
542                assert i + 1 < len(name), name
543                if name[i + 1].isdigit():
544                    assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name
545                    new_name.append(chr(int(name[i + 1:i + 4])))
546                    i += 3
547                else:
548                    new_name.append(name[i + 1])
549                    i += 1
550            else:
551                new_name.append(c)
552
553            i += 1
554
555        return ''.join(new_name)
556
557    def __parse_dns_dig_txt(self, line: str):
558        # Example TXT entry:
559        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
560        txt = {}
561        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
562            if entry == "":
563                continue
564
565            k, v = entry.split('=', 1)
566            txt[k] = v
567
568        return txt
569
570    def _setup_sysctl(self):
571        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2')
572        self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64')
573
574
575class OtCli:
576    RESET_DELAY = 0.1
577
578    def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs):
579        self.verbose = int(float(os.getenv('VERBOSE', 0)))
580        self.node_type = os.getenv('NODE_TYPE', 'sim')
581        self.env_version = os.getenv('THREAD_VERSION', '1.1')
582        self.is_bbr = is_bbr
583        self._initialized = False
584        if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc':
585            self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv(
586                'top_srcdir', '.'), sys.argv[0], nodeid)
587        else:
588            self._cmd_prefix = ''
589
590        if version is not None:
591            self.version = version
592        else:
593            self.version = self.env_version
594
595        mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd'
596
597        if self.node_type == 'soc':
598            self.__init_soc(nodeid)
599        elif self.node_type == 'ncp-sim':
600            # TODO use mode after ncp-mtd is available.
601            self.__init_ncp_sim(nodeid, 'ftd')
602        else:
603            self.__init_sim(nodeid, mode)
604
605        if self.verbose:
606            self.pexpect.logfile_read = sys.stdout.buffer
607
608        self._initialized = True
609
610    def __init_sim(self, nodeid, mode):
611        """ Initialize a simulation node. """
612
613        # Default command if no match below, will be overridden if below conditions are met.
614        cmd = './ot-cli-%s' % (mode)
615
616        # For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH
617        if self.version != '1.1' and mode == 'mtd' and 'top_builddir' in os.environ:
618            srcdir = os.environ['top_builddir']
619            cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid)
620
621        # If Thread version of node matches the testing environment version.
622        elif self.version == self.env_version:
623            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
624            # which requires device with Backbone functionality.
625            if self.version != '1.1' and self.is_bbr:
626                if 'OT_CLI_PATH_BBR' in os.environ:
627                    cmd = os.environ['OT_CLI_PATH_BBR']
628                elif 'top_builddir_1_3_bbr' in os.environ:
629                    srcdir = os.environ['top_builddir_1_3_bbr']
630                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
631
632            # Load Thread device of the testing environment version (may be 1.1 or 1.2)
633            else:
634                if 'OT_CLI_PATH' in os.environ:
635                    cmd = os.environ['OT_CLI_PATH']
636                elif 'top_builddir' in os.environ:
637                    srcdir = os.environ['top_builddir']
638                    cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
639
640            if 'RADIO_DEVICE' in os.environ:
641                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
642                                                                                           nodeid)
643                self.is_posix = True
644            else:
645                cmd += ' %d' % nodeid
646
647        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability
648        elif self.version == '1.1':
649            # Posix app
650            if 'OT_CLI_PATH_1_1' in os.environ:
651                cmd = os.environ['OT_CLI_PATH_1_1']
652            elif 'top_builddir_1_1' in os.environ:
653                srcdir = os.environ['top_builddir_1_1']
654                cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode)
655
656            if 'RADIO_DEVICE_1_1' in os.environ:
657                cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (
658                    os.environ['RADIO_DEVICE_1_1'], nodeid)
659                self.is_posix = True
660            else:
661                cmd += ' %d' % nodeid
662
663        print("%s" % cmd)
664
665        self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10)
666
667        # Add delay to ensure that the process is ready to receive commands.
668        timeout = 0.4
669        while timeout > 0:
670            self.pexpect.send('\r\n')
671            try:
672                self.pexpect.expect('> ', timeout=0.1)
673                break
674            except pexpect.TIMEOUT:
675                timeout -= 0.1
676
677    def __init_ncp_sim(self, nodeid, mode):
678        """ Initialize an NCP simulation node. """
679
680        # Default command if no match below, will be overridden if below conditions are met.
681        cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode
682
683        # If Thread version of node matches the testing environment version.
684        if self.version == self.env_version:
685            if 'RADIO_DEVICE' in os.environ:
686                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'],
687                                                                                        nodeid)
688                self.is_posix = True
689            else:
690                args = ''
691
692            # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios
693            # which requires device with Backbone functionality.
694            if self.version != '1.1' and self.is_bbr:
695                if 'OT_NCP_PATH_1_3_BBR' in os.environ:
696                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
697                        os.environ['OT_NCP_PATH_1_3_BBR'],
698                        args,
699                    )
700                elif 'top_builddir_1_3_bbr' in os.environ:
701                    srcdir = os.environ['top_builddir_1_3_bbr']
702                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
703                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
704                        cmd,
705                        args,
706                    )
707
708            # Load Thread device of the testing environment version (may be 1.1 or 1.2).
709            else:
710                if 'OT_NCP_PATH' in os.environ:
711                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
712                        os.environ['OT_NCP_PATH'],
713                        args,
714                    )
715                elif 'top_builddir' in os.environ:
716                    srcdir = os.environ['top_builddir']
717                    cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
718                    cmd = 'spinel-cli.py -p "%s%s" -n' % (
719                        cmd,
720                        args,
721                    )
722
723        # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability.
724        elif self.version == '1.1':
725            if 'RADIO_DEVICE_1_1' in os.environ:
726                args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'],
727                                                                                        nodeid)
728                self.is_posix = True
729            else:
730                args = ''
731
732            if 'OT_NCP_PATH_1_1' in os.environ:
733                cmd = 'spinel-cli.py -p "%s%s" -n' % (
734                    os.environ['OT_NCP_PATH_1_1'],
735                    args,
736                )
737            elif 'top_builddir_1_1' in os.environ:
738                srcdir = os.environ['top_builddir_1_1']
739                cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode)
740                cmd = 'spinel-cli.py -p "%s%s" -n' % (
741                    cmd,
742                    args,
743                )
744
745        cmd += ' %d' % nodeid
746        print("%s" % cmd)
747
748        self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10)
749
750        # Add delay to ensure that the process is ready to receive commands.
751        time.sleep(0.2)
752        self._expect('spinel-cli >')
753        self.debug(int(os.getenv('DEBUG', '0')))
754
755    def __init_soc(self, nodeid):
756        """ Initialize a System-on-a-chip node connected via UART. """
757        import fdpexpect
758
759        serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2)
760        self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY))
761
762    def destroy(self):
763        if not self._initialized:
764            return
765
766        if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or
767                not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()):
768            print("%d: exit" % self.nodeid)
769            self.pexpect.send('exit\n')
770            self.pexpect.expect(pexpect.EOF)
771            self.pexpect.wait()
772            self._initialized = False
773
774
775class NodeImpl:
776    is_host = False
777    is_otbr = False
778
779    def __init__(self, nodeid, name=None, simulator=None, **kwargs):
780        self.nodeid = nodeid
781        self.name = name or ('Node%d' % nodeid)
782        self.is_posix = False
783
784        self.simulator = simulator
785        if self.simulator:
786            self.simulator.add_node(self)
787
788        super().__init__(nodeid, **kwargs)
789
790        self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid))
791
792    def _expect(self, pattern, timeout=-1, *args, **kwargs):
793        """ Process simulator events until expected the pattern. """
794        if timeout == -1:
795            timeout = self.pexpect.timeout
796
797        assert timeout > 0
798
799        while timeout > 0:
800            try:
801                return self.pexpect.expect(pattern, 0.1, *args, **kwargs)
802            except pexpect.TIMEOUT:
803                timeout -= 0.1
804                self.simulator.go(0)
805                if timeout <= 0:
806                    raise
807
808    def _expect_done(self, timeout=-1):
809        self._expect('Done', timeout)
810
811    def _expect_result(self, pattern, *args, **kwargs):
812        """Expect a single matching result.
813
814        The arguments are identical to pexpect.expect().
815
816        Returns:
817            The matched line.
818        """
819        results = self._expect_results(pattern, *args, **kwargs)
820        assert len(results) == 1, results
821        return results[0]
822
823    def _expect_results(self, pattern, *args, **kwargs):
824        """Expect multiple matching results.
825
826        The arguments are identical to pexpect.expect().
827
828        Returns:
829            The matched lines.
830        """
831        output = self._expect_command_output()
832        results = [line for line in output if self._match_pattern(line, pattern)]
833        return results
834
835    def _expect_key_value_pairs(self, pattern, separator=': '):
836        """Expect 'key: value' in multiple lines.
837
838        Returns:
839            Dictionary of the key:value pairs.
840        """
841        result = {}
842        for line in self._expect_results(pattern):
843            key, val = line.split(separator)
844            result.update({key: val})
845        return result
846
847    @staticmethod
848    def _match_pattern(line, pattern):
849        if isinstance(pattern, str):
850            pattern = re.compile(pattern)
851
852        if isinstance(pattern, typing.Pattern):
853            return pattern.match(line)
854        else:
855            return any(NodeImpl._match_pattern(line, p) for p in pattern)
856
857    def _expect_command_output(self, ignore_logs=True):
858        lines = []
859
860        while True:
861            line = self.__readline(ignore_logs=ignore_logs)
862
863            if line == 'Done':
864                break
865            elif line.startswith('Error '):
866                raise Exception(line)
867            else:
868                lines.append(line)
869
870        print(f'_expect_command_output() returns {lines!r}')
871        return lines
872
873    def __is_logging_line(self, line: str) -> bool:
874        return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'}
875
876    def read_cert_messages_in_commissioning_log(self, timeout=-1):
877        """Get the log of the traffic after DTLS handshake.
878        """
879        format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}"
880        join_fin_req = format_str % br"JOIN_FIN\.req"
881        join_fin_rsp = format_str % br"JOIN_FIN\.rsp"
882        dummy_format_str = br"\[THCI\].*?type=%s.*?"
883        join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf"
884        join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp"
885        pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")")
886
887        messages = []
888        # There are at most 4 cert messages both for joiner and commissioner
889        for _ in range(0, 4):
890            try:
891                self._expect(pattern, timeout=timeout)
892                log = self.pexpect.match.group(0)
893                messages.append(self._extract_cert_message(log))
894            except BaseException:
895                break
896        return messages
897
898    def _extract_cert_message(self, log):
899        res = re.search(br"direction=\w+", log)
900        assert res
901        direction = res.group(0).split(b'=')[1].strip()
902
903        res = re.search(br"type=\S+", log)
904        assert res
905        type = res.group(0).split(b'=')[1].strip()
906
907        payload = bytearray([])
908        payload_len = 0
909        if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]:
910            res = re.search(br"len=\d+", log)
911            assert res
912            payload_len = int(res.group(0).split(b'=')[1].strip())
913
914            hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|"
915            while True:
916                res = re.search(hex_pattern, log)
917                if not res:
918                    break
919                data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..']
920                payload += bytearray(data)
921                log = log[res.end() - 1:]
922        assert len(payload) == payload_len
923        return (direction, type, payload)
924
925    def send_command(self, cmd, go=True, expect_command_echo=True):
926        print("%d: %s" % (self.nodeid, cmd))
927        self.pexpect.send(cmd + '\n')
928        if go:
929            self.simulator.go(0, nodeid=self.nodeid)
930        sys.stdout.flush()
931
932        if expect_command_echo:
933            self._expect_command_echo(cmd)
934
935    def _expect_command_echo(self, cmd):
936        cmd = cmd.strip()
937        while True:
938            line = self.__readline()
939            if line == cmd:
940                break
941
942            logging.warning("expecting echo %r, but read %r", cmd, line)
943
944    def __readline(self, ignore_logs=True):
945        PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> '
946        while True:
947            self._expect(r"[^\n]+\n")
948            line = self.pexpect.match.group(0).decode('utf8').strip()
949            while line.startswith(PROMPT):
950                line = line[len(PROMPT):]
951
952            if line == '':
953                continue
954
955            if ignore_logs and self.__is_logging_line(line):
956                continue
957
958            return line
959
960    def get_commands(self):
961        self.send_command('?')
962        self._expect('Commands:')
963        return self._expect_results(r'\S+')
964
965    def set_mode(self, mode):
966        cmd = 'mode %s' % mode
967        self.send_command(cmd)
968        self._expect_done()
969
970    def debug(self, level):
971        # `debug` command will not trigger interaction with simulator
972        self.send_command('debug %d' % level, go=False)
973
974    def start(self):
975        self.interface_up()
976        self.thread_start()
977
978    def stop(self):
979        self.thread_stop()
980        self.interface_down()
981
982    def set_log_level(self, level: int):
983        self.send_command(f'log level {level}')
984        self._expect_done()
985
986    def interface_up(self):
987        self.send_command('ifconfig up')
988        self._expect_done()
989
990    def interface_down(self):
991        self.send_command('ifconfig down')
992        self._expect_done()
993
994    def thread_start(self):
995        self.send_command('thread start')
996        self._expect_done()
997
998    def thread_stop(self):
999        self.send_command('thread stop')
1000        self._expect_done()
1001
1002    def detach(self, is_async=False):
1003        cmd = 'detach'
1004        if is_async:
1005            cmd += ' async'
1006
1007        self.send_command(cmd)
1008
1009        if is_async:
1010            self._expect_done()
1011            return
1012
1013        end = self.simulator.now() + 4
1014        while True:
1015            self.simulator.go(1)
1016            try:
1017                self._expect_done(timeout=0.1)
1018                return
1019            except (pexpect.TIMEOUT, socket.timeout):
1020                if self.simulator.now() > end:
1021                    raise
1022
1023    def expect_finished_detaching(self):
1024        self._expect('Finished detaching')
1025
1026    def commissioner_start(self):
1027        cmd = 'commissioner start'
1028        self.send_command(cmd)
1029        self._expect_done()
1030
1031    def commissioner_stop(self):
1032        cmd = 'commissioner stop'
1033        self.send_command(cmd)
1034        self._expect_done()
1035
1036    def commissioner_state(self):
1037        states = [r'disabled', r'petitioning', r'active']
1038        self.send_command('commissioner state')
1039        return self._expect_result(states)
1040
1041    def commissioner_add_joiner(self, addr, psk):
1042        cmd = 'commissioner joiner add %s %s' % (addr, psk)
1043        self.send_command(cmd)
1044        self._expect_done()
1045
1046    def commissioner_set_provisioning_url(self, provisioning_url=''):
1047        cmd = 'commissioner provisioningurl %s' % provisioning_url
1048        self.send_command(cmd)
1049        self._expect_done()
1050
1051    def joiner_start(self, pskd='', provisioning_url=''):
1052        cmd = 'joiner start %s %s' % (pskd, provisioning_url)
1053        self.send_command(cmd)
1054        self._expect_done()
1055
1056    def clear_allowlist(self):
1057        cmd = 'macfilter addr clear'
1058        self.send_command(cmd)
1059        self._expect_done()
1060
1061    def enable_allowlist(self):
1062        cmd = 'macfilter addr allowlist'
1063        self.send_command(cmd)
1064        self._expect_done()
1065
1066    def disable_allowlist(self):
1067        cmd = 'macfilter addr disable'
1068        self.send_command(cmd)
1069        self._expect_done()
1070
1071    def add_allowlist(self, addr, rssi=None):
1072        cmd = 'macfilter addr add %s' % addr
1073
1074        if rssi is not None:
1075            cmd += ' %s' % rssi
1076
1077        self.send_command(cmd)
1078        self._expect_done()
1079
1080    def radiofilter_is_enabled(self) -> bool:
1081        states = [r'Disabled', r'Enabled']
1082        self.send_command('radiofilter')
1083        return self._expect_result(states) == 'Enabled'
1084
1085    def radiofilter_enable(self):
1086        cmd = 'radiofilter enable'
1087        self.send_command(cmd)
1088        self._expect_done()
1089
1090    def radiofilter_disable(self):
1091        cmd = 'radiofilter disable'
1092        self.send_command(cmd)
1093        self._expect_done()
1094
1095    def get_bbr_registration_jitter(self):
1096        self.send_command('bbr jitter')
1097        return int(self._expect_result(r'\d+'))
1098
1099    def set_bbr_registration_jitter(self, jitter):
1100        cmd = 'bbr jitter %d' % jitter
1101        self.send_command(cmd)
1102        self._expect_done()
1103
1104    def get_rcp_version(self) -> str:
1105        self.send_command('rcp version')
1106        rcp_version = self._expect_command_output()[0].strip()
1107        return rcp_version
1108
1109    def srp_server_get_state(self):
1110        states = ['disabled', 'running', 'stopped']
1111        self.send_command('srp server state')
1112        return self._expect_result(states)
1113
1114    def srp_server_get_addr_mode(self):
1115        modes = [r'unicast', r'anycast']
1116        self.send_command(f'srp server addrmode')
1117        return self._expect_result(modes)
1118
1119    def srp_server_set_addr_mode(self, mode):
1120        self.send_command(f'srp server addrmode {mode}')
1121        self._expect_done()
1122
1123    def srp_server_get_anycast_seq_num(self):
1124        self.send_command(f'srp server seqnum')
1125        return int(self._expect_result(r'\d+'))
1126
1127    def srp_server_set_anycast_seq_num(self, seqnum):
1128        self.send_command(f'srp server seqnum {seqnum}')
1129        self._expect_done()
1130
1131    def srp_server_set_enabled(self, enable):
1132        cmd = f'srp server {"enable" if enable else "disable"}'
1133        self.send_command(cmd)
1134        self._expect_done()
1135
1136    def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease):
1137        self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}')
1138        self._expect_done()
1139
1140    def srp_server_set_ttl_range(self, min_ttl, max_ttl):
1141        self.send_command(f'srp server ttl {min_ttl} {max_ttl}')
1142        self._expect_done()
1143
1144    def srp_server_get_hosts(self):
1145        """Returns the host list on the SRP server as a list of property
1146           dictionary.
1147
1148           Example output:
1149           [{
1150               'fullname': 'my-host.default.service.arpa.',
1151               'name': 'my-host',
1152               'deleted': 'false',
1153               'addresses': ['2001::1', '2001::2']
1154           }]
1155        """
1156
1157        cmd = 'srp server host'
1158        self.send_command(cmd)
1159        lines = self._expect_command_output()
1160        host_list = []
1161        while lines:
1162            host = {}
1163
1164            host['fullname'] = lines.pop(0).strip()
1165            host['name'] = host['fullname'].split('.')[0]
1166
1167            host['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1168            if host['deleted'] == 'true':
1169                host_list.append(host)
1170                continue
1171
1172            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1173            map(str.strip, addresses)
1174            host['addresses'] = [addr.strip() for addr in addresses if addr]
1175
1176            host_list.append(host)
1177
1178        return host_list
1179
1180    def srp_server_get_host(self, host_name):
1181        """Returns host on the SRP server that matches given host name.
1182
1183           Example usage:
1184           self.srp_server_get_host("my-host")
1185        """
1186
1187        for host in self.srp_server_get_hosts():
1188            if host_name == host['name']:
1189                return host
1190
1191    def srp_server_get_services(self):
1192        """Returns the service list on the SRP server as a list of property
1193           dictionary.
1194
1195           Example output:
1196           [{
1197               'fullname': 'my-service._ipps._tcp.default.service.arpa.',
1198               'instance': 'my-service',
1199               'name': '_ipps._tcp',
1200               'deleted': 'false',
1201               'port': '12345',
1202               'priority': '0',
1203               'weight': '0',
1204               'ttl': '7200',
1205               'lease': '7200',
1206               'key-lease': '7200',
1207               'TXT': ['abc=010203'],
1208               'host_fullname': 'my-host.default.service.arpa.',
1209               'host': 'my-host',
1210               'addresses': ['2001::1', '2001::2']
1211           }]
1212
1213           Note that the TXT data is output as a HEX string.
1214        """
1215
1216        cmd = 'srp server service'
1217        self.send_command(cmd)
1218        lines = self._expect_command_output()
1219
1220        service_list = []
1221        while lines:
1222            service = {}
1223
1224            service['fullname'] = lines.pop(0).strip()
1225            name_labels = service['fullname'].split('.')
1226            service['instance'] = name_labels[0]
1227            service['name'] = '.'.join(name_labels[1:3])
1228
1229            service['deleted'] = lines.pop(0).strip().split(':')[1].strip()
1230            if service['deleted'] == 'true':
1231                service_list.append(service)
1232                continue
1233
1234            # 'subtypes', port', 'priority', 'weight', 'ttl', 'lease', and 'key-lease'
1235            for i in range(0, 7):
1236                key_value = lines.pop(0).strip().split(':')
1237                service[key_value[0].strip()] = key_value[1].strip()
1238
1239            txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1240            txt_entries = map(str.strip, txt_entries)
1241            service['TXT'] = [txt for txt in txt_entries if txt]
1242
1243            service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip()
1244            service['host'] = service['host_fullname'].split('.')[0]
1245
1246            addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',')
1247            addresses = map(str.strip, addresses)
1248            service['addresses'] = [addr for addr in addresses if addr]
1249
1250            service_list.append(service)
1251
1252        return service_list
1253
1254    def srp_server_get_service(self, instance_name, service_name):
1255        """Returns service on the SRP server that matches given instance
1256           name and service name.
1257
1258           Example usage:
1259           self.srp_server_get_service("my-service", "_ipps._tcp")
1260        """
1261
1262        for service in self.srp_server_get_services():
1263            if (instance_name == service['instance'] and service_name == service['name']):
1264                return service
1265
1266    def get_srp_server_port(self):
1267        """Returns the SRP server UDP port by parsing
1268           the SRP Server Data in Network Data.
1269        """
1270
1271        for service in self.get_services():
1272            # TODO: for now, we are using 0xfd as the SRP service data.
1273            #       May use a dedicated bit flag for SRP server.
1274            if int(service[1], 16) == 0x5d:
1275                # The SRP server data contains IPv6 address (16 bytes)
1276                # followed by UDP port number.
1277                return int(service[2][2 * 16:], 16)
1278
1279    def srp_client_start(self, server_address, server_port):
1280        self.send_command(f'srp client start {server_address} {server_port}')
1281        self._expect_done()
1282
1283    def srp_client_stop(self):
1284        self.send_command(f'srp client stop')
1285        self._expect_done()
1286
1287    def srp_client_get_state(self):
1288        cmd = 'srp client state'
1289        self.send_command(cmd)
1290        return self._expect_command_output()[0]
1291
1292    def srp_client_get_auto_start_mode(self):
1293        cmd = 'srp client autostart'
1294        self.send_command(cmd)
1295        return self._expect_command_output()[0]
1296
1297    def srp_client_enable_auto_start_mode(self):
1298        self.send_command(f'srp client autostart enable')
1299        self._expect_done()
1300
1301    def srp_client_disable_auto_start_mode(self):
1302        self.send_command(f'srp client autostart disable')
1303        self._expect_done()
1304
1305    def srp_client_get_server_address(self):
1306        cmd = 'srp client server address'
1307        self.send_command(cmd)
1308        return self._expect_command_output()[0]
1309
1310    def srp_client_get_server_port(self):
1311        cmd = 'srp client server port'
1312        self.send_command(cmd)
1313        return int(self._expect_command_output()[0])
1314
1315    def srp_client_get_host_state(self):
1316        cmd = 'srp client host state'
1317        self.send_command(cmd)
1318        return self._expect_command_output()[0]
1319
1320    def srp_client_set_host_name(self, name):
1321        self.send_command(f'srp client host name {name}')
1322        self._expect_done()
1323
1324    def srp_client_get_host_name(self):
1325        self.send_command(f'srp client host name')
1326        self._expect_done()
1327
1328    def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False):
1329        self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}')
1330        self._expect_done()
1331
1332    def srp_client_clear_host(self):
1333        self.send_command(f'srp client host clear')
1334        self._expect_done()
1335
1336    def srp_client_enable_auto_host_address(self):
1337        self.send_command(f'srp client host address auto')
1338        self._expect_done()
1339
1340    def srp_client_set_host_address(self, *addrs: str):
1341        self.send_command(f'srp client host address {" ".join(addrs)}')
1342        self._expect_done()
1343
1344    def srp_client_get_host_address(self):
1345        self.send_command(f'srp client host address')
1346        self._expect_done()
1347
1348    def srp_client_add_service(self,
1349                               instance_name,
1350                               service_name,
1351                               port,
1352                               priority=0,
1353                               weight=0,
1354                               txt_entries=[],
1355                               lease=0,
1356                               key_lease=0):
1357        txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries)
1358        if txt_record == '':
1359            txt_record = '-'
1360        instance_name = self._escape_escapable(instance_name)
1361        self.send_command(
1362            f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record} {lease} {key_lease}'
1363        )
1364        self._expect_done()
1365
1366    def srp_client_remove_service(self, instance_name, service_name):
1367        self.send_command(f'srp client service remove {instance_name} {service_name}')
1368        self._expect_done()
1369
1370    def srp_client_clear_service(self, instance_name, service_name):
1371        self.send_command(f'srp client service clear {instance_name} {service_name}')
1372        self._expect_done()
1373
1374    def srp_client_get_services(self):
1375        cmd = 'srp client service'
1376        self.send_command(cmd)
1377        service_lines = self._expect_command_output()
1378        return [self._parse_srp_client_service(line) for line in service_lines]
1379
1380    def srp_client_set_lease_interval(self, leaseinterval: int):
1381        cmd = f'srp client leaseinterval {leaseinterval}'
1382        self.send_command(cmd)
1383        self._expect_done()
1384
1385    def srp_client_get_lease_interval(self) -> int:
1386        cmd = 'srp client leaseinterval'
1387        self.send_command(cmd)
1388        return int(self._expect_result('\d+'))
1389
1390    def srp_client_set_key_lease_interval(self, leaseinterval: int):
1391        cmd = f'srp client keyleaseinterval {leaseinterval}'
1392        self.send_command(cmd)
1393        self._expect_done()
1394
1395    def srp_client_get_key_lease_interval(self) -> int:
1396        cmd = 'srp client keyleaseinterval'
1397        self.send_command(cmd)
1398        return int(self._expect_result('\d+'))
1399
1400    def srp_client_set_ttl(self, ttl: int):
1401        cmd = f'srp client ttl {ttl}'
1402        self.send_command(cmd)
1403        self._expect_done()
1404
1405    def srp_client_get_ttl(self) -> int:
1406        cmd = 'srp client ttl'
1407        self.send_command(cmd)
1408        return int(self._expect_result('\d+'))
1409
1410    #
1411    # TREL utilities
1412    #
1413
1414    def enable_trel(self):
1415        cmd = 'trel enable'
1416        self.send_command(cmd)
1417        self._expect_done()
1418
1419    def is_trel_enabled(self) -> Union[None, bool]:
1420        states = [r'Disabled', r'Enabled']
1421        self.send_command('trel')
1422        try:
1423            return self._expect_result(states) == 'Enabled'
1424        except Exception as ex:
1425            if 'InvalidCommand' in str(ex):
1426                return None
1427
1428            raise
1429
1430    def get_trel_counters(self):
1431        cmd = 'trel counters'
1432        self.send_command(cmd)
1433        result = self._expect_command_output()
1434
1435        counters = {}
1436        for line in result:
1437            m = re.match(r'(\w+)\:[^\d]+(\d+)[^\d]+(\d+)(?:[^\d]+(\d+))?', line)
1438            if m:
1439                groups = m.groups()
1440                sub_counters = {
1441                    'packets': int(groups[1]),
1442                    'bytes': int(groups[2]),
1443                }
1444                if groups[3]:
1445                    sub_counters['failures'] = int(groups[3])
1446                counters[groups[0]] = sub_counters
1447        return counters
1448
1449    def reset_trel_counters(self):
1450        cmd = 'trel counters reset'
1451        self.send_command(cmd)
1452        self._expect_done()
1453
1454    def get_trel_port(self):
1455        cmd = 'trel port'
1456        self.send_command(cmd)
1457        return int(self._expect_command_output()[0])
1458
1459    def set_epskc(self, keystring: str, timeout=120000, port=0):
1460        cmd = 'ba ephemeralkey set ' + keystring + ' ' + str(timeout) + ' ' + str(port)
1461        self.send_command(cmd)
1462        self._expect(r"(Done|Error .*)")
1463
1464    def clear_epskc(self):
1465        cmd = 'ba ephemeralkey clear'
1466        self.send_command(cmd)
1467        self._expect_done()
1468
1469    def get_border_agent_counters(self):
1470        cmd = 'ba counters'
1471        self.send_command(cmd)
1472        result = self._expect_command_output()
1473
1474        counters = {}
1475        for line in result:
1476            m = re.match(r'(\w+)\: (\d+)', line)
1477            if m:
1478                counter_name = m.group(1)
1479                counter_value = m.group(2)
1480
1481                counters[counter_name] = int(counter_value)
1482        return counters
1483
1484    def _encode_txt_entry(self, entry):
1485        """Encodes the TXT entry to the DNS-SD TXT record format as a HEX string.
1486
1487           Example usage:
1488           self._encode_txt_entries(['abc'])     -> '03616263'
1489           self._encode_txt_entries(['def='])    -> '046465663d'
1490           self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a'
1491        """
1492        return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry)
1493
1494    def _parse_srp_client_service(self, line: str):
1495        """Parse one line of srp service list into a dictionary which
1496           maps string keys to string values.
1497
1498           Example output for input
1499           'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"'
1500           {
1501               'instance': 'my-service',
1502               'name': '_ipps._udp',
1503               'state': 'ToAdd',
1504               'port': '12345',
1505               'priority': '0',
1506               'weight': '0'
1507           }
1508
1509           Note that value of 'port', 'priority' and 'weight' are represented
1510           as strings but not integers.
1511        """
1512        key_values = [word.strip().split(':') for word in line.split(', ')]
1513        keys = [key_value[0] for key_value in key_values]
1514        values = [key_value[1].strip('"') for key_value in key_values]
1515        return dict(zip(keys, values))
1516
1517    def locate(self, anycast_addr):
1518        cmd = 'locate ' + anycast_addr
1519        self.send_command(cmd)
1520        self.simulator.go(5)
1521        return self._parse_locate_result(self._expect_command_output()[0])
1522
1523    def _parse_locate_result(self, line: str):
1524        """Parse anycast locate result as list of ml-eid and rloc16.
1525
1526           Example output for input
1527           'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800'
1528
1529           [ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ]
1530        """
1531        return line.split(' ')
1532
1533    def enable_backbone_router(self):
1534        cmd = 'bbr enable'
1535        self.send_command(cmd)
1536        self._expect_done()
1537
1538    def disable_backbone_router(self):
1539        cmd = 'bbr disable'
1540        self.send_command(cmd)
1541        self._expect_done()
1542
1543    def register_backbone_router(self):
1544        cmd = 'bbr register'
1545        self.send_command(cmd)
1546        self._expect_done()
1547
1548    def get_backbone_router_state(self):
1549        states = [r'Disabled', r'Primary', r'Secondary']
1550        self.send_command('bbr state')
1551        return self._expect_result(states)
1552
1553    @property
1554    def is_primary_backbone_router(self) -> bool:
1555        return self.get_backbone_router_state() == 'Primary'
1556
1557    def get_backbone_router(self):
1558        cmd = 'bbr config'
1559        self.send_command(cmd)
1560        self._expect(r'(.*)Done')
1561        g = self.pexpect.match.groups()
1562        output = g[0].decode("utf-8")
1563        lines = output.strip().split('\n')
1564        lines = [l.strip() for l in lines]
1565        ret = {}
1566        for l in lines:
1567            z = re.search(r'seqno:\s+([0-9]+)', l)
1568            if z:
1569                ret['seqno'] = int(z.groups()[0])
1570
1571            z = re.search(r'delay:\s+([0-9]+)', l)
1572            if z:
1573                ret['delay'] = int(z.groups()[0])
1574
1575            z = re.search(r'timeout:\s+([0-9]+)', l)
1576            if z:
1577                ret['timeout'] = int(z.groups()[0])
1578
1579        return ret
1580
1581    def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None):
1582        cmd = 'bbr config'
1583
1584        if seqno is not None:
1585            cmd += ' seqno %d' % seqno
1586
1587        if reg_delay is not None:
1588            cmd += ' delay %d' % reg_delay
1589
1590        if mlr_timeout is not None:
1591            cmd += ' timeout %d' % mlr_timeout
1592
1593        self.send_command(cmd)
1594        self._expect_done()
1595
1596    def set_domain_prefix(self, prefix, flags='prosD'):
1597        self.add_prefix(prefix, flags)
1598        self.register_netdata()
1599
1600    def remove_domain_prefix(self, prefix):
1601        self.remove_prefix(prefix)
1602        self.register_netdata()
1603
1604    def set_next_dua_response(self, status: Union[str, int], iid=None):
1605        # Convert 5.00 to COAP CODE 160
1606        if isinstance(status, str):
1607            assert '.' in status
1608            status = status.split('.')
1609            status = (int(status[0]) << 5) + int(status[1])
1610
1611        cmd = 'bbr mgmt dua {}'.format(status)
1612        if iid is not None:
1613            cmd += ' ' + str(iid)
1614        self.send_command(cmd)
1615        self._expect_done()
1616
1617    def set_dua_iid(self, iid: str):
1618        assert len(iid) == 16
1619        int(iid, 16)
1620
1621        cmd = 'dua iid {}'.format(iid)
1622        self.send_command(cmd)
1623        self._expect_done()
1624
1625    def clear_dua_iid(self):
1626        cmd = 'dua iid clear'
1627        self.send_command(cmd)
1628        self._expect_done()
1629
1630    def multicast_listener_list(self) -> Dict[IPv6Address, int]:
1631        cmd = 'bbr mgmt mlr listener'
1632        self.send_command(cmd)
1633
1634        table = {}
1635        for line in self._expect_results("\S+ \d+"):
1636            line = line.split()
1637            assert len(line) == 2, line
1638            ip = IPv6Address(line[0])
1639            timeout = int(line[1])
1640            assert ip not in table
1641
1642            table[ip] = timeout
1643
1644        return table
1645
1646    def multicast_listener_clear(self):
1647        cmd = f'bbr mgmt mlr listener clear'
1648        self.send_command(cmd)
1649        self._expect_done()
1650
1651    def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0):
1652        if not isinstance(ip, IPv6Address):
1653            ip = IPv6Address(ip)
1654
1655        cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}'
1656        self.send_command(cmd)
1657        self._expect(r"(Done|Error .*)")
1658
1659    def set_next_mlr_response(self, status: int):
1660        cmd = 'bbr mgmt mlr response {}'.format(status)
1661        self.send_command(cmd)
1662        self._expect_done()
1663
1664    def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None):
1665        assert len(ipaddrs) > 0, ipaddrs
1666
1667        ipaddrs = map(str, ipaddrs)
1668        cmd = f'mlr reg {" ".join(ipaddrs)}'
1669        if timeout is not None:
1670            cmd += f' {int(timeout)}'
1671        self.send_command(cmd)
1672        self.simulator.go(3)
1673        lines = self._expect_command_output()
1674        m = re.match(r'status (\d+), (\d+) failed', lines[0])
1675        assert m is not None, lines
1676        status = int(m.group(1))
1677        failed_num = int(m.group(2))
1678        assert failed_num == len(lines) - 1
1679        failed_ips = list(map(IPv6Address, lines[1:]))
1680        print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}")
1681        return status, failed_ips
1682
1683    def set_link_quality(self, addr, lqi):
1684        cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi)
1685        self.send_command(cmd)
1686        self._expect_done()
1687
1688    def set_outbound_link_quality(self, lqi):
1689        cmd = 'macfilter rss add-lqi * %s' % (lqi)
1690        self.send_command(cmd)
1691        self._expect_done()
1692
1693    def remove_allowlist(self, addr):
1694        cmd = 'macfilter addr remove %s' % addr
1695        self.send_command(cmd)
1696        self._expect_done()
1697
1698    def get_addr16(self):
1699        self.send_command('rloc16')
1700        rloc16 = self._expect_result(r'[0-9a-fA-F]{4}')
1701        return int(rloc16, 16)
1702
1703    def get_router_id(self):
1704        rloc16 = self.get_addr16()
1705        return rloc16 >> 10
1706
1707    def get_addr64(self):
1708        self.send_command('extaddr')
1709        return self._expect_result('[0-9a-fA-F]{16}')
1710
1711    def set_addr64(self, addr64: str):
1712        # Make sure `addr64` is a hex string of length 16
1713        assert len(addr64) == 16
1714        int(addr64, 16)
1715        self.send_command('extaddr %s' % addr64)
1716        self._expect_done()
1717
1718    def get_eui64(self):
1719        self.send_command('eui64')
1720        return self._expect_result('[0-9a-fA-F]{16}')
1721
1722    def set_extpanid(self, extpanid):
1723        self.send_command('extpanid %s' % extpanid)
1724        self._expect_done()
1725
1726    def get_extpanid(self):
1727        self.send_command('extpanid')
1728        return self._expect_result('[0-9a-fA-F]{16}')
1729
1730    def get_mesh_local_prefix(self):
1731        self.send_command('prefix meshlocal')
1732        return self._expect_command_output()[0]
1733
1734    def set_mesh_local_prefix(self, mesh_local_prefix):
1735        self.send_command('prefix meshlocal %s' % mesh_local_prefix)
1736        self._expect_done()
1737
1738    def get_joiner_id(self):
1739        self.send_command('joiner id')
1740        return self._expect_result('[0-9a-fA-F]{16}')
1741
1742    def get_channel(self):
1743        self.send_command('channel')
1744        return int(self._expect_result(r'\d+'))
1745
1746    def set_channel(self, channel):
1747        cmd = 'channel %d' % channel
1748        self.send_command(cmd)
1749        self._expect_done()
1750
1751    def get_networkkey(self):
1752        self.send_command('networkkey')
1753        return self._expect_result('[0-9a-fA-F]{32}')
1754
1755    def set_networkkey(self, networkkey):
1756        cmd = 'networkkey %s' % networkkey
1757        self.send_command(cmd)
1758        self._expect_done()
1759
1760    def get_key_sequence_counter(self):
1761        self.send_command('keysequence counter')
1762        result = self._expect_result(r'\d+')
1763        return int(result)
1764
1765    def set_key_sequence_counter(self, key_sequence_counter):
1766        cmd = 'keysequence counter %d' % key_sequence_counter
1767        self.send_command(cmd)
1768        self._expect_done()
1769
1770    def get_key_switch_guardtime(self):
1771        self.send_command('keysequence guardtime')
1772        return int(self._expect_result(r'\d+'))
1773
1774    def set_key_switch_guardtime(self, key_switch_guardtime):
1775        cmd = 'keysequence guardtime %d' % key_switch_guardtime
1776        self.send_command(cmd)
1777        self._expect_done()
1778
1779    def set_network_id_timeout(self, network_id_timeout):
1780        cmd = 'networkidtimeout %d' % network_id_timeout
1781        self.send_command(cmd)
1782        self._expect_done()
1783
1784    def _escape_escapable(self, string):
1785        """Escape CLI escapable characters in the given string.
1786
1787        Args:
1788            string (str): UTF-8 input string.
1789
1790        Returns:
1791            [str]: The modified string with escaped characters.
1792        """
1793        escapable_chars = '\\ \t\r\n'
1794        for char in escapable_chars:
1795            string = string.replace(char, '\\%s' % char)
1796        return string
1797
1798    def get_network_name(self):
1799        self.send_command('networkname')
1800        return self._expect_result([r'\S+'])
1801
1802    def set_network_name(self, network_name):
1803        cmd = 'networkname %s' % self._escape_escapable(network_name)
1804        self.send_command(cmd)
1805        self._expect_done()
1806
1807    def get_panid(self):
1808        self.send_command('panid')
1809        result = self._expect_result('0x[0-9a-fA-F]{4}')
1810        return int(result, 16)
1811
1812    def set_panid(self, panid=config.PANID):
1813        cmd = 'panid %d' % panid
1814        self.send_command(cmd)
1815        self._expect_done()
1816
1817    def set_parent_priority(self, priority):
1818        cmd = 'parentpriority %d' % priority
1819        self.send_command(cmd)
1820        self._expect_done()
1821
1822    def get_partition_id(self):
1823        self.send_command('partitionid')
1824        return self._expect_result(r'\d+')
1825
1826    def get_preferred_partition_id(self):
1827        self.send_command('partitionid preferred')
1828        return self._expect_result(r'\d+')
1829
1830    def set_preferred_partition_id(self, partition_id):
1831        cmd = 'partitionid preferred %d' % partition_id
1832        self.send_command(cmd)
1833        self._expect_done()
1834
1835    def get_pollperiod(self):
1836        self.send_command('pollperiod')
1837        return self._expect_result(r'\d+')
1838
1839    def set_pollperiod(self, pollperiod):
1840        self.send_command('pollperiod %d' % pollperiod)
1841        self._expect_done()
1842
1843    def get_child_supervision_interval(self):
1844        self.send_command('childsupervision interval')
1845        return self._expect_result(r'\d+')
1846
1847    def set_child_supervision_interval(self, interval):
1848        self.send_command('childsupervision interval %d' % interval)
1849        self._expect_done()
1850
1851    def get_child_supervision_check_timeout(self):
1852        self.send_command('childsupervision checktimeout')
1853        return self._expect_result(r'\d+')
1854
1855    def set_child_supervision_check_timeout(self, timeout):
1856        self.send_command('childsupervision checktimeout %d' % timeout)
1857        self._expect_done()
1858
1859    def get_child_supervision_check_failure_counter(self):
1860        self.send_command('childsupervision failcounter')
1861        return self._expect_result(r'\d+')
1862
1863    def reset_child_supervision_check_failure_counter(self):
1864        self.send_command('childsupervision failcounter reset')
1865        self._expect_done()
1866
1867    def get_csl_info(self):
1868        self.send_command('csl')
1869        return self._expect_key_value_pairs(r'\S+')
1870
1871    def set_csl_channel(self, csl_channel):
1872        self.send_command('csl channel %d' % csl_channel)
1873        self._expect_done()
1874
1875    def set_csl_period(self, csl_period):
1876        self.send_command('csl period %d' % csl_period)
1877        self._expect_done()
1878
1879    def set_csl_timeout(self, csl_timeout):
1880        self.send_command('csl timeout %d' % csl_timeout)
1881        self._expect_done()
1882
1883    def send_mac_emptydata(self):
1884        self.send_command('mac send emptydata')
1885        self._expect_done()
1886
1887    def send_mac_datarequest(self):
1888        self.send_command('mac send datarequest')
1889        self._expect_done()
1890
1891    def set_router_upgrade_threshold(self, threshold):
1892        cmd = 'routerupgradethreshold %d' % threshold
1893        self.send_command(cmd)
1894        self._expect_done()
1895
1896    def set_router_downgrade_threshold(self, threshold):
1897        cmd = 'routerdowngradethreshold %d' % threshold
1898        self.send_command(cmd)
1899        self._expect_done()
1900
1901    def get_router_downgrade_threshold(self) -> int:
1902        self.send_command('routerdowngradethreshold')
1903        return int(self._expect_result(r'\d+'))
1904
1905    def set_router_eligible(self, enable: bool):
1906        cmd = f'routereligible {"enable" if enable else "disable"}'
1907        self.send_command(cmd)
1908        self._expect_done()
1909
1910    def get_router_eligible(self) -> bool:
1911        states = [r'Disabled', r'Enabled']
1912        self.send_command('routereligible')
1913        return self._expect_result(states) == 'Enabled'
1914
1915    def prefer_router_id(self, router_id):
1916        cmd = 'preferrouterid %d' % router_id
1917        self.send_command(cmd)
1918        self._expect_done()
1919
1920    def release_router_id(self, router_id):
1921        cmd = 'releaserouterid %d' % router_id
1922        self.send_command(cmd)
1923        self._expect_done()
1924
1925    def get_state(self):
1926        states = [r'detached', r'child', r'router', r'leader', r'disabled']
1927        self.send_command('state')
1928        return self._expect_result(states)
1929
1930    def set_state(self, state):
1931        cmd = 'state %s' % state
1932        self.send_command(cmd)
1933        self._expect_done()
1934
1935    def get_ephemeral_key_state(self):
1936        cmd = 'ba ephemeralkey'
1937        states = [r'inactive', r'active']
1938        self.send_command(cmd)
1939        return self._expect_result(states)
1940
1941    def get_timeout(self):
1942        self.send_command('childtimeout')
1943        return self._expect_result(r'\d+')
1944
1945    def set_timeout(self, timeout):
1946        cmd = 'childtimeout %d' % timeout
1947        self.send_command(cmd)
1948        self._expect_done()
1949
1950    def set_max_children(self, number):
1951        cmd = 'childmax %d' % number
1952        self.send_command(cmd)
1953        self._expect_done()
1954
1955    def get_weight(self):
1956        self.send_command('leaderweight')
1957        return self._expect_result(r'\d+')
1958
1959    def set_weight(self, weight):
1960        cmd = 'leaderweight %d' % weight
1961        self.send_command(cmd)
1962        self._expect_done()
1963
1964    def add_ipaddr(self, ipaddr):
1965        cmd = 'ipaddr add %s' % ipaddr
1966        self.send_command(cmd)
1967        self._expect_done()
1968
1969    def del_ipaddr(self, ipaddr):
1970        cmd = 'ipaddr del %s' % ipaddr
1971        self.send_command(cmd)
1972        self._expect_done()
1973
1974    def add_ipmaddr(self, ipmaddr):
1975        cmd = 'ipmaddr add %s' % ipmaddr
1976        self.send_command(cmd)
1977        self._expect_done()
1978
1979    def del_ipmaddr(self, ipmaddr):
1980        cmd = 'ipmaddr del %s' % ipmaddr
1981        self.send_command(cmd)
1982        self._expect_done()
1983
1984    def get_addrs(self, verbose=False):
1985        self.send_command('ipaddr' + (' -v' if verbose else ''))
1986
1987        return self._expect_results(r'\S+(:\S*)+')
1988
1989    def get_mleid(self):
1990        self.send_command('ipaddr mleid')
1991        return self._expect_result(r'\S+(:\S*)+')
1992
1993    def get_linklocal(self):
1994        self.send_command('ipaddr linklocal')
1995        return self._expect_result(r'\S+(:\S*)+')
1996
1997    def get_rloc(self):
1998        self.send_command('ipaddr rloc')
1999        return self._expect_result(r'\S+(:\S*)+')
2000
2001    def get_addr(self, prefix):
2002        network = ipaddress.ip_network(u'%s' % str(prefix))
2003        addrs = self.get_addrs()
2004
2005        for addr in addrs:
2006            if isinstance(addr, bytearray):
2007                addr = bytes(addr)
2008            ipv6_address = ipaddress.ip_address(addr)
2009            if ipv6_address in network:
2010                return ipv6_address.exploded
2011
2012        return None
2013
2014    def has_ipaddr(self, address):
2015        ipaddr = ipaddress.ip_address(address)
2016        ipaddrs = self.get_addrs()
2017        for addr in ipaddrs:
2018            if isinstance(addr, bytearray):
2019                addr = bytes(addr)
2020            if ipaddress.ip_address(addr) == ipaddr:
2021                return True
2022        return False
2023
2024    def get_ipmaddrs(self):
2025        self.send_command('ipmaddr')
2026        return self._expect_results(r'\S+(:\S*)+')
2027
2028    def has_ipmaddr(self, address):
2029        ipmaddr = ipaddress.ip_address(address)
2030        ipmaddrs = self.get_ipmaddrs()
2031        for addr in ipmaddrs:
2032            if isinstance(addr, bytearray):
2033                addr = bytes(addr)
2034            if ipaddress.ip_address(addr) == ipmaddr:
2035                return True
2036        return False
2037
2038    def get_addr_leader_aloc(self):
2039        addrs = self.get_addrs()
2040        for addr in addrs:
2041            segs = addr.split(':')
2042            if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'):
2043                return addr
2044        return None
2045
2046    def get_mleid_iid(self):
2047        ml_eid = IPv6Address(self.get_mleid())
2048        return ml_eid.packed[8:].hex()
2049
2050    def get_eidcaches(self):
2051        eidcaches = []
2052        self.send_command('eidcache')
2053        for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'):
2054            eidcaches.append(line.split())
2055
2056        return eidcaches
2057
2058    def add_service(self, enterpriseNumber, serviceData, serverData):
2059        cmd = 'service add %s %s %s' % (
2060            enterpriseNumber,
2061            serviceData,
2062            serverData,
2063        )
2064        self.send_command(cmd)
2065        self._expect_done()
2066
2067    def remove_service(self, enterpriseNumber, serviceData):
2068        cmd = 'service remove %s %s' % (enterpriseNumber, serviceData)
2069        self.send_command(cmd)
2070        self._expect_done()
2071
2072    def get_child_table(self) -> Dict[int, Dict[str, Any]]:
2073        """Get the table of attached children."""
2074        cmd = 'child table'
2075        self.send_command(cmd)
2076        output = self._expect_command_output()
2077
2078        #
2079        # Example output:
2080        # | ID  | RLOC16 | Timeout    | Age        | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt|Suprvsn| Extended MAC     |
2081        # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+-------+------------------+
2082        # |   1 | 0xc801 |        240 |         24 |     3 |  131 |1|0|0|  3| 0 |     0 |   129 | 4ecede68435358ac |
2083        # |   2 | 0xc802 |        240 |          2 |     3 |  131 |0|0|0|  3| 1 |     0 |     0 | a672a601d2ce37d8 |
2084        # Done
2085        #
2086
2087        headers = self.__split_table_row(output[0])
2088
2089        table = {}
2090        for line in output[2:]:
2091            line = line.strip()
2092            if not line:
2093                continue
2094
2095            fields = self.__split_table_row(line)
2096            col = lambda colname: self.__get_table_col(colname, headers, fields)
2097
2098            id = int(col("ID"))
2099            r, d, n = int(col("R")), int(col("D")), int(col("N"))
2100            mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}'
2101
2102            table[int(id)] = {
2103                'id': int(id),
2104                'rloc16': int(col('RLOC16'), 16),
2105                'timeout': int(col('Timeout')),
2106                'age': int(col('Age')),
2107                'lq_in': int(col('LQ In')),
2108                'c_vn': int(col('C_VN')),
2109                'mode': mode,
2110                'extaddr': col('Extended MAC'),
2111                'ver': int(col('Ver')),
2112                'csl': bool(int(col('CSL'))),
2113                'qmsgcnt': int(col('QMsgCnt')),
2114                'suprvsn': int(col('Suprvsn'))
2115            }
2116
2117        return table
2118
2119    def __split_table_row(self, row: str) -> List[str]:
2120        if not (row.startswith('|') and row.endswith('|')):
2121            raise ValueError(row)
2122
2123        fields = row.split('|')
2124        fields = [x.strip() for x in fields[1:-1]]
2125        return fields
2126
2127    def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str:
2128        return fields[headers.index(colname)]
2129
2130    def __getOmrAddress(self):
2131        prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()]
2132        omr_addrs = []
2133        for addr in self.get_addrs():
2134            for prefix in prefixes:
2135                if (addr.startswith(prefix)) and (addr != self.__getDua()):
2136                    omr_addrs.append(addr)
2137                    break
2138
2139        return omr_addrs
2140
2141    def __getLinkLocalAddress(self):
2142        for ip6Addr in self.get_addrs():
2143            if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I):
2144                return ip6Addr
2145
2146        return None
2147
2148    def __getGlobalAddress(self):
2149        global_address = []
2150        for ip6Addr in self.get_addrs():
2151            if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and
2152                (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and
2153                (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))):
2154                global_address.append(ip6Addr)
2155
2156        return global_address
2157
2158    def __getRloc(self):
2159        for ip6Addr in self.get_addrs():
2160            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2161                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2162                    not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))):
2163                return ip6Addr
2164        return None
2165
2166    def __getAloc(self):
2167        aloc = []
2168        for ip6Addr in self.get_addrs():
2169            if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and
2170                    re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and
2171                    re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)):
2172                aloc.append(ip6Addr)
2173
2174        return aloc
2175
2176    def __getMleid(self):
2177        for ip6Addr in self.get_addrs():
2178            if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr,
2179                        re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)):
2180                return ip6Addr
2181
2182        return None
2183
2184    def __getDua(self) -> Optional[str]:
2185        for ip6Addr in self.get_addrs():
2186            if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I):
2187                return ip6Addr
2188
2189        return None
2190
2191    def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]:
2192        """Get addresses matched with given prefix.
2193
2194        Args:
2195            prefix: the prefix to match against.
2196                    Can be either a string or ipaddress.IPv6Network.
2197
2198        Returns:
2199            The IPv6 address list.
2200        """
2201        if isinstance(prefix, str):
2202            prefix = IPv6Network(prefix)
2203        addrs = map(IPv6Address, self.get_addrs())
2204
2205        return [addr for addr in addrs if addr in prefix]
2206
2207    def get_ip6_address(self, address_type):
2208        """Get specific type of IPv6 address configured on thread device.
2209
2210        Args:
2211            address_type: the config.ADDRESS_TYPE type of IPv6 address.
2212
2213        Returns:
2214            IPv6 address string.
2215        """
2216        if address_type == config.ADDRESS_TYPE.LINK_LOCAL:
2217            return self.__getLinkLocalAddress()
2218        elif address_type == config.ADDRESS_TYPE.GLOBAL:
2219            return self.__getGlobalAddress()
2220        elif address_type == config.ADDRESS_TYPE.RLOC:
2221            return self.__getRloc()
2222        elif address_type == config.ADDRESS_TYPE.ALOC:
2223            return self.__getAloc()
2224        elif address_type == config.ADDRESS_TYPE.ML_EID:
2225            return self.__getMleid()
2226        elif address_type == config.ADDRESS_TYPE.DUA:
2227            return self.__getDua()
2228        elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
2229            return self._getBackboneGua()
2230        elif address_type == config.ADDRESS_TYPE.OMR:
2231            return self.__getOmrAddress()
2232        else:
2233            return None
2234
2235    def get_context_reuse_delay(self):
2236        self.send_command('contextreusedelay')
2237        return self._expect_result(r'\d+')
2238
2239    def set_context_reuse_delay(self, delay):
2240        cmd = 'contextreusedelay %d' % delay
2241        self.send_command(cmd)
2242        self._expect_done()
2243
2244    def add_prefix(self, prefix, flags='paosr', prf='med'):
2245        cmd = 'prefix add %s %s %s' % (prefix, flags, prf)
2246        self.send_command(cmd)
2247        self._expect_done()
2248
2249    def remove_prefix(self, prefix):
2250        cmd = 'prefix remove %s' % prefix
2251        self.send_command(cmd)
2252        self._expect_done()
2253
2254    #
2255    # BR commands
2256    #
2257    def enable_br(self):
2258        self.send_command('br enable')
2259        self._expect_done()
2260
2261    def disable_br(self):
2262        self.send_command('br disable')
2263        self._expect_done()
2264
2265    def get_br_omr_prefix(self):
2266        cmd = 'br omrprefix local'
2267        self.send_command(cmd)
2268        return self._expect_command_output()[0]
2269
2270    def get_br_peers(self) -> List[str]:
2271        # Example output of `br peers` command:
2272        #   rloc16:0xa800 age:00:00:50
2273        #   rloc16:0x6800 age:00:00:51
2274        #   Done
2275        self.send_command('br peers')
2276        return self._expect_command_output()
2277
2278    def get_br_peers_rloc16s(self) -> List[int]:
2279        """parse `br peers` output and return the list of RLOC16s"""
2280        return [
2281            int(pair.split(':')[1], 16)
2282            for line in self.get_br_peers()
2283            for pair in line.split()
2284            if pair.split(':')[0] == 'rloc16'
2285        ]
2286
2287    def get_br_routers(self) -> List[str]:
2288        # Example output of `br routers` command:
2289        #   fe80:0:0:0:42:acff:fe14:3 (M:0 O:0 Stub:1) ms-since-rx:144160 reachable:yes age:00:17:36 (peer BR)
2290        #   fe80:0:0:0:42:acff:fe14:2 (M:0 O:0 Stub:1) ms-since-rx:45179 reachable:yes age:00:17:36
2291        #   Done
2292        self.send_command('br routers')
2293        return self._expect_command_output()
2294
2295    def get_br_routers_ip_addresses(self) -> List[IPv6Address]:
2296        """parse `br routers` output and return the list of IPv6 addresses"""
2297        return [IPv6Address(line.split()[0]) for line in self.get_br_routers()]
2298
2299    def get_netdata_omr_prefixes(self):
2300        omr_prefixes = []
2301        for prefix in self.get_prefixes():
2302            prefix, flags = prefix.split()[:2]
2303            if 'a' in flags and 'o' in flags and 's' in flags and 'D' not in flags:
2304                omr_prefixes.append(prefix)
2305
2306        return omr_prefixes
2307
2308    def get_br_on_link_prefix(self):
2309        cmd = 'br onlinkprefix local'
2310        self.send_command(cmd)
2311        return self._expect_command_output()[0]
2312
2313    def get_netdata_non_nat64_routes(self):
2314        nat64_routes = []
2315        routes = self.get_routes()
2316        for route in routes:
2317            if 'n' not in route.split(' ')[1]:
2318                nat64_routes.append(route.split(' ')[0])
2319        return nat64_routes
2320
2321    def get_netdata_nat64_routes(self):
2322        nat64_routes = []
2323        routes = self.get_routes()
2324        for route in routes:
2325            if 'n' in route.split(' ')[1]:
2326                nat64_routes.append(route.split(' ')[0])
2327        return nat64_routes
2328
2329    def get_br_nat64_prefix(self):
2330        cmd = 'br nat64prefix local'
2331        self.send_command(cmd)
2332        return self._expect_command_output()[0]
2333
2334    def get_br_favored_nat64_prefix(self):
2335        cmd = 'br nat64prefix favored'
2336        self.send_command(cmd)
2337        return self._expect_command_output()[0].split(' ')[0]
2338
2339    def enable_nat64(self):
2340        self.send_command(f'nat64 enable')
2341        self._expect_done()
2342
2343    def disable_nat64(self):
2344        self.send_command(f'nat64 disable')
2345        self._expect_done()
2346
2347    def get_nat64_state(self):
2348        self.send_command('nat64 state')
2349        res = {}
2350        for line in self._expect_command_output():
2351            state = line.split(':')
2352            res[state[0].strip()] = state[1].strip()
2353        return res
2354
2355    def get_nat64_mappings(self):
2356        cmd = 'nat64 mappings'
2357        self.send_command(cmd)
2358        result = self._expect_command_output()
2359        session = None
2360        session_counters = None
2361        sessions = []
2362
2363        for line in result:
2364            m = re.match(
2365                r'\|\s+([a-f0-9]+)\s+\|\s+(.+)\s+\|\s+(.+)\s+\|\s+(\d+)s\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|',
2366                line)
2367            if m:
2368                groups = m.groups()
2369                if session:
2370                    session['counters'] = session_counters
2371                    sessions.append(session)
2372                session = {
2373                    'id': groups[0],
2374                    'ip6': groups[1],
2375                    'ip4': groups[2],
2376                    'expiry': int(groups[3]),
2377                }
2378                session_counters = {}
2379                session_counters['total'] = {
2380                    '4to6': {
2381                        'packets': int(groups[4]),
2382                        'bytes': int(groups[5]),
2383                    },
2384                    '6to4': {
2385                        'packets': int(groups[6]),
2386                        'bytes': int(groups[7]),
2387                    },
2388                }
2389                continue
2390            if not session:
2391                continue
2392            m = re.match(r'\|\s+\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2393            if m:
2394                groups = m.groups()
2395                session_counters[groups[0]] = {
2396                    '4to6': {
2397                        'packets': int(groups[1]),
2398                        'bytes': int(groups[2]),
2399                    },
2400                    '6to4': {
2401                        'packets': int(groups[3]),
2402                        'bytes': int(groups[4]),
2403                    },
2404                }
2405        if session:
2406            session['counters'] = session_counters
2407            sessions.append(session)
2408        return sessions
2409
2410    def get_nat64_counters(self):
2411        cmd = 'nat64 counters'
2412        self.send_command(cmd)
2413        result = self._expect_command_output()
2414
2415        protocol_counters = {}
2416        error_counters = {}
2417        for line in result:
2418            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2419            if m:
2420                groups = m.groups()
2421                protocol_counters[groups[0]] = {
2422                    '4to6': {
2423                        'packets': int(groups[1]),
2424                        'bytes': int(groups[2]),
2425                    },
2426                    '6to4': {
2427                        'packets': int(groups[3]),
2428                        'bytes': int(groups[4]),
2429                    },
2430                }
2431                continue
2432            m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line)
2433            if m:
2434                groups = m.groups()
2435                error_counters[groups[0]] = {
2436                    '4to6': {
2437                        'packets': int(groups[1]),
2438                    },
2439                    '6to4': {
2440                        'packets': int(groups[2]),
2441                    },
2442                }
2443                continue
2444        return {'protocol': protocol_counters, 'errors': error_counters}
2445
2446    def get_prefixes(self):
2447        return self.get_netdata()['Prefixes']
2448
2449    def get_routes(self):
2450        return self.get_netdata()['Routes']
2451
2452    def get_services(self):
2453        netdata = self.netdata_show()
2454        services = []
2455        services_section = False
2456
2457        for line in netdata:
2458            if line.startswith('Services:'):
2459                services_section = True
2460            elif line.startswith('Contexts'):
2461                services_section = False
2462            elif services_section:
2463                services.append(line.strip().split(' '))
2464        return services
2465
2466    def netdata_show(self):
2467        self.send_command('netdata show')
2468        return self._expect_command_output()
2469
2470    def get_netdata(self):
2471        raw_netdata = self.netdata_show()
2472        netdata = {'Prefixes': [], 'Routes': [], 'Services': [], 'Contexts': [], 'Commissioning': []}
2473        key_list = ['Prefixes', 'Routes', 'Services', 'Contexts', 'Commissioning']
2474        key = None
2475
2476        for i in range(0, len(raw_netdata)):
2477            keys = list(filter(raw_netdata[i].startswith, key_list))
2478            if keys != []:
2479                key = keys[0]
2480            elif key is not None:
2481                netdata[key].append(raw_netdata[i])
2482
2483        return netdata
2484
2485    def add_route(self, prefix, stable=False, nat64=False, prf='med'):
2486        cmd = 'route add %s ' % prefix
2487        if stable:
2488            cmd += 's'
2489        if nat64:
2490            cmd += 'n'
2491        cmd += ' %s' % prf
2492        self.send_command(cmd)
2493        self._expect_done()
2494
2495    def remove_route(self, prefix):
2496        cmd = 'route remove %s' % prefix
2497        self.send_command(cmd)
2498        self._expect_done()
2499
2500    def register_netdata(self):
2501        self.send_command('netdata register')
2502        self._expect_done()
2503
2504    def netdata_publish_dnssrp_anycast(self, seqnum):
2505        self.send_command(f'netdata publish dnssrp anycast {seqnum}')
2506        self._expect_done()
2507
2508    def netdata_publish_dnssrp_unicast(self, address, port):
2509        self.send_command(f'netdata publish dnssrp unicast {address} {port}')
2510        self._expect_done()
2511
2512    def netdata_publish_dnssrp_unicast_mleid(self, port):
2513        self.send_command(f'netdata publish dnssrp unicast {port}')
2514        self._expect_done()
2515
2516    def netdata_unpublish_dnssrp(self):
2517        self.send_command('netdata unpublish dnssrp')
2518        self._expect_done()
2519
2520    def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'):
2521        self.send_command(f'netdata publish prefix {prefix} {flags} {prf}')
2522        self._expect_done()
2523
2524    def netdata_publish_route(self, prefix, flags='s', prf='med'):
2525        self.send_command(f'netdata publish route {prefix} {flags} {prf}')
2526        self._expect_done()
2527
2528    def netdata_publish_replace(self, old_prefix, prefix, flags='s', prf='med'):
2529        self.send_command(f'netdata publish replace {old_prefix} {prefix} {flags} {prf}')
2530        self._expect_done()
2531
2532    def netdata_unpublish_prefix(self, prefix):
2533        self.send_command(f'netdata unpublish {prefix}')
2534        self._expect_done()
2535
2536    def send_network_diag_get(self, addr, tlv_types):
2537        self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2538
2539        if isinstance(self.simulator, simulator.VirtualTime):
2540            self.simulator.go(8)
2541            timeout = 1
2542        else:
2543            timeout = 8
2544
2545        self._expect_done(timeout=timeout)
2546
2547    def send_network_diag_reset(self, addr, tlv_types):
2548        self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types])))
2549
2550        if isinstance(self.simulator, simulator.VirtualTime):
2551            self.simulator.go(8)
2552            timeout = 1
2553        else:
2554            timeout = 8
2555
2556        self._expect_done(timeout=timeout)
2557
2558    def energy_scan(self, mask, count, period, scan_duration, ipaddr):
2559        cmd = 'commissioner energy %d %d %d %d %s' % (
2560            mask,
2561            count,
2562            period,
2563            scan_duration,
2564            ipaddr,
2565        )
2566        self.send_command(cmd)
2567
2568        if isinstance(self.simulator, simulator.VirtualTime):
2569            self.simulator.go(8)
2570            timeout = 1
2571        else:
2572            timeout = 8
2573
2574        self._expect('Energy:', timeout=timeout)
2575
2576    def panid_query(self, panid, mask, ipaddr):
2577        cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr)
2578        self.send_command(cmd)
2579
2580        if isinstance(self.simulator, simulator.VirtualTime):
2581            self.simulator.go(8)
2582            timeout = 1
2583        else:
2584            timeout = 8
2585
2586        self._expect('Conflict:', timeout=timeout)
2587
2588    def scan(self, result=1, timeout=10):
2589        self.send_command('scan')
2590
2591        self.simulator.go(timeout)
2592
2593        if result == 1:
2594            networks = []
2595            for line in self._expect_command_output()[2:]:
2596                _, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|'))
2597                panid = int(panid, 16)
2598                channel, dbm, lqi = map(int, (channel, dbm, lqi))
2599
2600                networks.append({
2601                    'panid': panid,
2602                    'extaddr': extaddr,
2603                    'channel': channel,
2604                    'dbm': dbm,
2605                    'lqi': lqi,
2606                })
2607            return networks
2608
2609    def scan_energy(self, timeout=10):
2610        self.send_command('scan energy')
2611        self.simulator.go(timeout)
2612        rssi_list = []
2613        for line in self._expect_command_output()[2:]:
2614            _, channel, rssi, _ = line.split('|')
2615            rssi_list.append({
2616                'channel': int(channel.strip()),
2617                'rssi': int(rssi.strip()),
2618            })
2619        return rssi_list
2620
2621    def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None):
2622        args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}'
2623        if interface is not None:
2624            args = f'-I {interface} {args}'
2625        cmd = f'ping {args}'
2626
2627        self.send_command(cmd)
2628
2629        wait_allowance = 3
2630        end = self.simulator.now() + timeout + wait_allowance
2631
2632        responders = {}
2633
2634        result = True
2635        # ncp-sim doesn't print Done
2636        done = (self.node_type == 'ncp-sim')
2637        while len(responders) < num_responses or not done:
2638            self.simulator.go(1)
2639            try:
2640                i = self._expect([r'from (\S+):', r'Done'], timeout=0.1)
2641            except (pexpect.TIMEOUT, socket.timeout):
2642                if self.simulator.now() < end:
2643                    continue
2644                result = False
2645                if isinstance(self.simulator, simulator.VirtualTime):
2646                    self.simulator.sync_devices()
2647                break
2648            else:
2649                if i == 0:
2650                    responders[self.pexpect.match.groups()[0]] = 1
2651                elif i == 1:
2652                    done = True
2653        return result
2654
2655    def reset(self):
2656        self._reset('reset')
2657
2658    def factory_reset(self):
2659        self._reset('factoryreset')
2660
2661    def _reset(self, cmd):
2662        self.send_command(cmd, expect_command_echo=False)
2663        time.sleep(self.RESET_DELAY)
2664        # Send a "version" command and drain the CLI output after reset
2665        self.send_command('version', expect_command_echo=False)
2666        while True:
2667            try:
2668                self._expect(r"[^\n]+\n", timeout=0.1)
2669                continue
2670            except pexpect.TIMEOUT:
2671                break
2672
2673        if self.is_otbr:
2674            self.set_log_level(5)
2675
2676    def set_router_selection_jitter(self, jitter):
2677        cmd = 'routerselectionjitter %d' % jitter
2678        self.send_command(cmd)
2679        self._expect_done()
2680
2681    def set_active_dataset(
2682        self,
2683        timestamp=None,
2684        channel=None,
2685        channel_mask=None,
2686        extended_panid=None,
2687        mesh_local_prefix=None,
2688        network_key=None,
2689        network_name=None,
2690        panid=None,
2691        pskc=None,
2692        security_policy=[],
2693        updateExisting=False,
2694    ):
2695
2696        if updateExisting:
2697            self.send_command('dataset init active', go=False)
2698        else:
2699            self.send_command('dataset clear', go=False)
2700        self._expect_done()
2701
2702        if timestamp is not None:
2703            cmd = 'dataset activetimestamp %d' % timestamp
2704            self.send_command(cmd, go=False)
2705            self._expect_done()
2706
2707        if channel is not None:
2708            cmd = 'dataset channel %d' % channel
2709            self.send_command(cmd, go=False)
2710            self._expect_done()
2711
2712        if channel_mask is not None:
2713            cmd = 'dataset channelmask %d' % channel_mask
2714            self.send_command(cmd, go=False)
2715            self._expect_done()
2716
2717        if extended_panid is not None:
2718            cmd = 'dataset extpanid %s' % extended_panid
2719            self.send_command(cmd, go=False)
2720            self._expect_done()
2721
2722        if mesh_local_prefix is not None:
2723            cmd = 'dataset meshlocalprefix %s' % mesh_local_prefix
2724            self.send_command(cmd, go=False)
2725            self._expect_done()
2726
2727        if network_key is not None:
2728            cmd = 'dataset networkkey %s' % network_key
2729            self.send_command(cmd, go=False)
2730            self._expect_done()
2731
2732        if network_name is not None:
2733            cmd = 'dataset networkname %s' % network_name
2734            self.send_command(cmd, go=False)
2735            self._expect_done()
2736
2737        if panid is not None:
2738            cmd = 'dataset panid %d' % panid
2739            self.send_command(cmd, go=False)
2740            self._expect_done()
2741
2742        if pskc is not None:
2743            cmd = 'dataset pskc %s' % pskc
2744            self.send_command(cmd, go=False)
2745            self._expect_done()
2746
2747        if security_policy is not None:
2748            if len(security_policy) >= 2:
2749                cmd = 'dataset securitypolicy %s %s' % (
2750                    str(security_policy[0]),
2751                    security_policy[1],
2752                )
2753            if len(security_policy) >= 3:
2754                cmd += ' %s' % (str(security_policy[2]))
2755            self.send_command(cmd, go=False)
2756            self._expect_done()
2757
2758        self.send_command('dataset commit active', go=False)
2759        self._expect_done()
2760
2761    def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None):
2762        self.send_command('dataset clear')
2763        self._expect_done()
2764
2765        cmd = 'dataset pendingtimestamp %d' % pendingtimestamp
2766        self.send_command(cmd)
2767        self._expect_done()
2768
2769        cmd = 'dataset activetimestamp %d' % activetimestamp
2770        self.send_command(cmd)
2771        self._expect_done()
2772
2773        if panid is not None:
2774            cmd = 'dataset panid %d' % panid
2775            self.send_command(cmd)
2776            self._expect_done()
2777
2778        if channel is not None:
2779            cmd = 'dataset channel %d' % channel
2780            self.send_command(cmd)
2781            self._expect_done()
2782
2783        if delay is not None:
2784            cmd = 'dataset delay %d' % delay
2785            self.send_command(cmd)
2786            self._expect_done()
2787
2788        # Set the meshlocal prefix in config.py
2789        self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0])
2790        self._expect_done()
2791
2792        self.send_command('dataset commit pending')
2793        self._expect_done()
2794
2795    def start_dataset_updater(self, panid=None, channel=None, security_policy=None, delay=None):
2796        self.send_command('dataset clear')
2797        self._expect_done()
2798
2799        if panid is not None:
2800            cmd = 'dataset panid %d' % panid
2801            self.send_command(cmd)
2802            self._expect_done()
2803
2804        if channel is not None:
2805            cmd = 'dataset channel %d' % channel
2806            self.send_command(cmd)
2807            self._expect_done()
2808
2809        if security_policy is not None:
2810            cmd = 'dataset securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2811            if (len(security_policy) >= 3):
2812                cmd += '%d ' % (security_policy[2])
2813            self.send_command(cmd)
2814            self._expect_done()
2815
2816        if delay is not None:
2817            cmd = 'dataset delay %d ' % delay
2818            self.send_command(cmd)
2819            self._expect_done()
2820
2821        self.send_command('dataset updater start')
2822        self._expect_done()
2823
2824    def announce_begin(self, mask, count, period, ipaddr):
2825        cmd = 'commissioner announce %d %d %d %s' % (
2826            mask,
2827            count,
2828            period,
2829            ipaddr,
2830        )
2831        self.send_command(cmd)
2832        self._expect_done()
2833
2834    def send_mgmt_active_set(
2835        self,
2836        active_timestamp=None,
2837        channel=None,
2838        channel_mask=None,
2839        extended_panid=None,
2840        panid=None,
2841        network_key=None,
2842        mesh_local=None,
2843        network_name=None,
2844        security_policy=None,
2845        binary=None,
2846    ):
2847        cmd = 'dataset mgmtsetcommand active '
2848
2849        if active_timestamp is not None:
2850            cmd += 'activetimestamp %d ' % active_timestamp
2851
2852        if channel is not None:
2853            cmd += 'channel %d ' % channel
2854
2855        if channel_mask is not None:
2856            cmd += 'channelmask %d ' % channel_mask
2857
2858        if extended_panid is not None:
2859            cmd += 'extpanid %s ' % extended_panid
2860
2861        if panid is not None:
2862            cmd += 'panid %d ' % panid
2863
2864        if network_key is not None:
2865            cmd += 'networkkey %s ' % network_key
2866
2867        if mesh_local is not None:
2868            cmd += 'localprefix %s ' % mesh_local
2869
2870        if network_name is not None:
2871            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2872
2873        if security_policy is not None:
2874            cmd += 'securitypolicy %d %s ' % (security_policy[0], security_policy[1])
2875            if (len(security_policy) >= 3):
2876                cmd += '%d ' % (security_policy[2])
2877
2878        if binary is not None:
2879            cmd += '-x %s ' % binary
2880
2881        self.send_command(cmd)
2882        self._expect_done()
2883
2884    def send_mgmt_active_get(self, addr='', tlvs=[]):
2885        cmd = 'dataset mgmtgetcommand active'
2886
2887        if addr != '':
2888            cmd += ' address '
2889            cmd += addr
2890
2891        if len(tlvs) != 0:
2892            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2893            cmd += ' -x '
2894            cmd += tlv_str
2895
2896        self.send_command(cmd)
2897        self._expect_done()
2898
2899    def send_mgmt_pending_get(self, addr='', tlvs=[]):
2900        cmd = 'dataset mgmtgetcommand pending'
2901
2902        if addr != '':
2903            cmd += ' address '
2904            cmd += addr
2905
2906        if len(tlvs) != 0:
2907            tlv_str = ''.join('%02x' % tlv for tlv in tlvs)
2908            cmd += ' -x '
2909            cmd += tlv_str
2910
2911        self.send_command(cmd)
2912        self._expect_done()
2913
2914    def send_mgmt_pending_set(
2915        self,
2916        pending_timestamp=None,
2917        active_timestamp=None,
2918        delay_timer=None,
2919        channel=None,
2920        panid=None,
2921        network_key=None,
2922        mesh_local=None,
2923        network_name=None,
2924    ):
2925        cmd = 'dataset mgmtsetcommand pending '
2926        if pending_timestamp is not None:
2927            cmd += 'pendingtimestamp %d ' % pending_timestamp
2928
2929        if active_timestamp is not None:
2930            cmd += 'activetimestamp %d ' % active_timestamp
2931
2932        if delay_timer is not None:
2933            cmd += 'delaytimer %d ' % delay_timer
2934
2935        if channel is not None:
2936            cmd += 'channel %d ' % channel
2937
2938        if panid is not None:
2939            cmd += 'panid %d ' % panid
2940
2941        if network_key is not None:
2942            cmd += 'networkkey %s ' % network_key
2943
2944        if mesh_local is not None:
2945            cmd += 'localprefix %s ' % mesh_local
2946
2947        if network_name is not None:
2948            cmd += 'networkname %s ' % self._escape_escapable(network_name)
2949
2950        self.send_command(cmd)
2951        self._expect_done()
2952
2953    def coap_cancel(self):
2954        """
2955        Cancel a CoAP subscription.
2956        """
2957        cmd = 'coap cancel'
2958        self.send_command(cmd)
2959        self._expect_done()
2960
2961    def coap_delete(self, ipaddr, uri, con=False, payload=None):
2962        """
2963        Send a DELETE request via CoAP.
2964        """
2965        return self._coap_rq('delete', ipaddr, uri, con, payload)
2966
2967    def coap_get(self, ipaddr, uri, con=False, payload=None):
2968        """
2969        Send a GET request via CoAP.
2970        """
2971        return self._coap_rq('get', ipaddr, uri, con, payload)
2972
2973    def coap_get_block(self, ipaddr, uri, size=16, count=0):
2974        """
2975        Send a GET request via CoAP.
2976        """
2977        return self._coap_rq_block('get', ipaddr, uri, size, count)
2978
2979    def coap_observe(self, ipaddr, uri, con=False, payload=None):
2980        """
2981        Send a GET request via CoAP with Observe set.
2982        """
2983        return self._coap_rq('observe', ipaddr, uri, con, payload)
2984
2985    def coap_post(self, ipaddr, uri, con=False, payload=None):
2986        """
2987        Send a POST request via CoAP.
2988        """
2989        return self._coap_rq('post', ipaddr, uri, con, payload)
2990
2991    def coap_post_block(self, ipaddr, uri, size=16, count=0):
2992        """
2993        Send a POST request via CoAP.
2994        """
2995        return self._coap_rq_block('post', ipaddr, uri, size, count)
2996
2997    def coap_put(self, ipaddr, uri, con=False, payload=None):
2998        """
2999        Send a PUT request via CoAP.
3000        """
3001        return self._coap_rq('put', ipaddr, uri, con, payload)
3002
3003    def coap_put_block(self, ipaddr, uri, size=16, count=0):
3004        """
3005        Send a PUT request via CoAP.
3006        """
3007        return self._coap_rq_block('put', ipaddr, uri, size, count)
3008
3009    def _coap_rq(self, method, ipaddr, uri, con=False, payload=None):
3010        """
3011        Issue a GET/POST/PUT/DELETE/GET OBSERVE request.
3012        """
3013        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3014        if con:
3015            cmd += ' con'
3016        else:
3017            cmd += ' non'
3018
3019        if payload is not None:
3020            cmd += ' %s' % payload
3021
3022        self.send_command(cmd)
3023        return self.coap_wait_response()
3024
3025    def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0):
3026        """
3027        Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request.
3028        """
3029        cmd = 'coap %s %s %s' % (method, ipaddr, uri)
3030
3031        cmd += ' block-%d' % size
3032
3033        if count != 0:
3034            cmd += ' %d' % count
3035
3036        self.send_command(cmd)
3037        return self.coap_wait_response()
3038
3039    def coap_wait_response(self):
3040        """
3041        Wait for a CoAP response, and return it.
3042        """
3043        if isinstance(self.simulator, simulator.VirtualTime):
3044            self.simulator.go(5)
3045            timeout = 1
3046        else:
3047            timeout = 5
3048
3049        self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?'
3050                     r'(?: with payload: ([\da-f]+))?\b',
3051                     timeout=timeout)
3052        (source, observe, payload) = self.pexpect.match.groups()
3053        source = source.decode('UTF-8')
3054
3055        if observe is not None:
3056            observe = int(observe, base=10)
3057
3058        if payload is not None:
3059            try:
3060                payload = binascii.a2b_hex(payload).decode('UTF-8')
3061            except UnicodeDecodeError:
3062                pass
3063
3064        # Return the values received
3065        return dict(source=source, observe=observe, payload=payload)
3066
3067    def coap_wait_request(self):
3068        """
3069        Wait for a CoAP request to be made.
3070        """
3071        if isinstance(self.simulator, simulator.VirtualTime):
3072            self.simulator.go(5)
3073            timeout = 1
3074        else:
3075            timeout = 5
3076
3077        self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?'
3078                     r'(?: with payload: ([\da-f]+))?\b',
3079                     timeout=timeout)
3080        (source, observe, payload) = self.pexpect.match.groups()
3081        source = source.decode('UTF-8')
3082
3083        if observe is not None:
3084            observe = int(observe, base=10)
3085
3086        if payload is not None:
3087            payload = binascii.a2b_hex(payload).decode('UTF-8')
3088
3089        # Return the values received
3090        return dict(source=source, observe=observe, payload=payload)
3091
3092    def coap_wait_subscribe(self):
3093        """
3094        Wait for a CoAP client to be subscribed.
3095        """
3096        if isinstance(self.simulator, simulator.VirtualTime):
3097            self.simulator.go(5)
3098            timeout = 1
3099        else:
3100            timeout = 5
3101
3102        self._expect(r'Subscribing client\b', timeout=timeout)
3103
3104    def coap_wait_ack(self):
3105        """
3106        Wait for a CoAP notification ACK.
3107        """
3108        if isinstance(self.simulator, simulator.VirtualTime):
3109            self.simulator.go(5)
3110            timeout = 1
3111        else:
3112            timeout = 5
3113
3114        self._expect(r'Received ACK in reply to notification from ([\da-f:]+)\b', timeout=timeout)
3115        (source,) = self.pexpect.match.groups()
3116        source = source.decode('UTF-8')
3117
3118        return source
3119
3120    def coap_set_resource_path(self, path):
3121        """
3122        Set the path for the CoAP resource.
3123        """
3124        cmd = 'coap resource %s' % path
3125        self.send_command(cmd)
3126        self._expect_done()
3127
3128    def coap_set_resource_path_block(self, path, count=0):
3129        """
3130        Set the path for the CoAP resource and how many blocks can be received from this resource.
3131        """
3132        cmd = 'coap resource %s %d' % (path, count)
3133        self.send_command(cmd)
3134        self._expect('Done')
3135
3136    def coap_set_content(self, content):
3137        """
3138        Set the content of the CoAP resource.
3139        """
3140        cmd = 'coap set %s' % content
3141        self.send_command(cmd)
3142        self._expect_done()
3143
3144    def coap_start(self):
3145        """
3146        Start the CoAP service.
3147        """
3148        cmd = 'coap start'
3149        self.send_command(cmd)
3150        self._expect_done()
3151
3152    def coap_stop(self):
3153        """
3154        Stop the CoAP service.
3155        """
3156        cmd = 'coap stop'
3157        self.send_command(cmd)
3158
3159        if isinstance(self.simulator, simulator.VirtualTime):
3160            self.simulator.go(5)
3161            timeout = 1
3162        else:
3163            timeout = 5
3164
3165        self._expect_done(timeout=timeout)
3166
3167    def coaps_start_psk(self, psk, pskIdentity):
3168        cmd = 'coaps psk %s %s' % (psk, pskIdentity)
3169        self.send_command(cmd)
3170        self._expect_done()
3171
3172        cmd = 'coaps start'
3173        self.send_command(cmd)
3174        self._expect_done()
3175
3176    def coaps_start_x509(self):
3177        cmd = 'coaps x509'
3178        self.send_command(cmd)
3179        self._expect_done()
3180
3181        cmd = 'coaps start'
3182        self.send_command(cmd)
3183        self._expect_done()
3184
3185    def coaps_set_resource_path(self, path):
3186        cmd = 'coaps resource %s' % path
3187        self.send_command(cmd)
3188        self._expect_done()
3189
3190    def coaps_stop(self):
3191        cmd = 'coaps stop'
3192        self.send_command(cmd)
3193
3194        if isinstance(self.simulator, simulator.VirtualTime):
3195            self.simulator.go(5)
3196            timeout = 1
3197        else:
3198            timeout = 5
3199
3200        self._expect_done(timeout=timeout)
3201
3202    def coaps_connect(self, ipaddr):
3203        cmd = 'coaps connect %s' % ipaddr
3204        self.send_command(cmd)
3205
3206        if isinstance(self.simulator, simulator.VirtualTime):
3207            self.simulator.go(5)
3208            timeout = 1
3209        else:
3210            timeout = 5
3211
3212        self._expect('coaps connected', timeout=timeout)
3213
3214    def coaps_disconnect(self):
3215        cmd = 'coaps disconnect'
3216        self.send_command(cmd)
3217        self._expect_done()
3218        self.simulator.go(5)
3219
3220    def coaps_get(self):
3221        cmd = 'coaps get test'
3222        self.send_command(cmd)
3223
3224        if isinstance(self.simulator, simulator.VirtualTime):
3225            self.simulator.go(5)
3226            timeout = 1
3227        else:
3228            timeout = 5
3229
3230        self._expect('coaps response', timeout=timeout)
3231
3232    def commissioner_mgmtget(self, tlvs_binary=None):
3233        cmd = 'commissioner mgmtget'
3234        if tlvs_binary is not None:
3235            cmd += ' -x %s' % tlvs_binary
3236        self.send_command(cmd)
3237        self._expect_done()
3238
3239    def commissioner_mgmtset(self, tlvs_binary):
3240        cmd = 'commissioner mgmtset -x %s' % tlvs_binary
3241        self.send_command(cmd)
3242        self._expect_done()
3243
3244    def bytes_to_hex_str(self, src):
3245        return ''.join(format(x, '02x') for x in src)
3246
3247    def commissioner_mgmtset_with_tlvs(self, tlvs):
3248        payload = bytearray()
3249        for tlv in tlvs:
3250            payload += tlv.to_hex()
3251        self.commissioner_mgmtset(self.bytes_to_hex_str(payload))
3252
3253    def udp_start(self, local_ipaddr, local_port, bind_unspecified=False):
3254        cmd = 'udp open'
3255        self.send_command(cmd)
3256        self._expect_done()
3257
3258        cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port)
3259        self.send_command(cmd)
3260        self._expect_done()
3261
3262    def udp_stop(self):
3263        cmd = 'udp close'
3264        self.send_command(cmd)
3265        self._expect_done()
3266
3267    def udp_send(self, bytes, ipaddr, port, success=True):
3268        cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes)
3269        self.send_command(cmd)
3270        if success:
3271            self._expect_done()
3272        else:
3273            self._expect('Error')
3274
3275    def udp_check_rx(self, bytes_should_rx):
3276        self._expect('%d bytes' % bytes_should_rx)
3277
3278    def set_routereligible(self, enable: bool):
3279        cmd = f'routereligible {"enable" if enable else "disable"}'
3280        self.send_command(cmd)
3281        self._expect_done()
3282
3283    def router_list(self):
3284        cmd = 'router list'
3285        self.send_command(cmd)
3286        self._expect([r'(\d+)((\s\d+)*)'])
3287
3288        g = self.pexpect.match.groups()
3289        router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8')
3290        router_list = [int(x) for x in router_list.split()]
3291        self._expect_done()
3292        return router_list
3293
3294    def router_table(self):
3295        cmd = 'router table'
3296        self.send_command(cmd)
3297
3298        self._expect(r'(.*)Done')
3299        g = self.pexpect.match.groups()
3300        output = g[0].decode('utf8')
3301        lines = output.strip().split('\n')
3302        lines = [l.strip() for l in lines]
3303        router_table = {}
3304        for i, line in enumerate(lines):
3305            if not line.startswith('|') or not line.endswith('|'):
3306                if i not in (0, 2):
3307                    # should not happen
3308                    print("unexpected line %d: %s" % (i, line))
3309
3310                continue
3311
3312            line = line[1:][:-1]
3313            line = [x.strip() for x in line.split('|')]
3314            if len(line) < 9:
3315                print("unexpected line %d: %s" % (i, line))
3316                continue
3317
3318            try:
3319                int(line[0])
3320            except ValueError:
3321                if i != 1:
3322                    print("unexpected line %d: %s" % (i, line))
3323                continue
3324
3325            id = int(line[0])
3326            rloc16 = int(line[1], 16)
3327            nexthop = int(line[2])
3328            pathcost = int(line[3])
3329            lqin = int(line[4])
3330            lqout = int(line[5])
3331            age = int(line[6])
3332            emac = str(line[7])
3333            link = int(line[8])
3334
3335            router_table[id] = {
3336                'rloc16': rloc16,
3337                'nexthop': nexthop,
3338                'pathcost': pathcost,
3339                'lqin': lqin,
3340                'lqout': lqout,
3341                'age': age,
3342                'emac': emac,
3343                'link': link,
3344            }
3345
3346        return router_table
3347
3348    def link_metrics_request_single_probe(self, dst_addr: str, linkmetrics_flags: str, mode: str = ''):
3349        cmd = 'linkmetrics request %s %s single %s' % (mode, dst_addr, linkmetrics_flags)
3350        self.send_command(cmd)
3351        self.simulator.go(5)
3352        return self._parse_linkmetrics_query_result(self._expect_command_output())
3353
3354    def link_metrics_request_forward_tracking_series(self, dst_addr: str, series_id: int, mode: str = ''):
3355        cmd = 'linkmetrics request %s %s forward %d' % (mode, dst_addr, series_id)
3356        self.send_command(cmd)
3357        self.simulator.go(5)
3358        return self._parse_linkmetrics_query_result(self._expect_command_output())
3359
3360    def _parse_linkmetrics_query_result(self, lines):
3361        """Parse link metrics query result"""
3362
3363        # Example of command output:
3364        # ['Received Link Metrics Report from: fe80:0:0:0:146e:a00:0:1',
3365        #  '- PDU Counter: 1 (Count/Summation)',
3366        #  '- LQI: 0 (Exponential Moving Average)',
3367        #  '- Margin: 80 (dB) (Exponential Moving Average)',
3368        #  '- RSSI: -20 (dBm) (Exponential Moving Average)']
3369        #
3370        # Or 'Link Metrics Report, status: {status}'
3371
3372        result = {}
3373        for line in lines:
3374            if line.startswith('- '):
3375                k, v = line[2:].split(': ')
3376                result[k] = v.split(' ')[0]
3377            elif line.startswith('Link Metrics Report, status: '):
3378                result['Status'] = line[29:]
3379        return result
3380
3381    def link_metrics_config_req_enhanced_ack_based_probing(self,
3382                                                           dst_addr: str,
3383                                                           enable: bool,
3384                                                           metrics_flags: str,
3385                                                           ext_flags='',
3386                                                           mode: str = ''):
3387        cmd = "linkmetrics config %s %s enhanced-ack" % (mode, dst_addr)
3388        if enable:
3389            cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags))
3390        else:
3391            cmd = cmd + " clear"
3392        self.send_command(cmd)
3393        self._expect_done()
3394
3395    def link_metrics_config_req_forward_tracking_series(self,
3396                                                        dst_addr: str,
3397                                                        series_id: int,
3398                                                        series_flags: str,
3399                                                        metrics_flags: str,
3400                                                        mode: str = ''):
3401        cmd = "linkmetrics config %s %s forward %d %s %s" % (mode, dst_addr, series_id, series_flags, metrics_flags)
3402        self.send_command(cmd)
3403        self._expect_done()
3404
3405    def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int):
3406        cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length)
3407        self.send_command(cmd)
3408        self._expect_done()
3409
3410    def link_metrics_mgr_set_enabled(self, enable: bool):
3411        op_str = "enable" if enable else "disable"
3412        cmd = f'linkmetricsmgr {op_str}'
3413        self.send_command(cmd)
3414        self._expect_done()
3415
3416    def send_address_notification(self, dst: str, target: str, mliid: str):
3417        cmd = f'fake /a/an {dst} {target} {mliid}'
3418        self.send_command(cmd)
3419        self._expect_done()
3420
3421    def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int):
3422        cmd = f'fake /b/ba {target} {mliid} {ltt}'
3423        self.send_command(cmd)
3424        self._expect_done()
3425
3426    def dns_get_config(self):
3427        """
3428        Returns the DNS config as a list of property dictionary (string key and string value).
3429
3430        Example output:
3431        {
3432            'Server': '[fd00:0:0:0:0:0:0:1]:1234'
3433            'ResponseTimeout': '5000 ms'
3434            'MaxTxAttempts': '2'
3435            'RecursionDesired': 'no'
3436        }
3437        """
3438        cmd = f'dns config'
3439        self.send_command(cmd)
3440        output = self._expect_command_output()
3441        config = {}
3442        for line in output:
3443            k, v = line.split(': ')
3444            config[k] = v
3445        return config
3446
3447    def dns_set_config(self, config):
3448        cmd = f'dns config {config}'
3449        self.send_command(cmd)
3450        self._expect_done()
3451
3452    def dns_resolve(self, hostname, server=None, port=53):
3453        cmd = f'dns resolve {hostname}'
3454        if server is not None:
3455            cmd += f' {server} {port}'
3456
3457        self.send_command(cmd)
3458        self.simulator.go(10)
3459        output = self._expect_command_output()
3460        dns_resp = output[0]
3461        # example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 "
3462        #                 " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190"
3463        addrs = dns_resp.strip().split(' - ')[1].split(' ')
3464        ip = [item.strip() for item in addrs[::2]]
3465        ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]]
3466
3467        return list(zip(ip, ttl))
3468
3469    def _parse_dns_service_info(self, output):
3470        # Example of `output`
3471        #   Port:22222, Priority:2, Weight:2, TTL:7155
3472        #   Host:host2.default.service.arpa.
3473        #   HostAddress:0:0:0:0:0:0:0:0 TTL:0
3474        #   TXT:[a=00, b=02bb] TTL:7155
3475
3476        m = re.match(
3477            r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)',
3478            '\r'.join(output))
3479        if not m:
3480            return {}
3481        port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups()
3482        return {
3483            'port': int(port),
3484            'priority': int(priority),
3485            'weight': int(weight),
3486            'host': hostname,
3487            'address': address,
3488            'txt_data': txt_data,
3489            'srv_ttl': int(srv_ttl),
3490            'txt_ttl': int(txt_ttl),
3491            'aaaa_ttl': int(aaaa_ttl),
3492        }
3493
3494    def dns_resolve_service(self, instance, service, server=None, port=53):
3495        """
3496        Resolves the service instance and returns the instance information as a dict.
3497
3498        Example return value:
3499            {
3500                'port': 12345,
3501                'priority': 0,
3502                'weight': 0,
3503                'host': 'ins1._ipps._tcp.default.service.arpa.',
3504                'address': '2001::1',
3505                'txt_data': 'a=00, b=02bb',
3506                'srv_ttl': 7100,
3507                'txt_ttl': 7100,
3508                'aaaa_ttl': 7100,
3509            }
3510        """
3511        instance = self._escape_escapable(instance)
3512        cmd = f'dns service {instance} {service}'
3513        if server is not None:
3514            cmd += f' {server} {port}'
3515
3516        self.send_command(cmd)
3517        self.simulator.go(10)
3518        output = self._expect_command_output()
3519        info = self._parse_dns_service_info(output)
3520        if not info:
3521            raise Exception('dns resolve service failed: %s.%s' % (instance, service))
3522        return info
3523
3524    @staticmethod
3525    def __parse_hex_string(hexstr: str) -> bytes:
3526        assert (len(hexstr) % 2 == 0)
3527        return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2))
3528
3529    def dns_browse(self, service_name, server=None, port=53):
3530        """
3531        Browse the service and returns the instances.
3532
3533        Example return value:
3534            {
3535                'ins1': {
3536                    'port': 12345,
3537                    'priority': 1,
3538                    'weight': 1,
3539                    'host': 'ins1._ipps._tcp.default.service.arpa.',
3540                    'address': '2001::1',
3541                    'txt_data': 'a=00, b=11cf',
3542                    'srv_ttl': 7100,
3543                    'txt_ttl': 7100,
3544                    'aaaa_ttl': 7100,
3545                },
3546                'ins2': {
3547                    'port': 12345,
3548                    'priority': 2,
3549                    'weight': 2,
3550                    'host': 'ins2._ipps._tcp.default.service.arpa.',
3551                    'address': '2001::2',
3552                    'txt_data': 'a=01, b=23dd',
3553                    'srv_ttl': 7100,
3554                    'txt_ttl': 7100,
3555                    'aaaa_ttl': 7100,
3556                }
3557            }
3558        """
3559        cmd = f'dns browse {service_name}'
3560        if server is not None:
3561            cmd += f' {server} {port}'
3562
3563        self.send_command(cmd)
3564        self.simulator.go(10)
3565        output = self._expect_command_output()
3566
3567        # Example output:
3568        # DNS browse response for _ipps._tcp.default.service.arpa.
3569        # ins2
3570        #     Port:22222, Priority:2, Weight:2, TTL:7175
3571        #     Host:host2.default.service.arpa.
3572        #     HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175
3573        #     TXT:[a=00, b=11cf] TTL:7175
3574        # ins1
3575        #     Port:11111, Priority:1, Weight:1, TTL:7170
3576        #     Host:host1.default.service.arpa.
3577        #     HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170
3578        #     TXT:[a=01, b=23dd] TTL:7170
3579        # Done
3580
3581        result = {}
3582        index = 1  # skip first line
3583        while index < len(output):
3584            ins = output[index].strip()
3585            result[ins] = self._parse_dns_service_info(output[index + 1:index + 6])
3586            index = index + (5 if result[ins] else 1)
3587        return result
3588
3589    def set_mliid(self, mliid: str):
3590        cmd = f'mliid {mliid}'
3591        self.send_command(cmd)
3592        self._expect_command_output()
3593
3594    def history_netinfo(self, num_entries=0):
3595        """
3596        Get the `netinfo` history list, parse each entry and return
3597        a list of dictionary (string key and string value) entries.
3598
3599        Example of return value:
3600        [
3601            {
3602                'age': '00:00:00.000 ago',
3603                'role': 'disabled',
3604                'mode': 'rdn',
3605                'rloc16': '0x7400',
3606                'partition-id': '1318093703'
3607            },
3608            {
3609                'age': '00:00:02.588 ago',
3610                'role': 'leader',
3611                'mode': 'rdn',
3612                'rloc16': '0x7400',
3613                'partition-id': '1318093703'
3614            }
3615        ]
3616        """
3617        cmd = f'history netinfo list {num_entries}'
3618        self.send_command(cmd)
3619        output = self._expect_command_output()
3620        netinfos = []
3621        for entry in output:
3622            netinfo = {}
3623            age, info = entry.split(' -> ')
3624            netinfo['age'] = age
3625            for item in info.split(' '):
3626                k, v = item.split(':')
3627                netinfo[k] = v
3628            netinfos.append(netinfo)
3629        return netinfos
3630
3631    def history_rx(self, num_entries=0):
3632        """
3633        Get the IPv6 RX history list, parse each entry and return
3634        a list of dictionary (string key and string value) entries.
3635
3636        Example of return value:
3637        [
3638            {
3639                'age': '00:00:01.999',
3640                'type': 'ICMP6(EchoReqst)',
3641                'len': '16',
3642                'sec': 'yes',
3643                'prio': 'norm',
3644                'rss': '-20',
3645                'from': '0xac00',
3646                'radio': '15.4',
3647                'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3648                'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3649            }
3650        ]
3651        """
3652        cmd = f'history rx list {num_entries}'
3653        self.send_command(cmd)
3654        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3655
3656    def history_tx(self, num_entries=0):
3657        """
3658        Get the IPv6 TX history list, parse each entry and return
3659        a list of dictionary (string key and string value) entries.
3660
3661        Example of return value:
3662        [
3663            {
3664                'age': '00:00:01.999',
3665                'type': 'ICMP6(EchoReply)',
3666                'len': '16',
3667                'sec': 'yes',
3668                'prio': 'norm',
3669                'to': '0xac00',
3670                'tx-success': 'yes',
3671                'radio': '15.4',
3672                'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0',
3673                'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0',
3674
3675            }
3676        ]
3677        """
3678        cmd = f'history tx list {num_entries}'
3679        self.send_command(cmd)
3680        return self._parse_history_rx_tx_ouput(self._expect_command_output())
3681
3682    def _parse_history_rx_tx_ouput(self, lines):
3683        rxtx_list = []
3684        for line in lines:
3685            if line.strip().startswith('type:'):
3686                for item in line.strip().split(' '):
3687                    k, v = item.split(':')
3688                    entry[k] = v
3689            elif line.strip().startswith('src:'):
3690                entry['src'] = line[4:]
3691            elif line.strip().startswith('dst:'):
3692                entry['dst'] = line[4:]
3693                rxtx_list.append(entry)
3694            else:
3695                entry = {}
3696                entry['age'] = line
3697
3698        return rxtx_list
3699
3700    def set_router_id_range(self, min_router_id: int, max_router_id: int):
3701        cmd = f'routeridrange {min_router_id} {max_router_id}'
3702        self.send_command(cmd)
3703        self._expect_command_output()
3704
3705    def get_router_id_range(self):
3706        cmd = 'routeridrange'
3707        self.send_command(cmd)
3708        line = self._expect_command_output()[0]
3709        return [int(item) for item in line.split()]
3710
3711    def get_channel_monitor_info(self) -> Dict:
3712        """
3713        Returns:
3714            Dict of channel monitor info, e.g.
3715                {'enabled': '1',
3716                 'interval': '41000',
3717                 'threshold': '-75',
3718                 'window': '960',
3719                 'count': '985',
3720                 'occupancies': {
3721                    '11': '0.00%',
3722                    '12': '3.50%',
3723                    '13': '9.89%',
3724                    '14': '15.36%',
3725                    '15': '20.02%',
3726                    '16': '21.95%',
3727                    '17': '32.71%',
3728                    '18': '35.76%',
3729                    '19': '37.97%',
3730                    '20': '43.68%',
3731                    '21': '48.95%',
3732                    '22': '54.05%',
3733                    '23': '58.65%',
3734                    '24': '68.26%',
3735                    '25': '66.73%',
3736                    '26': '73.12%'
3737                    }
3738                }
3739        """
3740        config = {}
3741        self.send_command('channel monitor')
3742
3743        for line in self._expect_results(r'\S+'):
3744            if re.match(r'.*:\s.*', line):
3745                key, val = line.split(':')
3746                config.update({key: val.strip()})
3747            elif re.match(r'.*:', line):  # occupancy
3748                occ_key, val = line.split(':')
3749                val = {}
3750                config.update({occ_key: val})
3751            elif 'busy' in line:
3752                # channel occupancies
3753                key = line.split()[1]
3754                val = line.split()[3]
3755                config[occ_key].update({key: val})
3756        return config
3757
3758    def set_channel_manager_auto_enable(self, enable: bool):
3759        self.send_command(f'channel manager auto {int(enable)}')
3760        self._expect_done()
3761
3762    def set_channel_manager_autocsl_enable(self, enable: bool):
3763        self.send_command(f'channel manager autocsl {int(enable)}')
3764        self._expect_done()
3765
3766    def set_channel_manager_supported(self, channel_mask: int):
3767        self.send_command(f'channel manager supported {int(channel_mask)}')
3768        self._expect_done()
3769
3770    def set_channel_manager_favored(self, channel_mask: int):
3771        self.send_command(f'channel manager favored {int(channel_mask)}')
3772        self._expect_done()
3773
3774    def set_channel_manager_interval(self, interval: int):
3775        self.send_command(f'channel manager interval {interval}')
3776        self._expect_done()
3777
3778    def set_channel_manager_cca_threshold(self, hex_value: str):
3779        self.send_command(f'channel manager threshold {hex_value}')
3780        self._expect_done()
3781
3782    def get_channel_manager_config(self):
3783        self.send_command('channel manager')
3784        return self._expect_key_value_pairs(r'\S+')
3785
3786
3787class Node(NodeImpl, OtCli):
3788    pass
3789
3790
3791class LinuxHost():
3792    PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*')
3793    ETH_DEV = config.BACKBONE_IFNAME
3794
3795    def enable_ether(self):
3796        """Enable the ethernet interface.
3797        """
3798
3799        self.bash(f'ip link set {self.ETH_DEV} up')
3800
3801    def disable_ether(self):
3802        """Disable the ethernet interface.
3803        """
3804
3805        self.bash(f'ip link set {self.ETH_DEV} down')
3806
3807    def get_ether_addrs(self, ipv4=False, ipv6=True):
3808        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3809
3810        addrs = []
3811        for line in output:
3812            # line examples:
3813            # "inet6 fe80::42:c0ff:fea8:903/64 scope link"
3814            # "inet 192.168.9.1/24 brd 192.168.9.255 scope global eth0"
3815            line = line.strip().split()
3816
3817            if not line or not line[0].startswith('inet'):
3818                continue
3819            if line[0] == 'inet' and not ipv4:
3820                continue
3821            if line[0] == 'inet6' and not ipv6:
3822                continue
3823
3824            addr = line[1]
3825            if '/' in addr:
3826                addr = addr.split('/')[0]
3827            addrs.append(addr)
3828
3829        logging.debug('%s: get_ether_addrs: %r', self, addrs)
3830        return addrs
3831
3832    def get_ether_mac(self):
3833        output = self.bash(f'ip addr list dev {self.ETH_DEV}')
3834        for line in output:
3835            # link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0
3836            line = line.strip().split()
3837            if line and line[0] == 'link/ether':
3838                return line[1]
3839
3840        assert False, output
3841
3842    def add_ipmaddr_ether(self, ip: str):
3843        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &'
3844        self.bash(cmd)
3845
3846    def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int:
3847
3848        cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}'
3849        if size is not None:
3850            cmd += f' -s {size}'
3851
3852        if ttl is not None:
3853            cmd += f' -t {ttl}'
3854
3855        resp_count = 0
3856
3857        try:
3858            for line in self.bash(cmd):
3859                if self.PING_RESPONSE_PATTERN.match(line):
3860                    resp_count += 1
3861        except subprocess.CalledProcessError:
3862            pass
3863
3864        return resp_count
3865
3866    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
3867        """Get specific type of IPv6 address configured on thread device.
3868
3869        Args:
3870            address_type: the config.ADDRESS_TYPE type of IPv6 address.
3871
3872        Returns:
3873            IPv6 address string.
3874        """
3875        if address_type == config.ADDRESS_TYPE.BACKBONE_GUA:
3876            return self._getBackboneGua()
3877        elif address_type == config.ADDRESS_TYPE.BACKBONE_LINK_LOCAL:
3878            return self._getInfraLinkLocalAddress()
3879        elif address_type == config.ADDRESS_TYPE.ONLINK_ULA:
3880            return self._getInfraUla()
3881        elif address_type == config.ADDRESS_TYPE.ONLINK_GUA:
3882            return self._getInfraGua()
3883        else:
3884            raise ValueError(f'unsupported address type: {address_type}')
3885
3886    def _getBackboneGua(self) -> Optional[str]:
3887        for addr in self.get_ether_addrs():
3888            if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I):
3889                return addr
3890
3891        return None
3892
3893    def _getInfraUla(self) -> Optional[str]:
3894        """ Returns the ULA addresses autoconfigured on the infra link.
3895        """
3896        addrs = []
3897        for addr in self.get_ether_addrs():
3898            if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I):
3899                addrs.append(addr)
3900
3901        return addrs
3902
3903    def _getInfraGua(self) -> Optional[str]:
3904        """ Returns the GUA addresses autoconfigured on the infra link.
3905        """
3906
3907        gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0]
3908        return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)]
3909
3910    def _getInfraLinkLocalAddress(self) -> Optional[str]:
3911        """ Returns the link-local address autoconfigured on the infra link, which is started with "fe80".
3912        """
3913        for addr in self.get_ether_addrs():
3914            if re.match(config.LINK_LOCAL_REGEX_PATTERN, addr, re.I):
3915                return addr
3916
3917        return None
3918
3919    def ping(self, *args, **kwargs):
3920        backbone = kwargs.pop('backbone', False)
3921        if backbone:
3922            return self.ping_ether(*args, **kwargs)
3923        else:
3924            return super().ping(*args, **kwargs)
3925
3926    def udp_send_host(self, ipaddr, port, data, hop_limit=None):
3927        if hop_limit is None:
3928            if ipaddress.ip_address(ipaddr).is_multicast:
3929                hop_limit = 10
3930            else:
3931                hop_limit = 64
3932        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}'
3933        self.bash(cmd)
3934
3935    def add_ipmaddr(self, *args, **kwargs):
3936        backbone = kwargs.pop('backbone', False)
3937        if backbone:
3938            return self.add_ipmaddr_ether(*args, **kwargs)
3939        else:
3940            return super().add_ipmaddr(*args, **kwargs)
3941
3942    def ip_neighbors_flush(self):
3943        # clear neigh cache on linux
3944        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3945        self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}')
3946        self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' %
3947                  (self.ETH_DEV, self.ETH_DEV))
3948        self.bash(f'ip -6 neigh list dev {self.ETH_DEV}')
3949
3950    def publish_mdns_service(self, instance_name, service_type, port, host_name, txt):
3951        """Publish an mDNS service on the Ethernet.
3952
3953        :param instance_name: the service instance name.
3954        :param service_type: the service type in format of '<service_type>.<protocol>'.
3955        :param port: the port the service is at.
3956        :param host_name: the host name this service points to. The domain
3957                          should not be included.
3958        :param txt: a dictionary containing the key-value pairs of the TXT record.
3959        """
3960        txt_string = ' '.join([f'{key}={value}' for key, value in txt.items()])
3961        self.bash(f'avahi-publish -s {instance_name}  {service_type} {port} -H {host_name}.local {txt_string} &')
3962
3963    def publish_mdns_host(self, hostname, addresses):
3964        """Publish an mDNS host on the Ethernet
3965
3966        :param host_name: the host name this service points to. The domain
3967                          should not be included.
3968        :param addresses: a list of strings representing the addresses to
3969                          be registered with the host.
3970        """
3971        for address in addresses:
3972            self.bash(f'avahi-publish -a {hostname}.local {address} &')
3973
3974    def browse_mdns_services(self, name, timeout=2):
3975        """ Browse mDNS services on the ethernet.
3976
3977        :param name: the service type name in format of '<service-name>.<protocol>'.
3978        :param timeout: timeout value in seconds before returning.
3979        :return: A list of service instance names.
3980        """
3981
3982        self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &')
3983        time.sleep(timeout)
3984        self.bash('pkill dns-sd')
3985
3986        instances = []
3987        for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'):
3988            elements = line.split()
3989            if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR':
3990                instances.append(elements[2][:-len('.' + name)])
3991        return instances
3992
3993    def discover_mdns_service(self, instance, name, host_name, timeout=2):
3994        """ Discover/resolve the mDNS service on ethernet.
3995
3996        :param instance: the service instance name.
3997        :param name: the service name in format of '<service-name>.<protocol>'.
3998        :param host_name: the host name this service points to. The domain
3999                          should not be included.
4000        :param timeout: timeout value in seconds before returning.
4001        :return: a dict of service properties or None.
4002
4003        The return value is a dict with the same key/values of srp_server_get_service
4004        except that we don't have a `deleted` field here.
4005        """
4006        host_name_file = self.bash('mktemp')[0].strip()
4007        service_data_file = self.bash('mktemp')[0].strip()
4008
4009        self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &')
4010        time.sleep(timeout)
4011
4012        full_service_name = f'{instance}.{name}'
4013        # When hostname is unspecified, extract hostname from browse result
4014        if host_name is None:
4015            for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4016                elements = line.split()
4017                if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV':
4018                    host_name = elements[5].split('.')[0]
4019                    break
4020
4021        assert (host_name is not None)
4022        self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &')
4023        time.sleep(timeout)
4024
4025        self.bash('pkill dns-sd')
4026        addresses = []
4027        service = {}
4028
4029        logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'))
4030        logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'))
4031
4032        # example output in the host file:
4033        # Timestamp     A/R Flags if Hostname                               Address                                     TTL
4034        # 9:38:09.274  Add     23 48 my-host.local.                         2001:0000:0000:0000:0000:0000:0000:0002%<0>  120
4035        #
4036        for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'):
4037            elements = line.split()
4038            fullname = f'{host_name}.local.'
4039            if fullname not in elements:
4040                continue
4041            if 'Add' not in elements:
4042                continue
4043            addresses.append(elements[elements.index(fullname) + 1].split('%')[0])
4044
4045        logging.debug(f'addresses of {host_name}: {addresses}')
4046
4047        # example output of in the service file:
4048        # _ipps._tcp                                      PTR     my-service._ipps._tcp
4049        # my-service._ipps._tcp                           SRV     0 0 12345 my-host.local. ; Replace with unicast FQDN of target host
4050        # my-service._ipps._tcp                           TXT     ""
4051        #
4052        is_txt = False
4053        txt = ''
4054        for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'):
4055            elements = line.split()
4056            if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT':
4057                is_txt = True
4058            if is_txt:
4059                txt += line.strip()
4060                if line.strip().endswith('"'):
4061                    is_txt = False
4062                    txt_dict = self.__parse_dns_sd_txt(txt)
4063                    logging.info(f'txt = {txt_dict}')
4064                    service['txt'] = txt_dict
4065
4066            if not elements or elements[0] != full_service_name:
4067                continue
4068            if elements[1] == 'SRV':
4069                service['fullname'] = elements[0]
4070                service['instance'] = instance
4071                service['name'] = name
4072                service['priority'] = int(elements[2])
4073                service['weight'] = int(elements[3])
4074                service['port'] = int(elements[4])
4075                service['host_fullname'] = elements[5]
4076                assert (service['host_fullname'] == f'{host_name}.local.')
4077                service['host'] = host_name
4078                service['addresses'] = addresses
4079        return service or None
4080
4081    def start_radvd_service(self, prefix, slaac):
4082        self.bash("""cat >/etc/radvd.conf <<EOF
4083interface eth0
4084{
4085    AdvSendAdvert on;
4086
4087    AdvReachableTime 200;
4088    AdvRetransTimer 200;
4089    AdvDefaultLifetime 1800;
4090    MinRtrAdvInterval 1200;
4091    MaxRtrAdvInterval 1800;
4092    AdvDefaultPreference low;
4093
4094    prefix %s
4095    {
4096        AdvOnLink on;
4097        AdvAutonomous %s;
4098        AdvRouterAddr off;
4099        AdvPreferredLifetime 1800;
4100        AdvValidLifetime 1800;
4101    };
4102};
4103EOF
4104""" % (prefix, 'on' if slaac else 'off'))
4105        self.bash('service radvd start')
4106        self.bash('service radvd status')  # Make sure radvd service is running
4107
4108    def stop_radvd_service(self):
4109        self.bash('service radvd stop')
4110
4111    def kill_radvd_service(self):
4112        self.bash('pkill radvd')
4113
4114    def __parse_dns_sd_txt(self, line: str):
4115        # Example TXT entry:
4116        # "xp=\\000\\013\\184\\000\\000\\000\\000\\000"
4117        txt = {}
4118        for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line):
4119            if '=' not in entry:
4120                continue
4121
4122            k, v = entry.split('=', 1)
4123            txt[k] = v
4124
4125        return txt
4126
4127
4128class OtbrNode(LinuxHost, NodeImpl, OtbrDocker):
4129    TUN_DEV = config.THREAD_IFNAME
4130    is_otbr = True
4131    is_bbr = True  # OTBR is also BBR
4132    node_type = 'otbr-docker'
4133
4134    def __repr__(self):
4135        return f'Otbr<{self.nodeid}>'
4136
4137    def start(self):
4138        self._setup_sysctl()
4139        self.set_log_level(5)
4140        super().start()
4141
4142    def add_ipaddr(self, addr):
4143        cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}'
4144        self.bash(cmd)
4145
4146    def add_ipmaddr_tun(self, ip: str):
4147        cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &'
4148        self.bash(cmd)
4149
4150    def get_ip6_address(self, address_type: config.ADDRESS_TYPE):
4151        try:
4152            return super(OtbrNode, self).get_ip6_address(address_type)
4153        except Exception as e:
4154            return super(LinuxHost, self).get_ip6_address(address_type)
4155
4156
4157class HostNode(LinuxHost, OtbrDocker):
4158    is_host = True
4159
4160    def __init__(self, nodeid, name=None, **kwargs):
4161        self.nodeid = nodeid
4162        self.name = name or ('Host%d' % nodeid)
4163        super().__init__(nodeid, **kwargs)
4164        self.bash('service otbr-agent stop')
4165
4166    def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False):
4167        self._setup_sysctl()
4168        if start_radvd:
4169            self.start_radvd_service(prefix, slaac)
4170        else:
4171            self.stop_radvd_service()
4172
4173    def stop(self):
4174        self.stop_radvd_service()
4175
4176    def get_addrs(self) -> List[str]:
4177        return self.get_ether_addrs()
4178
4179    def __repr__(self):
4180        return f'Host<{self.nodeid}>'
4181
4182    def get_matched_ula_addresses(self, prefix):
4183        """Get the IPv6 addresses that matches given prefix.
4184        """
4185
4186        addrs = []
4187        for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA):
4188            if IPv6Address(addr) in IPv6Network(prefix):
4189                addrs.append(addr)
4190
4191        return addrs
4192
4193
4194if __name__ == '__main__':
4195    unittest.main()
4196