1#!/usr/bin/env python3 2# 3# Copyright (c) 2016, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28# 29 30import json 31import binascii 32import ipaddress 33import logging 34import os 35import re 36import shlex 37import socket 38import subprocess 39import sys 40import time 41import traceback 42import typing 43import unittest 44from ipaddress import IPv6Address, IPv6Network 45from typing import Union, Dict, Optional, List, Any 46 47import pexpect 48import pexpect.popen_spawn 49 50import config 51import simulator 52import thread_cert 53 54PORT_OFFSET = int(os.getenv('PORT_OFFSET', "0")) 55 56INFRA_DNS64 = int(os.getenv('NAT64', 0)) 57 58 59class OtbrDocker: 60 RESET_DELAY = 3 61 62 _socat_proc = None 63 _ot_rcp_proc = None 64 _docker_proc = None 65 _border_routing_counters = None 66 67 def __init__(self, nodeid: int, backbone_network: str, **kwargs): 68 self.verbose = int(float(os.getenv('VERBOSE', 0))) 69 70 assert backbone_network is not None 71 self.backbone_network = backbone_network 72 try: 73 self._docker_name = config.OTBR_DOCKER_NAME_PREFIX + str(nodeid) 74 self._prepare_ot_rcp_sim(nodeid) 75 self._launch_docker() 76 except Exception: 77 traceback.print_exc() 78 self.destroy() 79 raise 80 81 def _prepare_ot_rcp_sim(self, nodeid: int): 82 self._socat_proc = subprocess.Popen(['socat', '-d', '-d', 'pty,raw,echo=0', 'pty,raw,echo=0'], 83 stderr=subprocess.PIPE, 84 stdin=subprocess.DEVNULL, 85 stdout=subprocess.DEVNULL) 86 87 line = self._socat_proc.stderr.readline().decode('ascii').strip() 88 self._rcp_device_pty = rcp_device_pty = line[line.index('PTY is /dev') + 7:] 89 line = self._socat_proc.stderr.readline().decode('ascii').strip() 90 self._rcp_device = rcp_device = line[line.index('PTY is /dev') + 7:] 91 logging.info(f"socat running: device PTY: {rcp_device_pty}, device: {rcp_device}") 92 93 ot_rcp_path = self._get_ot_rcp_path() 94 self._ot_rcp_proc = subprocess.Popen(f"{ot_rcp_path} {nodeid} > {rcp_device_pty} < {rcp_device_pty}", 95 shell=True, 96 stdin=subprocess.DEVNULL, 97 stdout=subprocess.DEVNULL, 98 stderr=subprocess.DEVNULL) 99 100 try: 101 self._ot_rcp_proc.wait(1) 102 except subprocess.TimeoutExpired: 103 # We expect ot-rcp not to quit in 1 second. 104 pass 105 else: 106 raise Exception(f"ot-rcp {nodeid} exited unexpectedly!") 107 108 def _get_ot_rcp_path(self) -> str: 109 srcdir = os.environ['top_builddir'] 110 path = '%s/examples/apps/ncp/ot-rcp' % srcdir 111 logging.info("ot-rcp path: %s", path) 112 return path 113 114 def _launch_docker(self): 115 logging.info(f'Docker image: {config.OTBR_DOCKER_IMAGE}') 116 subprocess.check_call(f"docker rm -f {self._docker_name} || true", shell=True) 117 CI_ENV = os.getenv('CI_ENV', '').split() 118 dns = ['--dns=127.0.0.1'] if INFRA_DNS64 == 1 else ['--dns=8.8.8.8'] 119 nat64_prefix = ['--nat64-prefix', '2001:db8:1:ffff::/96'] if INFRA_DNS64 == 1 else [] 120 os.makedirs('/tmp/coverage/', exist_ok=True) 121 122 cmd = ['docker', 'run'] + CI_ENV + [ 123 '--rm', 124 '--name', 125 self._docker_name, 126 '--network', 127 self.backbone_network, 128 ] + dns + [ 129 '-i', 130 '--sysctl', 131 'net.ipv6.conf.all.disable_ipv6=0 net.ipv4.conf.all.forwarding=1 net.ipv6.conf.all.forwarding=1', 132 '--privileged', 133 '--cap-add=NET_ADMIN', 134 '--volume', 135 f'{self._rcp_device}:/dev/ttyUSB0', 136 '-v', 137 '/tmp/coverage/:/tmp/coverage/', 138 config.OTBR_DOCKER_IMAGE, 139 '-B', 140 config.BACKBONE_IFNAME, 141 '--trel-url', 142 f'trel://{config.BACKBONE_IFNAME}', 143 ] + nat64_prefix 144 logging.info(' '.join(cmd)) 145 self._docker_proc = subprocess.Popen(cmd, 146 stdin=subprocess.DEVNULL, 147 stdout=sys.stdout if self.verbose else subprocess.DEVNULL, 148 stderr=sys.stderr if self.verbose else subprocess.DEVNULL) 149 150 launch_docker_deadline = time.time() + 300 151 launch_ok = False 152 153 while time.time() < launch_docker_deadline: 154 try: 155 subprocess.check_call(f'docker exec -i {self._docker_name} ot-ctl state', shell=True) 156 launch_ok = True 157 logging.info("OTBR Docker %s on %s Is Ready!", self._docker_name, self.backbone_network) 158 break 159 except subprocess.CalledProcessError: 160 time.sleep(5) 161 continue 162 163 assert launch_ok 164 165 self.start_ot_ctl() 166 167 def __repr__(self): 168 return f'OtbrDocker<{self.nodeid}>' 169 170 def start_otbr_service(self): 171 self.bash('service otbr-agent start') 172 self.simulator.go(3) 173 self.start_ot_ctl() 174 175 def stop_otbr_service(self): 176 self.stop_ot_ctl() 177 self.bash('service otbr-agent stop') 178 179 def stop_mdns_service(self): 180 self.bash('service avahi-daemon stop; service mdns stop; !(cat /proc/net/udp | grep -i :14E9)') 181 182 def start_mdns_service(self): 183 self.bash('service avahi-daemon start; service mdns start; cat /proc/net/udp | grep -i :14E9') 184 185 def start_ot_ctl(self): 186 cmd = f'docker exec -i {self._docker_name} ot-ctl' 187 self.pexpect = pexpect.popen_spawn.PopenSpawn(cmd, timeout=30) 188 if self.verbose: 189 self.pexpect.logfile_read = sys.stdout.buffer 190 191 # Add delay to ensure that the process is ready to receive commands. 192 timeout = 0.4 193 while timeout > 0: 194 self.pexpect.send('\r\n') 195 try: 196 self.pexpect.expect('> ', timeout=0.1) 197 break 198 except pexpect.TIMEOUT: 199 timeout -= 0.1 200 201 def stop_ot_ctl(self): 202 self.pexpect.sendeof() 203 self.pexpect.wait() 204 self.pexpect.proc.kill() 205 206 def reserve_udp_port(self, port): 207 self.bash(f'socat -u UDP6-LISTEN:{port},bindtodevice=wpan0 - &') 208 209 def destroy(self): 210 logging.info("Destroying %s", self) 211 self._shutdown_docker() 212 self._shutdown_ot_rcp() 213 self._shutdown_socat() 214 215 def _shutdown_docker(self): 216 if self._docker_proc is None: 217 return 218 219 try: 220 COVERAGE = int(os.getenv('COVERAGE', '0')) 221 OTBR_COVERAGE = int(os.getenv('OTBR_COVERAGE', '0')) 222 test_name = os.getenv('TEST_NAME') 223 unique_node_id = f'{test_name}-{PORT_OFFSET}-{self.nodeid}' 224 225 if COVERAGE or OTBR_COVERAGE: 226 self.bash('service otbr-agent stop') 227 228 cov_file_path = f'/tmp/coverage/coverage-{unique_node_id}.info' 229 # Upload OTBR code coverage if OTBR_COVERAGE=1, otherwise OpenThread code coverage. 230 if OTBR_COVERAGE: 231 codecov_cmd = f'lcov --directory . --capture --output-file {cov_file_path}' 232 else: 233 codecov_cmd = ('lcov --directory build/otbr/third_party/openthread/repo --capture ' 234 f'--output-file {cov_file_path}') 235 236 self.bash(codecov_cmd) 237 238 copyCore = subprocess.run(f'docker cp {self._docker_name}:/core ./coredump_{unique_node_id}', shell=True) 239 if copyCore.returncode == 0: 240 subprocess.check_call( 241 f'docker cp {self._docker_name}:/usr/sbin/otbr-agent ./otbr-agent_{unique_node_id}', shell=True) 242 243 finally: 244 subprocess.check_call(f"docker rm -f {self._docker_name}", shell=True) 245 self._docker_proc.wait() 246 del self._docker_proc 247 248 def _shutdown_ot_rcp(self): 249 if self._ot_rcp_proc is not None: 250 self._ot_rcp_proc.kill() 251 self._ot_rcp_proc.wait() 252 del self._ot_rcp_proc 253 254 def _shutdown_socat(self): 255 if self._socat_proc is not None: 256 self._socat_proc.stderr.close() 257 self._socat_proc.kill() 258 self._socat_proc.wait() 259 del self._socat_proc 260 261 def bash(self, cmd: str, encoding='ascii') -> List[str]: 262 logging.info("%s $ %s", self, cmd) 263 proc = subprocess.Popen(['docker', 'exec', '-i', self._docker_name, 'bash', '-c', cmd], 264 stdin=subprocess.DEVNULL, 265 stdout=subprocess.PIPE, 266 stderr=sys.stderr, 267 encoding=encoding) 268 269 with proc: 270 271 lines = [] 272 273 while True: 274 line = proc.stdout.readline() 275 276 if not line: 277 break 278 279 lines.append(line) 280 logging.info("%s $ %r", self, line.rstrip('\r\n')) 281 282 proc.wait() 283 284 if proc.returncode != 0: 285 raise subprocess.CalledProcessError(proc.returncode, cmd, ''.join(lines)) 286 else: 287 return lines 288 289 def dns_dig(self, server: str, name: str, qtype: str): 290 """ 291 Run dig command to query a DNS server. 292 293 Args: 294 server: the server address. 295 name: the name to query. 296 qtype: the query type (e.g. AAAA, PTR, TXT, SRV). 297 298 Returns: 299 The dig result similar as below: 300 { 301 "opcode": "QUERY", 302 "status": "NOERROR", 303 "id": "64144", 304 "QUESTION": [ 305 ('google.com.', 'IN', 'AAAA') 306 ], 307 "ANSWER": [ 308 ('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::71'), 309 ('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::8a'), 310 ('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::66'), 311 ('google.com.', 107, 'IN', 'AAAA', '2404:6800:4008:c00::8b'), 312 ], 313 "ADDITIONAL": [ 314 ], 315 } 316 """ 317 output = self.bash(f'dig -6 @{server} \'{name}\' {qtype}', encoding='raw_unicode_escape') 318 319 section = None 320 dig_result = { 321 'QUESTION': [], 322 'ANSWER': [], 323 'ADDITIONAL': [], 324 } 325 326 for line in output: 327 line = line.strip() 328 329 if line.startswith(';; ->>HEADER<<- '): 330 headers = line[len(';; ->>HEADER<<- '):].split(', ') 331 for header in headers: 332 key, val = header.split(': ') 333 dig_result[key] = val 334 335 continue 336 337 if line == ';; QUESTION SECTION:': 338 section = 'QUESTION' 339 continue 340 elif line == ';; ANSWER SECTION:': 341 section = 'ANSWER' 342 continue 343 elif line == ';; ADDITIONAL SECTION:': 344 section = 'ADDITIONAL' 345 continue 346 elif section and not line: 347 section = None 348 continue 349 350 if section: 351 assert line 352 353 if section == 'QUESTION': 354 assert line.startswith(';') 355 line = line[1:] 356 record = list(line.split()) 357 358 if section == 'QUESTION': 359 if record[2] in ('SRV', 'TXT'): 360 record[0] = self.__unescape_dns_instance_name(record[0]) 361 else: 362 record[1] = int(record[1]) 363 if record[3] == 'SRV': 364 record[0] = self.__unescape_dns_instance_name(record[0]) 365 record[4], record[5], record[6] = map(int, [record[4], record[5], record[6]]) 366 elif record[3] == 'TXT': 367 record[0] = self.__unescape_dns_instance_name(record[0]) 368 record[4:] = [self.__parse_dns_dig_txt(line)] 369 elif record[3] == 'PTR': 370 record[4] = self.__unescape_dns_instance_name(record[4]) 371 372 dig_result[section].append(tuple(record)) 373 374 return dig_result 375 376 def call_dbus_method(self, *args): 377 args = shlex.join([args[0], args[1], json.dumps(args[2:])]) 378 return json.loads( 379 self.bash(f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/call_dbus_method.py {args}') 380 [0]) 381 382 def get_dbus_property(self, property_name): 383 return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Get', 'io.openthread.BorderRouter', 384 property_name) 385 386 def set_dbus_property(self, property_name, property_value): 387 return self.call_dbus_method('org.freedesktop.DBus.Properties', 'Set', 'io.openthread.BorderRouter', 388 property_name, property_value) 389 390 def get_border_routing_counters(self): 391 counters = self.get_dbus_property('BorderRoutingCounters') 392 counters = { 393 'inbound_unicast': counters[0], 394 'inbound_multicast': counters[1], 395 'outbound_unicast': counters[2], 396 'outbound_multicast': counters[3], 397 'ra_rx': counters[4], 398 'ra_tx_success': counters[5], 399 'ra_tx_failure': counters[6], 400 'rs_rx': counters[7], 401 'rs_tx_success': counters[8], 402 'rs_tx_failure': counters[9], 403 } 404 logging.info(f'border routing counters: {counters}') 405 return counters 406 407 def _process_traffic_counters(self, counter): 408 return { 409 '4to6': { 410 'packets': counter[0], 411 'bytes': counter[1], 412 }, 413 '6to4': { 414 'packets': counter[2], 415 'bytes': counter[3], 416 } 417 } 418 419 def _process_packet_counters(self, counter): 420 return {'4to6': {'packets': counter[0]}, '6to4': {'packets': counter[1]}} 421 422 def nat64_set_enabled(self, enable): 423 return self.call_dbus_method('io.openthread.BorderRouter', 'SetNat64Enabled', enable) 424 425 def activate_ephemeral_key_mode(self, lifetime): 426 return self.call_dbus_method('io.openthread.BorderRouter', 'ActivateEphemeralKeyMode', lifetime) 427 428 def deactivate_ephemeral_key_mode(self): 429 return self.call_dbus_method('io.openthread.BorderRouter', 'DeactivateEphemeralKeyMode') 430 431 @property 432 def nat64_cidr(self): 433 self.send_command('nat64 cidr') 434 cidr = self._expect_command_output()[0].strip() 435 return ipaddress.IPv4Network(cidr, strict=False) 436 437 @nat64_cidr.setter 438 def nat64_cidr(self, cidr: ipaddress.IPv4Network): 439 if not isinstance(cidr, ipaddress.IPv4Network): 440 raise ValueError("cidr is expected to be an instance of ipaddress.IPv4Network") 441 self.send_command(f'nat64 cidr {cidr}') 442 self._expect_done() 443 444 @property 445 def nat64_state(self): 446 state = self.get_dbus_property('Nat64State') 447 return {'PrefixManager': state[0], 'Translator': state[1]} 448 449 @property 450 def nat64_mappings(self): 451 return [{ 452 'id': row[0], 453 'ip4': row[1], 454 'ip6': row[2], 455 'expiry': row[3], 456 'counters': { 457 'total': self._process_traffic_counters(row[4][0]), 458 'ICMP': self._process_traffic_counters(row[4][1]), 459 'UDP': self._process_traffic_counters(row[4][2]), 460 'TCP': self._process_traffic_counters(row[4][3]), 461 } 462 } for row in self.get_dbus_property('Nat64Mappings')] 463 464 @property 465 def nat64_counters(self): 466 res_error = self.get_dbus_property('Nat64ErrorCounters') 467 res_proto = self.get_dbus_property('Nat64ProtocolCounters') 468 return { 469 'protocol': { 470 'Total': self._process_traffic_counters(res_proto[0]), 471 'ICMP': self._process_traffic_counters(res_proto[1]), 472 'UDP': self._process_traffic_counters(res_proto[2]), 473 'TCP': self._process_traffic_counters(res_proto[3]), 474 }, 475 'errors': { 476 'Unknown': self._process_packet_counters(res_error[0]), 477 'Illegal Pkt': self._process_packet_counters(res_error[1]), 478 'Unsup Proto': self._process_packet_counters(res_error[2]), 479 'No Mapping': self._process_packet_counters(res_error[3]), 480 } 481 } 482 483 @property 484 def nat64_traffic_counters(self): 485 res = self.get_dbus_property('Nat64TrafficCounters') 486 return { 487 'Total': self._process_traffic_counters(res[0]), 488 'ICMP': self._process_traffic_counters(res[1]), 489 'UDP': self._process_traffic_counters(res[2]), 490 'TCP': self._process_traffic_counters(res[3]), 491 } 492 493 @property 494 def dns_upstream_query_state(self): 495 return bool(self.get_dbus_property('DnsUpstreamQueryState')) 496 497 @dns_upstream_query_state.setter 498 def dns_upstream_query_state(self, value): 499 if type(value) is not bool: 500 raise ValueError("dns_upstream_query_state must be a bool") 501 return self.set_dbus_property('DnsUpstreamQueryState', value) 502 503 @property 504 def ephemeral_key_enabled(self): 505 return bool(self.get_dbus_property('EphemeralKeyEnabled')) 506 507 @ephemeral_key_enabled.setter 508 def ephemeral_key_enabled(self, value): 509 if type(value) is not bool: 510 raise ValueError("ephemeral_key_enabled must be a bool") 511 return self.set_dbus_property('EphemeralKeyEnabled', value) 512 513 def read_border_routing_counters_delta(self): 514 old_counters = self._border_routing_counters 515 new_counters = self.get_border_routing_counters() 516 self._border_routing_counters = new_counters 517 delta_counters = {} 518 if old_counters is None: 519 delta_counters = new_counters 520 else: 521 for i in ('inbound', 'outbound'): 522 for j in ('unicast', 'multicast'): 523 key = f'{i}_{j}' 524 assert (key in old_counters) 525 assert (key in new_counters) 526 value = [new_counters[key][0] - old_counters[key][0], new_counters[key][1] - old_counters[key][1]] 527 delta_counters[key] = value 528 delta_counters = { 529 key: value for key, value in delta_counters.items() if not isinstance(value, int) and value[0] and value[1] 530 } 531 532 return delta_counters 533 534 @staticmethod 535 def __unescape_dns_instance_name(name: str) -> str: 536 new_name = [] 537 i = 0 538 while i < len(name): 539 c = name[i] 540 541 if c == '\\': 542 assert i + 1 < len(name), name 543 if name[i + 1].isdigit(): 544 assert i + 3 < len(name) and name[i + 2].isdigit() and name[i + 3].isdigit(), name 545 new_name.append(chr(int(name[i + 1:i + 4]))) 546 i += 3 547 else: 548 new_name.append(name[i + 1]) 549 i += 1 550 else: 551 new_name.append(c) 552 553 i += 1 554 555 return ''.join(new_name) 556 557 def __parse_dns_dig_txt(self, line: str): 558 # Example TXT entry: 559 # "xp=\\000\\013\\184\\000\\000\\000\\000\\000" 560 txt = {} 561 for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line): 562 if entry == "": 563 continue 564 565 k, v = entry.split('=', 1) 566 txt[k] = v 567 568 return txt 569 570 def _setup_sysctl(self): 571 self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra=2') 572 self.bash(f'sysctl net.ipv6.conf.{self.ETH_DEV}.accept_ra_rt_info_max_plen=64') 573 574 575class OtCli: 576 RESET_DELAY = 0.1 577 578 def __init__(self, nodeid, is_mtd=False, version=None, is_bbr=False, **kwargs): 579 self.verbose = int(float(os.getenv('VERBOSE', 0))) 580 self.node_type = os.getenv('NODE_TYPE', 'sim') 581 self.env_version = os.getenv('THREAD_VERSION', '1.1') 582 self.is_bbr = is_bbr 583 self._initialized = False 584 if os.getenv('COVERAGE', 0) and os.getenv('CC', 'gcc') == 'gcc': 585 self._cmd_prefix = '/usr/bin/env GCOV_PREFIX=%s/ot-run/%s/ot-gcda.%d ' % (os.getenv( 586 'top_srcdir', '.'), sys.argv[0], nodeid) 587 else: 588 self._cmd_prefix = '' 589 590 if version is not None: 591 self.version = version 592 else: 593 self.version = self.env_version 594 595 mode = os.environ.get('USE_MTD') == '1' and is_mtd and 'mtd' or 'ftd' 596 597 if self.node_type == 'soc': 598 self.__init_soc(nodeid) 599 elif self.node_type == 'ncp-sim': 600 # TODO use mode after ncp-mtd is available. 601 self.__init_ncp_sim(nodeid, 'ftd') 602 else: 603 self.__init_sim(nodeid, mode) 604 605 if self.verbose: 606 self.pexpect.logfile_read = sys.stdout.buffer 607 608 self._initialized = True 609 610 def __init_sim(self, nodeid, mode): 611 """ Initialize a simulation node. """ 612 613 # Default command if no match below, will be overridden if below conditions are met. 614 cmd = './ot-cli-%s' % (mode) 615 616 # For Thread 1.2 MTD node, use ot-cli-mtd build regardless of OT_CLI_PATH 617 if self.version != '1.1' and mode == 'mtd' and 'top_builddir' in os.environ: 618 srcdir = os.environ['top_builddir'] 619 cmd = '%s/examples/apps/cli/ot-cli-%s %d' % (srcdir, mode, nodeid) 620 621 # If Thread version of node matches the testing environment version. 622 elif self.version == self.env_version: 623 # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios 624 # which requires device with Backbone functionality. 625 if self.version != '1.1' and self.is_bbr: 626 if 'OT_CLI_PATH_BBR' in os.environ: 627 cmd = os.environ['OT_CLI_PATH_BBR'] 628 elif 'top_builddir_1_3_bbr' in os.environ: 629 srcdir = os.environ['top_builddir_1_3_bbr'] 630 cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode) 631 632 # Load Thread device of the testing environment version (may be 1.1 or 1.2) 633 else: 634 if 'OT_CLI_PATH' in os.environ: 635 cmd = os.environ['OT_CLI_PATH'] 636 elif 'top_builddir' in os.environ: 637 srcdir = os.environ['top_builddir'] 638 cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode) 639 640 if 'RADIO_DEVICE' in os.environ: 641 cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'], 642 nodeid) 643 self.is_posix = True 644 else: 645 cmd += ' %d' % nodeid 646 647 # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability 648 elif self.version == '1.1': 649 # Posix app 650 if 'OT_CLI_PATH_1_1' in os.environ: 651 cmd = os.environ['OT_CLI_PATH_1_1'] 652 elif 'top_builddir_1_1' in os.environ: 653 srcdir = os.environ['top_builddir_1_1'] 654 cmd = '%s/examples/apps/cli/ot-cli-%s' % (srcdir, mode) 655 656 if 'RADIO_DEVICE_1_1' in os.environ: 657 cmd += ' --real-time-signal=+1 -v spinel+hdlc+uart://%s?forkpty-arg=%d' % ( 658 os.environ['RADIO_DEVICE_1_1'], nodeid) 659 self.is_posix = True 660 else: 661 cmd += ' %d' % nodeid 662 663 print("%s" % cmd) 664 665 self.pexpect = pexpect.popen_spawn.PopenSpawn(self._cmd_prefix + cmd, timeout=10) 666 667 # Add delay to ensure that the process is ready to receive commands. 668 timeout = 0.4 669 while timeout > 0: 670 self.pexpect.send('\r\n') 671 try: 672 self.pexpect.expect('> ', timeout=0.1) 673 break 674 except pexpect.TIMEOUT: 675 timeout -= 0.1 676 677 def __init_ncp_sim(self, nodeid, mode): 678 """ Initialize an NCP simulation node. """ 679 680 # Default command if no match below, will be overridden if below conditions are met. 681 cmd = 'spinel-cli.py -p ./ot-ncp-%s -n' % mode 682 683 # If Thread version of node matches the testing environment version. 684 if self.version == self.env_version: 685 if 'RADIO_DEVICE' in os.environ: 686 args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE'], 687 nodeid) 688 self.is_posix = True 689 else: 690 args = '' 691 692 # Load Thread 1.2 BBR device when testing Thread 1.2 scenarios 693 # which requires device with Backbone functionality. 694 if self.version != '1.1' and self.is_bbr: 695 if 'OT_NCP_PATH_1_3_BBR' in os.environ: 696 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 697 os.environ['OT_NCP_PATH_1_3_BBR'], 698 args, 699 ) 700 elif 'top_builddir_1_3_bbr' in os.environ: 701 srcdir = os.environ['top_builddir_1_3_bbr'] 702 cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode) 703 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 704 cmd, 705 args, 706 ) 707 708 # Load Thread device of the testing environment version (may be 1.1 or 1.2). 709 else: 710 if 'OT_NCP_PATH' in os.environ: 711 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 712 os.environ['OT_NCP_PATH'], 713 args, 714 ) 715 elif 'top_builddir' in os.environ: 716 srcdir = os.environ['top_builddir'] 717 cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode) 718 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 719 cmd, 720 args, 721 ) 722 723 # Load Thread 1.1 node when testing Thread 1.2 scenarios for interoperability. 724 elif self.version == '1.1': 725 if 'RADIO_DEVICE_1_1' in os.environ: 726 args = ' --real-time-signal=+1 spinel+hdlc+uart://%s?forkpty-arg=%d' % (os.environ['RADIO_DEVICE_1_1'], 727 nodeid) 728 self.is_posix = True 729 else: 730 args = '' 731 732 if 'OT_NCP_PATH_1_1' in os.environ: 733 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 734 os.environ['OT_NCP_PATH_1_1'], 735 args, 736 ) 737 elif 'top_builddir_1_1' in os.environ: 738 srcdir = os.environ['top_builddir_1_1'] 739 cmd = '%s/examples/apps/ncp/ot-ncp-%s' % (srcdir, mode) 740 cmd = 'spinel-cli.py -p "%s%s" -n' % ( 741 cmd, 742 args, 743 ) 744 745 cmd += ' %d' % nodeid 746 print("%s" % cmd) 747 748 self.pexpect = pexpect.spawn(self._cmd_prefix + cmd, timeout=10) 749 750 # Add delay to ensure that the process is ready to receive commands. 751 time.sleep(0.2) 752 self._expect('spinel-cli >') 753 self.debug(int(os.getenv('DEBUG', '0'))) 754 755 def __init_soc(self, nodeid): 756 """ Initialize a System-on-a-chip node connected via UART. """ 757 import fdpexpect 758 759 serialPort = '/dev/ttyUSB%d' % ((nodeid - 1) * 2) 760 self.pexpect = fdpexpect.fdspawn(os.open(serialPort, os.O_RDWR | os.O_NONBLOCK | os.O_NOCTTY)) 761 762 def destroy(self): 763 if not self._initialized: 764 return 765 766 if (hasattr(self.pexpect, 'proc') and self.pexpect.proc.poll() is None or 767 not hasattr(self.pexpect, 'proc') and self.pexpect.isalive()): 768 print("%d: exit" % self.nodeid) 769 self.pexpect.send('exit\n') 770 self.pexpect.expect(pexpect.EOF) 771 self.pexpect.wait() 772 self._initialized = False 773 774 775class NodeImpl: 776 is_host = False 777 is_otbr = False 778 779 def __init__(self, nodeid, name=None, simulator=None, **kwargs): 780 self.nodeid = nodeid 781 self.name = name or ('Node%d' % nodeid) 782 self.is_posix = False 783 784 self.simulator = simulator 785 if self.simulator: 786 self.simulator.add_node(self) 787 788 super().__init__(nodeid, **kwargs) 789 790 self.set_addr64('%016x' % (thread_cert.EXTENDED_ADDRESS_BASE + nodeid)) 791 792 def _expect(self, pattern, timeout=-1, *args, **kwargs): 793 """ Process simulator events until expected the pattern. """ 794 if timeout == -1: 795 timeout = self.pexpect.timeout 796 797 assert timeout > 0 798 799 while timeout > 0: 800 try: 801 return self.pexpect.expect(pattern, 0.1, *args, **kwargs) 802 except pexpect.TIMEOUT: 803 timeout -= 0.1 804 self.simulator.go(0) 805 if timeout <= 0: 806 raise 807 808 def _expect_done(self, timeout=-1): 809 self._expect('Done', timeout) 810 811 def _expect_result(self, pattern, *args, **kwargs): 812 """Expect a single matching result. 813 814 The arguments are identical to pexpect.expect(). 815 816 Returns: 817 The matched line. 818 """ 819 results = self._expect_results(pattern, *args, **kwargs) 820 assert len(results) == 1, results 821 return results[0] 822 823 def _expect_results(self, pattern, *args, **kwargs): 824 """Expect multiple matching results. 825 826 The arguments are identical to pexpect.expect(). 827 828 Returns: 829 The matched lines. 830 """ 831 output = self._expect_command_output() 832 results = [line for line in output if self._match_pattern(line, pattern)] 833 return results 834 835 def _expect_key_value_pairs(self, pattern, separator=': '): 836 """Expect 'key: value' in multiple lines. 837 838 Returns: 839 Dictionary of the key:value pairs. 840 """ 841 result = {} 842 for line in self._expect_results(pattern): 843 key, val = line.split(separator) 844 result.update({key: val}) 845 return result 846 847 @staticmethod 848 def _match_pattern(line, pattern): 849 if isinstance(pattern, str): 850 pattern = re.compile(pattern) 851 852 if isinstance(pattern, typing.Pattern): 853 return pattern.match(line) 854 else: 855 return any(NodeImpl._match_pattern(line, p) for p in pattern) 856 857 def _expect_command_output(self, ignore_logs=True): 858 lines = [] 859 860 while True: 861 line = self.__readline(ignore_logs=ignore_logs) 862 863 if line == 'Done': 864 break 865 elif line.startswith('Error '): 866 raise Exception(line) 867 else: 868 lines.append(line) 869 870 print(f'_expect_command_output() returns {lines!r}') 871 return lines 872 873 def __is_logging_line(self, line: str) -> bool: 874 return len(line) >= 3 and line[:3] in {'[D]', '[I]', '[N]', '[W]', '[C]', '[-]'} 875 876 def read_cert_messages_in_commissioning_log(self, timeout=-1): 877 """Get the log of the traffic after DTLS handshake. 878 """ 879 format_str = br"=+?\[\[THCI\].*?type=%s.*?\].*?=+?[\s\S]+?-{40,}" 880 join_fin_req = format_str % br"JOIN_FIN\.req" 881 join_fin_rsp = format_str % br"JOIN_FIN\.rsp" 882 dummy_format_str = br"\[THCI\].*?type=%s.*?" 883 join_ent_ntf = dummy_format_str % br"JOIN_ENT\.ntf" 884 join_ent_rsp = dummy_format_str % br"JOIN_ENT\.rsp" 885 pattern = (b"(" + join_fin_req + b")|(" + join_fin_rsp + b")|(" + join_ent_ntf + b")|(" + join_ent_rsp + b")") 886 887 messages = [] 888 # There are at most 4 cert messages both for joiner and commissioner 889 for _ in range(0, 4): 890 try: 891 self._expect(pattern, timeout=timeout) 892 log = self.pexpect.match.group(0) 893 messages.append(self._extract_cert_message(log)) 894 except BaseException: 895 break 896 return messages 897 898 def _extract_cert_message(self, log): 899 res = re.search(br"direction=\w+", log) 900 assert res 901 direction = res.group(0).split(b'=')[1].strip() 902 903 res = re.search(br"type=\S+", log) 904 assert res 905 type = res.group(0).split(b'=')[1].strip() 906 907 payload = bytearray([]) 908 payload_len = 0 909 if type in [b"JOIN_FIN.req", b"JOIN_FIN.rsp"]: 910 res = re.search(br"len=\d+", log) 911 assert res 912 payload_len = int(res.group(0).split(b'=')[1].strip()) 913 914 hex_pattern = br"\|(\s([0-9a-fA-F]{2}|\.\.))+?\s+?\|" 915 while True: 916 res = re.search(hex_pattern, log) 917 if not res: 918 break 919 data = [int(hex, 16) for hex in res.group(0)[1:-1].split(b' ') if hex and hex != b'..'] 920 payload += bytearray(data) 921 log = log[res.end() - 1:] 922 assert len(payload) == payload_len 923 return (direction, type, payload) 924 925 def send_command(self, cmd, go=True, expect_command_echo=True): 926 print("%d: %s" % (self.nodeid, cmd)) 927 self.pexpect.send(cmd + '\n') 928 if go: 929 self.simulator.go(0, nodeid=self.nodeid) 930 sys.stdout.flush() 931 932 if expect_command_echo: 933 self._expect_command_echo(cmd) 934 935 def _expect_command_echo(self, cmd): 936 cmd = cmd.strip() 937 while True: 938 line = self.__readline() 939 if line == cmd: 940 break 941 942 logging.warning("expecting echo %r, but read %r", cmd, line) 943 944 def __readline(self, ignore_logs=True): 945 PROMPT = 'spinel-cli > ' if self.node_type == 'ncp-sim' else '> ' 946 while True: 947 self._expect(r"[^\n]+\n") 948 line = self.pexpect.match.group(0).decode('utf8').strip() 949 while line.startswith(PROMPT): 950 line = line[len(PROMPT):] 951 952 if line == '': 953 continue 954 955 if ignore_logs and self.__is_logging_line(line): 956 continue 957 958 return line 959 960 def get_commands(self): 961 self.send_command('?') 962 self._expect('Commands:') 963 return self._expect_results(r'\S+') 964 965 def set_mode(self, mode): 966 cmd = 'mode %s' % mode 967 self.send_command(cmd) 968 self._expect_done() 969 970 def debug(self, level): 971 # `debug` command will not trigger interaction with simulator 972 self.send_command('debug %d' % level, go=False) 973 974 def start(self): 975 self.interface_up() 976 self.thread_start() 977 978 def stop(self): 979 self.thread_stop() 980 self.interface_down() 981 982 def set_log_level(self, level: int): 983 self.send_command(f'log level {level}') 984 self._expect_done() 985 986 def interface_up(self): 987 self.send_command('ifconfig up') 988 self._expect_done() 989 990 def interface_down(self): 991 self.send_command('ifconfig down') 992 self._expect_done() 993 994 def thread_start(self): 995 self.send_command('thread start') 996 self._expect_done() 997 998 def thread_stop(self): 999 self.send_command('thread stop') 1000 self._expect_done() 1001 1002 def detach(self, is_async=False): 1003 cmd = 'detach' 1004 if is_async: 1005 cmd += ' async' 1006 1007 self.send_command(cmd) 1008 1009 if is_async: 1010 self._expect_done() 1011 return 1012 1013 end = self.simulator.now() + 4 1014 while True: 1015 self.simulator.go(1) 1016 try: 1017 self._expect_done(timeout=0.1) 1018 return 1019 except (pexpect.TIMEOUT, socket.timeout): 1020 if self.simulator.now() > end: 1021 raise 1022 1023 def expect_finished_detaching(self): 1024 self._expect('Finished detaching') 1025 1026 def commissioner_start(self): 1027 cmd = 'commissioner start' 1028 self.send_command(cmd) 1029 self._expect_done() 1030 1031 def commissioner_stop(self): 1032 cmd = 'commissioner stop' 1033 self.send_command(cmd) 1034 self._expect_done() 1035 1036 def commissioner_state(self): 1037 states = [r'disabled', r'petitioning', r'active'] 1038 self.send_command('commissioner state') 1039 return self._expect_result(states) 1040 1041 def commissioner_add_joiner(self, addr, psk): 1042 cmd = 'commissioner joiner add %s %s' % (addr, psk) 1043 self.send_command(cmd) 1044 self._expect_done() 1045 1046 def commissioner_set_provisioning_url(self, provisioning_url=''): 1047 cmd = 'commissioner provisioningurl %s' % provisioning_url 1048 self.send_command(cmd) 1049 self._expect_done() 1050 1051 def joiner_start(self, pskd='', provisioning_url=''): 1052 cmd = 'joiner start %s %s' % (pskd, provisioning_url) 1053 self.send_command(cmd) 1054 self._expect_done() 1055 1056 def clear_allowlist(self): 1057 cmd = 'macfilter addr clear' 1058 self.send_command(cmd) 1059 self._expect_done() 1060 1061 def enable_allowlist(self): 1062 cmd = 'macfilter addr allowlist' 1063 self.send_command(cmd) 1064 self._expect_done() 1065 1066 def disable_allowlist(self): 1067 cmd = 'macfilter addr disable' 1068 self.send_command(cmd) 1069 self._expect_done() 1070 1071 def add_allowlist(self, addr, rssi=None): 1072 cmd = 'macfilter addr add %s' % addr 1073 1074 if rssi is not None: 1075 cmd += ' %s' % rssi 1076 1077 self.send_command(cmd) 1078 self._expect_done() 1079 1080 def radiofilter_is_enabled(self) -> bool: 1081 states = [r'Disabled', r'Enabled'] 1082 self.send_command('radiofilter') 1083 return self._expect_result(states) == 'Enabled' 1084 1085 def radiofilter_enable(self): 1086 cmd = 'radiofilter enable' 1087 self.send_command(cmd) 1088 self._expect_done() 1089 1090 def radiofilter_disable(self): 1091 cmd = 'radiofilter disable' 1092 self.send_command(cmd) 1093 self._expect_done() 1094 1095 def get_bbr_registration_jitter(self): 1096 self.send_command('bbr jitter') 1097 return int(self._expect_result(r'\d+')) 1098 1099 def set_bbr_registration_jitter(self, jitter): 1100 cmd = 'bbr jitter %d' % jitter 1101 self.send_command(cmd) 1102 self._expect_done() 1103 1104 def get_rcp_version(self) -> str: 1105 self.send_command('rcp version') 1106 rcp_version = self._expect_command_output()[0].strip() 1107 return rcp_version 1108 1109 def srp_server_get_state(self): 1110 states = ['disabled', 'running', 'stopped'] 1111 self.send_command('srp server state') 1112 return self._expect_result(states) 1113 1114 def srp_server_get_addr_mode(self): 1115 modes = [r'unicast', r'anycast'] 1116 self.send_command(f'srp server addrmode') 1117 return self._expect_result(modes) 1118 1119 def srp_server_set_addr_mode(self, mode): 1120 self.send_command(f'srp server addrmode {mode}') 1121 self._expect_done() 1122 1123 def srp_server_get_anycast_seq_num(self): 1124 self.send_command(f'srp server seqnum') 1125 return int(self._expect_result(r'\d+')) 1126 1127 def srp_server_set_anycast_seq_num(self, seqnum): 1128 self.send_command(f'srp server seqnum {seqnum}') 1129 self._expect_done() 1130 1131 def srp_server_set_enabled(self, enable): 1132 cmd = f'srp server {"enable" if enable else "disable"}' 1133 self.send_command(cmd) 1134 self._expect_done() 1135 1136 def srp_server_set_lease_range(self, min_lease, max_lease, min_key_lease, max_key_lease): 1137 self.send_command(f'srp server lease {min_lease} {max_lease} {min_key_lease} {max_key_lease}') 1138 self._expect_done() 1139 1140 def srp_server_set_ttl_range(self, min_ttl, max_ttl): 1141 self.send_command(f'srp server ttl {min_ttl} {max_ttl}') 1142 self._expect_done() 1143 1144 def srp_server_get_hosts(self): 1145 """Returns the host list on the SRP server as a list of property 1146 dictionary. 1147 1148 Example output: 1149 [{ 1150 'fullname': 'my-host.default.service.arpa.', 1151 'name': 'my-host', 1152 'deleted': 'false', 1153 'addresses': ['2001::1', '2001::2'] 1154 }] 1155 """ 1156 1157 cmd = 'srp server host' 1158 self.send_command(cmd) 1159 lines = self._expect_command_output() 1160 host_list = [] 1161 while lines: 1162 host = {} 1163 1164 host['fullname'] = lines.pop(0).strip() 1165 host['name'] = host['fullname'].split('.')[0] 1166 1167 host['deleted'] = lines.pop(0).strip().split(':')[1].strip() 1168 if host['deleted'] == 'true': 1169 host_list.append(host) 1170 continue 1171 1172 addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',') 1173 map(str.strip, addresses) 1174 host['addresses'] = [addr.strip() for addr in addresses if addr] 1175 1176 host_list.append(host) 1177 1178 return host_list 1179 1180 def srp_server_get_host(self, host_name): 1181 """Returns host on the SRP server that matches given host name. 1182 1183 Example usage: 1184 self.srp_server_get_host("my-host") 1185 """ 1186 1187 for host in self.srp_server_get_hosts(): 1188 if host_name == host['name']: 1189 return host 1190 1191 def srp_server_get_services(self): 1192 """Returns the service list on the SRP server as a list of property 1193 dictionary. 1194 1195 Example output: 1196 [{ 1197 'fullname': 'my-service._ipps._tcp.default.service.arpa.', 1198 'instance': 'my-service', 1199 'name': '_ipps._tcp', 1200 'deleted': 'false', 1201 'port': '12345', 1202 'priority': '0', 1203 'weight': '0', 1204 'ttl': '7200', 1205 'lease': '7200', 1206 'key-lease': '7200', 1207 'TXT': ['abc=010203'], 1208 'host_fullname': 'my-host.default.service.arpa.', 1209 'host': 'my-host', 1210 'addresses': ['2001::1', '2001::2'] 1211 }] 1212 1213 Note that the TXT data is output as a HEX string. 1214 """ 1215 1216 cmd = 'srp server service' 1217 self.send_command(cmd) 1218 lines = self._expect_command_output() 1219 1220 service_list = [] 1221 while lines: 1222 service = {} 1223 1224 service['fullname'] = lines.pop(0).strip() 1225 name_labels = service['fullname'].split('.') 1226 service['instance'] = name_labels[0] 1227 service['name'] = '.'.join(name_labels[1:3]) 1228 1229 service['deleted'] = lines.pop(0).strip().split(':')[1].strip() 1230 if service['deleted'] == 'true': 1231 service_list.append(service) 1232 continue 1233 1234 # 'subtypes', port', 'priority', 'weight', 'ttl', 'lease', and 'key-lease' 1235 for i in range(0, 7): 1236 key_value = lines.pop(0).strip().split(':') 1237 service[key_value[0].strip()] = key_value[1].strip() 1238 1239 txt_entries = lines.pop(0).strip().split('[')[1].strip(' ]').split(',') 1240 txt_entries = map(str.strip, txt_entries) 1241 service['TXT'] = [txt for txt in txt_entries if txt] 1242 1243 service['host_fullname'] = lines.pop(0).strip().split(':')[1].strip() 1244 service['host'] = service['host_fullname'].split('.')[0] 1245 1246 addresses = lines.pop(0).strip().split('[')[1].strip(' ]').split(',') 1247 addresses = map(str.strip, addresses) 1248 service['addresses'] = [addr for addr in addresses if addr] 1249 1250 service_list.append(service) 1251 1252 return service_list 1253 1254 def srp_server_get_service(self, instance_name, service_name): 1255 """Returns service on the SRP server that matches given instance 1256 name and service name. 1257 1258 Example usage: 1259 self.srp_server_get_service("my-service", "_ipps._tcp") 1260 """ 1261 1262 for service in self.srp_server_get_services(): 1263 if (instance_name == service['instance'] and service_name == service['name']): 1264 return service 1265 1266 def get_srp_server_port(self): 1267 """Returns the SRP server UDP port by parsing 1268 the SRP Server Data in Network Data. 1269 """ 1270 1271 for service in self.get_services(): 1272 # TODO: for now, we are using 0xfd as the SRP service data. 1273 # May use a dedicated bit flag for SRP server. 1274 if int(service[1], 16) == 0x5d: 1275 # The SRP server data contains IPv6 address (16 bytes) 1276 # followed by UDP port number. 1277 return int(service[2][2 * 16:], 16) 1278 1279 def srp_client_start(self, server_address, server_port): 1280 self.send_command(f'srp client start {server_address} {server_port}') 1281 self._expect_done() 1282 1283 def srp_client_stop(self): 1284 self.send_command(f'srp client stop') 1285 self._expect_done() 1286 1287 def srp_client_get_state(self): 1288 cmd = 'srp client state' 1289 self.send_command(cmd) 1290 return self._expect_command_output()[0] 1291 1292 def srp_client_get_auto_start_mode(self): 1293 cmd = 'srp client autostart' 1294 self.send_command(cmd) 1295 return self._expect_command_output()[0] 1296 1297 def srp_client_enable_auto_start_mode(self): 1298 self.send_command(f'srp client autostart enable') 1299 self._expect_done() 1300 1301 def srp_client_disable_auto_start_mode(self): 1302 self.send_command(f'srp client autostart disable') 1303 self._expect_done() 1304 1305 def srp_client_get_server_address(self): 1306 cmd = 'srp client server address' 1307 self.send_command(cmd) 1308 return self._expect_command_output()[0] 1309 1310 def srp_client_get_server_port(self): 1311 cmd = 'srp client server port' 1312 self.send_command(cmd) 1313 return int(self._expect_command_output()[0]) 1314 1315 def srp_client_get_host_state(self): 1316 cmd = 'srp client host state' 1317 self.send_command(cmd) 1318 return self._expect_command_output()[0] 1319 1320 def srp_client_set_host_name(self, name): 1321 self.send_command(f'srp client host name {name}') 1322 self._expect_done() 1323 1324 def srp_client_get_host_name(self): 1325 self.send_command(f'srp client host name') 1326 self._expect_done() 1327 1328 def srp_client_remove_host(self, remove_key=False, send_unreg_to_server=False): 1329 self.send_command(f'srp client host remove {int(remove_key)} {int(send_unreg_to_server)}') 1330 self._expect_done() 1331 1332 def srp_client_clear_host(self): 1333 self.send_command(f'srp client host clear') 1334 self._expect_done() 1335 1336 def srp_client_enable_auto_host_address(self): 1337 self.send_command(f'srp client host address auto') 1338 self._expect_done() 1339 1340 def srp_client_set_host_address(self, *addrs: str): 1341 self.send_command(f'srp client host address {" ".join(addrs)}') 1342 self._expect_done() 1343 1344 def srp_client_get_host_address(self): 1345 self.send_command(f'srp client host address') 1346 self._expect_done() 1347 1348 def srp_client_add_service(self, 1349 instance_name, 1350 service_name, 1351 port, 1352 priority=0, 1353 weight=0, 1354 txt_entries=[], 1355 lease=0, 1356 key_lease=0): 1357 txt_record = "".join(self._encode_txt_entry(entry) for entry in txt_entries) 1358 if txt_record == '': 1359 txt_record = '-' 1360 instance_name = self._escape_escapable(instance_name) 1361 self.send_command( 1362 f'srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt_record} {lease} {key_lease}' 1363 ) 1364 self._expect_done() 1365 1366 def srp_client_remove_service(self, instance_name, service_name): 1367 self.send_command(f'srp client service remove {instance_name} {service_name}') 1368 self._expect_done() 1369 1370 def srp_client_clear_service(self, instance_name, service_name): 1371 self.send_command(f'srp client service clear {instance_name} {service_name}') 1372 self._expect_done() 1373 1374 def srp_client_get_services(self): 1375 cmd = 'srp client service' 1376 self.send_command(cmd) 1377 service_lines = self._expect_command_output() 1378 return [self._parse_srp_client_service(line) for line in service_lines] 1379 1380 def srp_client_set_lease_interval(self, leaseinterval: int): 1381 cmd = f'srp client leaseinterval {leaseinterval}' 1382 self.send_command(cmd) 1383 self._expect_done() 1384 1385 def srp_client_get_lease_interval(self) -> int: 1386 cmd = 'srp client leaseinterval' 1387 self.send_command(cmd) 1388 return int(self._expect_result('\d+')) 1389 1390 def srp_client_set_key_lease_interval(self, leaseinterval: int): 1391 cmd = f'srp client keyleaseinterval {leaseinterval}' 1392 self.send_command(cmd) 1393 self._expect_done() 1394 1395 def srp_client_get_key_lease_interval(self) -> int: 1396 cmd = 'srp client keyleaseinterval' 1397 self.send_command(cmd) 1398 return int(self._expect_result('\d+')) 1399 1400 def srp_client_set_ttl(self, ttl: int): 1401 cmd = f'srp client ttl {ttl}' 1402 self.send_command(cmd) 1403 self._expect_done() 1404 1405 def srp_client_get_ttl(self) -> int: 1406 cmd = 'srp client ttl' 1407 self.send_command(cmd) 1408 return int(self._expect_result('\d+')) 1409 1410 # 1411 # TREL utilities 1412 # 1413 1414 def enable_trel(self): 1415 cmd = 'trel enable' 1416 self.send_command(cmd) 1417 self._expect_done() 1418 1419 def is_trel_enabled(self) -> Union[None, bool]: 1420 states = [r'Disabled', r'Enabled'] 1421 self.send_command('trel') 1422 try: 1423 return self._expect_result(states) == 'Enabled' 1424 except Exception as ex: 1425 if 'InvalidCommand' in str(ex): 1426 return None 1427 1428 raise 1429 1430 def get_trel_counters(self): 1431 cmd = 'trel counters' 1432 self.send_command(cmd) 1433 result = self._expect_command_output() 1434 1435 counters = {} 1436 for line in result: 1437 m = re.match(r'(\w+)\:[^\d]+(\d+)[^\d]+(\d+)(?:[^\d]+(\d+))?', line) 1438 if m: 1439 groups = m.groups() 1440 sub_counters = { 1441 'packets': int(groups[1]), 1442 'bytes': int(groups[2]), 1443 } 1444 if groups[3]: 1445 sub_counters['failures'] = int(groups[3]) 1446 counters[groups[0]] = sub_counters 1447 return counters 1448 1449 def reset_trel_counters(self): 1450 cmd = 'trel counters reset' 1451 self.send_command(cmd) 1452 self._expect_done() 1453 1454 def get_trel_port(self): 1455 cmd = 'trel port' 1456 self.send_command(cmd) 1457 return int(self._expect_command_output()[0]) 1458 1459 def set_epskc(self, keystring: str, timeout=120000, port=0): 1460 cmd = 'ba ephemeralkey set ' + keystring + ' ' + str(timeout) + ' ' + str(port) 1461 self.send_command(cmd) 1462 self._expect(r"(Done|Error .*)") 1463 1464 def clear_epskc(self): 1465 cmd = 'ba ephemeralkey clear' 1466 self.send_command(cmd) 1467 self._expect_done() 1468 1469 def get_border_agent_counters(self): 1470 cmd = 'ba counters' 1471 self.send_command(cmd) 1472 result = self._expect_command_output() 1473 1474 counters = {} 1475 for line in result: 1476 m = re.match(r'(\w+)\: (\d+)', line) 1477 if m: 1478 counter_name = m.group(1) 1479 counter_value = m.group(2) 1480 1481 counters[counter_name] = int(counter_value) 1482 return counters 1483 1484 def _encode_txt_entry(self, entry): 1485 """Encodes the TXT entry to the DNS-SD TXT record format as a HEX string. 1486 1487 Example usage: 1488 self._encode_txt_entries(['abc']) -> '03616263' 1489 self._encode_txt_entries(['def=']) -> '046465663d' 1490 self._encode_txt_entries(['xyz=XYZ']) -> '0778797a3d58595a' 1491 """ 1492 return '{:02x}'.format(len(entry)) + "".join("{:02x}".format(ord(c)) for c in entry) 1493 1494 def _parse_srp_client_service(self, line: str): 1495 """Parse one line of srp service list into a dictionary which 1496 maps string keys to string values. 1497 1498 Example output for input 1499 'instance:\"%s\", name:\"%s\", state:%s, port:%d, priority:%d, weight:%d"' 1500 { 1501 'instance': 'my-service', 1502 'name': '_ipps._udp', 1503 'state': 'ToAdd', 1504 'port': '12345', 1505 'priority': '0', 1506 'weight': '0' 1507 } 1508 1509 Note that value of 'port', 'priority' and 'weight' are represented 1510 as strings but not integers. 1511 """ 1512 key_values = [word.strip().split(':') for word in line.split(', ')] 1513 keys = [key_value[0] for key_value in key_values] 1514 values = [key_value[1].strip('"') for key_value in key_values] 1515 return dict(zip(keys, values)) 1516 1517 def locate(self, anycast_addr): 1518 cmd = 'locate ' + anycast_addr 1519 self.send_command(cmd) 1520 self.simulator.go(5) 1521 return self._parse_locate_result(self._expect_command_output()[0]) 1522 1523 def _parse_locate_result(self, line: str): 1524 """Parse anycast locate result as list of ml-eid and rloc16. 1525 1526 Example output for input 1527 'fd00:db8:0:0:acf9:9d0:7f3c:b06e 0xa800' 1528 1529 [ 'fd00:db8:0:0:acf9:9d0:7f3c:b06e', '0xa800' ] 1530 """ 1531 return line.split(' ') 1532 1533 def enable_backbone_router(self): 1534 cmd = 'bbr enable' 1535 self.send_command(cmd) 1536 self._expect_done() 1537 1538 def disable_backbone_router(self): 1539 cmd = 'bbr disable' 1540 self.send_command(cmd) 1541 self._expect_done() 1542 1543 def register_backbone_router(self): 1544 cmd = 'bbr register' 1545 self.send_command(cmd) 1546 self._expect_done() 1547 1548 def get_backbone_router_state(self): 1549 states = [r'Disabled', r'Primary', r'Secondary'] 1550 self.send_command('bbr state') 1551 return self._expect_result(states) 1552 1553 @property 1554 def is_primary_backbone_router(self) -> bool: 1555 return self.get_backbone_router_state() == 'Primary' 1556 1557 def get_backbone_router(self): 1558 cmd = 'bbr config' 1559 self.send_command(cmd) 1560 self._expect(r'(.*)Done') 1561 g = self.pexpect.match.groups() 1562 output = g[0].decode("utf-8") 1563 lines = output.strip().split('\n') 1564 lines = [l.strip() for l in lines] 1565 ret = {} 1566 for l in lines: 1567 z = re.search(r'seqno:\s+([0-9]+)', l) 1568 if z: 1569 ret['seqno'] = int(z.groups()[0]) 1570 1571 z = re.search(r'delay:\s+([0-9]+)', l) 1572 if z: 1573 ret['delay'] = int(z.groups()[0]) 1574 1575 z = re.search(r'timeout:\s+([0-9]+)', l) 1576 if z: 1577 ret['timeout'] = int(z.groups()[0]) 1578 1579 return ret 1580 1581 def set_backbone_router(self, seqno=None, reg_delay=None, mlr_timeout=None): 1582 cmd = 'bbr config' 1583 1584 if seqno is not None: 1585 cmd += ' seqno %d' % seqno 1586 1587 if reg_delay is not None: 1588 cmd += ' delay %d' % reg_delay 1589 1590 if mlr_timeout is not None: 1591 cmd += ' timeout %d' % mlr_timeout 1592 1593 self.send_command(cmd) 1594 self._expect_done() 1595 1596 def set_domain_prefix(self, prefix, flags='prosD'): 1597 self.add_prefix(prefix, flags) 1598 self.register_netdata() 1599 1600 def remove_domain_prefix(self, prefix): 1601 self.remove_prefix(prefix) 1602 self.register_netdata() 1603 1604 def set_next_dua_response(self, status: Union[str, int], iid=None): 1605 # Convert 5.00 to COAP CODE 160 1606 if isinstance(status, str): 1607 assert '.' in status 1608 status = status.split('.') 1609 status = (int(status[0]) << 5) + int(status[1]) 1610 1611 cmd = 'bbr mgmt dua {}'.format(status) 1612 if iid is not None: 1613 cmd += ' ' + str(iid) 1614 self.send_command(cmd) 1615 self._expect_done() 1616 1617 def set_dua_iid(self, iid: str): 1618 assert len(iid) == 16 1619 int(iid, 16) 1620 1621 cmd = 'dua iid {}'.format(iid) 1622 self.send_command(cmd) 1623 self._expect_done() 1624 1625 def clear_dua_iid(self): 1626 cmd = 'dua iid clear' 1627 self.send_command(cmd) 1628 self._expect_done() 1629 1630 def multicast_listener_list(self) -> Dict[IPv6Address, int]: 1631 cmd = 'bbr mgmt mlr listener' 1632 self.send_command(cmd) 1633 1634 table = {} 1635 for line in self._expect_results("\S+ \d+"): 1636 line = line.split() 1637 assert len(line) == 2, line 1638 ip = IPv6Address(line[0]) 1639 timeout = int(line[1]) 1640 assert ip not in table 1641 1642 table[ip] = timeout 1643 1644 return table 1645 1646 def multicast_listener_clear(self): 1647 cmd = f'bbr mgmt mlr listener clear' 1648 self.send_command(cmd) 1649 self._expect_done() 1650 1651 def multicast_listener_add(self, ip: Union[IPv6Address, str], timeout: int = 0): 1652 if not isinstance(ip, IPv6Address): 1653 ip = IPv6Address(ip) 1654 1655 cmd = f'bbr mgmt mlr listener add {ip.compressed} {timeout}' 1656 self.send_command(cmd) 1657 self._expect(r"(Done|Error .*)") 1658 1659 def set_next_mlr_response(self, status: int): 1660 cmd = 'bbr mgmt mlr response {}'.format(status) 1661 self.send_command(cmd) 1662 self._expect_done() 1663 1664 def register_multicast_listener(self, *ipaddrs: Union[IPv6Address, str], timeout=None): 1665 assert len(ipaddrs) > 0, ipaddrs 1666 1667 ipaddrs = map(str, ipaddrs) 1668 cmd = f'mlr reg {" ".join(ipaddrs)}' 1669 if timeout is not None: 1670 cmd += f' {int(timeout)}' 1671 self.send_command(cmd) 1672 self.simulator.go(3) 1673 lines = self._expect_command_output() 1674 m = re.match(r'status (\d+), (\d+) failed', lines[0]) 1675 assert m is not None, lines 1676 status = int(m.group(1)) 1677 failed_num = int(m.group(2)) 1678 assert failed_num == len(lines) - 1 1679 failed_ips = list(map(IPv6Address, lines[1:])) 1680 print(f"register_multicast_listener {ipaddrs} => status: {status}, failed ips: {failed_ips}") 1681 return status, failed_ips 1682 1683 def set_link_quality(self, addr, lqi): 1684 cmd = 'macfilter rss add-lqi %s %s' % (addr, lqi) 1685 self.send_command(cmd) 1686 self._expect_done() 1687 1688 def set_outbound_link_quality(self, lqi): 1689 cmd = 'macfilter rss add-lqi * %s' % (lqi) 1690 self.send_command(cmd) 1691 self._expect_done() 1692 1693 def remove_allowlist(self, addr): 1694 cmd = 'macfilter addr remove %s' % addr 1695 self.send_command(cmd) 1696 self._expect_done() 1697 1698 def get_addr16(self): 1699 self.send_command('rloc16') 1700 rloc16 = self._expect_result(r'[0-9a-fA-F]{4}') 1701 return int(rloc16, 16) 1702 1703 def get_router_id(self): 1704 rloc16 = self.get_addr16() 1705 return rloc16 >> 10 1706 1707 def get_addr64(self): 1708 self.send_command('extaddr') 1709 return self._expect_result('[0-9a-fA-F]{16}') 1710 1711 def set_addr64(self, addr64: str): 1712 # Make sure `addr64` is a hex string of length 16 1713 assert len(addr64) == 16 1714 int(addr64, 16) 1715 self.send_command('extaddr %s' % addr64) 1716 self._expect_done() 1717 1718 def get_eui64(self): 1719 self.send_command('eui64') 1720 return self._expect_result('[0-9a-fA-F]{16}') 1721 1722 def set_extpanid(self, extpanid): 1723 self.send_command('extpanid %s' % extpanid) 1724 self._expect_done() 1725 1726 def get_extpanid(self): 1727 self.send_command('extpanid') 1728 return self._expect_result('[0-9a-fA-F]{16}') 1729 1730 def get_mesh_local_prefix(self): 1731 self.send_command('prefix meshlocal') 1732 return self._expect_command_output()[0] 1733 1734 def set_mesh_local_prefix(self, mesh_local_prefix): 1735 self.send_command('prefix meshlocal %s' % mesh_local_prefix) 1736 self._expect_done() 1737 1738 def get_joiner_id(self): 1739 self.send_command('joiner id') 1740 return self._expect_result('[0-9a-fA-F]{16}') 1741 1742 def get_channel(self): 1743 self.send_command('channel') 1744 return int(self._expect_result(r'\d+')) 1745 1746 def set_channel(self, channel): 1747 cmd = 'channel %d' % channel 1748 self.send_command(cmd) 1749 self._expect_done() 1750 1751 def get_networkkey(self): 1752 self.send_command('networkkey') 1753 return self._expect_result('[0-9a-fA-F]{32}') 1754 1755 def set_networkkey(self, networkkey): 1756 cmd = 'networkkey %s' % networkkey 1757 self.send_command(cmd) 1758 self._expect_done() 1759 1760 def get_key_sequence_counter(self): 1761 self.send_command('keysequence counter') 1762 result = self._expect_result(r'\d+') 1763 return int(result) 1764 1765 def set_key_sequence_counter(self, key_sequence_counter): 1766 cmd = 'keysequence counter %d' % key_sequence_counter 1767 self.send_command(cmd) 1768 self._expect_done() 1769 1770 def get_key_switch_guardtime(self): 1771 self.send_command('keysequence guardtime') 1772 return int(self._expect_result(r'\d+')) 1773 1774 def set_key_switch_guardtime(self, key_switch_guardtime): 1775 cmd = 'keysequence guardtime %d' % key_switch_guardtime 1776 self.send_command(cmd) 1777 self._expect_done() 1778 1779 def set_network_id_timeout(self, network_id_timeout): 1780 cmd = 'networkidtimeout %d' % network_id_timeout 1781 self.send_command(cmd) 1782 self._expect_done() 1783 1784 def _escape_escapable(self, string): 1785 """Escape CLI escapable characters in the given string. 1786 1787 Args: 1788 string (str): UTF-8 input string. 1789 1790 Returns: 1791 [str]: The modified string with escaped characters. 1792 """ 1793 escapable_chars = '\\ \t\r\n' 1794 for char in escapable_chars: 1795 string = string.replace(char, '\\%s' % char) 1796 return string 1797 1798 def get_network_name(self): 1799 self.send_command('networkname') 1800 return self._expect_result([r'\S+']) 1801 1802 def set_network_name(self, network_name): 1803 cmd = 'networkname %s' % self._escape_escapable(network_name) 1804 self.send_command(cmd) 1805 self._expect_done() 1806 1807 def get_panid(self): 1808 self.send_command('panid') 1809 result = self._expect_result('0x[0-9a-fA-F]{4}') 1810 return int(result, 16) 1811 1812 def set_panid(self, panid=config.PANID): 1813 cmd = 'panid %d' % panid 1814 self.send_command(cmd) 1815 self._expect_done() 1816 1817 def set_parent_priority(self, priority): 1818 cmd = 'parentpriority %d' % priority 1819 self.send_command(cmd) 1820 self._expect_done() 1821 1822 def get_partition_id(self): 1823 self.send_command('partitionid') 1824 return self._expect_result(r'\d+') 1825 1826 def get_preferred_partition_id(self): 1827 self.send_command('partitionid preferred') 1828 return self._expect_result(r'\d+') 1829 1830 def set_preferred_partition_id(self, partition_id): 1831 cmd = 'partitionid preferred %d' % partition_id 1832 self.send_command(cmd) 1833 self._expect_done() 1834 1835 def get_pollperiod(self): 1836 self.send_command('pollperiod') 1837 return self._expect_result(r'\d+') 1838 1839 def set_pollperiod(self, pollperiod): 1840 self.send_command('pollperiod %d' % pollperiod) 1841 self._expect_done() 1842 1843 def get_child_supervision_interval(self): 1844 self.send_command('childsupervision interval') 1845 return self._expect_result(r'\d+') 1846 1847 def set_child_supervision_interval(self, interval): 1848 self.send_command('childsupervision interval %d' % interval) 1849 self._expect_done() 1850 1851 def get_child_supervision_check_timeout(self): 1852 self.send_command('childsupervision checktimeout') 1853 return self._expect_result(r'\d+') 1854 1855 def set_child_supervision_check_timeout(self, timeout): 1856 self.send_command('childsupervision checktimeout %d' % timeout) 1857 self._expect_done() 1858 1859 def get_child_supervision_check_failure_counter(self): 1860 self.send_command('childsupervision failcounter') 1861 return self._expect_result(r'\d+') 1862 1863 def reset_child_supervision_check_failure_counter(self): 1864 self.send_command('childsupervision failcounter reset') 1865 self._expect_done() 1866 1867 def get_csl_info(self): 1868 self.send_command('csl') 1869 return self._expect_key_value_pairs(r'\S+') 1870 1871 def set_csl_channel(self, csl_channel): 1872 self.send_command('csl channel %d' % csl_channel) 1873 self._expect_done() 1874 1875 def set_csl_period(self, csl_period): 1876 self.send_command('csl period %d' % csl_period) 1877 self._expect_done() 1878 1879 def set_csl_timeout(self, csl_timeout): 1880 self.send_command('csl timeout %d' % csl_timeout) 1881 self._expect_done() 1882 1883 def send_mac_emptydata(self): 1884 self.send_command('mac send emptydata') 1885 self._expect_done() 1886 1887 def send_mac_datarequest(self): 1888 self.send_command('mac send datarequest') 1889 self._expect_done() 1890 1891 def set_router_upgrade_threshold(self, threshold): 1892 cmd = 'routerupgradethreshold %d' % threshold 1893 self.send_command(cmd) 1894 self._expect_done() 1895 1896 def set_router_downgrade_threshold(self, threshold): 1897 cmd = 'routerdowngradethreshold %d' % threshold 1898 self.send_command(cmd) 1899 self._expect_done() 1900 1901 def get_router_downgrade_threshold(self) -> int: 1902 self.send_command('routerdowngradethreshold') 1903 return int(self._expect_result(r'\d+')) 1904 1905 def set_router_eligible(self, enable: bool): 1906 cmd = f'routereligible {"enable" if enable else "disable"}' 1907 self.send_command(cmd) 1908 self._expect_done() 1909 1910 def get_router_eligible(self) -> bool: 1911 states = [r'Disabled', r'Enabled'] 1912 self.send_command('routereligible') 1913 return self._expect_result(states) == 'Enabled' 1914 1915 def prefer_router_id(self, router_id): 1916 cmd = 'preferrouterid %d' % router_id 1917 self.send_command(cmd) 1918 self._expect_done() 1919 1920 def release_router_id(self, router_id): 1921 cmd = 'releaserouterid %d' % router_id 1922 self.send_command(cmd) 1923 self._expect_done() 1924 1925 def get_state(self): 1926 states = [r'detached', r'child', r'router', r'leader', r'disabled'] 1927 self.send_command('state') 1928 return self._expect_result(states) 1929 1930 def set_state(self, state): 1931 cmd = 'state %s' % state 1932 self.send_command(cmd) 1933 self._expect_done() 1934 1935 def get_ephemeral_key_state(self): 1936 cmd = 'ba ephemeralkey' 1937 states = [r'inactive', r'active'] 1938 self.send_command(cmd) 1939 return self._expect_result(states) 1940 1941 def get_timeout(self): 1942 self.send_command('childtimeout') 1943 return self._expect_result(r'\d+') 1944 1945 def set_timeout(self, timeout): 1946 cmd = 'childtimeout %d' % timeout 1947 self.send_command(cmd) 1948 self._expect_done() 1949 1950 def set_max_children(self, number): 1951 cmd = 'childmax %d' % number 1952 self.send_command(cmd) 1953 self._expect_done() 1954 1955 def get_weight(self): 1956 self.send_command('leaderweight') 1957 return self._expect_result(r'\d+') 1958 1959 def set_weight(self, weight): 1960 cmd = 'leaderweight %d' % weight 1961 self.send_command(cmd) 1962 self._expect_done() 1963 1964 def add_ipaddr(self, ipaddr): 1965 cmd = 'ipaddr add %s' % ipaddr 1966 self.send_command(cmd) 1967 self._expect_done() 1968 1969 def del_ipaddr(self, ipaddr): 1970 cmd = 'ipaddr del %s' % ipaddr 1971 self.send_command(cmd) 1972 self._expect_done() 1973 1974 def add_ipmaddr(self, ipmaddr): 1975 cmd = 'ipmaddr add %s' % ipmaddr 1976 self.send_command(cmd) 1977 self._expect_done() 1978 1979 def del_ipmaddr(self, ipmaddr): 1980 cmd = 'ipmaddr del %s' % ipmaddr 1981 self.send_command(cmd) 1982 self._expect_done() 1983 1984 def get_addrs(self, verbose=False): 1985 self.send_command('ipaddr' + (' -v' if verbose else '')) 1986 1987 return self._expect_results(r'\S+(:\S*)+') 1988 1989 def get_mleid(self): 1990 self.send_command('ipaddr mleid') 1991 return self._expect_result(r'\S+(:\S*)+') 1992 1993 def get_linklocal(self): 1994 self.send_command('ipaddr linklocal') 1995 return self._expect_result(r'\S+(:\S*)+') 1996 1997 def get_rloc(self): 1998 self.send_command('ipaddr rloc') 1999 return self._expect_result(r'\S+(:\S*)+') 2000 2001 def get_addr(self, prefix): 2002 network = ipaddress.ip_network(u'%s' % str(prefix)) 2003 addrs = self.get_addrs() 2004 2005 for addr in addrs: 2006 if isinstance(addr, bytearray): 2007 addr = bytes(addr) 2008 ipv6_address = ipaddress.ip_address(addr) 2009 if ipv6_address in network: 2010 return ipv6_address.exploded 2011 2012 return None 2013 2014 def has_ipaddr(self, address): 2015 ipaddr = ipaddress.ip_address(address) 2016 ipaddrs = self.get_addrs() 2017 for addr in ipaddrs: 2018 if isinstance(addr, bytearray): 2019 addr = bytes(addr) 2020 if ipaddress.ip_address(addr) == ipaddr: 2021 return True 2022 return False 2023 2024 def get_ipmaddrs(self): 2025 self.send_command('ipmaddr') 2026 return self._expect_results(r'\S+(:\S*)+') 2027 2028 def has_ipmaddr(self, address): 2029 ipmaddr = ipaddress.ip_address(address) 2030 ipmaddrs = self.get_ipmaddrs() 2031 for addr in ipmaddrs: 2032 if isinstance(addr, bytearray): 2033 addr = bytes(addr) 2034 if ipaddress.ip_address(addr) == ipmaddr: 2035 return True 2036 return False 2037 2038 def get_addr_leader_aloc(self): 2039 addrs = self.get_addrs() 2040 for addr in addrs: 2041 segs = addr.split(':') 2042 if (segs[4] == '0' and segs[5] == 'ff' and segs[6] == 'fe00' and segs[7] == 'fc00'): 2043 return addr 2044 return None 2045 2046 def get_mleid_iid(self): 2047 ml_eid = IPv6Address(self.get_mleid()) 2048 return ml_eid.packed[8:].hex() 2049 2050 def get_eidcaches(self): 2051 eidcaches = [] 2052 self.send_command('eidcache') 2053 for line in self._expect_results(r'([a-fA-F0-9\:]+) ([a-fA-F0-9]+)'): 2054 eidcaches.append(line.split()) 2055 2056 return eidcaches 2057 2058 def add_service(self, enterpriseNumber, serviceData, serverData): 2059 cmd = 'service add %s %s %s' % ( 2060 enterpriseNumber, 2061 serviceData, 2062 serverData, 2063 ) 2064 self.send_command(cmd) 2065 self._expect_done() 2066 2067 def remove_service(self, enterpriseNumber, serviceData): 2068 cmd = 'service remove %s %s' % (enterpriseNumber, serviceData) 2069 self.send_command(cmd) 2070 self._expect_done() 2071 2072 def get_child_table(self) -> Dict[int, Dict[str, Any]]: 2073 """Get the table of attached children.""" 2074 cmd = 'child table' 2075 self.send_command(cmd) 2076 output = self._expect_command_output() 2077 2078 # 2079 # Example output: 2080 # | ID | RLOC16 | Timeout | Age | LQ In | C_VN |R|D|N|Ver|CSL|QMsgCnt|Suprvsn| Extended MAC | 2081 # +-----+--------+------------+------------+-------+------+-+-+-+---+---+-------+-------+------------------+ 2082 # | 1 | 0xc801 | 240 | 24 | 3 | 131 |1|0|0| 3| 0 | 0 | 129 | 4ecede68435358ac | 2083 # | 2 | 0xc802 | 240 | 2 | 3 | 131 |0|0|0| 3| 1 | 0 | 0 | a672a601d2ce37d8 | 2084 # Done 2085 # 2086 2087 headers = self.__split_table_row(output[0]) 2088 2089 table = {} 2090 for line in output[2:]: 2091 line = line.strip() 2092 if not line: 2093 continue 2094 2095 fields = self.__split_table_row(line) 2096 col = lambda colname: self.__get_table_col(colname, headers, fields) 2097 2098 id = int(col("ID")) 2099 r, d, n = int(col("R")), int(col("D")), int(col("N")) 2100 mode = f'{"r" if r else ""}{"d" if d else ""}{"n" if n else ""}' 2101 2102 table[int(id)] = { 2103 'id': int(id), 2104 'rloc16': int(col('RLOC16'), 16), 2105 'timeout': int(col('Timeout')), 2106 'age': int(col('Age')), 2107 'lq_in': int(col('LQ In')), 2108 'c_vn': int(col('C_VN')), 2109 'mode': mode, 2110 'extaddr': col('Extended MAC'), 2111 'ver': int(col('Ver')), 2112 'csl': bool(int(col('CSL'))), 2113 'qmsgcnt': int(col('QMsgCnt')), 2114 'suprvsn': int(col('Suprvsn')) 2115 } 2116 2117 return table 2118 2119 def __split_table_row(self, row: str) -> List[str]: 2120 if not (row.startswith('|') and row.endswith('|')): 2121 raise ValueError(row) 2122 2123 fields = row.split('|') 2124 fields = [x.strip() for x in fields[1:-1]] 2125 return fields 2126 2127 def __get_table_col(self, colname: str, headers: List[str], fields: List[str]) -> str: 2128 return fields[headers.index(colname)] 2129 2130 def __getOmrAddress(self): 2131 prefixes = [prefix.split('::')[0] for prefix in self.get_prefixes()] 2132 omr_addrs = [] 2133 for addr in self.get_addrs(): 2134 for prefix in prefixes: 2135 if (addr.startswith(prefix)) and (addr != self.__getDua()): 2136 omr_addrs.append(addr) 2137 break 2138 2139 return omr_addrs 2140 2141 def __getLinkLocalAddress(self): 2142 for ip6Addr in self.get_addrs(): 2143 if re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I): 2144 return ip6Addr 2145 2146 return None 2147 2148 def __getGlobalAddress(self): 2149 global_address = [] 2150 for ip6Addr in self.get_addrs(): 2151 if ((not re.match(config.LINK_LOCAL_REGEX_PATTERN, ip6Addr, re.I)) and 2152 (not re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I)) and 2153 (not re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I))): 2154 global_address.append(ip6Addr) 2155 2156 return global_address 2157 2158 def __getRloc(self): 2159 for ip6Addr in self.get_addrs(): 2160 if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and 2161 re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and 2162 not (re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I))): 2163 return ip6Addr 2164 return None 2165 2166 def __getAloc(self): 2167 aloc = [] 2168 for ip6Addr in self.get_addrs(): 2169 if (re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, re.I) and 2170 re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I) and 2171 re.match(config.ALOC_FLAG_REGEX_PATTERN, ip6Addr, re.I)): 2172 aloc.append(ip6Addr) 2173 2174 return aloc 2175 2176 def __getMleid(self): 2177 for ip6Addr in self.get_addrs(): 2178 if re.match(config.MESH_LOCAL_PREFIX_REGEX_PATTERN, ip6Addr, 2179 re.I) and not (re.match(config.ROUTING_LOCATOR_REGEX_PATTERN, ip6Addr, re.I)): 2180 return ip6Addr 2181 2182 return None 2183 2184 def __getDua(self) -> Optional[str]: 2185 for ip6Addr in self.get_addrs(): 2186 if re.match(config.DOMAIN_PREFIX_REGEX_PATTERN, ip6Addr, re.I): 2187 return ip6Addr 2188 2189 return None 2190 2191 def get_ip6_address_by_prefix(self, prefix: Union[str, IPv6Network]) -> List[IPv6Address]: 2192 """Get addresses matched with given prefix. 2193 2194 Args: 2195 prefix: the prefix to match against. 2196 Can be either a string or ipaddress.IPv6Network. 2197 2198 Returns: 2199 The IPv6 address list. 2200 """ 2201 if isinstance(prefix, str): 2202 prefix = IPv6Network(prefix) 2203 addrs = map(IPv6Address, self.get_addrs()) 2204 2205 return [addr for addr in addrs if addr in prefix] 2206 2207 def get_ip6_address(self, address_type): 2208 """Get specific type of IPv6 address configured on thread device. 2209 2210 Args: 2211 address_type: the config.ADDRESS_TYPE type of IPv6 address. 2212 2213 Returns: 2214 IPv6 address string. 2215 """ 2216 if address_type == config.ADDRESS_TYPE.LINK_LOCAL: 2217 return self.__getLinkLocalAddress() 2218 elif address_type == config.ADDRESS_TYPE.GLOBAL: 2219 return self.__getGlobalAddress() 2220 elif address_type == config.ADDRESS_TYPE.RLOC: 2221 return self.__getRloc() 2222 elif address_type == config.ADDRESS_TYPE.ALOC: 2223 return self.__getAloc() 2224 elif address_type == config.ADDRESS_TYPE.ML_EID: 2225 return self.__getMleid() 2226 elif address_type == config.ADDRESS_TYPE.DUA: 2227 return self.__getDua() 2228 elif address_type == config.ADDRESS_TYPE.BACKBONE_GUA: 2229 return self._getBackboneGua() 2230 elif address_type == config.ADDRESS_TYPE.OMR: 2231 return self.__getOmrAddress() 2232 else: 2233 return None 2234 2235 def get_context_reuse_delay(self): 2236 self.send_command('contextreusedelay') 2237 return self._expect_result(r'\d+') 2238 2239 def set_context_reuse_delay(self, delay): 2240 cmd = 'contextreusedelay %d' % delay 2241 self.send_command(cmd) 2242 self._expect_done() 2243 2244 def add_prefix(self, prefix, flags='paosr', prf='med'): 2245 cmd = 'prefix add %s %s %s' % (prefix, flags, prf) 2246 self.send_command(cmd) 2247 self._expect_done() 2248 2249 def remove_prefix(self, prefix): 2250 cmd = 'prefix remove %s' % prefix 2251 self.send_command(cmd) 2252 self._expect_done() 2253 2254 # 2255 # BR commands 2256 # 2257 def enable_br(self): 2258 self.send_command('br enable') 2259 self._expect_done() 2260 2261 def disable_br(self): 2262 self.send_command('br disable') 2263 self._expect_done() 2264 2265 def get_br_omr_prefix(self): 2266 cmd = 'br omrprefix local' 2267 self.send_command(cmd) 2268 return self._expect_command_output()[0] 2269 2270 def get_br_peers(self) -> List[str]: 2271 # Example output of `br peers` command: 2272 # rloc16:0xa800 age:00:00:50 2273 # rloc16:0x6800 age:00:00:51 2274 # Done 2275 self.send_command('br peers') 2276 return self._expect_command_output() 2277 2278 def get_br_peers_rloc16s(self) -> List[int]: 2279 """parse `br peers` output and return the list of RLOC16s""" 2280 return [ 2281 int(pair.split(':')[1], 16) 2282 for line in self.get_br_peers() 2283 for pair in line.split() 2284 if pair.split(':')[0] == 'rloc16' 2285 ] 2286 2287 def get_br_routers(self) -> List[str]: 2288 # Example output of `br routers` command: 2289 # fe80:0:0:0:42:acff:fe14:3 (M:0 O:0 Stub:1) ms-since-rx:144160 reachable:yes age:00:17:36 (peer BR) 2290 # fe80:0:0:0:42:acff:fe14:2 (M:0 O:0 Stub:1) ms-since-rx:45179 reachable:yes age:00:17:36 2291 # Done 2292 self.send_command('br routers') 2293 return self._expect_command_output() 2294 2295 def get_br_routers_ip_addresses(self) -> List[IPv6Address]: 2296 """parse `br routers` output and return the list of IPv6 addresses""" 2297 return [IPv6Address(line.split()[0]) for line in self.get_br_routers()] 2298 2299 def get_netdata_omr_prefixes(self): 2300 omr_prefixes = [] 2301 for prefix in self.get_prefixes(): 2302 prefix, flags = prefix.split()[:2] 2303 if 'a' in flags and 'o' in flags and 's' in flags and 'D' not in flags: 2304 omr_prefixes.append(prefix) 2305 2306 return omr_prefixes 2307 2308 def get_br_on_link_prefix(self): 2309 cmd = 'br onlinkprefix local' 2310 self.send_command(cmd) 2311 return self._expect_command_output()[0] 2312 2313 def get_netdata_non_nat64_routes(self): 2314 nat64_routes = [] 2315 routes = self.get_routes() 2316 for route in routes: 2317 if 'n' not in route.split(' ')[1]: 2318 nat64_routes.append(route.split(' ')[0]) 2319 return nat64_routes 2320 2321 def get_netdata_nat64_routes(self): 2322 nat64_routes = [] 2323 routes = self.get_routes() 2324 for route in routes: 2325 if 'n' in route.split(' ')[1]: 2326 nat64_routes.append(route.split(' ')[0]) 2327 return nat64_routes 2328 2329 def get_br_nat64_prefix(self): 2330 cmd = 'br nat64prefix local' 2331 self.send_command(cmd) 2332 return self._expect_command_output()[0] 2333 2334 def get_br_favored_nat64_prefix(self): 2335 cmd = 'br nat64prefix favored' 2336 self.send_command(cmd) 2337 return self._expect_command_output()[0].split(' ')[0] 2338 2339 def enable_nat64(self): 2340 self.send_command(f'nat64 enable') 2341 self._expect_done() 2342 2343 def disable_nat64(self): 2344 self.send_command(f'nat64 disable') 2345 self._expect_done() 2346 2347 def get_nat64_state(self): 2348 self.send_command('nat64 state') 2349 res = {} 2350 for line in self._expect_command_output(): 2351 state = line.split(':') 2352 res[state[0].strip()] = state[1].strip() 2353 return res 2354 2355 def get_nat64_mappings(self): 2356 cmd = 'nat64 mappings' 2357 self.send_command(cmd) 2358 result = self._expect_command_output() 2359 session = None 2360 session_counters = None 2361 sessions = [] 2362 2363 for line in result: 2364 m = re.match( 2365 r'\|\s+([a-f0-9]+)\s+\|\s+(.+)\s+\|\s+(.+)\s+\|\s+(\d+)s\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', 2366 line) 2367 if m: 2368 groups = m.groups() 2369 if session: 2370 session['counters'] = session_counters 2371 sessions.append(session) 2372 session = { 2373 'id': groups[0], 2374 'ip6': groups[1], 2375 'ip4': groups[2], 2376 'expiry': int(groups[3]), 2377 } 2378 session_counters = {} 2379 session_counters['total'] = { 2380 '4to6': { 2381 'packets': int(groups[4]), 2382 'bytes': int(groups[5]), 2383 }, 2384 '6to4': { 2385 'packets': int(groups[6]), 2386 'bytes': int(groups[7]), 2387 }, 2388 } 2389 continue 2390 if not session: 2391 continue 2392 m = re.match(r'\|\s+\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line) 2393 if m: 2394 groups = m.groups() 2395 session_counters[groups[0]] = { 2396 '4to6': { 2397 'packets': int(groups[1]), 2398 'bytes': int(groups[2]), 2399 }, 2400 '6to4': { 2401 'packets': int(groups[3]), 2402 'bytes': int(groups[4]), 2403 }, 2404 } 2405 if session: 2406 session['counters'] = session_counters 2407 sessions.append(session) 2408 return sessions 2409 2410 def get_nat64_counters(self): 2411 cmd = 'nat64 counters' 2412 self.send_command(cmd) 2413 result = self._expect_command_output() 2414 2415 protocol_counters = {} 2416 error_counters = {} 2417 for line in result: 2418 m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line) 2419 if m: 2420 groups = m.groups() 2421 protocol_counters[groups[0]] = { 2422 '4to6': { 2423 'packets': int(groups[1]), 2424 'bytes': int(groups[2]), 2425 }, 2426 '6to4': { 2427 'packets': int(groups[3]), 2428 'bytes': int(groups[4]), 2429 }, 2430 } 2431 continue 2432 m = re.match(r'\|\s+(.+)\s+\|\s+(\d+)\s+\|\s+(\d+)\s+\|', line) 2433 if m: 2434 groups = m.groups() 2435 error_counters[groups[0]] = { 2436 '4to6': { 2437 'packets': int(groups[1]), 2438 }, 2439 '6to4': { 2440 'packets': int(groups[2]), 2441 }, 2442 } 2443 continue 2444 return {'protocol': protocol_counters, 'errors': error_counters} 2445 2446 def get_prefixes(self): 2447 return self.get_netdata()['Prefixes'] 2448 2449 def get_routes(self): 2450 return self.get_netdata()['Routes'] 2451 2452 def get_services(self): 2453 netdata = self.netdata_show() 2454 services = [] 2455 services_section = False 2456 2457 for line in netdata: 2458 if line.startswith('Services:'): 2459 services_section = True 2460 elif line.startswith('Contexts'): 2461 services_section = False 2462 elif services_section: 2463 services.append(line.strip().split(' ')) 2464 return services 2465 2466 def netdata_show(self): 2467 self.send_command('netdata show') 2468 return self._expect_command_output() 2469 2470 def get_netdata(self): 2471 raw_netdata = self.netdata_show() 2472 netdata = {'Prefixes': [], 'Routes': [], 'Services': [], 'Contexts': [], 'Commissioning': []} 2473 key_list = ['Prefixes', 'Routes', 'Services', 'Contexts', 'Commissioning'] 2474 key = None 2475 2476 for i in range(0, len(raw_netdata)): 2477 keys = list(filter(raw_netdata[i].startswith, key_list)) 2478 if keys != []: 2479 key = keys[0] 2480 elif key is not None: 2481 netdata[key].append(raw_netdata[i]) 2482 2483 return netdata 2484 2485 def add_route(self, prefix, stable=False, nat64=False, prf='med'): 2486 cmd = 'route add %s ' % prefix 2487 if stable: 2488 cmd += 's' 2489 if nat64: 2490 cmd += 'n' 2491 cmd += ' %s' % prf 2492 self.send_command(cmd) 2493 self._expect_done() 2494 2495 def remove_route(self, prefix): 2496 cmd = 'route remove %s' % prefix 2497 self.send_command(cmd) 2498 self._expect_done() 2499 2500 def register_netdata(self): 2501 self.send_command('netdata register') 2502 self._expect_done() 2503 2504 def netdata_publish_dnssrp_anycast(self, seqnum): 2505 self.send_command(f'netdata publish dnssrp anycast {seqnum}') 2506 self._expect_done() 2507 2508 def netdata_publish_dnssrp_unicast(self, address, port): 2509 self.send_command(f'netdata publish dnssrp unicast {address} {port}') 2510 self._expect_done() 2511 2512 def netdata_publish_dnssrp_unicast_mleid(self, port): 2513 self.send_command(f'netdata publish dnssrp unicast {port}') 2514 self._expect_done() 2515 2516 def netdata_unpublish_dnssrp(self): 2517 self.send_command('netdata unpublish dnssrp') 2518 self._expect_done() 2519 2520 def netdata_publish_prefix(self, prefix, flags='paosr', prf='med'): 2521 self.send_command(f'netdata publish prefix {prefix} {flags} {prf}') 2522 self._expect_done() 2523 2524 def netdata_publish_route(self, prefix, flags='s', prf='med'): 2525 self.send_command(f'netdata publish route {prefix} {flags} {prf}') 2526 self._expect_done() 2527 2528 def netdata_publish_replace(self, old_prefix, prefix, flags='s', prf='med'): 2529 self.send_command(f'netdata publish replace {old_prefix} {prefix} {flags} {prf}') 2530 self._expect_done() 2531 2532 def netdata_unpublish_prefix(self, prefix): 2533 self.send_command(f'netdata unpublish {prefix}') 2534 self._expect_done() 2535 2536 def send_network_diag_get(self, addr, tlv_types): 2537 self.send_command('networkdiagnostic get %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types]))) 2538 2539 if isinstance(self.simulator, simulator.VirtualTime): 2540 self.simulator.go(8) 2541 timeout = 1 2542 else: 2543 timeout = 8 2544 2545 self._expect_done(timeout=timeout) 2546 2547 def send_network_diag_reset(self, addr, tlv_types): 2548 self.send_command('networkdiagnostic reset %s %s' % (addr, ' '.join([str(t.value) for t in tlv_types]))) 2549 2550 if isinstance(self.simulator, simulator.VirtualTime): 2551 self.simulator.go(8) 2552 timeout = 1 2553 else: 2554 timeout = 8 2555 2556 self._expect_done(timeout=timeout) 2557 2558 def energy_scan(self, mask, count, period, scan_duration, ipaddr): 2559 cmd = 'commissioner energy %d %d %d %d %s' % ( 2560 mask, 2561 count, 2562 period, 2563 scan_duration, 2564 ipaddr, 2565 ) 2566 self.send_command(cmd) 2567 2568 if isinstance(self.simulator, simulator.VirtualTime): 2569 self.simulator.go(8) 2570 timeout = 1 2571 else: 2572 timeout = 8 2573 2574 self._expect('Energy:', timeout=timeout) 2575 2576 def panid_query(self, panid, mask, ipaddr): 2577 cmd = 'commissioner panid %d %d %s' % (panid, mask, ipaddr) 2578 self.send_command(cmd) 2579 2580 if isinstance(self.simulator, simulator.VirtualTime): 2581 self.simulator.go(8) 2582 timeout = 1 2583 else: 2584 timeout = 8 2585 2586 self._expect('Conflict:', timeout=timeout) 2587 2588 def scan(self, result=1, timeout=10): 2589 self.send_command('scan') 2590 2591 self.simulator.go(timeout) 2592 2593 if result == 1: 2594 networks = [] 2595 for line in self._expect_command_output()[2:]: 2596 _, panid, extaddr, channel, dbm, lqi, _ = map(str.strip, line.split('|')) 2597 panid = int(panid, 16) 2598 channel, dbm, lqi = map(int, (channel, dbm, lqi)) 2599 2600 networks.append({ 2601 'panid': panid, 2602 'extaddr': extaddr, 2603 'channel': channel, 2604 'dbm': dbm, 2605 'lqi': lqi, 2606 }) 2607 return networks 2608 2609 def scan_energy(self, timeout=10): 2610 self.send_command('scan energy') 2611 self.simulator.go(timeout) 2612 rssi_list = [] 2613 for line in self._expect_command_output()[2:]: 2614 _, channel, rssi, _ = line.split('|') 2615 rssi_list.append({ 2616 'channel': int(channel.strip()), 2617 'rssi': int(rssi.strip()), 2618 }) 2619 return rssi_list 2620 2621 def ping(self, ipaddr, num_responses=1, size=8, timeout=5, count=1, interval=1, hoplimit=64, interface=None): 2622 args = f'{ipaddr} {size} {count} {interval} {hoplimit} {timeout}' 2623 if interface is not None: 2624 args = f'-I {interface} {args}' 2625 cmd = f'ping {args}' 2626 2627 self.send_command(cmd) 2628 2629 wait_allowance = 3 2630 end = self.simulator.now() + timeout + wait_allowance 2631 2632 responders = {} 2633 2634 result = True 2635 # ncp-sim doesn't print Done 2636 done = (self.node_type == 'ncp-sim') 2637 while len(responders) < num_responses or not done: 2638 self.simulator.go(1) 2639 try: 2640 i = self._expect([r'from (\S+):', r'Done'], timeout=0.1) 2641 except (pexpect.TIMEOUT, socket.timeout): 2642 if self.simulator.now() < end: 2643 continue 2644 result = False 2645 if isinstance(self.simulator, simulator.VirtualTime): 2646 self.simulator.sync_devices() 2647 break 2648 else: 2649 if i == 0: 2650 responders[self.pexpect.match.groups()[0]] = 1 2651 elif i == 1: 2652 done = True 2653 return result 2654 2655 def reset(self): 2656 self._reset('reset') 2657 2658 def factory_reset(self): 2659 self._reset('factoryreset') 2660 2661 def _reset(self, cmd): 2662 self.send_command(cmd, expect_command_echo=False) 2663 time.sleep(self.RESET_DELAY) 2664 # Send a "version" command and drain the CLI output after reset 2665 self.send_command('version', expect_command_echo=False) 2666 while True: 2667 try: 2668 self._expect(r"[^\n]+\n", timeout=0.1) 2669 continue 2670 except pexpect.TIMEOUT: 2671 break 2672 2673 if self.is_otbr: 2674 self.set_log_level(5) 2675 2676 def set_router_selection_jitter(self, jitter): 2677 cmd = 'routerselectionjitter %d' % jitter 2678 self.send_command(cmd) 2679 self._expect_done() 2680 2681 def set_active_dataset( 2682 self, 2683 timestamp=None, 2684 channel=None, 2685 channel_mask=None, 2686 extended_panid=None, 2687 mesh_local_prefix=None, 2688 network_key=None, 2689 network_name=None, 2690 panid=None, 2691 pskc=None, 2692 security_policy=[], 2693 updateExisting=False, 2694 ): 2695 2696 if updateExisting: 2697 self.send_command('dataset init active', go=False) 2698 else: 2699 self.send_command('dataset clear', go=False) 2700 self._expect_done() 2701 2702 if timestamp is not None: 2703 cmd = 'dataset activetimestamp %d' % timestamp 2704 self.send_command(cmd, go=False) 2705 self._expect_done() 2706 2707 if channel is not None: 2708 cmd = 'dataset channel %d' % channel 2709 self.send_command(cmd, go=False) 2710 self._expect_done() 2711 2712 if channel_mask is not None: 2713 cmd = 'dataset channelmask %d' % channel_mask 2714 self.send_command(cmd, go=False) 2715 self._expect_done() 2716 2717 if extended_panid is not None: 2718 cmd = 'dataset extpanid %s' % extended_panid 2719 self.send_command(cmd, go=False) 2720 self._expect_done() 2721 2722 if mesh_local_prefix is not None: 2723 cmd = 'dataset meshlocalprefix %s' % mesh_local_prefix 2724 self.send_command(cmd, go=False) 2725 self._expect_done() 2726 2727 if network_key is not None: 2728 cmd = 'dataset networkkey %s' % network_key 2729 self.send_command(cmd, go=False) 2730 self._expect_done() 2731 2732 if network_name is not None: 2733 cmd = 'dataset networkname %s' % network_name 2734 self.send_command(cmd, go=False) 2735 self._expect_done() 2736 2737 if panid is not None: 2738 cmd = 'dataset panid %d' % panid 2739 self.send_command(cmd, go=False) 2740 self._expect_done() 2741 2742 if pskc is not None: 2743 cmd = 'dataset pskc %s' % pskc 2744 self.send_command(cmd, go=False) 2745 self._expect_done() 2746 2747 if security_policy is not None: 2748 if len(security_policy) >= 2: 2749 cmd = 'dataset securitypolicy %s %s' % ( 2750 str(security_policy[0]), 2751 security_policy[1], 2752 ) 2753 if len(security_policy) >= 3: 2754 cmd += ' %s' % (str(security_policy[2])) 2755 self.send_command(cmd, go=False) 2756 self._expect_done() 2757 2758 self.send_command('dataset commit active', go=False) 2759 self._expect_done() 2760 2761 def set_pending_dataset(self, pendingtimestamp, activetimestamp, panid=None, channel=None, delay=None): 2762 self.send_command('dataset clear') 2763 self._expect_done() 2764 2765 cmd = 'dataset pendingtimestamp %d' % pendingtimestamp 2766 self.send_command(cmd) 2767 self._expect_done() 2768 2769 cmd = 'dataset activetimestamp %d' % activetimestamp 2770 self.send_command(cmd) 2771 self._expect_done() 2772 2773 if panid is not None: 2774 cmd = 'dataset panid %d' % panid 2775 self.send_command(cmd) 2776 self._expect_done() 2777 2778 if channel is not None: 2779 cmd = 'dataset channel %d' % channel 2780 self.send_command(cmd) 2781 self._expect_done() 2782 2783 if delay is not None: 2784 cmd = 'dataset delay %d' % delay 2785 self.send_command(cmd) 2786 self._expect_done() 2787 2788 # Set the meshlocal prefix in config.py 2789 self.send_command('dataset meshlocalprefix %s' % config.MESH_LOCAL_PREFIX.split('/')[0]) 2790 self._expect_done() 2791 2792 self.send_command('dataset commit pending') 2793 self._expect_done() 2794 2795 def start_dataset_updater(self, panid=None, channel=None, security_policy=None, delay=None): 2796 self.send_command('dataset clear') 2797 self._expect_done() 2798 2799 if panid is not None: 2800 cmd = 'dataset panid %d' % panid 2801 self.send_command(cmd) 2802 self._expect_done() 2803 2804 if channel is not None: 2805 cmd = 'dataset channel %d' % channel 2806 self.send_command(cmd) 2807 self._expect_done() 2808 2809 if security_policy is not None: 2810 cmd = 'dataset securitypolicy %d %s ' % (security_policy[0], security_policy[1]) 2811 if (len(security_policy) >= 3): 2812 cmd += '%d ' % (security_policy[2]) 2813 self.send_command(cmd) 2814 self._expect_done() 2815 2816 if delay is not None: 2817 cmd = 'dataset delay %d ' % delay 2818 self.send_command(cmd) 2819 self._expect_done() 2820 2821 self.send_command('dataset updater start') 2822 self._expect_done() 2823 2824 def announce_begin(self, mask, count, period, ipaddr): 2825 cmd = 'commissioner announce %d %d %d %s' % ( 2826 mask, 2827 count, 2828 period, 2829 ipaddr, 2830 ) 2831 self.send_command(cmd) 2832 self._expect_done() 2833 2834 def send_mgmt_active_set( 2835 self, 2836 active_timestamp=None, 2837 channel=None, 2838 channel_mask=None, 2839 extended_panid=None, 2840 panid=None, 2841 network_key=None, 2842 mesh_local=None, 2843 network_name=None, 2844 security_policy=None, 2845 binary=None, 2846 ): 2847 cmd = 'dataset mgmtsetcommand active ' 2848 2849 if active_timestamp is not None: 2850 cmd += 'activetimestamp %d ' % active_timestamp 2851 2852 if channel is not None: 2853 cmd += 'channel %d ' % channel 2854 2855 if channel_mask is not None: 2856 cmd += 'channelmask %d ' % channel_mask 2857 2858 if extended_panid is not None: 2859 cmd += 'extpanid %s ' % extended_panid 2860 2861 if panid is not None: 2862 cmd += 'panid %d ' % panid 2863 2864 if network_key is not None: 2865 cmd += 'networkkey %s ' % network_key 2866 2867 if mesh_local is not None: 2868 cmd += 'localprefix %s ' % mesh_local 2869 2870 if network_name is not None: 2871 cmd += 'networkname %s ' % self._escape_escapable(network_name) 2872 2873 if security_policy is not None: 2874 cmd += 'securitypolicy %d %s ' % (security_policy[0], security_policy[1]) 2875 if (len(security_policy) >= 3): 2876 cmd += '%d ' % (security_policy[2]) 2877 2878 if binary is not None: 2879 cmd += '-x %s ' % binary 2880 2881 self.send_command(cmd) 2882 self._expect_done() 2883 2884 def send_mgmt_active_get(self, addr='', tlvs=[]): 2885 cmd = 'dataset mgmtgetcommand active' 2886 2887 if addr != '': 2888 cmd += ' address ' 2889 cmd += addr 2890 2891 if len(tlvs) != 0: 2892 tlv_str = ''.join('%02x' % tlv for tlv in tlvs) 2893 cmd += ' -x ' 2894 cmd += tlv_str 2895 2896 self.send_command(cmd) 2897 self._expect_done() 2898 2899 def send_mgmt_pending_get(self, addr='', tlvs=[]): 2900 cmd = 'dataset mgmtgetcommand pending' 2901 2902 if addr != '': 2903 cmd += ' address ' 2904 cmd += addr 2905 2906 if len(tlvs) != 0: 2907 tlv_str = ''.join('%02x' % tlv for tlv in tlvs) 2908 cmd += ' -x ' 2909 cmd += tlv_str 2910 2911 self.send_command(cmd) 2912 self._expect_done() 2913 2914 def send_mgmt_pending_set( 2915 self, 2916 pending_timestamp=None, 2917 active_timestamp=None, 2918 delay_timer=None, 2919 channel=None, 2920 panid=None, 2921 network_key=None, 2922 mesh_local=None, 2923 network_name=None, 2924 ): 2925 cmd = 'dataset mgmtsetcommand pending ' 2926 if pending_timestamp is not None: 2927 cmd += 'pendingtimestamp %d ' % pending_timestamp 2928 2929 if active_timestamp is not None: 2930 cmd += 'activetimestamp %d ' % active_timestamp 2931 2932 if delay_timer is not None: 2933 cmd += 'delaytimer %d ' % delay_timer 2934 2935 if channel is not None: 2936 cmd += 'channel %d ' % channel 2937 2938 if panid is not None: 2939 cmd += 'panid %d ' % panid 2940 2941 if network_key is not None: 2942 cmd += 'networkkey %s ' % network_key 2943 2944 if mesh_local is not None: 2945 cmd += 'localprefix %s ' % mesh_local 2946 2947 if network_name is not None: 2948 cmd += 'networkname %s ' % self._escape_escapable(network_name) 2949 2950 self.send_command(cmd) 2951 self._expect_done() 2952 2953 def coap_cancel(self): 2954 """ 2955 Cancel a CoAP subscription. 2956 """ 2957 cmd = 'coap cancel' 2958 self.send_command(cmd) 2959 self._expect_done() 2960 2961 def coap_delete(self, ipaddr, uri, con=False, payload=None): 2962 """ 2963 Send a DELETE request via CoAP. 2964 """ 2965 return self._coap_rq('delete', ipaddr, uri, con, payload) 2966 2967 def coap_get(self, ipaddr, uri, con=False, payload=None): 2968 """ 2969 Send a GET request via CoAP. 2970 """ 2971 return self._coap_rq('get', ipaddr, uri, con, payload) 2972 2973 def coap_get_block(self, ipaddr, uri, size=16, count=0): 2974 """ 2975 Send a GET request via CoAP. 2976 """ 2977 return self._coap_rq_block('get', ipaddr, uri, size, count) 2978 2979 def coap_observe(self, ipaddr, uri, con=False, payload=None): 2980 """ 2981 Send a GET request via CoAP with Observe set. 2982 """ 2983 return self._coap_rq('observe', ipaddr, uri, con, payload) 2984 2985 def coap_post(self, ipaddr, uri, con=False, payload=None): 2986 """ 2987 Send a POST request via CoAP. 2988 """ 2989 return self._coap_rq('post', ipaddr, uri, con, payload) 2990 2991 def coap_post_block(self, ipaddr, uri, size=16, count=0): 2992 """ 2993 Send a POST request via CoAP. 2994 """ 2995 return self._coap_rq_block('post', ipaddr, uri, size, count) 2996 2997 def coap_put(self, ipaddr, uri, con=False, payload=None): 2998 """ 2999 Send a PUT request via CoAP. 3000 """ 3001 return self._coap_rq('put', ipaddr, uri, con, payload) 3002 3003 def coap_put_block(self, ipaddr, uri, size=16, count=0): 3004 """ 3005 Send a PUT request via CoAP. 3006 """ 3007 return self._coap_rq_block('put', ipaddr, uri, size, count) 3008 3009 def _coap_rq(self, method, ipaddr, uri, con=False, payload=None): 3010 """ 3011 Issue a GET/POST/PUT/DELETE/GET OBSERVE request. 3012 """ 3013 cmd = 'coap %s %s %s' % (method, ipaddr, uri) 3014 if con: 3015 cmd += ' con' 3016 else: 3017 cmd += ' non' 3018 3019 if payload is not None: 3020 cmd += ' %s' % payload 3021 3022 self.send_command(cmd) 3023 return self.coap_wait_response() 3024 3025 def _coap_rq_block(self, method, ipaddr, uri, size=16, count=0): 3026 """ 3027 Issue a GET/POST/PUT/DELETE/GET OBSERVE BLOCK request. 3028 """ 3029 cmd = 'coap %s %s %s' % (method, ipaddr, uri) 3030 3031 cmd += ' block-%d' % size 3032 3033 if count != 0: 3034 cmd += ' %d' % count 3035 3036 self.send_command(cmd) 3037 return self.coap_wait_response() 3038 3039 def coap_wait_response(self): 3040 """ 3041 Wait for a CoAP response, and return it. 3042 """ 3043 if isinstance(self.simulator, simulator.VirtualTime): 3044 self.simulator.go(5) 3045 timeout = 1 3046 else: 3047 timeout = 5 3048 3049 self._expect(r'coap response from ([\da-f:]+)(?: OBS=(\d+))?' 3050 r'(?: with payload: ([\da-f]+))?\b', 3051 timeout=timeout) 3052 (source, observe, payload) = self.pexpect.match.groups() 3053 source = source.decode('UTF-8') 3054 3055 if observe is not None: 3056 observe = int(observe, base=10) 3057 3058 if payload is not None: 3059 try: 3060 payload = binascii.a2b_hex(payload).decode('UTF-8') 3061 except UnicodeDecodeError: 3062 pass 3063 3064 # Return the values received 3065 return dict(source=source, observe=observe, payload=payload) 3066 3067 def coap_wait_request(self): 3068 """ 3069 Wait for a CoAP request to be made. 3070 """ 3071 if isinstance(self.simulator, simulator.VirtualTime): 3072 self.simulator.go(5) 3073 timeout = 1 3074 else: 3075 timeout = 5 3076 3077 self._expect(r'coap request from ([\da-f:]+)(?: OBS=(\d+))?' 3078 r'(?: with payload: ([\da-f]+))?\b', 3079 timeout=timeout) 3080 (source, observe, payload) = self.pexpect.match.groups() 3081 source = source.decode('UTF-8') 3082 3083 if observe is not None: 3084 observe = int(observe, base=10) 3085 3086 if payload is not None: 3087 payload = binascii.a2b_hex(payload).decode('UTF-8') 3088 3089 # Return the values received 3090 return dict(source=source, observe=observe, payload=payload) 3091 3092 def coap_wait_subscribe(self): 3093 """ 3094 Wait for a CoAP client to be subscribed. 3095 """ 3096 if isinstance(self.simulator, simulator.VirtualTime): 3097 self.simulator.go(5) 3098 timeout = 1 3099 else: 3100 timeout = 5 3101 3102 self._expect(r'Subscribing client\b', timeout=timeout) 3103 3104 def coap_wait_ack(self): 3105 """ 3106 Wait for a CoAP notification ACK. 3107 """ 3108 if isinstance(self.simulator, simulator.VirtualTime): 3109 self.simulator.go(5) 3110 timeout = 1 3111 else: 3112 timeout = 5 3113 3114 self._expect(r'Received ACK in reply to notification from ([\da-f:]+)\b', timeout=timeout) 3115 (source,) = self.pexpect.match.groups() 3116 source = source.decode('UTF-8') 3117 3118 return source 3119 3120 def coap_set_resource_path(self, path): 3121 """ 3122 Set the path for the CoAP resource. 3123 """ 3124 cmd = 'coap resource %s' % path 3125 self.send_command(cmd) 3126 self._expect_done() 3127 3128 def coap_set_resource_path_block(self, path, count=0): 3129 """ 3130 Set the path for the CoAP resource and how many blocks can be received from this resource. 3131 """ 3132 cmd = 'coap resource %s %d' % (path, count) 3133 self.send_command(cmd) 3134 self._expect('Done') 3135 3136 def coap_set_content(self, content): 3137 """ 3138 Set the content of the CoAP resource. 3139 """ 3140 cmd = 'coap set %s' % content 3141 self.send_command(cmd) 3142 self._expect_done() 3143 3144 def coap_start(self): 3145 """ 3146 Start the CoAP service. 3147 """ 3148 cmd = 'coap start' 3149 self.send_command(cmd) 3150 self._expect_done() 3151 3152 def coap_stop(self): 3153 """ 3154 Stop the CoAP service. 3155 """ 3156 cmd = 'coap stop' 3157 self.send_command(cmd) 3158 3159 if isinstance(self.simulator, simulator.VirtualTime): 3160 self.simulator.go(5) 3161 timeout = 1 3162 else: 3163 timeout = 5 3164 3165 self._expect_done(timeout=timeout) 3166 3167 def coaps_start_psk(self, psk, pskIdentity): 3168 cmd = 'coaps psk %s %s' % (psk, pskIdentity) 3169 self.send_command(cmd) 3170 self._expect_done() 3171 3172 cmd = 'coaps start' 3173 self.send_command(cmd) 3174 self._expect_done() 3175 3176 def coaps_start_x509(self): 3177 cmd = 'coaps x509' 3178 self.send_command(cmd) 3179 self._expect_done() 3180 3181 cmd = 'coaps start' 3182 self.send_command(cmd) 3183 self._expect_done() 3184 3185 def coaps_set_resource_path(self, path): 3186 cmd = 'coaps resource %s' % path 3187 self.send_command(cmd) 3188 self._expect_done() 3189 3190 def coaps_stop(self): 3191 cmd = 'coaps stop' 3192 self.send_command(cmd) 3193 3194 if isinstance(self.simulator, simulator.VirtualTime): 3195 self.simulator.go(5) 3196 timeout = 1 3197 else: 3198 timeout = 5 3199 3200 self._expect_done(timeout=timeout) 3201 3202 def coaps_connect(self, ipaddr): 3203 cmd = 'coaps connect %s' % ipaddr 3204 self.send_command(cmd) 3205 3206 if isinstance(self.simulator, simulator.VirtualTime): 3207 self.simulator.go(5) 3208 timeout = 1 3209 else: 3210 timeout = 5 3211 3212 self._expect('coaps connected', timeout=timeout) 3213 3214 def coaps_disconnect(self): 3215 cmd = 'coaps disconnect' 3216 self.send_command(cmd) 3217 self._expect_done() 3218 self.simulator.go(5) 3219 3220 def coaps_get(self): 3221 cmd = 'coaps get test' 3222 self.send_command(cmd) 3223 3224 if isinstance(self.simulator, simulator.VirtualTime): 3225 self.simulator.go(5) 3226 timeout = 1 3227 else: 3228 timeout = 5 3229 3230 self._expect('coaps response', timeout=timeout) 3231 3232 def commissioner_mgmtget(self, tlvs_binary=None): 3233 cmd = 'commissioner mgmtget' 3234 if tlvs_binary is not None: 3235 cmd += ' -x %s' % tlvs_binary 3236 self.send_command(cmd) 3237 self._expect_done() 3238 3239 def commissioner_mgmtset(self, tlvs_binary): 3240 cmd = 'commissioner mgmtset -x %s' % tlvs_binary 3241 self.send_command(cmd) 3242 self._expect_done() 3243 3244 def bytes_to_hex_str(self, src): 3245 return ''.join(format(x, '02x') for x in src) 3246 3247 def commissioner_mgmtset_with_tlvs(self, tlvs): 3248 payload = bytearray() 3249 for tlv in tlvs: 3250 payload += tlv.to_hex() 3251 self.commissioner_mgmtset(self.bytes_to_hex_str(payload)) 3252 3253 def udp_start(self, local_ipaddr, local_port, bind_unspecified=False): 3254 cmd = 'udp open' 3255 self.send_command(cmd) 3256 self._expect_done() 3257 3258 cmd = 'udp bind %s %s %s' % ("-u" if bind_unspecified else "", local_ipaddr, local_port) 3259 self.send_command(cmd) 3260 self._expect_done() 3261 3262 def udp_stop(self): 3263 cmd = 'udp close' 3264 self.send_command(cmd) 3265 self._expect_done() 3266 3267 def udp_send(self, bytes, ipaddr, port, success=True): 3268 cmd = 'udp send %s %d -s %d ' % (ipaddr, port, bytes) 3269 self.send_command(cmd) 3270 if success: 3271 self._expect_done() 3272 else: 3273 self._expect('Error') 3274 3275 def udp_check_rx(self, bytes_should_rx): 3276 self._expect('%d bytes' % bytes_should_rx) 3277 3278 def set_routereligible(self, enable: bool): 3279 cmd = f'routereligible {"enable" if enable else "disable"}' 3280 self.send_command(cmd) 3281 self._expect_done() 3282 3283 def router_list(self): 3284 cmd = 'router list' 3285 self.send_command(cmd) 3286 self._expect([r'(\d+)((\s\d+)*)']) 3287 3288 g = self.pexpect.match.groups() 3289 router_list = g[0].decode('utf8') + ' ' + g[1].decode('utf8') 3290 router_list = [int(x) for x in router_list.split()] 3291 self._expect_done() 3292 return router_list 3293 3294 def router_table(self): 3295 cmd = 'router table' 3296 self.send_command(cmd) 3297 3298 self._expect(r'(.*)Done') 3299 g = self.pexpect.match.groups() 3300 output = g[0].decode('utf8') 3301 lines = output.strip().split('\n') 3302 lines = [l.strip() for l in lines] 3303 router_table = {} 3304 for i, line in enumerate(lines): 3305 if not line.startswith('|') or not line.endswith('|'): 3306 if i not in (0, 2): 3307 # should not happen 3308 print("unexpected line %d: %s" % (i, line)) 3309 3310 continue 3311 3312 line = line[1:][:-1] 3313 line = [x.strip() for x in line.split('|')] 3314 if len(line) < 9: 3315 print("unexpected line %d: %s" % (i, line)) 3316 continue 3317 3318 try: 3319 int(line[0]) 3320 except ValueError: 3321 if i != 1: 3322 print("unexpected line %d: %s" % (i, line)) 3323 continue 3324 3325 id = int(line[0]) 3326 rloc16 = int(line[1], 16) 3327 nexthop = int(line[2]) 3328 pathcost = int(line[3]) 3329 lqin = int(line[4]) 3330 lqout = int(line[5]) 3331 age = int(line[6]) 3332 emac = str(line[7]) 3333 link = int(line[8]) 3334 3335 router_table[id] = { 3336 'rloc16': rloc16, 3337 'nexthop': nexthop, 3338 'pathcost': pathcost, 3339 'lqin': lqin, 3340 'lqout': lqout, 3341 'age': age, 3342 'emac': emac, 3343 'link': link, 3344 } 3345 3346 return router_table 3347 3348 def link_metrics_request_single_probe(self, dst_addr: str, linkmetrics_flags: str, mode: str = ''): 3349 cmd = 'linkmetrics request %s %s single %s' % (mode, dst_addr, linkmetrics_flags) 3350 self.send_command(cmd) 3351 self.simulator.go(5) 3352 return self._parse_linkmetrics_query_result(self._expect_command_output()) 3353 3354 def link_metrics_request_forward_tracking_series(self, dst_addr: str, series_id: int, mode: str = ''): 3355 cmd = 'linkmetrics request %s %s forward %d' % (mode, dst_addr, series_id) 3356 self.send_command(cmd) 3357 self.simulator.go(5) 3358 return self._parse_linkmetrics_query_result(self._expect_command_output()) 3359 3360 def _parse_linkmetrics_query_result(self, lines): 3361 """Parse link metrics query result""" 3362 3363 # Example of command output: 3364 # ['Received Link Metrics Report from: fe80:0:0:0:146e:a00:0:1', 3365 # '- PDU Counter: 1 (Count/Summation)', 3366 # '- LQI: 0 (Exponential Moving Average)', 3367 # '- Margin: 80 (dB) (Exponential Moving Average)', 3368 # '- RSSI: -20 (dBm) (Exponential Moving Average)'] 3369 # 3370 # Or 'Link Metrics Report, status: {status}' 3371 3372 result = {} 3373 for line in lines: 3374 if line.startswith('- '): 3375 k, v = line[2:].split(': ') 3376 result[k] = v.split(' ')[0] 3377 elif line.startswith('Link Metrics Report, status: '): 3378 result['Status'] = line[29:] 3379 return result 3380 3381 def link_metrics_config_req_enhanced_ack_based_probing(self, 3382 dst_addr: str, 3383 enable: bool, 3384 metrics_flags: str, 3385 ext_flags='', 3386 mode: str = ''): 3387 cmd = "linkmetrics config %s %s enhanced-ack" % (mode, dst_addr) 3388 if enable: 3389 cmd = cmd + (" register %s %s" % (metrics_flags, ext_flags)) 3390 else: 3391 cmd = cmd + " clear" 3392 self.send_command(cmd) 3393 self._expect_done() 3394 3395 def link_metrics_config_req_forward_tracking_series(self, 3396 dst_addr: str, 3397 series_id: int, 3398 series_flags: str, 3399 metrics_flags: str, 3400 mode: str = ''): 3401 cmd = "linkmetrics config %s %s forward %d %s %s" % (mode, dst_addr, series_id, series_flags, metrics_flags) 3402 self.send_command(cmd) 3403 self._expect_done() 3404 3405 def link_metrics_send_link_probe(self, dst_addr: str, series_id: int, length: int): 3406 cmd = "linkmetrics probe %s %d %d" % (dst_addr, series_id, length) 3407 self.send_command(cmd) 3408 self._expect_done() 3409 3410 def link_metrics_mgr_set_enabled(self, enable: bool): 3411 op_str = "enable" if enable else "disable" 3412 cmd = f'linkmetricsmgr {op_str}' 3413 self.send_command(cmd) 3414 self._expect_done() 3415 3416 def send_address_notification(self, dst: str, target: str, mliid: str): 3417 cmd = f'fake /a/an {dst} {target} {mliid}' 3418 self.send_command(cmd) 3419 self._expect_done() 3420 3421 def send_proactive_backbone_notification(self, target: str, mliid: str, ltt: int): 3422 cmd = f'fake /b/ba {target} {mliid} {ltt}' 3423 self.send_command(cmd) 3424 self._expect_done() 3425 3426 def dns_get_config(self): 3427 """ 3428 Returns the DNS config as a list of property dictionary (string key and string value). 3429 3430 Example output: 3431 { 3432 'Server': '[fd00:0:0:0:0:0:0:1]:1234' 3433 'ResponseTimeout': '5000 ms' 3434 'MaxTxAttempts': '2' 3435 'RecursionDesired': 'no' 3436 } 3437 """ 3438 cmd = f'dns config' 3439 self.send_command(cmd) 3440 output = self._expect_command_output() 3441 config = {} 3442 for line in output: 3443 k, v = line.split(': ') 3444 config[k] = v 3445 return config 3446 3447 def dns_set_config(self, config): 3448 cmd = f'dns config {config}' 3449 self.send_command(cmd) 3450 self._expect_done() 3451 3452 def dns_resolve(self, hostname, server=None, port=53): 3453 cmd = f'dns resolve {hostname}' 3454 if server is not None: 3455 cmd += f' {server} {port}' 3456 3457 self.send_command(cmd) 3458 self.simulator.go(10) 3459 output = self._expect_command_output() 3460 dns_resp = output[0] 3461 # example output: "DNS response for host1.default.service.arpa. - fd00:db8:0:0:fd3d:d471:1e8c:b60 TTL:7190 " 3462 # " fd00:db8:0:0:0:ff:fe00:9000 TTL:7190" 3463 addrs = dns_resp.strip().split(' - ')[1].split(' ') 3464 ip = [item.strip() for item in addrs[::2]] 3465 ttl = [int(item.split('TTL:')[1]) for item in addrs[1::2]] 3466 3467 return list(zip(ip, ttl)) 3468 3469 def _parse_dns_service_info(self, output): 3470 # Example of `output` 3471 # Port:22222, Priority:2, Weight:2, TTL:7155 3472 # Host:host2.default.service.arpa. 3473 # HostAddress:0:0:0:0:0:0:0:0 TTL:0 3474 # TXT:[a=00, b=02bb] TTL:7155 3475 3476 m = re.match( 3477 r'.*Port:(\d+), Priority:(\d+), Weight:(\d+), TTL:(\d+)\s+Host:(.*?)\s+HostAddress:(\S+) TTL:(\d+)\s+TXT:\[(.*?)\] TTL:(\d+)', 3478 '\r'.join(output)) 3479 if not m: 3480 return {} 3481 port, priority, weight, srv_ttl, hostname, address, aaaa_ttl, txt_data, txt_ttl = m.groups() 3482 return { 3483 'port': int(port), 3484 'priority': int(priority), 3485 'weight': int(weight), 3486 'host': hostname, 3487 'address': address, 3488 'txt_data': txt_data, 3489 'srv_ttl': int(srv_ttl), 3490 'txt_ttl': int(txt_ttl), 3491 'aaaa_ttl': int(aaaa_ttl), 3492 } 3493 3494 def dns_resolve_service(self, instance, service, server=None, port=53): 3495 """ 3496 Resolves the service instance and returns the instance information as a dict. 3497 3498 Example return value: 3499 { 3500 'port': 12345, 3501 'priority': 0, 3502 'weight': 0, 3503 'host': 'ins1._ipps._tcp.default.service.arpa.', 3504 'address': '2001::1', 3505 'txt_data': 'a=00, b=02bb', 3506 'srv_ttl': 7100, 3507 'txt_ttl': 7100, 3508 'aaaa_ttl': 7100, 3509 } 3510 """ 3511 instance = self._escape_escapable(instance) 3512 cmd = f'dns service {instance} {service}' 3513 if server is not None: 3514 cmd += f' {server} {port}' 3515 3516 self.send_command(cmd) 3517 self.simulator.go(10) 3518 output = self._expect_command_output() 3519 info = self._parse_dns_service_info(output) 3520 if not info: 3521 raise Exception('dns resolve service failed: %s.%s' % (instance, service)) 3522 return info 3523 3524 @staticmethod 3525 def __parse_hex_string(hexstr: str) -> bytes: 3526 assert (len(hexstr) % 2 == 0) 3527 return bytes(int(hexstr[i:i + 2], 16) for i in range(0, len(hexstr), 2)) 3528 3529 def dns_browse(self, service_name, server=None, port=53): 3530 """ 3531 Browse the service and returns the instances. 3532 3533 Example return value: 3534 { 3535 'ins1': { 3536 'port': 12345, 3537 'priority': 1, 3538 'weight': 1, 3539 'host': 'ins1._ipps._tcp.default.service.arpa.', 3540 'address': '2001::1', 3541 'txt_data': 'a=00, b=11cf', 3542 'srv_ttl': 7100, 3543 'txt_ttl': 7100, 3544 'aaaa_ttl': 7100, 3545 }, 3546 'ins2': { 3547 'port': 12345, 3548 'priority': 2, 3549 'weight': 2, 3550 'host': 'ins2._ipps._tcp.default.service.arpa.', 3551 'address': '2001::2', 3552 'txt_data': 'a=01, b=23dd', 3553 'srv_ttl': 7100, 3554 'txt_ttl': 7100, 3555 'aaaa_ttl': 7100, 3556 } 3557 } 3558 """ 3559 cmd = f'dns browse {service_name}' 3560 if server is not None: 3561 cmd += f' {server} {port}' 3562 3563 self.send_command(cmd) 3564 self.simulator.go(10) 3565 output = self._expect_command_output() 3566 3567 # Example output: 3568 # DNS browse response for _ipps._tcp.default.service.arpa. 3569 # ins2 3570 # Port:22222, Priority:2, Weight:2, TTL:7175 3571 # Host:host2.default.service.arpa. 3572 # HostAddress:fd00:db8:0:0:3205:28dd:5b87:6a63 TTL:7175 3573 # TXT:[a=00, b=11cf] TTL:7175 3574 # ins1 3575 # Port:11111, Priority:1, Weight:1, TTL:7170 3576 # Host:host1.default.service.arpa. 3577 # HostAddress:fd00:db8:0:0:39f4:d9:eb4f:778 TTL:7170 3578 # TXT:[a=01, b=23dd] TTL:7170 3579 # Done 3580 3581 result = {} 3582 index = 1 # skip first line 3583 while index < len(output): 3584 ins = output[index].strip() 3585 result[ins] = self._parse_dns_service_info(output[index + 1:index + 6]) 3586 index = index + (5 if result[ins] else 1) 3587 return result 3588 3589 def set_mliid(self, mliid: str): 3590 cmd = f'mliid {mliid}' 3591 self.send_command(cmd) 3592 self._expect_command_output() 3593 3594 def history_netinfo(self, num_entries=0): 3595 """ 3596 Get the `netinfo` history list, parse each entry and return 3597 a list of dictionary (string key and string value) entries. 3598 3599 Example of return value: 3600 [ 3601 { 3602 'age': '00:00:00.000 ago', 3603 'role': 'disabled', 3604 'mode': 'rdn', 3605 'rloc16': '0x7400', 3606 'partition-id': '1318093703' 3607 }, 3608 { 3609 'age': '00:00:02.588 ago', 3610 'role': 'leader', 3611 'mode': 'rdn', 3612 'rloc16': '0x7400', 3613 'partition-id': '1318093703' 3614 } 3615 ] 3616 """ 3617 cmd = f'history netinfo list {num_entries}' 3618 self.send_command(cmd) 3619 output = self._expect_command_output() 3620 netinfos = [] 3621 for entry in output: 3622 netinfo = {} 3623 age, info = entry.split(' -> ') 3624 netinfo['age'] = age 3625 for item in info.split(' '): 3626 k, v = item.split(':') 3627 netinfo[k] = v 3628 netinfos.append(netinfo) 3629 return netinfos 3630 3631 def history_rx(self, num_entries=0): 3632 """ 3633 Get the IPv6 RX history list, parse each entry and return 3634 a list of dictionary (string key and string value) entries. 3635 3636 Example of return value: 3637 [ 3638 { 3639 'age': '00:00:01.999', 3640 'type': 'ICMP6(EchoReqst)', 3641 'len': '16', 3642 'sec': 'yes', 3643 'prio': 'norm', 3644 'rss': '-20', 3645 'from': '0xac00', 3646 'radio': '15.4', 3647 'src': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0', 3648 'dst': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0', 3649 } 3650 ] 3651 """ 3652 cmd = f'history rx list {num_entries}' 3653 self.send_command(cmd) 3654 return self._parse_history_rx_tx_ouput(self._expect_command_output()) 3655 3656 def history_tx(self, num_entries=0): 3657 """ 3658 Get the IPv6 TX history list, parse each entry and return 3659 a list of dictionary (string key and string value) entries. 3660 3661 Example of return value: 3662 [ 3663 { 3664 'age': '00:00:01.999', 3665 'type': 'ICMP6(EchoReply)', 3666 'len': '16', 3667 'sec': 'yes', 3668 'prio': 'norm', 3669 'to': '0xac00', 3670 'tx-success': 'yes', 3671 'radio': '15.4', 3672 'src': '[fd00:db8:0:0:ed7e:2d04:e543:eba5]:0', 3673 'dst': '[fd00:db8:0:0:2cfa:fd61:58a9:f0aa]:0', 3674 3675 } 3676 ] 3677 """ 3678 cmd = f'history tx list {num_entries}' 3679 self.send_command(cmd) 3680 return self._parse_history_rx_tx_ouput(self._expect_command_output()) 3681 3682 def _parse_history_rx_tx_ouput(self, lines): 3683 rxtx_list = [] 3684 for line in lines: 3685 if line.strip().startswith('type:'): 3686 for item in line.strip().split(' '): 3687 k, v = item.split(':') 3688 entry[k] = v 3689 elif line.strip().startswith('src:'): 3690 entry['src'] = line[4:] 3691 elif line.strip().startswith('dst:'): 3692 entry['dst'] = line[4:] 3693 rxtx_list.append(entry) 3694 else: 3695 entry = {} 3696 entry['age'] = line 3697 3698 return rxtx_list 3699 3700 def set_router_id_range(self, min_router_id: int, max_router_id: int): 3701 cmd = f'routeridrange {min_router_id} {max_router_id}' 3702 self.send_command(cmd) 3703 self._expect_command_output() 3704 3705 def get_router_id_range(self): 3706 cmd = 'routeridrange' 3707 self.send_command(cmd) 3708 line = self._expect_command_output()[0] 3709 return [int(item) for item in line.split()] 3710 3711 def get_channel_monitor_info(self) -> Dict: 3712 """ 3713 Returns: 3714 Dict of channel monitor info, e.g. 3715 {'enabled': '1', 3716 'interval': '41000', 3717 'threshold': '-75', 3718 'window': '960', 3719 'count': '985', 3720 'occupancies': { 3721 '11': '0.00%', 3722 '12': '3.50%', 3723 '13': '9.89%', 3724 '14': '15.36%', 3725 '15': '20.02%', 3726 '16': '21.95%', 3727 '17': '32.71%', 3728 '18': '35.76%', 3729 '19': '37.97%', 3730 '20': '43.68%', 3731 '21': '48.95%', 3732 '22': '54.05%', 3733 '23': '58.65%', 3734 '24': '68.26%', 3735 '25': '66.73%', 3736 '26': '73.12%' 3737 } 3738 } 3739 """ 3740 config = {} 3741 self.send_command('channel monitor') 3742 3743 for line in self._expect_results(r'\S+'): 3744 if re.match(r'.*:\s.*', line): 3745 key, val = line.split(':') 3746 config.update({key: val.strip()}) 3747 elif re.match(r'.*:', line): # occupancy 3748 occ_key, val = line.split(':') 3749 val = {} 3750 config.update({occ_key: val}) 3751 elif 'busy' in line: 3752 # channel occupancies 3753 key = line.split()[1] 3754 val = line.split()[3] 3755 config[occ_key].update({key: val}) 3756 return config 3757 3758 def set_channel_manager_auto_enable(self, enable: bool): 3759 self.send_command(f'channel manager auto {int(enable)}') 3760 self._expect_done() 3761 3762 def set_channel_manager_autocsl_enable(self, enable: bool): 3763 self.send_command(f'channel manager autocsl {int(enable)}') 3764 self._expect_done() 3765 3766 def set_channel_manager_supported(self, channel_mask: int): 3767 self.send_command(f'channel manager supported {int(channel_mask)}') 3768 self._expect_done() 3769 3770 def set_channel_manager_favored(self, channel_mask: int): 3771 self.send_command(f'channel manager favored {int(channel_mask)}') 3772 self._expect_done() 3773 3774 def set_channel_manager_interval(self, interval: int): 3775 self.send_command(f'channel manager interval {interval}') 3776 self._expect_done() 3777 3778 def set_channel_manager_cca_threshold(self, hex_value: str): 3779 self.send_command(f'channel manager threshold {hex_value}') 3780 self._expect_done() 3781 3782 def get_channel_manager_config(self): 3783 self.send_command('channel manager') 3784 return self._expect_key_value_pairs(r'\S+') 3785 3786 3787class Node(NodeImpl, OtCli): 3788 pass 3789 3790 3791class LinuxHost(): 3792 PING_RESPONSE_PATTERN = re.compile(r'\d+ bytes from .*:.*') 3793 ETH_DEV = config.BACKBONE_IFNAME 3794 3795 def enable_ether(self): 3796 """Enable the ethernet interface. 3797 """ 3798 3799 self.bash(f'ip link set {self.ETH_DEV} up') 3800 3801 def disable_ether(self): 3802 """Disable the ethernet interface. 3803 """ 3804 3805 self.bash(f'ip link set {self.ETH_DEV} down') 3806 3807 def get_ether_addrs(self, ipv4=False, ipv6=True): 3808 output = self.bash(f'ip addr list dev {self.ETH_DEV}') 3809 3810 addrs = [] 3811 for line in output: 3812 # line examples: 3813 # "inet6 fe80::42:c0ff:fea8:903/64 scope link" 3814 # "inet 192.168.9.1/24 brd 192.168.9.255 scope global eth0" 3815 line = line.strip().split() 3816 3817 if not line or not line[0].startswith('inet'): 3818 continue 3819 if line[0] == 'inet' and not ipv4: 3820 continue 3821 if line[0] == 'inet6' and not ipv6: 3822 continue 3823 3824 addr = line[1] 3825 if '/' in addr: 3826 addr = addr.split('/')[0] 3827 addrs.append(addr) 3828 3829 logging.debug('%s: get_ether_addrs: %r', self, addrs) 3830 return addrs 3831 3832 def get_ether_mac(self): 3833 output = self.bash(f'ip addr list dev {self.ETH_DEV}') 3834 for line in output: 3835 # link/ether 02:42:ac:11:00:02 brd ff:ff:ff:ff:ff:ff link-netnsid 0 3836 line = line.strip().split() 3837 if line and line[0] == 'link/ether': 3838 return line[1] 3839 3840 assert False, output 3841 3842 def add_ipmaddr_ether(self, ip: str): 3843 cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.ETH_DEV} {ip} &' 3844 self.bash(cmd) 3845 3846 def ping_ether(self, ipaddr, num_responses=1, size=None, timeout=5, ttl=None, interface='eth0') -> int: 3847 3848 cmd = f'ping -6 {ipaddr} -I {interface} -c {num_responses} -W {timeout}' 3849 if size is not None: 3850 cmd += f' -s {size}' 3851 3852 if ttl is not None: 3853 cmd += f' -t {ttl}' 3854 3855 resp_count = 0 3856 3857 try: 3858 for line in self.bash(cmd): 3859 if self.PING_RESPONSE_PATTERN.match(line): 3860 resp_count += 1 3861 except subprocess.CalledProcessError: 3862 pass 3863 3864 return resp_count 3865 3866 def get_ip6_address(self, address_type: config.ADDRESS_TYPE): 3867 """Get specific type of IPv6 address configured on thread device. 3868 3869 Args: 3870 address_type: the config.ADDRESS_TYPE type of IPv6 address. 3871 3872 Returns: 3873 IPv6 address string. 3874 """ 3875 if address_type == config.ADDRESS_TYPE.BACKBONE_GUA: 3876 return self._getBackboneGua() 3877 elif address_type == config.ADDRESS_TYPE.BACKBONE_LINK_LOCAL: 3878 return self._getInfraLinkLocalAddress() 3879 elif address_type == config.ADDRESS_TYPE.ONLINK_ULA: 3880 return self._getInfraUla() 3881 elif address_type == config.ADDRESS_TYPE.ONLINK_GUA: 3882 return self._getInfraGua() 3883 else: 3884 raise ValueError(f'unsupported address type: {address_type}') 3885 3886 def _getBackboneGua(self) -> Optional[str]: 3887 for addr in self.get_ether_addrs(): 3888 if re.match(config.BACKBONE_PREFIX_REGEX_PATTERN, addr, re.I): 3889 return addr 3890 3891 return None 3892 3893 def _getInfraUla(self) -> Optional[str]: 3894 """ Returns the ULA addresses autoconfigured on the infra link. 3895 """ 3896 addrs = [] 3897 for addr in self.get_ether_addrs(): 3898 if re.match(config.ONLINK_PREFIX_REGEX_PATTERN, addr, re.I): 3899 addrs.append(addr) 3900 3901 return addrs 3902 3903 def _getInfraGua(self) -> Optional[str]: 3904 """ Returns the GUA addresses autoconfigured on the infra link. 3905 """ 3906 3907 gua_prefix = config.ONLINK_GUA_PREFIX.split('::/')[0] 3908 return [addr for addr in self.get_ether_addrs() if addr.startswith(gua_prefix)] 3909 3910 def _getInfraLinkLocalAddress(self) -> Optional[str]: 3911 """ Returns the link-local address autoconfigured on the infra link, which is started with "fe80". 3912 """ 3913 for addr in self.get_ether_addrs(): 3914 if re.match(config.LINK_LOCAL_REGEX_PATTERN, addr, re.I): 3915 return addr 3916 3917 return None 3918 3919 def ping(self, *args, **kwargs): 3920 backbone = kwargs.pop('backbone', False) 3921 if backbone: 3922 return self.ping_ether(*args, **kwargs) 3923 else: 3924 return super().ping(*args, **kwargs) 3925 3926 def udp_send_host(self, ipaddr, port, data, hop_limit=None): 3927 if hop_limit is None: 3928 if ipaddress.ip_address(ipaddr).is_multicast: 3929 hop_limit = 10 3930 else: 3931 hop_limit = 64 3932 cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/udp_send_host.py {ipaddr} {port} "{data}" {hop_limit}' 3933 self.bash(cmd) 3934 3935 def add_ipmaddr(self, *args, **kwargs): 3936 backbone = kwargs.pop('backbone', False) 3937 if backbone: 3938 return self.add_ipmaddr_ether(*args, **kwargs) 3939 else: 3940 return super().add_ipmaddr(*args, **kwargs) 3941 3942 def ip_neighbors_flush(self): 3943 # clear neigh cache on linux 3944 self.bash(f'ip -6 neigh list dev {self.ETH_DEV}') 3945 self.bash(f'ip -6 neigh flush nud all nud failed nud noarp dev {self.ETH_DEV}') 3946 self.bash('ip -6 neigh list nud all dev %s | cut -d " " -f1 | sudo xargs -I{} ip -6 neigh delete {} dev %s' % 3947 (self.ETH_DEV, self.ETH_DEV)) 3948 self.bash(f'ip -6 neigh list dev {self.ETH_DEV}') 3949 3950 def publish_mdns_service(self, instance_name, service_type, port, host_name, txt): 3951 """Publish an mDNS service on the Ethernet. 3952 3953 :param instance_name: the service instance name. 3954 :param service_type: the service type in format of '<service_type>.<protocol>'. 3955 :param port: the port the service is at. 3956 :param host_name: the host name this service points to. The domain 3957 should not be included. 3958 :param txt: a dictionary containing the key-value pairs of the TXT record. 3959 """ 3960 txt_string = ' '.join([f'{key}={value}' for key, value in txt.items()]) 3961 self.bash(f'avahi-publish -s {instance_name} {service_type} {port} -H {host_name}.local {txt_string} &') 3962 3963 def publish_mdns_host(self, hostname, addresses): 3964 """Publish an mDNS host on the Ethernet 3965 3966 :param host_name: the host name this service points to. The domain 3967 should not be included. 3968 :param addresses: a list of strings representing the addresses to 3969 be registered with the host. 3970 """ 3971 for address in addresses: 3972 self.bash(f'avahi-publish -a {hostname}.local {address} &') 3973 3974 def browse_mdns_services(self, name, timeout=2): 3975 """ Browse mDNS services on the ethernet. 3976 3977 :param name: the service type name in format of '<service-name>.<protocol>'. 3978 :param timeout: timeout value in seconds before returning. 3979 :return: A list of service instance names. 3980 """ 3981 3982 self.bash(f'dns-sd -Z {name} local. > /tmp/{name} 2>&1 &') 3983 time.sleep(timeout) 3984 self.bash('pkill dns-sd') 3985 3986 instances = [] 3987 for line in self.bash(f'cat /tmp/{name}', encoding='raw_unicode_escape'): 3988 elements = line.split() 3989 if len(elements) >= 3 and elements[0] == name and elements[1] == 'PTR': 3990 instances.append(elements[2][:-len('.' + name)]) 3991 return instances 3992 3993 def discover_mdns_service(self, instance, name, host_name, timeout=2): 3994 """ Discover/resolve the mDNS service on ethernet. 3995 3996 :param instance: the service instance name. 3997 :param name: the service name in format of '<service-name>.<protocol>'. 3998 :param host_name: the host name this service points to. The domain 3999 should not be included. 4000 :param timeout: timeout value in seconds before returning. 4001 :return: a dict of service properties or None. 4002 4003 The return value is a dict with the same key/values of srp_server_get_service 4004 except that we don't have a `deleted` field here. 4005 """ 4006 host_name_file = self.bash('mktemp')[0].strip() 4007 service_data_file = self.bash('mktemp')[0].strip() 4008 4009 self.bash(f'dns-sd -Z {name} local. > {service_data_file} 2>&1 &') 4010 time.sleep(timeout) 4011 4012 full_service_name = f'{instance}.{name}' 4013 # When hostname is unspecified, extract hostname from browse result 4014 if host_name is None: 4015 for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'): 4016 elements = line.split() 4017 if len(elements) >= 6 and elements[0] == full_service_name and elements[1] == 'SRV': 4018 host_name = elements[5].split('.')[0] 4019 break 4020 4021 assert (host_name is not None) 4022 self.bash(f'dns-sd -G v6 {host_name}.local. > {host_name_file} 2>&1 &') 4023 time.sleep(timeout) 4024 4025 self.bash('pkill dns-sd') 4026 addresses = [] 4027 service = {} 4028 4029 logging.debug(self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape')) 4030 logging.debug(self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape')) 4031 4032 # example output in the host file: 4033 # Timestamp A/R Flags if Hostname Address TTL 4034 # 9:38:09.274 Add 23 48 my-host.local. 2001:0000:0000:0000:0000:0000:0000:0002%<0> 120 4035 # 4036 for line in self.bash(f'cat {host_name_file}', encoding='raw_unicode_escape'): 4037 elements = line.split() 4038 fullname = f'{host_name}.local.' 4039 if fullname not in elements: 4040 continue 4041 if 'Add' not in elements: 4042 continue 4043 addresses.append(elements[elements.index(fullname) + 1].split('%')[0]) 4044 4045 logging.debug(f'addresses of {host_name}: {addresses}') 4046 4047 # example output of in the service file: 4048 # _ipps._tcp PTR my-service._ipps._tcp 4049 # my-service._ipps._tcp SRV 0 0 12345 my-host.local. ; Replace with unicast FQDN of target host 4050 # my-service._ipps._tcp TXT "" 4051 # 4052 is_txt = False 4053 txt = '' 4054 for line in self.bash(f'cat {service_data_file}', encoding='raw_unicode_escape'): 4055 elements = line.split() 4056 if len(elements) >= 2 and elements[0] == full_service_name and elements[1] == 'TXT': 4057 is_txt = True 4058 if is_txt: 4059 txt += line.strip() 4060 if line.strip().endswith('"'): 4061 is_txt = False 4062 txt_dict = self.__parse_dns_sd_txt(txt) 4063 logging.info(f'txt = {txt_dict}') 4064 service['txt'] = txt_dict 4065 4066 if not elements or elements[0] != full_service_name: 4067 continue 4068 if elements[1] == 'SRV': 4069 service['fullname'] = elements[0] 4070 service['instance'] = instance 4071 service['name'] = name 4072 service['priority'] = int(elements[2]) 4073 service['weight'] = int(elements[3]) 4074 service['port'] = int(elements[4]) 4075 service['host_fullname'] = elements[5] 4076 assert (service['host_fullname'] == f'{host_name}.local.') 4077 service['host'] = host_name 4078 service['addresses'] = addresses 4079 return service or None 4080 4081 def start_radvd_service(self, prefix, slaac): 4082 self.bash("""cat >/etc/radvd.conf <<EOF 4083interface eth0 4084{ 4085 AdvSendAdvert on; 4086 4087 AdvReachableTime 200; 4088 AdvRetransTimer 200; 4089 AdvDefaultLifetime 1800; 4090 MinRtrAdvInterval 1200; 4091 MaxRtrAdvInterval 1800; 4092 AdvDefaultPreference low; 4093 4094 prefix %s 4095 { 4096 AdvOnLink on; 4097 AdvAutonomous %s; 4098 AdvRouterAddr off; 4099 AdvPreferredLifetime 1800; 4100 AdvValidLifetime 1800; 4101 }; 4102}; 4103EOF 4104""" % (prefix, 'on' if slaac else 'off')) 4105 self.bash('service radvd start') 4106 self.bash('service radvd status') # Make sure radvd service is running 4107 4108 def stop_radvd_service(self): 4109 self.bash('service radvd stop') 4110 4111 def kill_radvd_service(self): 4112 self.bash('pkill radvd') 4113 4114 def __parse_dns_sd_txt(self, line: str): 4115 # Example TXT entry: 4116 # "xp=\\000\\013\\184\\000\\000\\000\\000\\000" 4117 txt = {} 4118 for entry in re.findall(r'"((?:[^\\]|\\.)*?)"', line): 4119 if '=' not in entry: 4120 continue 4121 4122 k, v = entry.split('=', 1) 4123 txt[k] = v 4124 4125 return txt 4126 4127 4128class OtbrNode(LinuxHost, NodeImpl, OtbrDocker): 4129 TUN_DEV = config.THREAD_IFNAME 4130 is_otbr = True 4131 is_bbr = True # OTBR is also BBR 4132 node_type = 'otbr-docker' 4133 4134 def __repr__(self): 4135 return f'Otbr<{self.nodeid}>' 4136 4137 def start(self): 4138 self._setup_sysctl() 4139 self.set_log_level(5) 4140 super().start() 4141 4142 def add_ipaddr(self, addr): 4143 cmd = f'ip -6 addr add {addr}/64 dev {self.TUN_DEV}' 4144 self.bash(cmd) 4145 4146 def add_ipmaddr_tun(self, ip: str): 4147 cmd = f'python3 /app/third_party/openthread/repo/tests/scripts/thread-cert/mcast6.py {self.TUN_DEV} {ip} &' 4148 self.bash(cmd) 4149 4150 def get_ip6_address(self, address_type: config.ADDRESS_TYPE): 4151 try: 4152 return super(OtbrNode, self).get_ip6_address(address_type) 4153 except Exception as e: 4154 return super(LinuxHost, self).get_ip6_address(address_type) 4155 4156 4157class HostNode(LinuxHost, OtbrDocker): 4158 is_host = True 4159 4160 def __init__(self, nodeid, name=None, **kwargs): 4161 self.nodeid = nodeid 4162 self.name = name or ('Host%d' % nodeid) 4163 super().__init__(nodeid, **kwargs) 4164 self.bash('service otbr-agent stop') 4165 4166 def start(self, start_radvd=True, prefix=config.DOMAIN_PREFIX, slaac=False): 4167 self._setup_sysctl() 4168 if start_radvd: 4169 self.start_radvd_service(prefix, slaac) 4170 else: 4171 self.stop_radvd_service() 4172 4173 def stop(self): 4174 self.stop_radvd_service() 4175 4176 def get_addrs(self) -> List[str]: 4177 return self.get_ether_addrs() 4178 4179 def __repr__(self): 4180 return f'Host<{self.nodeid}>' 4181 4182 def get_matched_ula_addresses(self, prefix): 4183 """Get the IPv6 addresses that matches given prefix. 4184 """ 4185 4186 addrs = [] 4187 for addr in self.get_ip6_address(config.ADDRESS_TYPE.ONLINK_ULA): 4188 if IPv6Address(addr) in IPv6Network(prefix): 4189 addrs.append(addr) 4190 4191 return addrs 4192 4193 4194if __name__ == '__main__': 4195 unittest.main() 4196