xref: /aosp_15_r20/external/autotest/client/cros/cros_disks.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6
7from __future__ import print_function
8from __future__ import division
9from __future__ import absolute_import
10
11import dbus, logging, os, stat
12from dbus.mainloop.glib import DBusGMainLoop
13# AU tests use ToT client code, but ToT -3 client version.
14try:
15    from gi.repository import GObject
16except ImportError:
17    import gobject as GObject
18import six
19from six.moves import zip
20
21import common
22
23from autotest_lib.client.bin import utils
24from autotest_lib.client.common_lib import autotemp, error
25from autotest_lib.client.cros import dbus_util
26from autotest_lib.client.cros.mainloop import ExceptionForward
27from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
28
29
30"""This module contains several helper classes for writing tests to verify the
31CrosDisks DBus interface. In particular, the CrosDisksTester class can be used
32to derive functional tests that interact with the CrosDisks server over DBus.
33"""
34
35
36class ExceptionSuppressor(object):
37    """A context manager class for suppressing certain types of exception.
38
39    An instance of this class is expected to be used with the with statement
40    and takes a set of exception classes at instantiation, which are types of
41    exception to be suppressed (and logged) in the code block under the with
42    statement.
43
44    Example:
45
46        with ExceptionSuppressor(OSError, IOError):
47            # An exception, which is a sub-class of OSError or IOError, is
48            # suppressed in the block code under the with statement.
49    """
50    def __init__(self, *args):
51        self.__suppressed_exc_types = (args)
52
53    def __enter__(self):
54        return self
55
56    def __exit__(self, exc_type, exc_value, traceback):
57        if exc_type and issubclass(exc_type, self.__suppressed_exc_types):
58            try:
59                logging.exception('Suppressed exception: %s(%s)',
60                                  exc_type, exc_value)
61            except Exception:
62                pass
63            return True
64        return False
65
66
67class DBusClient(object):
68    """ A base class of a DBus proxy client to test a DBus server.
69
70    This class is expected to be used along with a GObject main loop and provides
71    some convenient functions for testing the DBus API exposed by a DBus server.
72    """
73
74    def __init__(self, main_loop, bus, bus_name, object_path, timeout=None):
75        """Initializes the instance.
76
77        Args:
78            main_loop: The GObject main loop.
79            bus: The bus where the DBus server is connected to.
80            bus_name: The bus name owned by the DBus server.
81            object_path: The object path of the DBus server.
82            timeout: Maximum time in seconds to wait for the DBus connection.
83        """
84        self.__signal_content = {}
85        self.main_loop = main_loop
86        self.signal_timeout_in_seconds = 10
87        logging.debug('Getting D-Bus proxy object on bus "%s" and path "%s"',
88                      bus_name, object_path)
89        self.proxy_object = dbus_util.get_dbus_object(bus, bus_name,
90                                                      object_path, timeout)
91
92    def clear_signal_content(self, signal_name):
93        """Clears the content of the signal.
94
95        Args:
96            signal_name: The name of the signal.
97        """
98        if signal_name in self.__signal_content:
99            self.__signal_content[signal_name] = None
100
101    def get_signal_content(self, signal_name):
102        """Gets the content of a signal.
103
104        Args:
105            signal_name: The name of the signal.
106
107        Returns:
108            The content of a signal or None if the signal is not being handled.
109        """
110        return self.__signal_content.get(signal_name)
111
112    def handle_signal(self, interface, signal_name, argument_names=()):
113        """Registers a signal handler to handle a given signal.
114
115        Args:
116            interface: The DBus interface of the signal.
117            signal_name: The name of the signal.
118            argument_names: A list of argument names that the signal contains.
119        """
120        if signal_name in self.__signal_content:
121            return
122
123        self.__signal_content[signal_name] = None
124
125        def signal_handler(*args):
126            self.__signal_content[signal_name] = dict(zip(argument_names, args))
127
128        logging.debug('Handling D-Bus signal "%s(%s)" on interface "%s"',
129                      signal_name, ', '.join(argument_names), interface)
130        self.proxy_object.connect_to_signal(signal_name, signal_handler,
131                                            interface)
132
133    def wait_for_signal(self, signal_name):
134        """Waits for the reception of a signal.
135
136        Args:
137            signal_name: The name of the signal to wait for.
138
139        Returns:
140            The content of the signal.
141        """
142        if signal_name not in self.__signal_content:
143            return None
144
145        def check_signal_content():
146            context = self.main_loop.get_context()
147            while context.iteration(False):
148                pass
149            return self.__signal_content[signal_name] is not None
150
151        logging.debug('Waiting for D-Bus signal "%s"', signal_name)
152        utils.poll_for_condition(condition=check_signal_content,
153                                 desc='%s signal' % signal_name,
154                                 timeout=self.signal_timeout_in_seconds)
155        content = self.__signal_content[signal_name]
156        logging.debug('Received D-Bus signal "%s(%s)"', signal_name, content)
157        self.__signal_content[signal_name] = None
158        return content
159
160    def expect_signal(self, signal_name, expected_content):
161        """Waits the the reception of a signal and verifies its content.
162
163        Args:
164            signal_name: The name of the signal to wait for.
165            expected_content: The expected content of the signal, which can be
166                              partially specified. Only specified fields are
167                              compared between the actual and expected content.
168
169        Returns:
170            The actual content of the signal.
171
172        Raises:
173            error.TestFail: A test failure when there is a mismatch between the
174                            actual and expected content of the signal.
175        """
176        actual_content = self.wait_for_signal(signal_name)
177        logging.debug("%s signal: expected=%s actual=%s",
178                      signal_name, expected_content, actual_content)
179        for argument, expected_value in six.iteritems(expected_content):
180            if argument not in actual_content:
181                raise error.TestFail(
182                    ('%s signal missing "%s": expected=%s, actual=%s') %
183                    (signal_name, argument, expected_content, actual_content))
184
185            if actual_content[argument] != expected_value:
186                raise error.TestFail(
187                    ('%s signal not matched on "%s": expected=%s, actual=%s') %
188                    (signal_name, argument, expected_content, actual_content))
189        return actual_content
190
191
192class CrosDisksClient(DBusClient):
193    """A DBus proxy client for testing the CrosDisks DBus server.
194    """
195
196    CROS_DISKS_BUS_NAME = 'org.chromium.CrosDisks'
197    CROS_DISKS_INTERFACE = 'org.chromium.CrosDisks'
198    CROS_DISKS_OBJECT_PATH = '/org/chromium/CrosDisks'
199    DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
200    FORMAT_COMPLETED_SIGNAL = 'FormatCompleted'
201    FORMAT_COMPLETED_SIGNAL_ARGUMENTS = (
202        'status', 'path'
203    )
204    MOUNT_COMPLETED_SIGNAL = 'MountCompleted'
205    MOUNT_COMPLETED_SIGNAL_ARGUMENTS = (
206        'status', 'source_path', 'source_type', 'mount_path'
207    )
208    RENAME_COMPLETED_SIGNAL = 'RenameCompleted'
209    RENAME_COMPLETED_SIGNAL_ARGUMENTS = (
210        'status', 'path'
211    )
212
213    def __init__(self, main_loop, bus, timeout_seconds=None):
214        """Initializes the instance.
215
216        Args:
217            main_loop: The GObject main loop.
218            bus: The bus where the DBus server is connected to.
219            timeout_seconds: Maximum time in seconds to wait for the DBus
220                             connection.
221        """
222        super(CrosDisksClient, self).__init__(main_loop, bus,
223                                              self.CROS_DISKS_BUS_NAME,
224                                              self.CROS_DISKS_OBJECT_PATH,
225                                              timeout_seconds)
226        self.interface = dbus.Interface(self.proxy_object,
227                                        self.CROS_DISKS_INTERFACE)
228        self.properties = dbus.Interface(self.proxy_object,
229                                         self.DBUS_PROPERTIES_INTERFACE)
230        self.handle_signal(self.CROS_DISKS_INTERFACE,
231                           self.FORMAT_COMPLETED_SIGNAL,
232                           self.FORMAT_COMPLETED_SIGNAL_ARGUMENTS)
233        self.handle_signal(self.CROS_DISKS_INTERFACE,
234                           self.MOUNT_COMPLETED_SIGNAL,
235                           self.MOUNT_COMPLETED_SIGNAL_ARGUMENTS)
236        self.handle_signal(self.CROS_DISKS_INTERFACE,
237                           self.RENAME_COMPLETED_SIGNAL,
238                           self.RENAME_COMPLETED_SIGNAL_ARGUMENTS)
239
240    def enumerate_devices(self):
241        """Invokes the CrosDisks EnumerateMountableDevices method.
242
243        Returns:
244            A list of sysfs paths of devices that are recognized by
245            CrosDisks.
246        """
247        return self.interface.EnumerateDevices()
248
249    def get_device_properties(self, path):
250        """Invokes the CrosDisks GetDeviceProperties method.
251
252        Args:
253            path: The device path.
254
255        Returns:
256            The properties of the device in a dictionary.
257        """
258        return self.interface.GetDeviceProperties(path)
259
260    def format(self, path, filesystem_type=None, options=None):
261        """Invokes the CrosDisks Format method.
262
263        Args:
264            path: The device path to format.
265            filesystem_type: The filesystem type used for formatting the device.
266            options: A list of options used for formatting the device.
267        """
268        if filesystem_type is None:
269            filesystem_type = ''
270        if options is None:
271            options = []
272        self.clear_signal_content(self.FORMAT_COMPLETED_SIGNAL)
273        self.interface.Format(path, filesystem_type,
274                              dbus.Array(options, signature='s'))
275
276    def wait_for_format_completion(self):
277        """Waits for the CrosDisks FormatCompleted signal.
278
279        Returns:
280            The content of the FormatCompleted signal.
281        """
282        return self.wait_for_signal(self.FORMAT_COMPLETED_SIGNAL)
283
284    def expect_format_completion(self, expected_content):
285        """Waits and verifies for the CrosDisks FormatCompleted signal.
286
287        Args:
288            expected_content: The expected content of the FormatCompleted
289                              signal, which can be partially specified.
290                              Only specified fields are compared between the
291                              actual and expected content.
292
293        Returns:
294            The actual content of the FormatCompleted signal.
295
296        Raises:
297            error.TestFail: A test failure when there is a mismatch between the
298                            actual and expected content of the FormatCompleted
299                            signal.
300        """
301        return self.expect_signal(self.FORMAT_COMPLETED_SIGNAL,
302                                  expected_content)
303
304    def rename(self, path, volume_name=None):
305        """Invokes the CrosDisks Rename method.
306
307        Args:
308            path: The device path to rename.
309            volume_name: The new name used for renaming.
310        """
311        if volume_name is None:
312            volume_name = ''
313        self.clear_signal_content(self.RENAME_COMPLETED_SIGNAL)
314        self.interface.Rename(path, volume_name)
315
316    def wait_for_rename_completion(self):
317        """Waits for the CrosDisks RenameCompleted signal.
318
319        Returns:
320            The content of the RenameCompleted signal.
321        """
322        return self.wait_for_signal(self.RENAME_COMPLETED_SIGNAL)
323
324    def expect_rename_completion(self, expected_content):
325        """Waits and verifies for the CrosDisks RenameCompleted signal.
326
327        Args:
328            expected_content: The expected content of the RenameCompleted
329                              signal, which can be partially specified.
330                              Only specified fields are compared between the
331                              actual and expected content.
332
333        Returns:
334            The actual content of the RenameCompleted signal.
335
336        Raises:
337            error.TestFail: A test failure when there is a mismatch between the
338                            actual and expected content of the RenameCompleted
339                            signal.
340        """
341        return self.expect_signal(self.RENAME_COMPLETED_SIGNAL,
342                                  expected_content)
343
344    def mount(self, path, filesystem_type=None, options=None):
345        """Invokes the CrosDisks Mount method.
346
347        Args:
348            path: The device path to mount.
349            filesystem_type: The filesystem type used for mounting the device.
350            options: A list of options used for mounting the device.
351        """
352        if filesystem_type is None:
353            filesystem_type = ''
354        if options is None:
355            options = []
356        self.clear_signal_content(self.MOUNT_COMPLETED_SIGNAL)
357        self.interface.Mount(path, filesystem_type,
358                             dbus.Array(options, signature='s'))
359
360    def unmount(self, path, options=None):
361        """Invokes the CrosDisks Unmount method.
362
363        Args:
364            path: The device or mount path to unmount.
365            options: A list of options used for unmounting the path.
366
367        Returns:
368            The mount error code.
369        """
370        if options is None:
371            options = []
372        return self.interface.Unmount(path, dbus.Array(options, signature='s'))
373
374    def wait_for_mount_completion(self):
375        """Waits for the CrosDisks MountCompleted signal.
376
377        Returns:
378            The content of the MountCompleted signal.
379        """
380        return self.wait_for_signal(self.MOUNT_COMPLETED_SIGNAL)
381
382    def expect_mount_completion(self, expected_content):
383        """Waits and verifies for the CrosDisks MountCompleted signal.
384
385        Args:
386            expected_content: The expected content of the MountCompleted
387                              signal, which can be partially specified.
388                              Only specified fields are compared between the
389                              actual and expected content.
390
391        Returns:
392            The actual content of the MountCompleted signal.
393
394        Raises:
395            error.TestFail: A test failure when there is a mismatch between the
396                            actual and expected content of the MountCompleted
397                            signal.
398        """
399        return self.expect_signal(self.MOUNT_COMPLETED_SIGNAL,
400                                  expected_content)
401
402    def add_loopback_to_allowlist(self, path):
403        """Adds a device by its path to the allowlist for testing.
404
405        Args:
406            path: path to the /dev/loopX device.
407        """
408        sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
409        self.interface.AddDeviceToAllowlist(sys_path)
410
411    def remove_loopback_from_allowlist(self, path):
412        """Removes a device by its sys path from the allowlist for testing.
413
414        Args:
415            path: path to the /dev/loopX device.
416        """
417        sys_path = '/sys/devices/virtual/block/' + os.path.basename(path)
418        self.interface.RemoveDeviceFromAllowlist(sys_path)
419
420
421class CrosDisksTester(GenericTesterMainLoop):
422    """A base tester class for testing the CrosDisks server.
423
424    A derived class should override the get_tests method to return a list of
425    test methods. The perform_one_test method invokes each test method in the
426    list to verify some functionalities of CrosDisks server.
427    """
428    def __init__(self, test):
429        bus_loop = DBusGMainLoop(set_as_default=True)
430        self.bus = dbus.SystemBus(mainloop=bus_loop)
431        self.main_loop = GObject.MainLoop()
432        super(CrosDisksTester, self).__init__(test, self.main_loop)
433        self.cros_disks = CrosDisksClient(self.main_loop, self.bus)
434
435    def get_tests(self):
436        """Returns a list of test methods to be invoked by perform_one_test.
437
438        A derived class should override this method.
439
440        Returns:
441            A list of test methods.
442        """
443        return []
444
445    @ExceptionForward
446    def perform_one_test(self):
447        """Exercises each test method in the list returned by get_tests.
448        """
449        tests = self.get_tests()
450        self.remaining_requirements = set([test.__name__ for test in tests])
451        for test in tests:
452            test()
453            self.requirement_completed(test.__name__)
454
455    def reconnect_client(self, timeout_seconds=None):
456        """"Reconnect the CrosDisks DBus client.
457
458        Args:
459            timeout_seconds: Maximum time in seconds to wait for the DBus
460                            connection.
461        """
462        self.cros_disks = CrosDisksClient(self.main_loop, self.bus,
463                                          timeout_seconds)
464
465
466class FilesystemTestObject(object):
467    """A base class to represent a filesystem test object.
468
469    A filesystem test object can be a file, directory or symbolic link.
470    A derived class should override the _create and _verify method to implement
471    how the test object should be created and verified, respectively, on a
472    filesystem.
473    """
474    def __init__(self, path, content, mode):
475        """Initializes the instance.
476
477        Args:
478            path: The relative path of the test object.
479            content: The content of the test object.
480            mode: The file permissions given to the test object.
481        """
482        self._path = path
483        self._content = content
484        self._mode = mode
485
486    def create(self, base_dir):
487        """Creates the test object in a base directory.
488
489        Args:
490            base_dir: The base directory where the test object is created.
491
492        Returns:
493            True if the test object is created successfully or False otherwise.
494        """
495        if not self._create(base_dir):
496            logging.debug('Failed to create filesystem test object at "%s"',
497                          os.path.join(base_dir, self._path))
498            return False
499        return True
500
501    def verify(self, base_dir):
502        """Verifies the test object in a base directory.
503
504        Args:
505            base_dir: The base directory where the test object is expected to be
506                      found.
507
508        Returns:
509            True if the test object is found in the base directory and matches
510            the expected content, or False otherwise.
511        """
512        if not self._verify(base_dir):
513            logging.error('Mismatched filesystem object at "%s"',
514                          os.path.join(base_dir, self._path))
515            return False
516        return True
517
518    def _create(self, base_dir):
519        return False
520
521    def _verify(self, base_dir):
522        return False
523
524
525class FilesystemTestDirectory(FilesystemTestObject):
526    """A filesystem test object that represents a directory."""
527
528    def __init__(self, path, content, mode=stat.S_IRWXU|stat.S_IRGRP| \
529                 stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH, strict=False):
530        """Initializes the directory.
531
532        Args:
533            path: The name of this directory.
534            content: The list of items in this directory.
535            mode: The file permissions given to this directory.
536            strict: Whether verify() strictly compares directory contents for
537                    equality. This flag only applies to this directory, and not
538                    to any child directories.
539        """
540        super(FilesystemTestDirectory, self).__init__(path, content, mode)
541        self._strict = strict
542
543    def _create(self, base_dir):
544        path = os.path.join(base_dir, self._path) if self._path else base_dir
545
546        if self._path:
547            with ExceptionSuppressor(OSError):
548                os.makedirs(path)
549                os.chmod(path, self._mode)
550
551        if not os.path.isdir(path):
552            return False
553
554        for content in self._content:
555            if not content.create(path):
556                return False
557
558        return True
559
560    def _verify(self, base_dir):
561        path = os.path.join(base_dir, self._path) if self._path else base_dir
562        if not os.path.isdir(path):
563            return False
564
565        result = True
566        seen = set()
567
568        for content in self._content:
569            if not content.verify(path):
570                result = False
571            seen.add(content._path)
572
573        if self._strict:
574            for child in os.listdir(path):
575                if child not in seen:
576                    logging.error('Unexpected filesystem entry "%s"',
577                                  os.path.join(path, child))
578                    result = False
579
580        return result
581
582
583class FilesystemTestFile(FilesystemTestObject):
584    """A filesystem test object that represents a file."""
585
586    def __init__(self,
587                 path,
588                 content,
589                 mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP \
590                 | stat.S_IROTH,
591                 mtime=None):
592        """Initializes the file.
593
594        Args:
595            path: The name of this file.
596            content: A byte string with the expected file contents.
597            mode: The file permissions given to this file.
598            mtime: If set, the expected file modification timestamp.
599        """
600        super(FilesystemTestFile, self).__init__(path, content, mode)
601        self._mtime = mtime
602
603    def _create(self, base_dir):
604        path = os.path.join(base_dir, self._path)
605        with ExceptionSuppressor(IOError):
606            with open(path, 'wb+') as f:
607                f.write(self._content)
608            with ExceptionSuppressor(OSError):
609                os.chmod(path, self._mode)
610            return True
611        return False
612
613    def _verify(self, base_dir):
614        path = os.path.join(base_dir, self._path)
615        with ExceptionSuppressor(IOError):
616            result = True
617
618            if self._content is not None:
619                with open(path, 'rb') as f:
620                    if f.read() != self._content:
621                        logging.error('Mismatched file contents for "%s"',
622                                      path)
623                        result = False
624
625            if self._mtime is not None:
626                st = os.stat(path)
627                if st.st_mtime != self._mtime:
628                    logging.error(
629                            'Mismatched file modification time for "%s": ' +
630                            'want %d, got %d', path, self._mtime, st.st_mtime)
631                    result = False
632
633            return result
634
635        return False
636
637
638class DefaultFilesystemTestContent(FilesystemTestDirectory):
639    def __init__(self):
640        super(DefaultFilesystemTestContent, self).__init__('', [
641            FilesystemTestFile('file1', '0123456789'),
642            FilesystemTestDirectory('dir1', [
643                FilesystemTestFile('file1', ''),
644                FilesystemTestFile('file2', 'abcdefg'),
645                FilesystemTestDirectory('dir2', [
646                    FilesystemTestFile('file3', 'abcdefg'),
647                    FilesystemTestFile('file4', 'a' * 65536),
648                ]),
649            ]),
650        ], stat.S_IRWXU|stat.S_IRGRP|stat.S_IXGRP|stat.S_IROTH|stat.S_IXOTH)
651
652
653class VirtualFilesystemImage(object):
654    def __init__(self, block_size, block_count, filesystem_type,
655                 *args, **kwargs):
656        """Initializes the instance.
657
658        Args:
659            block_size: The number of bytes of each block in the image.
660            block_count: The number of blocks in the image.
661            filesystem_type: The filesystem type to be given to the mkfs
662                             program for formatting the image.
663
664        Keyword Args:
665            mount_filesystem_type: The filesystem type to be given to the
666                                   mount program for mounting the image.
667            mkfs_options: A list of options to be given to the mkfs program.
668        """
669        self._block_size = block_size
670        self._block_count = block_count
671        self._filesystem_type = filesystem_type
672        self._mount_filesystem_type = kwargs.get('mount_filesystem_type')
673        if self._mount_filesystem_type is None:
674            self._mount_filesystem_type = filesystem_type
675        self._mkfs_options = kwargs.get('mkfs_options')
676        if self._mkfs_options is None:
677            self._mkfs_options = []
678        self._image_file = None
679        self._loop_device = None
680        self._loop_device_stat = None
681        self._mount_dir = None
682
683    def __del__(self):
684        with ExceptionSuppressor(Exception):
685            self.clean()
686
687    def __enter__(self):
688        self.create()
689        return self
690
691    def __exit__(self, exc_type, exc_value, traceback):
692        self.clean()
693        return False
694
695    def _remove_temp_path(self, temp_path):
696        """Removes a temporary file or directory created using autotemp."""
697        if temp_path:
698            with ExceptionSuppressor(Exception):
699                path = temp_path.name
700                temp_path.clean()
701                logging.debug('Removed "%s"', path)
702
703    def _remove_image_file(self):
704        """Removes the image file if one has been created."""
705        self._remove_temp_path(self._image_file)
706        self._image_file = None
707
708    def _remove_mount_dir(self):
709        """Removes the mount directory if one has been created."""
710        self._remove_temp_path(self._mount_dir)
711        self._mount_dir = None
712
713    @property
714    def image_file(self):
715        """Gets the path of the image file.
716
717        Returns:
718            The path of the image file or None if no image file has been
719            created.
720        """
721        return self._image_file.name if self._image_file else None
722
723    @property
724    def loop_device(self):
725        """Gets the loop device where the image file is attached to.
726
727        Returns:
728            The path of the loop device where the image file is attached to or
729            None if no loop device is attaching the image file.
730        """
731        return self._loop_device
732
733    @property
734    def mount_dir(self):
735        """Gets the directory where the image file is mounted to.
736
737        Returns:
738            The directory where the image file is mounted to or None if no
739            mount directory has been created.
740        """
741        return self._mount_dir.name if self._mount_dir else None
742
743    def create(self):
744        """Creates a zero-filled image file with the specified size.
745
746        The created image file is temporary and removed when clean()
747        is called.
748        """
749        self.clean()
750        self._image_file = autotemp.tempfile(unique_id='fsImage')
751        try:
752            logging.debug('Creating zero-filled image file at "%s"',
753                          self._image_file.name)
754            utils.run('dd if=/dev/zero of=%s bs=%s count=%s' %
755                      (self._image_file.name, self._block_size,
756                       self._block_count))
757        except error.CmdError as exc:
758            self._remove_image_file()
759            message = 'Failed to create filesystem image: %s' % exc
760            raise RuntimeError(message)
761
762    def clean(self):
763        """Removes the image file if one has been created.
764
765        Before removal, the image file is detached from the loop device that
766        it is attached to.
767        """
768        self.detach_from_loop_device()
769        self._remove_image_file()
770
771    def attach_to_loop_device(self):
772        """Attaches the created image file to a loop device.
773
774        Creates the image file, if one has not been created, by calling
775        create().
776
777        Returns:
778            The path of the loop device where the image file is attached to.
779        """
780        if self._loop_device:
781            return self._loop_device
782
783        if not self._image_file:
784            self.create()
785
786        logging.debug('Attaching image file "%s" to loop device',
787                      self._image_file.name)
788        utils.run('losetup -f %s' % self._image_file.name)
789        output = utils.system_output('losetup -j %s' % self._image_file.name)
790        # output should look like: "/dev/loop0: [000d]:6329 (/tmp/test.img)"
791        self._loop_device = output.split(':')[0]
792        logging.debug('Attached image file "%s" to loop device "%s"',
793                      self._image_file.name, self._loop_device)
794
795        self._loop_device_stat = os.stat(self._loop_device)
796        logging.debug('Loop device "%s" (uid=%d, gid=%d, permissions=%04o)',
797                      self._loop_device,
798                      self._loop_device_stat.st_uid,
799                      self._loop_device_stat.st_gid,
800                      stat.S_IMODE(self._loop_device_stat.st_mode))
801        return self._loop_device
802
803    def detach_from_loop_device(self):
804        """Detaches the image file from the loop device."""
805        if not self._loop_device:
806            return
807
808        self.unmount()
809
810        logging.debug('Cleaning up remaining mount points of loop device "%s"',
811                      self._loop_device)
812        utils.run('umount -f %s' % self._loop_device, ignore_status=True)
813
814        logging.debug('Restore ownership/permissions of loop device "%s"',
815                      self._loop_device)
816        os.chmod(self._loop_device,
817                 stat.S_IMODE(self._loop_device_stat.st_mode))
818        os.chown(self._loop_device,
819                 self._loop_device_stat.st_uid, self._loop_device_stat.st_gid)
820
821        logging.debug('Detaching image file "%s" from loop device "%s"',
822                      self._image_file.name, self._loop_device)
823        utils.run('losetup -d %s' % self._loop_device)
824        self._loop_device = None
825
826    def format(self):
827        """Formats the image file as the specified filesystem."""
828        self.attach_to_loop_device()
829        try:
830            logging.debug('Formatting image file at "%s" as "%s" filesystem',
831                          self._image_file.name, self._filesystem_type)
832            utils.run('yes | mkfs -t %s %s %s' %
833                      (self._filesystem_type, ' '.join(self._mkfs_options),
834                       self._loop_device))
835            logging.debug('blkid: %s', utils.system_output(
836                'blkid -c /dev/null %s' % self._loop_device,
837                ignore_status=True))
838        except error.CmdError as exc:
839            message = 'Failed to format filesystem image: %s' % exc
840            raise RuntimeError(message)
841
842    def mount(self, options=None):
843        """Mounts the image file to a directory.
844
845        Args:
846            options: An optional list of mount options.
847        """
848        if self._mount_dir:
849            return self._mount_dir.name
850
851        if options is None:
852            options = []
853
854        options_arg = ','.join(options)
855        if options_arg:
856            options_arg = '-o ' + options_arg
857
858        self.attach_to_loop_device()
859        self._mount_dir = autotemp.tempdir(unique_id='fsImage')
860        try:
861            logging.debug('Mounting image file "%s" (%s) to directory "%s"',
862                          self._image_file.name, self._loop_device,
863                          self._mount_dir.name)
864            utils.run('mount -t %s %s %s %s' %
865                      (self._mount_filesystem_type, options_arg,
866                       self._loop_device, self._mount_dir.name))
867        except error.CmdError as exc:
868            self._remove_mount_dir()
869            message = ('Failed to mount virtual filesystem image "%s": %s' %
870                       (self._image_file.name, exc))
871            raise RuntimeError(message)
872        return self._mount_dir.name
873
874    def unmount(self):
875        """Unmounts the image file from the mounted directory."""
876        if not self._mount_dir:
877            return
878
879        try:
880            logging.debug('Unmounting image file "%s" (%s) from directory "%s"',
881                          self._image_file.name, self._loop_device,
882                          self._mount_dir.name)
883            utils.run('umount %s' % self._mount_dir.name)
884        except error.CmdError as exc:
885            message = ('Failed to unmount virtual filesystem image "%s": %s' %
886                       (self._image_file.name, exc))
887            raise RuntimeError(message)
888        finally:
889            self._remove_mount_dir()
890
891    def get_volume_label(self):
892        """Gets volume name information of |self._loop_device|
893
894        @return a string with volume name if it exists.
895        """
896        # This script is run as root in a normal autotest run,
897        # so this works: It doesn't have access to the necessary info
898        # when run as a non-privileged user
899        cmd = "blkid -c /dev/null -o udev %s" % self._loop_device
900        output = utils.system_output(cmd, ignore_status=True)
901
902        for line in output.splitlines():
903            udev_key, udev_val = line.split('=')
904
905            if udev_key == 'ID_FS_LABEL':
906                return udev_val
907
908        return None
909