xref: /aosp_15_r20/system/apex/tools/deapexer.py (revision 33f3758387333dbd2962d7edbd98681940d895da)
1#!/usr/bin/env python
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""deapexer is a tool that prints out content of an APEX.
17
18To print content of an APEX to stdout:
19  deapexer list foo.apex
20
21To extract content of an APEX to the given directory:
22  deapexer extract foo.apex dest
23"""
24from __future__ import print_function
25
26import argparse
27import apex_manifest
28import enum
29import os
30import re
31import shutil
32import sys
33import subprocess
34import tempfile
35import zipfile
36
37BLOCK_SIZE = 4096
38
39# See apexd/apex_file.cpp#RetrieveFsType
40FS_TYPES = [
41    ('f2fs', 1024, b'\x10\x20\xf5\xf2'),
42    ('ext4', 1024 + 0x38, b'\123\357'),
43    ('erofs', 1024, b'\xe2\xe1\xf5\xe0'),
44]
45
46
47def RetrieveFileSystemType(file):
48  """Returns filesystem type with magic"""
49  with open(file, 'rb') as f:
50    for fs_type, offset, magic in FS_TYPES:
51      buf = bytearray(len(magic))
52      f.seek(offset, os.SEEK_SET)
53      f.readinto(buf)
54      if buf == magic:
55        return fs_type
56  raise ValueError('Failed to retrieve filesystem type')
57
58class ApexImageEntry(object):
59  """Represents an entry in APEX payload"""
60  def __init__(self, name, *, base_dir, permissions, size, ino, extents,
61               is_directory, is_symlink, security_context):
62    self._name = name
63    self._base_dir = base_dir
64    self._permissions = permissions
65    self._size = size
66    self._is_directory = is_directory
67    self._is_symlink = is_symlink
68    self._ino = ino
69    self._extents = extents
70    self._security_context = security_context
71    self._entries = []
72
73  @property
74  def name(self):
75    return self._name
76
77  @property
78  def root(self):
79    return self._base_dir == './' and self._name == '.'
80
81  @property
82  def full_path(self):
83    if self.root:
84      return self._base_dir  # './'
85    path = os.path.join(self._base_dir, self._name)
86    if self.is_directory:
87      path += '/'
88    return path
89
90  @property
91  def is_directory(self):
92    return self._is_directory
93
94  @property
95  def is_symlink(self):
96    return self._is_symlink
97
98  @property
99  def is_regular_file(self):
100    return not self.is_directory and not self.is_symlink
101
102  @property
103  def permissions(self):
104    return self._permissions
105
106  @property
107  def size(self):
108    return self._size
109
110  @property
111  def ino(self):
112    return self._ino
113
114  @property
115  def entries(self):
116    return self._entries
117
118  @property
119  def extents(self):
120    return self._extents
121
122  @property
123  def security_context(self):
124    return self._security_context
125
126  def __str__(self):
127    ret = ''
128    if self._is_directory:
129      ret += 'd'
130    elif self._is_symlink:
131      ret += 'l'
132    else:
133      ret += '-'
134
135    def MaskAsString(m):
136      ret = 'r' if m & 4 == 4 else '-'
137      ret += 'w' if m & 2 == 2 else '-'
138      ret += 'x' if m & 1 == 1 else '-'
139      return ret
140
141    ret += MaskAsString(self._permissions >> 6)
142    ret += MaskAsString((self._permissions >> 3) & 7)
143    ret += MaskAsString(self._permissions & 7)
144
145    return ret + ' ' + self._size + ' ' + self._name
146
147
148class Apex(object):
149  """Represents an APEX file"""
150  def __init__(self, args):
151    self._debugfs = args.debugfs_path
152    self._fsckerofs = args.fsckerofs_path
153    self._apex = args.apex
154    self._tempdir = tempfile.mkdtemp()
155    with zipfile.ZipFile(self._apex, 'r') as zip_ref:
156      self._payload = zip_ref.extract('apex_payload.img', path=self._tempdir)
157    self._payload_fs_type = RetrieveFileSystemType(self._payload)
158
159  def __del__(self):
160    shutil.rmtree(self._tempdir)
161
162  def __enter__(self):
163    return self
164
165  def __exit__(self, ex_type, value, traceback):
166    pass
167
168  def list(self):
169    if self._payload_fs_type not in ['ext4']:
170      sys.exit(f'{self._payload_fs_type} is not supported for `list`.')
171
172    yield from self.entries()
173
174  def read_dir(self, path) -> ApexImageEntry:
175    assert path.endswith('/')
176    assert self.payload_fs_type == 'ext4'
177
178    res = subprocess.check_output([self._debugfs, '-R', f'ls -l -p {path}', self._payload],
179                                  text=True, stderr=subprocess.DEVNULL)
180    dir_entry = None
181    entries = []
182    for line in res.split('\n'):
183      if not line:
184        continue
185      parts = line.split('/')
186      if len(parts) != 8:
187        continue
188      name = parts[5]
189      if not name:
190        continue
191      if name == '..':
192        continue
193      if name == 'lost+found' and path == './':
194        continue
195      ino = parts[1]
196      bits = parts[2]
197      size = parts[6]
198      extents = []
199      is_symlink = bits[1]=='2'
200      is_directory=bits[1]=='4'
201
202      if not is_symlink and not is_directory:
203        stdout = subprocess.check_output([self._debugfs, '-R', f'dump_extents <{ino}>',
204                                          self._payload], text=True, stderr=subprocess.DEVNULL)
205        # Output of dump_extents for an inode fragmented in 3 blocks (length and addresses represent
206        # block-sized sections):
207        # Level Entries       Logical      Physical Length Flags
208        # 0/ 0   1/  3     0 -     0    18 -    18      1
209        # 0/ 0   2/  3     1 -    15    20 -    34     15
210        # 0/ 0   3/  3    16 -  1863    37 -  1884   1848
211        res = stdout.splitlines()
212        res.pop(0) # the first line contains only columns names
213        left_length = int(size)
214        try: # dump_extents sometimes has an unexpected output
215          for line in res:
216            tokens = line.split()
217            offset = int(tokens[7]) * BLOCK_SIZE
218            length = min(int(tokens[-1]) * BLOCK_SIZE, left_length)
219            left_length -= length
220            extents.append((offset, length))
221          if left_length != 0: # dump_extents sometimes fails to display "hole" blocks
222            raise ValueError
223        except: # pylint: disable=bare-except
224          extents = [] # [] means that we failed to retrieve the file location successfully
225
226      # get 'security.selinux' attribute
227      entry_path = os.path.join(path, name)
228      stdout = subprocess.check_output([
229        self._debugfs,
230        '-R',
231        f'ea_get -V {entry_path} security.selinux',
232        self._payload
233      ], text=True, stderr=subprocess.DEVNULL)
234      security_context = stdout.rstrip('\n\x00')
235
236      entry = ApexImageEntry(name,
237                             base_dir=path,
238                             permissions=int(bits[3:], 8),
239                             size=size,
240                             is_directory=is_directory,
241                             is_symlink=is_symlink,
242                             ino=ino,
243                             extents=extents,
244                             security_context=security_context)
245      if name == '.':
246        dir_entry = entry
247      elif is_directory:
248        sub_dir_entry = self.read_dir(path + name + '/')
249        # sub_dir_entry should be the same inode
250        assert entry.ino == sub_dir_entry.ino
251        entry.entries.extend(sub_dir_entry.entries)
252        entries.append(entry)
253      else:
254        entries.append(entry)
255
256    assert dir_entry
257    dir_entry.entries.extend(sorted(entries, key=lambda e: e.name))
258    return dir_entry
259
260  def extract(self, dest):
261    """Recursively dumps contents of the payload with retaining mode bits, but not owner/group"""
262    if self._payload_fs_type == 'erofs':
263      subprocess.run([self._fsckerofs, f'--extract={dest}', '--overwrite',
264                     '--no-preserve-owner', self._payload], stdout=subprocess.DEVNULL, check=True)
265    elif self._payload_fs_type == 'ext4':
266      # Extract entries one by one using `dump` because `rdump` doesn't support
267      # "no-perserve" mode
268      for entry in self.entries():
269        self.write_entry(entry, dest)
270    else:
271      # TODO(b/279688635) f2fs is not supported yet.
272      sys.exit(f'{self._payload_fs_type} is not supported for `extract`.')
273
274  @property
275  def payload_fs_type(self) -> str:
276    return self._payload_fs_type
277
278  def entries(self):
279    """Generator to visit all entries in the payload starting from root(./)"""
280
281    def TopDown(entry):
282      yield entry
283      for child in entry.entries:
284        yield from TopDown(child)
285
286    root = self.read_dir('./')
287    yield from TopDown(root)
288
289  def read_symlink(self, entry):
290    assert entry.is_symlink
291    assert self.payload_fs_type == 'ext4'
292
293    stdout = subprocess.check_output([self._debugfs, '-R', f'stat {entry.full_path}',
294                                      self._payload], text=True, stderr=subprocess.DEVNULL)
295    # Output of stat for a symlink should have the following line:
296    #   Fast link dest: \"%.*s\"
297    m = re.search(r'\bFast link dest: \"(.+)\"\n', stdout)
298    if not m:
299      sys.exit('failed to read symlink target')
300    return m.group(1)
301
302  def write_entry(self, entry, out_dir):
303    dest = os.path.normpath(os.path.join(out_dir, entry.full_path))
304    if entry.is_directory:
305      if not os.path.exists(dest):
306        os.makedirs(dest, mode=0o755)
307    elif entry.is_symlink:
308      os.symlink(self.read_symlink(entry), dest)
309    else:
310      subprocess.check_output([self._debugfs, '-R', f'dump {entry.full_path} {dest}',
311        self._payload], text=True, stderr=subprocess.DEVNULL)
312      # retain mode bits
313      os.chmod(dest, entry.permissions)
314
315
316def RunList(args):
317  if GetType(args.apex) == ApexType.COMPRESSED:
318    with tempfile.TemporaryDirectory() as temp:
319      decompressed_apex = os.path.join(temp, 'temp.apex')
320      Decompress(args.apex, decompressed_apex)
321      args.apex = decompressed_apex
322
323      RunList(args)
324      return
325
326  with Apex(args) as apex:
327    for e in apex.list():
328      # dot(., ..) directories
329      if not e.root and e.name in ('.', '..'):
330        continue
331      res = ''
332      if args.size:
333        res += e.size + ' '
334      res += e.full_path
335      if args.extents:
336        res += ' [' + '-'.join(str(x) for x in e.extents) + ']'
337      if args.contexts:
338        res += ' ' + e.security_context
339      print(res)
340
341
342def RunExtract(args):
343  if GetType(args.apex) == ApexType.COMPRESSED:
344    with tempfile.TemporaryDirectory() as temp:
345      decompressed_apex = os.path.join(temp, 'temp.apex')
346      Decompress(args.apex, decompressed_apex)
347      args.apex = decompressed_apex
348
349      RunExtract(args)
350      return
351
352  with Apex(args) as apex:
353    if not os.path.exists(args.dest):
354      os.makedirs(args.dest, mode=0o755)
355    apex.extract(args.dest)
356    if os.path.isdir(os.path.join(args.dest, 'lost+found')):
357      shutil.rmtree(os.path.join(args.dest, 'lost+found'))
358
359class ApexType(enum.Enum):
360  INVALID = 0
361  UNCOMPRESSED = 1
362  COMPRESSED = 2
363
364
365def GetType(apex_path):
366  with zipfile.ZipFile(apex_path, 'r') as zip_file:
367    names = zip_file.namelist()
368    has_payload = 'apex_payload.img' in names
369    has_original_apex = 'original_apex' in names
370    if has_payload and has_original_apex:
371      return ApexType.INVALID
372    if has_payload:
373      return ApexType.UNCOMPRESSED
374    if has_original_apex:
375      return ApexType.COMPRESSED
376    return ApexType.INVALID
377
378
379def RunInfo(args):
380  if args.print_type:
381    res = GetType(args.apex)
382    if res == ApexType.INVALID:
383      print(args.apex + ' is not a valid apex')
384      sys.exit(1)
385    print(res.name)
386  elif args.print_payload_type:
387    print(Apex(args).payload_fs_type)
388  else:
389    manifest = apex_manifest.fromApex(args.apex)
390    print(apex_manifest.toJsonString(manifest))
391
392
393def RunDecompress(args):
394  """RunDecompress takes path to compressed APEX and decompresses it to
395  produce the original uncompressed APEX at give output path
396
397  See apex_compression_tool.py#RunCompress for details on compressed APEX
398  structure.
399
400  Args:
401      args.input: file path to compressed APEX
402      args.output: file path to where decompressed APEX will be placed
403  """
404  if GetType(args.input) == ApexType.UNCOMPRESSED and args.copy_if_uncompressed:
405    shutil.copyfile(args.input, args.output)
406    return
407
408  compressed_apex_fp = args.input
409  decompressed_apex_fp = args.output
410  return Decompress(compressed_apex_fp, decompressed_apex_fp)
411
412
413def Decompress(compressed_apex_fp, decompressed_apex_fp):
414  if os.path.exists(decompressed_apex_fp):
415    print("Output path '" + decompressed_apex_fp + "' already exists")
416    sys.exit(1)
417
418  with zipfile.ZipFile(compressed_apex_fp, 'r') as zip_obj:
419    if 'original_apex' not in zip_obj.namelist():
420      print(compressed_apex_fp + ' is not a compressed APEX. Missing '
421                                 "'original_apex' file inside it.")
422      sys.exit(1)
423    # Rename original_apex file to what user provided as output filename
424    original_apex_info = zip_obj.getinfo('original_apex')
425    original_apex_info.filename = os.path.basename(decompressed_apex_fp)
426    # Extract the original_apex as desired name
427    zip_obj.extract(original_apex_info,
428                    path=os.path.dirname(decompressed_apex_fp))
429
430
431def main(argv):
432  parser = argparse.ArgumentParser()
433
434  debugfs_default = None
435  fsckerofs_default = None
436  if 'ANDROID_HOST_OUT' in os.environ:
437    debugfs_default = os.path.join(os.environ['ANDROID_HOST_OUT'], 'bin/debugfs_static')
438    fsckerofs_default = os.path.join(os.environ['ANDROID_HOST_OUT'], 'bin/fsck.erofs')
439  parser.add_argument(
440      '--debugfs_path', help='The path to debugfs binary', default=debugfs_default)
441  parser.add_argument(
442      '--fsckerofs_path', help='The path to fsck.erofs binary', default=fsckerofs_default)
443  # TODO(b/279858383) remove the argument
444  parser.add_argument('--blkid_path', help='NOT USED')
445
446  subparsers = parser.add_subparsers(required=True, dest='cmd')
447
448  parser_list = subparsers.add_parser(
449      'list', help='prints content of an APEX to stdout')
450  parser_list.add_argument('apex', type=str, help='APEX file')
451  parser_list.add_argument(
452      '--size', help='also show the size of the files', action='store_true')
453  parser_list.add_argument(
454      '--extents', help='also show the location of the files', action='store_true')
455  parser_list.add_argument('-Z', '--contexts',
456                           help='also show the security context of the files',
457                           action='store_true')
458  parser_list.set_defaults(func=RunList)
459
460  parser_extract = subparsers.add_parser('extract', help='extracts content of an APEX to the given '
461                                                         'directory')
462  parser_extract.add_argument('apex', type=str, help='APEX file')
463  parser_extract.add_argument('dest', type=str, help='Directory to extract content of APEX to')
464  parser_extract.set_defaults(func=RunExtract)
465
466  parser_info = subparsers.add_parser('info', help='prints APEX manifest')
467  parser_info.add_argument('apex', type=str, help='APEX file')
468  parser_info.add_argument('--print-type',
469                           help='Prints type of the apex (COMPRESSED or UNCOMPRESSED)',
470                           action='store_true')
471  parser_info.add_argument('--print-payload-type',
472                           help='Prints filesystem type of the apex payload',
473                           action='store_true')
474  parser_info.set_defaults(func=RunInfo)
475
476  # Handle sub-command "decompress"
477  parser_decompress = subparsers.add_parser('decompress',
478                                            help='decompresses a compressed '
479                                                 'APEX')
480  parser_decompress.add_argument('--input', type=str, required=True,
481                                 help='path to compressed APEX file that '
482                                      'will be decompressed')
483  parser_decompress.add_argument('--output', type=str, required=True,
484                                 help='path to the output APEX file')
485  parser_decompress.add_argument('--copy-if-uncompressed',
486                                 help='just copy the input if not compressed',
487                                 action='store_true')
488  parser_decompress.set_defaults(func=RunDecompress)
489
490  args = parser.parse_args(argv)
491
492  debugfs_required_for_cmd = ['list', 'extract']
493  if args.cmd in debugfs_required_for_cmd and not args.debugfs_path:
494    print('ANDROID_HOST_OUT environment variable is not defined, --debugfs_path must be set',
495          file=sys.stderr)
496    sys.exit(1)
497
498  if args.cmd == 'extract':
499    if not args.fsckerofs_path:
500      print('ANDROID_HOST_OUT environment variable is not defined, --fsckerofs_path must be set',
501            file=sys.stderr)
502      sys.exit(1)
503
504    if not os.path.isfile(args.fsckerofs_path):
505      print(f'Cannot find fsck.erofs specified at {args.fsckerofs_path}',
506            file=sys.stderr)
507      sys.exit(1)
508
509  args.func(args)
510
511
512if __name__ == '__main__':
513  main(sys.argv[1:])
514