xref: /aosp_15_r20/external/autotest/client/common_lib/cros/virtual_ethernet_pair.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""
6VirtualEthernetPair provides methods for setting up and tearing down a virtual
7ethernet interface for use in tests.  You will probably need to be root on test
8devices to use this class.  The constructor allows you to specify your IP's to
9assign to both ends of the pair, however, if you wish to leave the interface
10unconfigured, simply pass None.  You may also specify the subnet of your ip
11addresses.  Failing to do so leaves them with default in ifconfig.
12
13Example usage:
14vif = virtual_ethernet_pair.VirtualEthernetPair(interface_name="main",
15                                                peer_interface_name="peer",
16                                                interface_ip="10.9.8.1/24",
17                                                peer_interface_ip=None)
18vif.setup()
19if not vif.is_healthy:
20    # bad things happened while creating the interface
21    # ... abort gracefully
22
23interface_name = vif.interface_name
24peer_interface_name = vif.peer_interface_name
25#... do things with your interface
26
27# You must call this if you want to leave the system in a good state.
28vif.teardown()
29
30Alternatively:
31
32with virtual_ethernet_pair.VirtualEthernetPair(...) as vif:
33    if not vif.is_healthy:
34        # bad things happened while creating the interface
35        # ... abort gracefully
36
37    interface_name = vif.interface_name
38    peer_interface_name = vif.peer_interface_name
39    #... do things with your interface
40
41"""
42
43import logging
44import re
45
46from autotest_lib.client.common_lib import utils
47from autotest_lib.client.common_lib.cros.network import interface
48
49class VirtualEthernetPair(object):
50    """ Class for configuring virtual ethernet device pair. """
51
52    def __init__(self,
53                 interface_name='veth_main',
54                 peer_interface_name='veth_secondary',
55                 interface_ip='10.9.8.1/24',
56                 peer_interface_ip='10.9.8.2/24',
57                 interface_ipv6=None,
58                 peer_interface_ipv6=None,
59                 interface_ns=None,
60                 ignore_shutdown_errors=False,
61                 host=None):
62        """
63        Construct a object managing a virtual ethernet pair.  One end of the
64        interface will be called |interface_name|, and the peer end
65        |peer_interface_name|.  You may get the interface names later with
66        VirtualEthernetPair.get_[peer_]interface_name().  The ends of the
67        interface are manually configured with the given IPv4 address strings
68        (like "10.9.8.2/24").  You may skip the IP configuration by passing None
69        as the address for either interface.
70        """
71        super(VirtualEthernetPair, self).__init__()
72        self._is_healthy = True
73        self._interface_name = interface_name
74        self._peer_interface_name = peer_interface_name
75        self._interface_ip = interface_ip
76        self._peer_interface_ip = peer_interface_ip
77        self._interface_ipv6 = interface_ipv6
78        self._peer_interface_ipv6 = peer_interface_ipv6
79        self._interface_ns = interface_ns
80        self._ns_exec = ''
81        if interface_ns:
82            self._ns_exec = 'ip netns exec %s ' % self._interface_ns
83        self._ignore_shutdown_errors = ignore_shutdown_errors
84        self._run = utils.run
85        self._host = host
86        if host is not None:
87            self._run = host.run
88        (self._eth_name, self._eth_ip) = self._get_ipv4_config()
89
90    def _get_ipv4_config(self):
91        """@return Tuple with interface name and IP address used for
92        external communication."""
93        route = utils.system_output("ip route get 8.8.8.8")
94        # Only first line is interesting - match it for interface and
95        # IP address
96        m = re.search(r"dev (\S+) .*? src ((?:\d+\.){3}\d+)",
97                      route[:route.find('\n')])
98        return (m.group(1), m.group(2)) if m else (None, None)
99
100    def setup(self):
101        """
102        Installs a virtual ethernet interface and configures one side with an IP
103        address.  First does some confidence checking and tries to remove an
104        existing interface by the same name, and logs messages on failures.
105        """
106        self._is_healthy = False
107        if self._either_interface_exists():
108            logging.warning('At least one test interface already existed.'
109                            '  Attempting to remove.')
110            self._remove_test_interface()
111            if self._either_interface_exists():
112                logging.error('Failed to remove unexpected test '
113                              'interface.  Aborting.')
114                return
115
116        self._create_test_interface()
117        if not self._interface_exists(self._interface_name,
118                                      self._interface_ns):
119            logging.error('Failed to create main test interface.')
120            return
121
122        if not self._interface_exists(self._peer_interface_name):
123            logging.error('Failed to create peer test interface.')
124            return
125        # Unless you tell the firewall about the interface, you're not going to
126        # get any IP traffic through.  Since this is basically a loopback
127        # device, just allow all traffic.
128        for name in (self._interface_name, self._peer_interface_name):
129            command = 'iptables -w -I INPUT -i %s -j ACCEPT' % name
130            if name == self._interface_name and self._interface_ns:
131                status = self._run(self._ns_exec + command, ignore_status=True)
132            else:
133                status = self._run(command, ignore_status=True)
134            if status.exit_status != 0:
135                logging.error('iptables rule addition failed for interface %s: '
136                              '%s', name, status.stderr)
137        # In addition to INPUT configure also FORWARD'ing for the case
138        # of interface being moved to its own namespace so that there is
139        # contact with "the world" from within that namespace.
140        if self._interface_ns and self._eth_ip:
141            command = 'iptables -w -I FORWARD -i %s -j ACCEPT' \
142                      % self._peer_interface_name
143            status = self._run(command, ignore_status=True)
144            if status.exit_status != 0:
145                logging.warning(
146                        'failed to configure forwarding rule for %s: '
147                        '%s', self._peer_interface_name, status.stderr)
148            command = 'iptables -w -t nat -I POSTROUTING ' \
149                      '--src %s -o %s -j MASQUERADE' % \
150                      (self._interface_ip, self._eth_name)
151            status = self._run(command, ignore_status=True)
152            if status.exit_status != 0:
153                logging.warning('failed to configure nat rule for %s: '
154                                '%s', self._peer_interface_name, status.stderr)
155            # Add default route in namespace to the address used for
156            # outbound traffic
157            commands = [
158                    'ip r add %s dev %s', 'ip route add default via %s dev %s'
159            ]
160            for command in commands:
161                command = command % (self._eth_ip, self._interface_name)
162                status = self._run(self._ns_exec + command, ignore_status=True)
163                if status.exit_status != 0:
164                    logging.warning(
165                            'failed to configure GW route for %s: '
166                            '%s', self._interface_name, status.stderr)
167        self._is_healthy = True
168
169
170    def teardown(self):
171        """
172        Removes the interface installed by VirtualEthernetPair.setup(), with
173        some simple confidence checks that print warnings when either the
174        interface isn't there or fails to be removed.
175        """
176        for name in (self._interface_name, self._peer_interface_name):
177            command = 'iptables -w -D INPUT -i %s -j ACCEPT' % name
178            if name == self._interface_name and self._interface_ns:
179                self._run(self._ns_exec + command, ignore_status=True)
180            else:
181                self._run(command, ignore_status=True)
182        if self._interface_ns and self._eth_ip:
183            self._run('iptables -w -D FORWARD -i %s -j ACCEPT' %
184                      self._peer_interface_name,
185                      ignore_status=True)
186            command = 'iptables -w -t nat -I POSTROUTING ' \
187                      '--src %s -o %s -j MASQUERADE' % \
188                      (self._interface_ip, self._eth_name)
189            self._run(command, ignore_status=True)
190        if not self._either_interface_exists():
191            logging.warning('VirtualEthernetPair.teardown() called, '
192                            'but no interface was found.')
193            return
194
195        self._remove_test_interface()
196        if self._either_interface_exists():
197            logging.error('Failed to destroy test interface.')
198
199
200    @property
201    def is_healthy(self):
202        """@return True if virtual ethernet pair is configured."""
203        return self._is_healthy
204
205
206    @property
207    def interface_name(self):
208        """@return string name of the interface."""
209        return self._interface_name
210
211
212    @property
213    def peer_interface_name(self):
214        """@return string name of the peer interface."""
215        return self._peer_interface_name
216
217
218    @property
219    def interface_ip(self):
220        """@return string IPv4 address of the interface."""
221        return interface.Interface(self.interface_name,
222                                   netns=self._interface_ns).ipv4_address
223
224
225    @property
226    def peer_interface_ip(self):
227        """@return string IPv4 address of the peer interface."""
228        return interface.Interface(self.peer_interface_name).ipv4_address
229
230
231    @property
232    def interface_subnet_mask(self):
233        """@return string IPv4 subnet mask of the interface."""
234        return interface.Interface(self.interface_name,
235                                   netns=self._interface_ns).ipv4_subnet_mask
236
237
238    @property
239    def interface_prefix(self):
240        """@return int IPv4 prefix length."""
241        return interface.Interface(self.interface_name,
242                                   netns=self._interface_ns).ipv4_prefix
243
244
245    @property
246    def peer_interface_subnet_mask(self):
247        """@return string IPv4 subnet mask of the peer interface."""
248        return interface.Interface(self.peer_interface_name).ipv4_subnet_mask
249
250
251    @property
252    def interface_mac(self):
253        """@return string MAC address of the interface."""
254        return interface.Interface(self.interface_name,
255                                   netns=self._interface_ns).mac_address
256
257
258    @property
259    def peer_interface_mac(self):
260        """@return string MAC address of the peer interface."""
261        return interface.Interface(self._peer_interface_name).mac_address
262
263    @property
264    def interface_namespace(self):
265        """@return interface name space if configured, None otherwise."""
266        return self._interface_ns
267
268    def __enter__(self):
269        self.setup()
270        return self
271
272
273    def __exit__(self, exc_type, exc_value, traceback):
274        self.teardown()
275
276
277    def _interface_exists(self, interface_name, netns=None):
278        """
279        Returns True iff we found an interface with name |interface_name|.
280        """
281        return interface.Interface(interface_name,
282                                   host=self._host,
283                                   netns=netns).exists
284
285
286    def _either_interface_exists(self):
287        return (self._interface_exists(self._interface_name,
288                                       self._interface_ns)
289                or self._interface_exists(self._peer_interface_name))
290
291
292    def _remove_test_interface(self):
293        """
294        Remove the virtual ethernet device installed by
295        _create_test_interface().
296        """
297        self._run(self._ns_exec + 'ip link set %s down' % self._interface_name,
298                  ignore_status=self._ignore_shutdown_errors)
299        self._run('ip link set %s down' % self._peer_interface_name,
300                  ignore_status=self._ignore_shutdown_errors)
301        self._run(self._ns_exec +
302                  'ip link delete %s >/dev/null 2>&1' % self._interface_name,
303                  ignore_status=self._ignore_shutdown_errors)
304
305        # Under most normal circumstances a successful deletion of
306        # |_interface_name| should also remove |_peer_interface_name|,
307        # but if we elected to ignore failures above, that may not be
308        # the case.
309        self._run('ip link delete %s >/dev/null 2>&1' %
310                  self._peer_interface_name, ignore_status=True)
311
312        if self._interface_ns:
313            self._run('ip netns del %s' % self._interface_ns,
314                      ignore_status=True)
315
316    def _create_test_interface(self):
317        """
318        Set up a virtual ethernet device and configure the host side with a
319        fake IP address.
320        """
321        self._run('ip link add name %s '
322                  'type veth peer name %s >/dev/null 2>&1' %
323                  (self._interface_name, self._peer_interface_name))
324        if self._interface_ns:
325            self._run('ip netns add %s' % self._interface_ns,
326                      ignore_status=True)
327            self._run('ip link set dev %s netns %s' %
328                      (self._interface_name, self._interface_ns))
329        self._run(self._ns_exec + 'ip link set %s up' % self._interface_name)
330        self._run('ip link set %s up' % self._peer_interface_name)
331        if self._interface_ip is not None:
332            self._run(self._ns_exec + 'ip addr add %s dev %s' %
333                      (self._interface_ip, self._interface_name))
334        if self._peer_interface_ip is not None:
335            self._run('ip addr add %s dev %s' % (self._peer_interface_ip,
336                                                 self._peer_interface_name))
337        if self._interface_ipv6 is not None:
338            self._run(self._ns_exec + 'ip -6 addr add %s dev %s' %
339                      (self._interface_ipv6, self._interface_name))
340        if self._peer_interface_ipv6 is not None:
341            self._run('ip -6 addr add %s dev %s' % (self._peer_interface_ipv6,
342                                                    self._peer_interface_name))
343