xref: /aosp_15_r20/external/openthread/tests/toranj/ncp/test-021-address-cache-table.py (revision cfb92d1480a9e65faed56933e9c12405f45898b4)
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2018, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28
29import wpan
30from wpan import verify
31
32# -----------------------------------------------------------------------------------------------------------------------
33# Test description: Address Cache Table
34#
35# This test verifies the behavior of `AddressResolver` module and entries in
36# address cache table. It also tests the behavior of nodes when there are
37# topology changes in the network (e.g., a child switches parent). In
38# particular, the test covers the address cache update through snooping, i.e.,
39# the logic which inspects forwarded frames to update address cache table if
40# source RLOC16 on a received frame differs from an existing entry in the
41# address cache table.
42
43test_name = __file__[:-3] if __file__.endswith('.py') else __file__
44print('-' * 120)
45print('Starting \'{}\''.format(test_name))
46
47# -----------------------------------------------------------------------------------------------------------------------
48# Creating `wpan.Nodes` instances
49
50speedup = 4
51wpan.Node.set_time_speedup_factor(speedup)
52
53r1 = wpan.Node()
54r2 = wpan.Node()
55r3 = wpan.Node()
56c1 = wpan.Node()
57c2 = wpan.Node()
58c3 = wpan.Node()
59
60# -----------------------------------------------------------------------------------------------------------------------
61# Init all nodes
62
63wpan.Node.init_all_nodes()
64
65# -----------------------------------------------------------------------------------------------------------------------
66# Build network topology
67#
68#     r1 ---- r2 ---- r3
69#     |       |       |
70#     |       |       |
71#     c1      c2(s)   c3
72#
73# c1 and c3 are FED children, c2 is an SED which is first attached to r2 and
74# then forced to switch to r3.
75
76PREFIX = "fd00:1234::"
77POLL_INTERVAL = 400
78
79r1.form("addr-cache-tbl")
80
81r1.add_prefix(PREFIX, stable=True, on_mesh=True, slaac=True, preferred=True)
82
83r1.allowlist_node(c1)
84c1.allowlist_node(r1)
85c1.join_node(r1, wpan.JOIN_TYPE_END_DEVICE)
86
87r1.allowlist_node(r2)
88r2.allowlist_node(r1)
89r2.join_node(r1, wpan.JOIN_TYPE_ROUTER)
90
91c2.allowlist_node(r2)
92r2.allowlist_node(c2)
93c2.join_node(r2, wpan.JOIN_TYPE_SLEEPY_END_DEVICE)
94c2.set(wpan.WPAN_POLL_INTERVAL, str(POLL_INTERVAL))
95
96r2.allowlist_node(r3)
97r3.allowlist_node(r2)
98r3.join_node(r2, wpan.JOIN_TYPE_ROUTER)
99
100c3.allowlist_node(r3)
101r3.allowlist_node(c3)
102c3.join_node(r3, wpan.JOIN_TYPE_END_DEVICE)
103
104# -----------------------------------------------------------------------------------------------------------------------
105# Test implementation
106#
107
108ROUTER_TABLE_WAIT_TIME = 30 / speedup + 5
109
110INVALID_ROUTER_ID = 63
111
112verify(r1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_LEADER)
113verify(r2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
114verify(r3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_ROUTER)
115verify(c1.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_END_DEVICE)
116verify(c2.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_SLEEPY_END_DEVICE)
117verify(c3.get(wpan.WPAN_NODE_TYPE) == wpan.NODE_TYPE_END_DEVICE)
118
119r2_rloc = int(r2.get(wpan.WPAN_THREAD_RLOC16), 16)
120r3_rloc = int(r3.get(wpan.WPAN_THREAD_RLOC16), 16)
121c3_rloc = int(c3.get(wpan.WPAN_THREAD_RLOC16), 16)
122
123# Wait till we have a valid "next hop" route on r1 towards r3
124
125
126def check_r1_router_table():
127    router_table = wpan.parse_router_table_result(r1.get(wpan.WPAN_THREAD_ROUTER_TABLE))
128    verify(len(router_table) == 3)
129    for entry in router_table:
130        if entry.rloc16 == r3_rloc:
131            verify(entry.next_hop != INVALID_ROUTER_ID)
132
133
134wpan.verify_within(check_r1_router_table, ROUTER_TABLE_WAIT_TIME)
135
136r1_address = r1.find_ip6_address_with_prefix(PREFIX)
137c1_address = c1.find_ip6_address_with_prefix(PREFIX)
138c2_address = c2.find_ip6_address_with_prefix(PREFIX)
139c3_address = c3.find_ip6_address_with_prefix(PREFIX)
140
141# Send a single UDP message from r1 to c2
142
143sender = r1.prepare_tx(r1_address, c2_address, "Hi from r1 to c2")
144recver = c2.prepare_rx(sender)
145wpan.Node.perform_async_tx_rx()
146verify(sender.was_successful and recver.was_successful)
147
148# Send a single UDP message from r1 to c3
149
150sender = r1.prepare_tx(r1_address, c3_address, "Hi from r1 to c3")
151recver = c3.prepare_rx(sender)
152wpan.Node.perform_async_tx_rx()
153verify(sender.was_successful and recver.was_successful)
154
155# The address cache table on r1 should contain two entries for
156# c2 and c3 addresses.
157
158addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
159verify(len(addr_cache_table) == 2)
160
161for entry in addr_cache_table:
162    if entry.address == c2_address:
163        # Entry for c2 should point towards its parent r2.
164        verify(entry.rloc16 == r2_rloc)
165    elif entry.address == c3_address:
166        # Entry for c3 should point towards c3 itself.
167        verify(entry.rloc16 == c3_rloc)
168    else:
169        raise (wpan.VerifyError("Unknown entry in the address cache table"))
170
171# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
172
173# Force c2 to switch its parent from r2 to r3
174
175CHILD_SUPERVISION_CHECK_TIMEOUT = 2
176PARENT_SUPERVISION_INTERVAL = 1
177
178REATTACH_WAIT_TIME = CHILD_SUPERVISION_CHECK_TIMEOUT / speedup + 6
179
180c2.set(
181    wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
182    str(CHILD_SUPERVISION_CHECK_TIMEOUT),
183)
184r3.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
185
186r2.un_allowlist_node(c2)
187r3.allowlist_node(c2)
188c2.allowlist_node(r3)
189
190# Wait for c2 to detach from r2 and attach to r3.
191#
192# Upon re-attach, previous parent r2 is notified and should remove c2 from
193# its child table.
194
195
196def check_c2_is_removed_from_r2_child_table():
197    child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
198    verify(len(child_table) == 0)
199
200
201wpan.verify_within(check_c2_is_removed_from_r2_child_table, REATTACH_WAIT_TIME)
202
203# Verify that both c2, c3 are children of r3
204
205child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
206verify(len(child_table) == 2)
207
208# New network topology
209#
210#     r1 ---- r2 ---- r3
211#     |               /\
212#     |              /  \
213#     c1           c2(s) c3
214
215# From r1 send again to c2 (which is now a child of r3).
216#
217# Note that r1 still has r2 as the destination for c2's address in its address
218# cache table.  But since r2 is aware that c2 is no longer its child, when it
219# receives the IPv6 message with c2's address, r2 itself would do an address
220# query for the address and forward the IPv6 message.
221
222sender = r1.prepare_tx(r1_address, c2_address, "Hi again c2")
223recver = c2.prepare_rx(sender)
224wpan.Node.perform_async_tx_rx()
225verify(sender.was_successful and recver.was_successful)
226
227# The address cache table on r1 should have c2's address removed.
228
229addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
230verify(len(addr_cache_table) == 1)
231
232for entry in addr_cache_table:
233    if entry.address == c3_address:
234        # Entry for c3 should still point towards c3
235        verify(entry.rloc16 == c3_rloc)
236    else:
237        raise (wpan.VerifyError("Unknown entry in the address cache table"))
238
239# Send a UDP message from r1 to c2.
240
241sender = r1.prepare_tx(r1_address, c2_address, "Hi again c2")
242recver = c2.prepare_rx(sender)
243wpan.Node.perform_async_tx_rx()
244verify(sender.was_successful and recver.was_successful)
245
246# The address cache table on r1 should have both c1 and c2.
247
248addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
249verify(len(addr_cache_table) == 2)
250
251for entry in addr_cache_table:
252    if entry.address == c2_address:
253        # Entry for c2 should point towards r3
254        verify(entry.rloc16 == r3_rloc)
255    elif entry.address == c3_address:
256        # Entry for c3 should still point towards c3
257        verify(entry.rloc16 == c3_rloc)
258    else:
259        raise (wpan.VerifyError("Unknown entry in the address cache table"))
260
261# Force c2 to switch its parent from r3 to r2
262
263c2.set(
264    wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
265    str(CHILD_SUPERVISION_CHECK_TIMEOUT),
266)
267r2.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
268
269r3.un_allowlist_node(c2)
270r2.allowlist_node(c2)
271c2.allowlist_node(r2)
272
273# Wait for c2 to detach from r3 and attach to r2.
274#
275# Upon re-attach, previous parent r3 is notified and should remove c2 from
276# its child table.
277
278
279def check_c2_is_removed_from_r3_child_table():
280    child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
281    verify(len(child_table) == 1)
282
283
284wpan.verify_within(check_c2_is_removed_from_r3_child_table, REATTACH_WAIT_TIME)
285
286# Verify that both c2 is a child of r2
287
288child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
289verify(len(child_table) == 1)
290
291# New network topology
292#
293#     r1 ---- r2 ---- r3
294#     |       |       |
295#     |       |       |
296#     c1      c2(s)   c3
297
298# Send a UDP message from c2 to c1.
299# This message will be forwarded by r1 to its FED child c1.
300
301sender = c2.prepare_tx(c2_address, c1_address, "Hi c1 child of r1")
302recver = c1.prepare_rx(sender)
303wpan.Node.perform_async_tx_rx()
304verify(sender.was_successful and recver.was_successful)
305
306# r1 upon receiving and forwarding the message from c2 (through r2 now) should
307# update its address cache table for c2 (address cache update through snooping).
308#
309# verify that the address cache table is updated correctly.
310
311addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
312verify(len(addr_cache_table) == 2)
313
314for entry in addr_cache_table:
315    if entry.address == c2_address:
316        # Entry for c2's address should now point to r2
317        verify(entry.rloc16 == r2_rloc)
318    elif entry.address == c3_address:
319        # Entry for c3's address should still point to c3
320        verify(entry.rloc16 == c3_rloc)
321    else:
322        raise (wpan.VerifyError("Unknown entry in the address cache table"))
323
324# Force c2 to switch its parent from r2 to r3
325
326CHILD_SUPERVISION_CHECK_TIMEOUT = 2
327PARENT_SUPERVISION_INTERVAL = 1
328
329REATTACH_WAIT_TIME = CHILD_SUPERVISION_CHECK_TIMEOUT / speedup + 6
330
331c2.set(
332    wpan.WPAN_CHILD_SUPERVISION_CHECK_TIMEOUT,
333    str(CHILD_SUPERVISION_CHECK_TIMEOUT),
334)
335r3.set(wpan.WPAN_CHILD_SUPERVISION_INTERVAL, str(PARENT_SUPERVISION_INTERVAL))
336
337r2.un_allowlist_node(c2)
338r3.allowlist_node(c2)
339c2.allowlist_node(r3)
340
341# Wait for c2 to detach from r2 and attach to r3.
342#
343# Upon re-attach, previous parent r2 is notified and should remove c2 from
344# its child table.
345
346
347def check_c2_is_removed_from_r2_child_table():
348    child_table = wpan.parse_list(r2.get(wpan.WPAN_THREAD_CHILD_TABLE))
349    verify(len(child_table) == 0)
350
351
352wpan.verify_within(check_c2_is_removed_from_r2_child_table, REATTACH_WAIT_TIME)
353
354# Verify that both c2, c3 are children of r3
355
356child_table = wpan.parse_list(r3.get(wpan.WPAN_THREAD_CHILD_TABLE))
357verify(len(child_table) == 2)
358
359# New network topology
360#
361#     r1 ---- r2 ---- r3
362#     |               /\
363#     |              /  \
364#     c1           c2(s) c3
365
366# Send a UDP message from c2 to c1.
367# This message will be forwarded by r1 to its FED child c1.
368
369sender = c2.prepare_tx(c2_address, c1_address, "Hi c1 child of r1")
370recver = c1.prepare_rx(sender)
371wpan.Node.perform_async_tx_rx()
372verify(sender.was_successful and recver.was_successful)
373
374# r1 upon receiving and forwarding the message from c2 (through r2 now) should
375# update its address cache table for c2 (address cache update through snooping).
376#
377# verify that the address cache table is updated correctly.
378
379addr_cache_table = wpan.parse_address_cache_table_result(r1.get(wpan.WPAN_THREAD_ADDRESS_CACHE_TABLE))
380verify(len(addr_cache_table) == 2)
381
382for entry in addr_cache_table:
383    if entry.address == c2_address:
384        # Entry for c2's address should now point to r3
385        verify(entry.rloc16 == r3_rloc)
386    elif entry.address == c3_address:
387        # Entry for c3's address should still point to c3
388        verify(entry.rloc16 == c3_rloc)
389    else:
390        raise (wpan.VerifyError("Unknown entry in the address cache table"))
391
392# -----------------------------------------------------------------------------------------------------------------------
393# Test finished
394
395wpan.Node.finalize_all_nodes()
396
397print('\'{}\' passed.'.format(test_name))
398