xref: /aosp_15_r20/external/toolchain-utils/crosperf/download_images.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# -*- coding: utf-8 -*-
2# Copyright 2014-2015 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Download images from Cloud Storage."""
7
8
9import ast
10import os
11import shlex
12
13from cros_utils import command_executer
14from cros_utils import misc
15import test_flag
16
17
18GS_UTIL = "src/chromium/depot_tools/gsutil.py"
19
20
21class MissingImage(Exception):
22    """Raised when the requested image does not exist in gs://"""
23
24
25class MissingFile(Exception):
26    """Raised when the requested file does not exist in gs://"""
27
28
29class RunCommandExceptionHandler(object):
30    """Handle Exceptions from calls to RunCommand"""
31
32    def __init__(self, logger_to_use, log_level, cmd_exec, command):
33        self.logger = logger_to_use
34        self.log_level = log_level
35        self.ce = cmd_exec
36        self.cleanup_command = command
37
38    def HandleException(self, _, e):
39        # Exception handler, Run specified command
40        if self.log_level != "verbose" and self.cleanup_command is not None:
41            self.logger.LogOutput("CMD: %s" % self.cleanup_command)
42        if self.cleanup_command is not None:
43            _ = self.ce.RunCommand(self.cleanup_command)
44        # Raise exception again
45        raise e
46
47
48class ImageDownloader(object):
49    """Download images from Cloud Storage."""
50
51    def __init__(self, logger_to_use=None, log_level="verbose", cmd_exec=None):
52        self._logger = logger_to_use
53        self.log_level = log_level
54        self._ce = cmd_exec or command_executer.GetCommandExecuter(
55            self._logger, log_level=self.log_level
56        )
57
58    def GetBuildID(self, chromeos_root, xbuddy_label):
59        # Get the translation of the xbuddy_label into the real Google Storage
60        # image name.
61        command = (
62            "cd /mnt/host/source/src/third_party/toolchain-utils/crosperf; "
63            "./translate_xbuddy.py '%s'" % xbuddy_label
64        )
65        _, build_id_tuple_str, _ = self._ce.ChrootRunCommandWOutput(
66            chromeos_root, command
67        )
68        if not build_id_tuple_str:
69            raise MissingImage("Unable to find image for '%s'" % xbuddy_label)
70
71        build_id_tuple = ast.literal_eval(build_id_tuple_str)
72        build_id = build_id_tuple[0]
73
74        return build_id
75
76    def DownloadImage(self, chromeos_root, build_id, image_name):
77        if self.log_level == "average":
78            self._logger.LogOutput(
79                "Preparing to download %s image to local "
80                "directory." % build_id
81            )
82
83        # Make sure the directory for downloading the image exists.
84        download_path = misc.GetOutsideChrootPath(
85            chromeos_root, os.path.join("/tmp", build_id)
86        )
87        image_path = os.path.join(download_path, "chromiumos_test_image.bin")
88        if not os.path.exists(download_path):
89            os.makedirs(download_path)
90
91        # Check to see if the image has already been downloaded.  If not,
92        # download the image.
93        if not os.path.exists(image_path):
94            gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
95            command = "%s cp %s %s" % (gsutil_cmd, image_name, download_path)
96
97            if self.log_level != "verbose":
98                self._logger.LogOutput("CMD: %s" % command)
99            status = self._ce.RunCommand(command)
100            downloaded_image_name = os.path.join(
101                download_path, "chromiumos_test_image.tar.xz"
102            )
103            if status != 0 or not os.path.exists(downloaded_image_name):
104                raise MissingImage(
105                    "Cannot download image: %s." % downloaded_image_name
106                )
107
108        return image_path
109
110    def UncompressImage(self, chromeos_root, build_id):
111        # Check to see if the file has already been uncompresssed, etc.
112        download_path = misc.GetOutsideChrootPath(
113            chromeos_root,
114            os.path.join(
115                "/tmp",
116                build_id,
117            ),
118        )
119        if os.path.exists(
120            os.path.join(download_path, "chromiumos_test_image.bin")
121        ):
122            return
123
124        # Uncompress and untar the downloaded image.
125        command = (
126            "cd %s ; tar -Jxf chromiumos_test_image.tar.xz " % download_path
127        )
128        # Cleanup command for exception handler
129        clean_cmd = "cd %s ; rm -f chromiumos_test_image.bin " % download_path
130        exception_handler = RunCommandExceptionHandler(
131            self._logger, self.log_level, self._ce, clean_cmd
132        )
133        if self.log_level != "verbose":
134            self._logger.LogOutput("CMD: %s" % command)
135            print(
136                "(Uncompressing and un-tarring may take a couple of minutes..."
137                "please be patient.)"
138            )
139        retval = self._ce.RunCommand(
140            command, except_handler=exception_handler.HandleException
141        )
142        if retval != 0:
143            if self.log_level != "verbose":
144                self._logger.LogOutput("CMD: %s" % clean_cmd)
145                print("(Removing file chromiumos_test_image.bin.)")
146            # Remove partially uncompressed file
147            _ = self._ce.RunCommand(clean_cmd)
148            # Raise exception for failure to uncompress
149            raise MissingImage("Cannot uncompress image: %s." % build_id)
150
151        # Remove compressed image
152        command = "cd %s ; rm -f chromiumos_test_image.tar.xz; " % download_path
153        if self.log_level != "verbose":
154            self._logger.LogOutput("CMD: %s" % command)
155            print("(Removing file chromiumos_test_image.tar.xz.)")
156        # try removing file, its ok to have an error, print if encountered
157        retval = self._ce.RunCommand(command)
158        if retval != 0:
159            print(
160                "(Warning: Could not remove file chromiumos_test_image.tar.xz .)"
161            )
162
163    def DownloadSingleFile(self, chromeos_root, build_id, package_file_name):
164        # Verify if package files exist
165        status = 0
166        gs_package_name = "gs://chromeos-image-archive/%s/%s" % (
167            build_id,
168            package_file_name,
169        )
170        gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
171        if not test_flag.GetTestMode():
172            cmd = "%s ls %s" % (gsutil_cmd, gs_package_name)
173            status = self._ce.RunCommand(cmd)
174        if status != 0:
175            raise MissingFile(
176                "Cannot find package file: %s." % package_file_name
177            )
178
179        if self.log_level == "average":
180            self._logger.LogOutput(
181                "Preparing to download %s package to local "
182                "directory." % package_file_name
183            )
184
185        # Make sure the directory for downloading the package exists.
186        download_path = misc.GetOutsideChrootPath(
187            chromeos_root, os.path.join("/tmp", build_id)
188        )
189        package_path = os.path.join(download_path, package_file_name)
190        if not os.path.exists(download_path):
191            os.makedirs(download_path)
192
193        # Check to see if the package file has already been downloaded.  If not,
194        # download it.
195        if not os.path.exists(package_path):
196            command = "%s cp %s %s" % (
197                gsutil_cmd,
198                gs_package_name,
199                download_path,
200            )
201
202            if self.log_level != "verbose":
203                self._logger.LogOutput("CMD: %s" % command)
204            status = self._ce.RunCommand(command)
205            if status != 0 or not os.path.exists(package_path):
206                raise MissingFile(
207                    "Cannot download package: %s ." % package_path
208                )
209
210    def UncompressSingleFile(
211        self, chromeos_root, build_id, package_file_name, uncompress_cmd
212    ):
213        # Uncompress file
214        download_path = misc.GetOutsideChrootPath(
215            chromeos_root, os.path.join("/tmp", build_id)
216        )
217        command = "cd %s ; %s %s" % (
218            download_path,
219            uncompress_cmd,
220            package_file_name,
221        )
222
223        if self.log_level != "verbose":
224            self._logger.LogOutput("CMD: %s" % command)
225            print("(Uncompressing file %s .)" % package_file_name)
226        retval = self._ce.RunCommand(command)
227        if retval != 0:
228            raise MissingFile("Cannot uncompress file: %s." % package_file_name)
229        # Remove uncompressed downloaded file
230        command = "cd %s ; rm -f %s" % (download_path, package_file_name)
231        if self.log_level != "verbose":
232            self._logger.LogOutput("CMD: %s" % command)
233            print("(Removing processed file %s .)" % package_file_name)
234        # try removing file, its ok to have an error, print if encountered
235        retval = self._ce.RunCommand(command)
236        if retval != 0:
237            print("(Warning: Could not remove file %s .)" % package_file_name)
238
239    def VerifyFileExists(self, chromeos_root, build_id, package_file):
240        # Quickly verify if the files are there
241        status = 0
242        gs_package_name = "gs://chromeos-image-archive/%s/%s" % (
243            build_id,
244            package_file,
245        )
246        gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
247        if not test_flag.GetTestMode():
248            cmd = "%s ls %s" % (gsutil_cmd, gs_package_name)
249            if self.log_level != "verbose":
250                self._logger.LogOutput("CMD: %s" % cmd)
251            status = self._ce.RunCommand(cmd)
252            if status != 0:
253                print("(Warning: Could not find file %s )" % gs_package_name)
254                return 1
255        # Package exists on server
256        return 0
257
258    def DownloadAutotestFiles(self, chromeos_root, build_id):
259        # Download autest package files (3 files)
260        autotest_packages_name = "autotest_packages.tar"
261        autotest_server_package_name = "autotest_server_package.tar.bz2"
262        autotest_control_files_name = "control_files.tar"
263
264        download_path = misc.GetOutsideChrootPath(
265            chromeos_root, os.path.join("/tmp", build_id)
266        )
267        # Autotest directory relative path wrt chroot
268        autotest_rel_path = os.path.join("/tmp", build_id, "autotest_files")
269        # Absolute Path to download files
270        autotest_path = os.path.join(download_path, "autotest_files")
271
272        if not os.path.exists(autotest_path):
273            # Quickly verify if the files are present on server
274            # If not, just exit with warning
275            status = self.VerifyFileExists(
276                chromeos_root, build_id, autotest_packages_name
277            )
278            if status != 0:
279                default_autotest_dir = (
280                    "/mnt/host/source/src/third_party/autotest/files"
281                )
282                print(
283                    "(Warning: Could not find autotest packages .)\n"
284                    "(Warning: Defaulting autotest path to %s ."
285                    % default_autotest_dir
286                )
287                return default_autotest_dir
288
289            # Files exist on server, download and uncompress them
290            self.DownloadSingleFile(
291                chromeos_root, build_id, autotest_packages_name
292            )
293            self.DownloadSingleFile(
294                chromeos_root, build_id, autotest_server_package_name
295            )
296            self.DownloadSingleFile(
297                chromeos_root, build_id, autotest_control_files_name
298            )
299
300            self.UncompressSingleFile(
301                chromeos_root, build_id, autotest_packages_name, "tar -xf "
302            )
303            self.UncompressSingleFile(
304                chromeos_root,
305                build_id,
306                autotest_server_package_name,
307                "tar -jxf ",
308            )
309            self.UncompressSingleFile(
310                chromeos_root, build_id, autotest_control_files_name, "tar -xf "
311            )
312            # Rename created autotest directory to autotest_files
313            command = "cd %s ; mv autotest autotest_files" % download_path
314            if self.log_level != "verbose":
315                self._logger.LogOutput("CMD: %s" % command)
316                print("(Moving downloaded autotest files to autotest_files)")
317            retval = self._ce.RunCommand(command)
318            if retval != 0:
319                raise MissingFile("Could not create directory autotest_files")
320
321        return autotest_rel_path
322
323    def DownloadDebugFile(self, chromeos_root, build_id):
324        # Download autest package files (3 files)
325        debug_archive_name = "debug.tgz"
326
327        download_path = misc.GetOutsideChrootPath(
328            chromeos_root, os.path.join("/tmp", build_id)
329        )
330        # Debug directory relative path wrt chroot
331        debug_rel_path = os.path.join("/tmp", build_id, "debug_files")
332        # Debug path to download files
333        debug_path = misc.GetOutsideChrootPath(
334            chromeos_root, os.path.join("/tmp", build_id, "debug_files")
335        )
336
337        if not os.path.exists(debug_path):
338            # Quickly verify if the file is present on server
339            # If not, just exit with warning
340            status = self.VerifyFileExists(
341                chromeos_root, build_id, debug_archive_name
342            )
343            if status != 0:
344                self._logger.LogOutput(
345                    "WARNING: Could not find debug archive on gs"
346                )
347                return ""
348
349            # File exists on server, download and uncompress it
350            self.DownloadSingleFile(chromeos_root, build_id, debug_archive_name)
351
352            # Extract and move debug files into the proper location.
353            debug_dir = "debug_files/usr/lib/debug"
354            command = (
355                f"cd {shlex.quote(download_path)}; "
356                f"mkdir -p {shlex.quote(debug_dir)}"
357            )
358            if self.log_level != "verbose":
359                self._logger.LogOutput("CMD: %s" % command)
360                print("Moving downloaded debug files to %s" % debug_dir)
361            retval = self._ce.RunCommand(command)
362            if retval != 0:
363                raise MissingFile(
364                    "Could not create directory %s"
365                    % os.path.join(debug_dir, "debug")
366                )
367            self.UncompressSingleFile(
368                chromeos_root,
369                build_id,
370                debug_archive_name,
371                f"tar -C {shlex.quote(debug_dir)} -xf ",
372            )
373
374        return debug_rel_path
375
376    def Run(
377        self,
378        chromeos_root,
379        xbuddy_label,
380        autotest_path,
381        debug_path,
382        download_debug,
383    ):
384        build_id = self.GetBuildID(chromeos_root, xbuddy_label)
385        image_name = (
386            "gs://chromeos-image-archive/%s/chromiumos_test_image.tar.xz"
387            % build_id
388        )
389
390        # Verify that image exists for build_id, before attempting to
391        # download it.
392        status = 0
393        if not test_flag.GetTestMode():
394            gsutil_cmd = os.path.join(chromeos_root, GS_UTIL)
395            cmd = "%s ls %s" % (gsutil_cmd, image_name)
396            status = self._ce.RunCommand(cmd)
397        if status != 0:
398            raise MissingImage("Cannot find official image: %s." % image_name)
399
400        image_path = self.DownloadImage(chromeos_root, build_id, image_name)
401        self.UncompressImage(chromeos_root, build_id)
402
403        if self.log_level != "quiet":
404            self._logger.LogOutput("Using image from %s." % image_path)
405
406        if autotest_path == "":
407            autotest_path = self.DownloadAutotestFiles(chromeos_root, build_id)
408
409        if debug_path == "" and download_debug:
410            debug_path = self.DownloadDebugFile(chromeos_root, build_id)
411
412        return image_path, autotest_path, debug_path
413