xref: /aosp_15_r20/external/autotest/server/cros/network/frame_sender.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from autotest_lib.client.common_lib import error
7import os
8
9class FrameSender(object):
10    """Context manager for sending management frames."""
11
12    _sender_count = 0
13
14    def __init__(self, router, frame_type, channel, ssid_prefix=None,
15                 num_bss=None, frame_count=None, delay=None, dest_addr=None,
16                 probe_resp_footer=None, instance=0):
17        """
18        @param router: LinuxRouter object router to send frames from.
19        @param frame_type: int management frame type.
20        @param channel: int targeted channel.
21        @param ssid_prefix: string SSID prefix for BSSes in the frames.
22        @param num_bss: int number of BSSes configured for sending frames.
23        @param frame_count: int number of frames to send, frame_count of 0
24                implies infinite number of frames.
25        @param delay: int delay in between frames in milliseconds.
26        @param dest_addr: MAC address of the destination address (DA).
27        @param probe_resp_footer: footer bytes for probe responses.
28        @param instance: int hostapd instance on router to send frames from.
29        """
30        if router.board == "panther":
31            raise error.TestNAError('Panther router does not support manual '
32                                    'beacon frame generation')
33        self._router = router
34        self._channel = channel
35        self._frame_type = frame_type
36        self._ssid_prefix = ssid_prefix
37        self._num_bss = num_bss
38        self._frame_count = frame_count
39        self._delay = delay
40        self._dest_addr = dest_addr
41        self._probe_resp_footer = probe_resp_footer
42        self._ap_interface = router.hostapd_instances[instance].interface
43        self._injection_interface = None
44        self._pid = None
45
46        self._index = FrameSender._sender_count
47        FrameSender._sender_count += 1
48
49
50    def __enter__(self):
51        self._injection_interface = self._router.get_configured_interface(
52            'monitor', same_phy_as=self._ap_interface)
53        self._pid = self._router.send_management_frame(
54            self._injection_interface,
55            self._frame_type, self._channel, ssid_prefix=self._ssid_prefix,
56            num_bss=self._num_bss, frame_count=self._frame_count,
57            delay=self._delay, dest_addr=self._dest_addr,
58            probe_resp_footer=self._probe_resp_footer)
59        return self
60
61
62    def __exit__(self, exception, value, traceback):
63        if self._injection_interface:
64            self._router.release_interface(self._injection_interface)
65        if self._pid:
66            # Kill process and wait for termination.
67            self._router.host.run(
68                'kill {pid};'
69                ' for i in $(seq 1 10); do'
70                ' kill -0 {pid} || break; sleep 0.2;'
71                ' done'.format(pid=self._pid), ignore_status=True)
72            self._router.host.get_file(
73                os.path.join(
74                    self._router.logdir, self._router.MGMT_FRAME_SENDER_LOG_FILE),
75                'debug/frame_sender_%d.log' % self._index)
76