1#!/usr/bin/env python3 2# 3# Copyright (c) 2018, The OpenThread Authors. 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without 7# modification, are permitted provided that the following conditions are met: 8# 1. Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# 2. Redistributions in binary form must reproduce the above copyright 11# notice, this list of conditions and the following disclaimer in the 12# documentation and/or other materials provided with the distribution. 13# 3. Neither the name of the copyright holder nor the 14# names of its contributors may be used to endorse or promote products 15# derived from this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 18# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 21# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 22# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 23# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 24# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 25# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 26# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 27# POSSIBILITY OF SUCH DAMAGE. 28 29import wpan 30from wpan import verify 31 32# ----------------------------------------------------------------------------------------------------------------------- 33# Test description: Verify sequential reset recovery of a router and leader 34# 35 36test_name = __file__[:-3] if __file__.endswith('.py') else __file__ 37print('-' * 120) 38print('Starting \'{}\''.format(test_name)) 39 40# ----------------------------------------------------------------------------------------------------------------------- 41# Utility functions 42 43 44def verify_neighbor_table(node, neighbors): 45 """ 46 This function verifies that the neighbor table of a given `node` contains the node in the `neighbors` list. 47 """ 48 neighbor_table = wpan.parse_neighbor_table_result(node.get(wpan.WPAN_THREAD_NEIGHBOR_TABLE)) 49 for neighbor in neighbors: 50 ext_addr = neighbor.get(wpan.WPAN_EXT_ADDRESS)[1:-1] 51 for entry in neighbor_table: 52 if entry.ext_address == ext_addr: 53 break 54 else: 55 raise wpan.VerifyError('Failed to find a neighbor entry for extended address {} in table'.format(ext_addr)) 56 57 58# ----------------------------------------------------------------------------------------------------------------------- 59# Creating `wpan.Nodes` instances 60 61speedup = 4 62wpan.Node.set_time_speedup_factor(speedup) 63 64r1 = wpan.Node() 65r2 = wpan.Node() 66c2 = wpan.Node() 67# c2 is used to force r2 becoming router 68 69# ----------------------------------------------------------------------------------------------------------------------- 70# Init all nodes 71 72wpan.Node.init_all_nodes() 73 74# ----------------------------------------------------------------------------------------------------------------------- 75# Build network topology 76# 77 78r1.form("RtrLderRst") 79 80r1.allowlist_node(r2) 81r2.allowlist_node(r1) 82r2.join_node(r1, wpan.JOIN_TYPE_ROUTER) 83 84c2.allowlist_node(r2) 85r2.allowlist_node(c2) 86c2.join_node(r2, wpan.JOIN_TYPE_END_DEVICE) 87c2.set(wpan.WPAN_POLL_INTERVAL, '8000') 88 89# ----------------------------------------------------------------------------------------------------------------------- 90# Test implementation 91 92WAIT_INTERVAL = 6 93 94# Check that r1 and r2 are present in each other's neighbor table 95 96 97def check_neighbor_tables(): 98 verify_neighbor_table(r1, [r2]) 99 verify_neighbor_table(r2, [r1]) 100 101 102wpan.verify_within(check_neighbor_tables, WAIT_INTERVAL) 103 104verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER) 105verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER) 106 107# Reset r2 and wait for it to be associated. 108r2.reset() 109 110 111def check_r2_neighbor_table(): 112 verify(r2.is_associated()) 113 verify_neighbor_table(r2, [r1]) 114 115 116wpan.verify_within(check_r2_neighbor_table, WAIT_INTERVAL) 117verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER) 118verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER) 119 120# Now reset r1 and check that everything recover correctly. 121r1.reset() 122 123 124def check_r1_neighbor_table(): 125 verify(r1.is_associated()) 126 verify_neighbor_table(r1, [r2]) 127 128 129wpan.verify_within(check_r1_neighbor_table, WAIT_INTERVAL) 130verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER) 131verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER) 132 133wpan.verify_within(check_r2_neighbor_table, WAIT_INTERVAL) 134 135# ----------------------------------------------------------------------------------------------------------------------- 136# Test finished 137 138wpan.Node.finalize_all_nodes() 139 140print('\'{}\' passed.'.format(test_name)) 141