xref: /aosp_15_r20/external/autotest/site_utils/lxc/utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides some utilities used by LXC and its tools.
6"""
7
8import logging
9import os
10import re
11import shutil
12import tempfile
13import unittest
14from contextlib import contextmanager
15
16import common
17from autotest_lib.client.bin import utils
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib.cros.network import interface
20from autotest_lib.client.common_lib import global_config
21from autotest_lib.site_utils.lxc import constants
22from autotest_lib.site_utils.lxc import unittest_setup
23
24
25def path_exists(path):
26    """Check if path exists.
27
28    If the process is not running with root user, os.path.exists may fail to
29    check if a path owned by root user exists. This function uses command
30    `test -e` to check if path exists.
31
32    @param path: Path to check if it exists.
33
34    @return: True if path exists, otherwise False.
35    """
36    try:
37        utils.run('sudo test -e "%s"' % path)
38        return True
39    except error.CmdError:
40        return False
41
42
43def get_host_ip():
44    """Get the IP address of the host running containers on lxcbr*.
45
46    This function gets the IP address on network interface lxcbr*. The
47    assumption is that lxc uses the network interface started with "lxcbr".
48
49    @return: IP address of the host running containers.
50    """
51    # The kernel publishes symlinks to various network devices in /sys.
52    result = utils.run('ls /sys/class/net', ignore_status=True)
53    # filter out empty strings
54    interface_names = [x for x in result.stdout.split() if x]
55
56    lxc_network = None
57    for name in interface_names:
58        if name.startswith('lxcbr'):
59            lxc_network = name
60            break
61    if not lxc_network:
62        raise error.ContainerError('Failed to find network interface used by '
63                                   'lxc. All existing interfaces are: %s' %
64                                   interface_names)
65    netif = interface.Interface(lxc_network)
66    return netif.ipv4_address
67
68def is_vm():
69    """Check if the process is running in a virtual machine.
70
71    @return: True if the process is running in a virtual machine, otherwise
72             return False.
73    """
74    try:
75        virt = utils.run('sudo -n virt-what').stdout.strip()
76        logging.debug('virt-what output: %s', virt)
77        return bool(virt)
78    except error.CmdError:
79        logging.warning('Package virt-what is not installed, default to assume '
80                     'it is not a virtual machine.')
81        return False
82
83
84def destroy(path, name,
85            force=True, snapshots=False, ignore_status=False, timeout=-1):
86    """
87  Destroy an LXC container.
88
89  @param force: Destroy even if running. Default true.
90  @param snapshots: Destroy all snapshots based on the container. Default false.
91  @param ignore_status: Ignore return code of command. Default false.
92  @param timeout: Seconds to wait for completion. No timeout imposed if the
93    value is negative. Default -1 (no timeout).
94
95  @returns: CmdResult object from the shell command
96  """
97    cmd = 'sudo lxc-destroy -P %s -n %s' % (path, name)
98    if force:
99        cmd += ' -f'
100    if snapshots:
101        cmd += ' -s'
102    if timeout >= 0:
103        return utils.run(cmd, ignore_status=ignore_status, timeout=timeout)
104    else:
105        return utils.run(cmd, ignore_status=ignore_status)
106
107
108def clone(lxc_path, src_name, new_path, dst_name, snapshot):
109    """Clones a container.
110
111    @param lxc_path: The LXC path of the source container.
112    @param src_name: The name of the source container.
113    @param new_path: The LXC path of the destination container.
114    @param dst_name: The name of the destination container.
115    @param snapshot: Whether or not to create a snapshot clone.
116    """
117    snapshot_arg = '-s' if snapshot and constants.SUPPORT_SNAPSHOT_CLONE else ''
118    # overlayfs is the default clone backend storage. However it is not
119    # supported in Ganeti yet. Use aufs as the alternative.
120    aufs_arg = '-B aufs' if is_vm() and snapshot else ''
121    cmd = (('sudo lxc-copy --lxcpath {lxcpath} --newpath {newpath} '
122                    '--name {name} --newname {newname} {snapshot} {backing}')
123           .format(
124               lxcpath = lxc_path,
125               newpath = new_path,
126               name = src_name,
127               newname = dst_name,
128               snapshot = snapshot_arg,
129               backing = aufs_arg
130           ))
131    utils.run(cmd)
132
133
134@contextmanager
135def TempDir(*args, **kwargs):
136    """Context manager for creating a temporary directory."""
137    tmpdir = tempfile.mkdtemp(*args, **kwargs)
138    try:
139        yield tmpdir
140    finally:
141        shutil.rmtree(tmpdir)
142
143
144class BindMount(object):
145    """Manages setup and cleanup of bind-mounts."""
146    def __init__(self, spec):
147        """Sets up a new bind mount.
148
149        Do not call this directly, use the create or from_existing class
150        methods.
151
152        @param spec: A two-element tuple (dir, mountpoint) where dir is the
153                     location of an existing directory, and mountpoint is the
154                     path under that directory to the desired mount point.
155        """
156        self.spec = spec
157
158
159    def __eq__(self, rhs):
160        if isinstance(rhs, self.__class__):
161            return self.spec == rhs.spec
162        return NotImplemented
163
164
165    def __ne__(self, rhs):
166        return not (self == rhs)
167
168
169    @classmethod
170    def create(cls, src, dst, rename=None, readonly=False):
171        """Creates a new bind mount.
172
173        @param src: The path of the source file/dir.
174        @param dst: The destination directory.  The new mount point will be
175                    ${dst}/${src} unless renamed.  If the mount point does not
176                    already exist, it will be created.
177        @param rename: An optional path to rename the mount.  If provided, the
178                       mount point will be ${dst}/${rename} instead of
179                       ${dst}/${src}.
180        @param readonly: If True, the mount will be read-only.  False by
181                         default.
182
183        @return An object representing the bind-mount, which can be used to
184                clean it up later.
185        """
186        spec = (dst, (rename if rename else src).lstrip(os.path.sep))
187        full_dst = os.path.join(*list(spec))
188
189        if not path_exists(full_dst):
190            utils.run('sudo mkdir -p %s' % full_dst)
191
192        utils.run('sudo mount --bind %s %s' % (src, full_dst))
193        if readonly:
194            utils.run('sudo mount -o remount,ro,bind %s' % full_dst)
195
196        return cls(spec)
197
198
199    @classmethod
200    def from_existing(cls, host_dir, mount_point):
201        """Creates a BindMount for an existing mount point.
202
203        @param host_dir: Path of the host dir hosting the bind-mount.
204        @param mount_point: Full path to the mount point (including the host
205                            dir).
206
207        @return An object representing the bind-mount, which can be used to
208                clean it up later.
209        """
210        spec = (host_dir, os.path.relpath(mount_point, host_dir))
211        return cls(spec)
212
213
214    def cleanup(self):
215        """Cleans up the bind-mount.
216
217        Unmounts the destination, and deletes it if possible. If it was mounted
218        alongside important files, it will not be deleted.
219        """
220        full_dst = os.path.join(*list(self.spec))
221        utils.run('sudo umount %s' % full_dst)
222        # Ignore errors because bind mount locations are sometimes nested
223        # alongside actual file content (e.g. SSPs install into
224        # /usr/local/autotest so rmdir -p will fail for any mounts located in
225        # /usr/local/autotest).
226        utils.run('sudo bash -c "cd %s; rmdir -p --ignore-fail-on-non-empty %s"'
227                  % self.spec)
228
229
230def is_subdir(parent, subdir):
231    """Determines whether the given subdir exists under the given parent dir.
232
233    @param parent: The parent directory.
234    @param subdir: The subdirectory.
235
236    @return True if the subdir exists under the parent dir, False otherwise.
237    """
238    # Append a trailing path separator because commonprefix basically just
239    # performs a prefix string comparison.
240    parent = os.path.join(parent, '')
241    return os.path.commonprefix([parent, subdir]) == parent
242
243
244def sudo_commands(commands):
245    """Takes a list of bash commands and executes them all with one invocation
246    of sudo. Saves ~400 ms per command.
247
248    @param commands: The bash commands, as strings.
249
250    @return The return code of the sudo call.
251    """
252
253    combine = global_config.global_config.get_config_value(
254        'LXC_POOL','combine_sudos', type=bool, default=False)
255
256    if combine:
257        with tempfile.NamedTemporaryFile() as temp:
258            temp.write(b"set -e\n")
259            temp.writelines([command+"\n" for command in commands])
260            logging.info("Commands to run: %s", str(commands))
261            return utils.run("sudo bash %s" % temp.name)
262    else:
263        for command in commands:
264            result = utils.run("sudo %s" % command)
265
266
267def get_lxc_version():
268    """Gets the current version of lxc if available."""
269    cmd = 'sudo lxc-info --version'
270    result = utils.run(cmd)
271    if result and result.exit_status == 0:
272        version = re.split("[.-]", result.stdout.strip())
273        if len(version) < 3:
274            logging.error("LXC version is not expected format %s.",
275                          result.stdout.strip())
276            return None
277        return_value = []
278        for a in version[:3]:
279            try:
280                return_value.append(int(a))
281            except ValueError:
282                logging.error(("LXC version contains non numerical version "
283                               "number %s (%s)."), a, result.stdout.strip())
284                return None
285        return return_value
286    else:
287        logging.error("Unable to determine LXC version.")
288        return None
289
290class LXCTests(unittest.TestCase):
291    """Thin wrapper to call correct setup for LXC tests."""
292
293    @classmethod
294    def setUpClass(cls):
295        unittest_setup.setup()
296