xref: /aosp_15_r20/external/openthread/tests/scripts/thread-cert/test_coap_block.py (revision cfb92d1480a9e65faed56933e9c12405f45898b4)
1#!/usr/bin/env python3
2#
3#  Copyright (c) 2020, The OpenThread Authors.
4#  All rights reserved.
5#
6#  Redistribution and use in source and binary forms, with or without
7#  modification, are permitted provided that the following conditions are met:
8#  1. Redistributions of source code must retain the above copyright
9#     notice, this list of conditions and the following disclaimer.
10#  2. Redistributions in binary form must reproduce the above copyright
11#     notice, this list of conditions and the following disclaimer in the
12#     documentation and/or other materials provided with the distribution.
13#  3. Neither the name of the copyright holder nor the
14#     names of its contributors may be used to endorse or promote products
15#     derived from this software without specific prior written permission.
16#
17#  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18#  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19#  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20#  ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21#  LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22#  CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23#  SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24#  INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25#  CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26#  ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27#  POSSIBILITY OF SUCH DAMAGE.
28#
29
30import unittest
31
32import pexpect
33import config
34import thread_cert
35
36LEADER = 1
37ROUTER = 2
38
39
40class TestCoapBlockTransfer(thread_cert.TestCase):
41    """
42    Test suite for CoAP Block-Wise Transfers (RFC7959).
43    """
44
45    SUPPORT_NCP = False
46
47    TOPOLOGY = {
48        LEADER: {
49            'mode': 'rdn',
50            'whitelist': [ROUTER]
51        },
52        ROUTER: {
53            'mode': 'rdn',
54            'whitelist': [LEADER]
55        },
56    }
57
58    def _do_transfer_test(self, method):
59        self.nodes[LEADER].start()
60        self.simulator.go(config.LEADER_STARTUP_DELAY)
61        self.assertEqual(self.nodes[LEADER].get_state(), 'leader')
62
63        self.nodes[ROUTER].start()
64        self.simulator.go(config.ROUTER_STARTUP_DELAY)
65        self.assertEqual(self.nodes[ROUTER].get_state(), 'router')
66
67        mleid = self.nodes[LEADER].get_ip6_address(config.ADDRESS_TYPE.ML_EID)
68
69        self.nodes[LEADER].coap_start()
70        self.nodes[LEADER].coap_set_resource_path_block('test', 10)
71
72        self.nodes[ROUTER].coap_start()
73
74        if method == 'GET':
75            response = self.nodes[ROUTER].coap_get_block(mleid, 'test', size=32)
76            response_payload = response['payload']
77            self.assertIsNotNone(response_payload)
78
79        if method == 'PUT':
80            self.nodes[ROUTER].coap_put_block(mleid, 'test', size=256, count=2)
81            request = self.nodes[ROUTER].coap_wait_request()
82            request_payload = request['payload']
83            self.assertIsNotNone(request_payload)
84
85        if method == 'POST':
86            self.nodes[ROUTER].coap_post_block(mleid, 'test', size=1024, count=2)
87            request = self.nodes[ROUTER].coap_wait_request()
88            request_payload = request['payload']
89            self.assertIsNotNone(request_payload)
90
91        self.simulator.go(10)
92
93        self.nodes[ROUTER].coap_stop()
94        self.nodes[LEADER].coap_stop()
95
96    def test_get(self):
97        """
98        Test block-wise transfer using GET method.
99        """
100        for trial in range(0, 3):
101            try:
102                self._do_transfer_test(method='GET')
103                break
104            except (AssertionError, pexpect.exceptions.TIMEOUT):
105                continue
106
107    def test_put(self):
108        """
109        Test block-wise transfer using PUT method.
110        """
111        for trial in range(0, 3):
112            try:
113                self._do_transfer_test(method='PUT')
114                break
115            except (AssertionError, pexpect.exceptions.TIMEOUT):
116                continue
117
118    def test_post(self):
119        """
120        Test block-wise transfer using POST method.
121        """
122        for trial in range(0, 3):
123            try:
124                self._do_transfer_test(method='POST')
125                break
126            except (AssertionError, pexpect.exceptions.TIMEOUT):
127                continue
128
129
130if __name__ == '__main__':
131    unittest.main()
132