xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/zipimport.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""zipimport provides support for importing Python modules from Zip archives.
2
3This module exports three objects:
4- zipimporter: a class; its constructor takes a path to a Zip archive.
5- ZipImportError: exception raised by zipimporter objects. It's a
6  subclass of ImportError, so it can be caught as ImportError, too.
7- _zip_directory_cache: a dict, mapping archive paths to zip directory
8  info dicts, as used in zipimporter._files.
9
10It is usually not needed to use the zipimport module explicitly; it is
11used by the builtin import mechanism for sys.path items that are paths
12to Zip archives.
13"""
14
15#from importlib import _bootstrap_external
16#from importlib import _bootstrap  # for _verbose_message
17import _frozen_importlib_external as _bootstrap_external
18from _frozen_importlib_external import _unpack_uint16, _unpack_uint32
19import _frozen_importlib as _bootstrap  # for _verbose_message
20import _imp  # for check_hash_based_pycs
21import _io  # for open
22import marshal  # for loads
23import sys  # for modules
24import time  # for mktime
25import _warnings  # For warn()
26
27__all__ = ['ZipImportError', 'zipimporter']
28
29
30path_sep = _bootstrap_external.path_sep
31alt_path_sep = _bootstrap_external.path_separators[1:]
32
33
34class ZipImportError(ImportError):
35    pass
36
37# _read_directory() cache
38_zip_directory_cache = {}
39
40_module_type = type(sys)
41
42END_CENTRAL_DIR_SIZE = 22
43STRING_END_ARCHIVE = b'PK\x05\x06'
44MAX_COMMENT_LEN = (1 << 16) - 1
45
46class zipimporter(_bootstrap_external._LoaderBasics):
47    """zipimporter(archivepath) -> zipimporter object
48
49    Create a new zipimporter instance. 'archivepath' must be a path to
50    a zipfile, or to a specific path inside a zipfile. For example, it can be
51    '/tmp/myimport.zip', or '/tmp/myimport.zip/mydirectory', if mydirectory is a
52    valid directory inside the archive.
53
54    'ZipImportError is raised if 'archivepath' doesn't point to a valid Zip
55    archive.
56
57    The 'archive' attribute of zipimporter objects contains the name of the
58    zipfile targeted.
59    """
60
61    # Split the "subdirectory" from the Zip archive path, lookup a matching
62    # entry in sys.path_importer_cache, fetch the file directory from there
63    # if found, or else read it from the archive.
64    def __init__(self, path):
65        if not isinstance(path, str):
66            raise TypeError(f"expected str, not {type(path)!r}")
67        if not path:
68            raise ZipImportError('archive path is empty', path=path)
69        if alt_path_sep:
70            path = path.replace(alt_path_sep, path_sep)
71
72        prefix = []
73        while True:
74            try:
75                st = _bootstrap_external._path_stat(path)
76            except (OSError, ValueError):
77                # On Windows a ValueError is raised for too long paths.
78                # Back up one path element.
79                dirname, basename = _bootstrap_external._path_split(path)
80                if dirname == path:
81                    raise ZipImportError('not a Zip file', path=path)
82                path = dirname
83                prefix.append(basename)
84            else:
85                # it exists
86                if (st.st_mode & 0o170000) != 0o100000:  # stat.S_ISREG
87                    # it's a not file
88                    raise ZipImportError('not a Zip file', path=path)
89                break
90
91        try:
92            files = _zip_directory_cache[path]
93        except KeyError:
94            files = _read_directory(path)
95            _zip_directory_cache[path] = files
96        self._files = files
97        self.archive = path
98        # a prefix directory following the ZIP file path.
99        self.prefix = _bootstrap_external._path_join(*prefix[::-1])
100        if self.prefix:
101            self.prefix += path_sep
102
103
104    # Check whether we can satisfy the import of the module named by
105    # 'fullname', or whether it could be a portion of a namespace
106    # package. Return self if we can load it, a string containing the
107    # full path if it's a possible namespace portion, None if we
108    # can't load it.
109    def find_loader(self, fullname, path=None):
110        """find_loader(fullname, path=None) -> self, str or None.
111
112        Search for a module specified by 'fullname'. 'fullname' must be the
113        fully qualified (dotted) module name. It returns the zipimporter
114        instance itself if the module was found, a string containing the
115        full path name if it's possibly a portion of a namespace package,
116        or None otherwise. The optional 'path' argument is ignored -- it's
117        there for compatibility with the importer protocol.
118
119        Deprecated since Python 3.10. Use find_spec() instead.
120        """
121        _warnings.warn("zipimporter.find_loader() is deprecated and slated for "
122                       "removal in Python 3.12; use find_spec() instead",
123                       DeprecationWarning)
124        mi = _get_module_info(self, fullname)
125        if mi is not None:
126            # This is a module or package.
127            return self, []
128
129        # Not a module or regular package. See if this is a directory, and
130        # therefore possibly a portion of a namespace package.
131
132        # We're only interested in the last path component of fullname
133        # earlier components are recorded in self.prefix.
134        modpath = _get_module_path(self, fullname)
135        if _is_dir(self, modpath):
136            # This is possibly a portion of a namespace
137            # package. Return the string representing its path,
138            # without a trailing separator.
139            return None, [f'{self.archive}{path_sep}{modpath}']
140
141        return None, []
142
143
144    # Check whether we can satisfy the import of the module named by
145    # 'fullname'. Return self if we can, None if we can't.
146    def find_module(self, fullname, path=None):
147        """find_module(fullname, path=None) -> self or None.
148
149        Search for a module specified by 'fullname'. 'fullname' must be the
150        fully qualified (dotted) module name. It returns the zipimporter
151        instance itself if the module was found, or None if it wasn't.
152        The optional 'path' argument is ignored -- it's there for compatibility
153        with the importer protocol.
154
155        Deprecated since Python 3.10. Use find_spec() instead.
156        """
157        _warnings.warn("zipimporter.find_module() is deprecated and slated for "
158                       "removal in Python 3.12; use find_spec() instead",
159                       DeprecationWarning)
160        return self.find_loader(fullname, path)[0]
161
162    def find_spec(self, fullname, target=None):
163        """Create a ModuleSpec for the specified module.
164
165        Returns None if the module cannot be found.
166        """
167        module_info = _get_module_info(self, fullname)
168        if module_info is not None:
169            return _bootstrap.spec_from_loader(fullname, self, is_package=module_info)
170        else:
171            # Not a module or regular package. See if this is a directory, and
172            # therefore possibly a portion of a namespace package.
173
174            # We're only interested in the last path component of fullname
175            # earlier components are recorded in self.prefix.
176            modpath = _get_module_path(self, fullname)
177            if _is_dir(self, modpath):
178                # This is possibly a portion of a namespace
179                # package. Return the string representing its path,
180                # without a trailing separator.
181                path = f'{self.archive}{path_sep}{modpath}'
182                spec = _bootstrap.ModuleSpec(name=fullname, loader=None,
183                                             is_package=True)
184                spec.submodule_search_locations.append(path)
185                return spec
186            else:
187                return None
188
189    def get_code(self, fullname):
190        """get_code(fullname) -> code object.
191
192        Return the code object for the specified module. Raise ZipImportError
193        if the module couldn't be imported.
194        """
195        code, ispackage, modpath = _get_module_code(self, fullname)
196        return code
197
198
199    def get_data(self, pathname):
200        """get_data(pathname) -> string with file data.
201
202        Return the data associated with 'pathname'. Raise OSError if
203        the file wasn't found.
204        """
205        if alt_path_sep:
206            pathname = pathname.replace(alt_path_sep, path_sep)
207
208        key = pathname
209        if pathname.startswith(self.archive + path_sep):
210            key = pathname[len(self.archive + path_sep):]
211
212        try:
213            toc_entry = self._files[key]
214        except KeyError:
215            raise OSError(0, '', key)
216        return _get_data(self.archive, toc_entry)
217
218
219    # Return a string matching __file__ for the named module
220    def get_filename(self, fullname):
221        """get_filename(fullname) -> filename string.
222
223        Return the filename for the specified module or raise ZipImportError
224        if it couldn't be imported.
225        """
226        # Deciding the filename requires working out where the code
227        # would come from if the module was actually loaded
228        code, ispackage, modpath = _get_module_code(self, fullname)
229        return modpath
230
231
232    def get_source(self, fullname):
233        """get_source(fullname) -> source string.
234
235        Return the source code for the specified module. Raise ZipImportError
236        if the module couldn't be found, return None if the archive does
237        contain the module, but has no source for it.
238        """
239        mi = _get_module_info(self, fullname)
240        if mi is None:
241            raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
242
243        path = _get_module_path(self, fullname)
244        if mi:
245            fullpath = _bootstrap_external._path_join(path, '__init__.py')
246        else:
247            fullpath = f'{path}.py'
248
249        try:
250            toc_entry = self._files[fullpath]
251        except KeyError:
252            # we have the module, but no source
253            return None
254        return _get_data(self.archive, toc_entry).decode()
255
256
257    # Return a bool signifying whether the module is a package or not.
258    def is_package(self, fullname):
259        """is_package(fullname) -> bool.
260
261        Return True if the module specified by fullname is a package.
262        Raise ZipImportError if the module couldn't be found.
263        """
264        mi = _get_module_info(self, fullname)
265        if mi is None:
266            raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
267        return mi
268
269
270    # Load and return the module named by 'fullname'.
271    def load_module(self, fullname):
272        """load_module(fullname) -> module.
273
274        Load the module specified by 'fullname'. 'fullname' must be the
275        fully qualified (dotted) module name. It returns the imported
276        module, or raises ZipImportError if it could not be imported.
277
278        Deprecated since Python 3.10. Use exec_module() instead.
279        """
280        msg = ("zipimport.zipimporter.load_module() is deprecated and slated for "
281               "removal in Python 3.12; use exec_module() instead")
282        _warnings.warn(msg, DeprecationWarning)
283        code, ispackage, modpath = _get_module_code(self, fullname)
284        mod = sys.modules.get(fullname)
285        if mod is None or not isinstance(mod, _module_type):
286            mod = _module_type(fullname)
287            sys.modules[fullname] = mod
288        mod.__loader__ = self
289
290        try:
291            if ispackage:
292                # add __path__ to the module *before* the code gets
293                # executed
294                path = _get_module_path(self, fullname)
295                fullpath = _bootstrap_external._path_join(self.archive, path)
296                mod.__path__ = [fullpath]
297
298            if not hasattr(mod, '__builtins__'):
299                mod.__builtins__ = __builtins__
300            _bootstrap_external._fix_up_module(mod.__dict__, fullname, modpath)
301            exec(code, mod.__dict__)
302        except:
303            del sys.modules[fullname]
304            raise
305
306        try:
307            mod = sys.modules[fullname]
308        except KeyError:
309            raise ImportError(f'Loaded module {fullname!r} not found in sys.modules')
310        _bootstrap._verbose_message('import {} # loaded from Zip {}', fullname, modpath)
311        return mod
312
313
314    def get_resource_reader(self, fullname):
315        """Return the ResourceReader for a package in a zip file.
316
317        If 'fullname' is a package within the zip file, return the
318        'ResourceReader' object for the package.  Otherwise return None.
319        """
320        try:
321            if not self.is_package(fullname):
322                return None
323        except ZipImportError:
324            return None
325        from importlib.readers import ZipReader
326        return ZipReader(self, fullname)
327
328
329    def invalidate_caches(self):
330        """Reload the file data of the archive path."""
331        try:
332            self._files = _read_directory(self.archive)
333            _zip_directory_cache[self.archive] = self._files
334        except ZipImportError:
335            _zip_directory_cache.pop(self.archive, None)
336            self._files = {}
337
338
339    def __repr__(self):
340        return f'<zipimporter object "{self.archive}{path_sep}{self.prefix}">'
341
342
343# _zip_searchorder defines how we search for a module in the Zip
344# archive: we first search for a package __init__, then for
345# non-package .pyc, and .py entries. The .pyc entries
346# are swapped by initzipimport() if we run in optimized mode. Also,
347# '/' is replaced by path_sep there.
348_zip_searchorder = (
349    (path_sep + '__init__.pyc', True, True),
350    (path_sep + '__init__.py', False, True),
351    ('.pyc', True, False),
352    ('.py', False, False),
353)
354
355# Given a module name, return the potential file path in the
356# archive (without extension).
357def _get_module_path(self, fullname):
358    return self.prefix + fullname.rpartition('.')[2]
359
360# Does this path represent a directory?
361def _is_dir(self, path):
362    # See if this is a "directory". If so, it's eligible to be part
363    # of a namespace package. We test by seeing if the name, with an
364    # appended path separator, exists.
365    dirpath = path + path_sep
366    # If dirpath is present in self._files, we have a directory.
367    return dirpath in self._files
368
369# Return some information about a module.
370def _get_module_info(self, fullname):
371    path = _get_module_path(self, fullname)
372    for suffix, isbytecode, ispackage in _zip_searchorder:
373        fullpath = path + suffix
374        if fullpath in self._files:
375            return ispackage
376    return None
377
378
379# implementation
380
381# _read_directory(archive) -> files dict (new reference)
382#
383# Given a path to a Zip archive, build a dict, mapping file names
384# (local to the archive, using SEP as a separator) to toc entries.
385#
386# A toc_entry is a tuple:
387#
388# (__file__,        # value to use for __file__, available for all files,
389#                   # encoded to the filesystem encoding
390#  compress,        # compression kind; 0 for uncompressed
391#  data_size,       # size of compressed data on disk
392#  file_size,       # size of decompressed data
393#  file_offset,     # offset of file header from start of archive
394#  time,            # mod time of file (in dos format)
395#  date,            # mod data of file (in dos format)
396#  crc,             # crc checksum of the data
397# )
398#
399# Directories can be recognized by the trailing path_sep in the name,
400# data_size and file_offset are 0.
401def _read_directory(archive):
402    try:
403        fp = _io.open_code(archive)
404    except OSError:
405        raise ZipImportError(f"can't open Zip file: {archive!r}", path=archive)
406
407    with fp:
408        # GH-87235: On macOS all file descriptors for /dev/fd/N share the same
409        # file offset, reset the file offset after scanning the zipfile diretory
410        # to not cause problems when some runs 'python3 /dev/fd/9 9<some_script'
411        start_offset = fp.tell()
412        try:
413            try:
414                fp.seek(-END_CENTRAL_DIR_SIZE, 2)
415                header_position = fp.tell()
416                buffer = fp.read(END_CENTRAL_DIR_SIZE)
417            except OSError:
418                raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
419            if len(buffer) != END_CENTRAL_DIR_SIZE:
420                raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
421            if buffer[:4] != STRING_END_ARCHIVE:
422                # Bad: End of Central Dir signature
423                # Check if there's a comment.
424                try:
425                    fp.seek(0, 2)
426                    file_size = fp.tell()
427                except OSError:
428                    raise ZipImportError(f"can't read Zip file: {archive!r}",
429                                         path=archive)
430                max_comment_start = max(file_size - MAX_COMMENT_LEN -
431                                        END_CENTRAL_DIR_SIZE, 0)
432                try:
433                    fp.seek(max_comment_start)
434                    data = fp.read()
435                except OSError:
436                    raise ZipImportError(f"can't read Zip file: {archive!r}",
437                                         path=archive)
438                pos = data.rfind(STRING_END_ARCHIVE)
439                if pos < 0:
440                    raise ZipImportError(f'not a Zip file: {archive!r}',
441                                         path=archive)
442                buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
443                if len(buffer) != END_CENTRAL_DIR_SIZE:
444                    raise ZipImportError(f"corrupt Zip file: {archive!r}",
445                                         path=archive)
446                header_position = file_size - len(data) + pos
447
448            header_size = _unpack_uint32(buffer[12:16])
449            header_offset = _unpack_uint32(buffer[16:20])
450            if header_position < header_size:
451                raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
452            if header_position < header_offset:
453                raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
454            header_position -= header_size
455            arc_offset = header_position - header_offset
456            if arc_offset < 0:
457                raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
458
459            files = {}
460            # Start of Central Directory
461            count = 0
462            try:
463                fp.seek(header_position)
464            except OSError:
465                raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
466            while True:
467                buffer = fp.read(46)
468                if len(buffer) < 4:
469                    raise EOFError('EOF read where not expected')
470                # Start of file header
471                if buffer[:4] != b'PK\x01\x02':
472                    break                                # Bad: Central Dir File Header
473                if len(buffer) != 46:
474                    raise EOFError('EOF read where not expected')
475                flags = _unpack_uint16(buffer[8:10])
476                compress = _unpack_uint16(buffer[10:12])
477                time = _unpack_uint16(buffer[12:14])
478                date = _unpack_uint16(buffer[14:16])
479                crc = _unpack_uint32(buffer[16:20])
480                data_size = _unpack_uint32(buffer[20:24])
481                file_size = _unpack_uint32(buffer[24:28])
482                name_size = _unpack_uint16(buffer[28:30])
483                extra_size = _unpack_uint16(buffer[30:32])
484                comment_size = _unpack_uint16(buffer[32:34])
485                file_offset = _unpack_uint32(buffer[42:46])
486                header_size = name_size + extra_size + comment_size
487                if file_offset > header_offset:
488                    raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
489                file_offset += arc_offset
490
491                try:
492                    name = fp.read(name_size)
493                except OSError:
494                    raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
495                if len(name) != name_size:
496                    raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
497                # On Windows, calling fseek to skip over the fields we don't use is
498                # slower than reading the data because fseek flushes stdio's
499                # internal buffers.    See issue #8745.
500                try:
501                    if len(fp.read(header_size - name_size)) != header_size - name_size:
502                        raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
503                except OSError:
504                    raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
505
506                if flags & 0x800:
507                    # UTF-8 file names extension
508                    name = name.decode()
509                else:
510                    # Historical ZIP filename encoding
511                    try:
512                        name = name.decode('ascii')
513                    except UnicodeDecodeError:
514                        name = name.decode('latin1').translate(cp437_table)
515
516                name = name.replace('/', path_sep)
517                path = _bootstrap_external._path_join(archive, name)
518                t = (path, compress, data_size, file_size, file_offset, time, date, crc)
519                files[name] = t
520                count += 1
521        finally:
522            fp.seek(start_offset)
523    _bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
524    return files
525
526# During bootstrap, we may need to load the encodings
527# package from a ZIP file. But the cp437 encoding is implemented
528# in Python in the encodings package.
529#
530# Break out of this dependency by using the translation table for
531# the cp437 encoding.
532cp437_table = (
533    # ASCII part, 8 rows x 16 chars
534    '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
535    '\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
536    ' !"#$%&\'()*+,-./'
537    '0123456789:;<=>?'
538    '@ABCDEFGHIJKLMNO'
539    'PQRSTUVWXYZ[\\]^_'
540    '`abcdefghijklmno'
541    'pqrstuvwxyz{|}~\x7f'
542    # non-ASCII part, 16 rows x 8 chars
543    '\xc7\xfc\xe9\xe2\xe4\xe0\xe5\xe7'
544    '\xea\xeb\xe8\xef\xee\xec\xc4\xc5'
545    '\xc9\xe6\xc6\xf4\xf6\xf2\xfb\xf9'
546    '\xff\xd6\xdc\xa2\xa3\xa5\u20a7\u0192'
547    '\xe1\xed\xf3\xfa\xf1\xd1\xaa\xba'
548    '\xbf\u2310\xac\xbd\xbc\xa1\xab\xbb'
549    '\u2591\u2592\u2593\u2502\u2524\u2561\u2562\u2556'
550    '\u2555\u2563\u2551\u2557\u255d\u255c\u255b\u2510'
551    '\u2514\u2534\u252c\u251c\u2500\u253c\u255e\u255f'
552    '\u255a\u2554\u2569\u2566\u2560\u2550\u256c\u2567'
553    '\u2568\u2564\u2565\u2559\u2558\u2552\u2553\u256b'
554    '\u256a\u2518\u250c\u2588\u2584\u258c\u2590\u2580'
555    '\u03b1\xdf\u0393\u03c0\u03a3\u03c3\xb5\u03c4'
556    '\u03a6\u0398\u03a9\u03b4\u221e\u03c6\u03b5\u2229'
557    '\u2261\xb1\u2265\u2264\u2320\u2321\xf7\u2248'
558    '\xb0\u2219\xb7\u221a\u207f\xb2\u25a0\xa0'
559)
560
561_importing_zlib = False
562
563# Return the zlib.decompress function object, or NULL if zlib couldn't
564# be imported. The function is cached when found, so subsequent calls
565# don't import zlib again.
566def _get_decompress_func():
567    global _importing_zlib
568    if _importing_zlib:
569        # Someone has a zlib.py[co] in their Zip file
570        # let's avoid a stack overflow.
571        _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
572        raise ZipImportError("can't decompress data; zlib not available")
573
574    _importing_zlib = True
575    try:
576        from zlib import decompress
577    except Exception:
578        _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
579        raise ZipImportError("can't decompress data; zlib not available")
580    finally:
581        _importing_zlib = False
582
583    _bootstrap._verbose_message('zipimport: zlib available')
584    return decompress
585
586# Given a path to a Zip file and a toc_entry, return the (uncompressed) data.
587def _get_data(archive, toc_entry):
588    datapath, compress, data_size, file_size, file_offset, time, date, crc = toc_entry
589    if data_size < 0:
590        raise ZipImportError('negative data size')
591
592    with _io.open_code(archive) as fp:
593        # Check to make sure the local file header is correct
594        try:
595            fp.seek(file_offset)
596        except OSError:
597            raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
598        buffer = fp.read(30)
599        if len(buffer) != 30:
600            raise EOFError('EOF read where not expected')
601
602        if buffer[:4] != b'PK\x03\x04':
603            # Bad: Local File Header
604            raise ZipImportError(f'bad local file header: {archive!r}', path=archive)
605
606        name_size = _unpack_uint16(buffer[26:28])
607        extra_size = _unpack_uint16(buffer[28:30])
608        header_size = 30 + name_size + extra_size
609        file_offset += header_size  # Start of file data
610        try:
611            fp.seek(file_offset)
612        except OSError:
613            raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
614        raw_data = fp.read(data_size)
615        if len(raw_data) != data_size:
616            raise OSError("zipimport: can't read data")
617
618    if compress == 0:
619        # data is not compressed
620        return raw_data
621
622    # Decompress with zlib
623    try:
624        decompress = _get_decompress_func()
625    except Exception:
626        raise ZipImportError("can't decompress data; zlib not available")
627    return decompress(raw_data, -15)
628
629
630# Lenient date/time comparison function. The precision of the mtime
631# in the archive is lower than the mtime stored in a .pyc: we
632# must allow a difference of at most one second.
633def _eq_mtime(t1, t2):
634    # dostime only stores even seconds, so be lenient
635    return abs(t1 - t2) <= 1
636
637
638# Given the contents of a .py[co] file, unmarshal the data
639# and return the code object. Raises ImportError it the magic word doesn't
640# match, or if the recorded .py[co] metadata does not match the source.
641def _unmarshal_code(self, pathname, fullpath, fullname, data):
642    exc_details = {
643        'name': fullname,
644        'path': fullpath,
645    }
646
647    flags = _bootstrap_external._classify_pyc(data, fullname, exc_details)
648
649    hash_based = flags & 0b1 != 0
650    if hash_based:
651        check_source = flags & 0b10 != 0
652        if (_imp.check_hash_based_pycs != 'never' and
653                (check_source or _imp.check_hash_based_pycs == 'always')):
654            source_bytes = _get_pyc_source(self, fullpath)
655            if source_bytes is not None:
656                source_hash = _imp.source_hash(
657                    _bootstrap_external._RAW_MAGIC_NUMBER,
658                    source_bytes,
659                )
660
661                _bootstrap_external._validate_hash_pyc(
662                    data, source_hash, fullname, exc_details)
663    else:
664        source_mtime, source_size = \
665            _get_mtime_and_size_of_source(self, fullpath)
666
667        if source_mtime:
668            # We don't use _bootstrap_external._validate_timestamp_pyc
669            # to allow for a more lenient timestamp check.
670            if (not _eq_mtime(_unpack_uint32(data[8:12]), source_mtime) or
671                    _unpack_uint32(data[12:16]) != source_size):
672                _bootstrap._verbose_message(
673                    f'bytecode is stale for {fullname!r}')
674                return None
675
676    code = marshal.loads(data[16:])
677    if not isinstance(code, _code_type):
678        raise TypeError(f'compiled module {pathname!r} is not a code object')
679    return code
680
681_code_type = type(_unmarshal_code.__code__)
682
683
684# Replace any occurrences of '\r\n?' in the input string with '\n'.
685# This converts DOS and Mac line endings to Unix line endings.
686def _normalize_line_endings(source):
687    source = source.replace(b'\r\n', b'\n')
688    source = source.replace(b'\r', b'\n')
689    return source
690
691# Given a string buffer containing Python source code, compile it
692# and return a code object.
693def _compile_source(pathname, source):
694    source = _normalize_line_endings(source)
695    return compile(source, pathname, 'exec', dont_inherit=True)
696
697# Convert the date/time values found in the Zip archive to a value
698# that's compatible with the time stamp stored in .pyc files.
699def _parse_dostime(d, t):
700    return time.mktime((
701        (d >> 9) + 1980,    # bits 9..15: year
702        (d >> 5) & 0xF,     # bits 5..8: month
703        d & 0x1F,           # bits 0..4: day
704        t >> 11,            # bits 11..15: hours
705        (t >> 5) & 0x3F,    # bits 8..10: minutes
706        (t & 0x1F) * 2,     # bits 0..7: seconds / 2
707        -1, -1, -1))
708
709# Given a path to a .pyc file in the archive, return the
710# modification time of the matching .py file and its size,
711# or (0, 0) if no source is available.
712def _get_mtime_and_size_of_source(self, path):
713    try:
714        # strip 'c' or 'o' from *.py[co]
715        assert path[-1:] in ('c', 'o')
716        path = path[:-1]
717        toc_entry = self._files[path]
718        # fetch the time stamp of the .py file for comparison
719        # with an embedded pyc time stamp
720        time = toc_entry[5]
721        date = toc_entry[6]
722        uncompressed_size = toc_entry[3]
723        return _parse_dostime(date, time), uncompressed_size
724    except (KeyError, IndexError, TypeError):
725        return 0, 0
726
727
728# Given a path to a .pyc file in the archive, return the
729# contents of the matching .py file, or None if no source
730# is available.
731def _get_pyc_source(self, path):
732    # strip 'c' or 'o' from *.py[co]
733    assert path[-1:] in ('c', 'o')
734    path = path[:-1]
735
736    try:
737        toc_entry = self._files[path]
738    except KeyError:
739        return None
740    else:
741        return _get_data(self.archive, toc_entry)
742
743
744# Get the code object associated with the module specified by
745# 'fullname'.
746def _get_module_code(self, fullname):
747    path = _get_module_path(self, fullname)
748    import_error = None
749    for suffix, isbytecode, ispackage in _zip_searchorder:
750        fullpath = path + suffix
751        _bootstrap._verbose_message('trying {}{}{}', self.archive, path_sep, fullpath, verbosity=2)
752        try:
753            toc_entry = self._files[fullpath]
754        except KeyError:
755            pass
756        else:
757            modpath = toc_entry[0]
758            data = _get_data(self.archive, toc_entry)
759            code = None
760            if isbytecode:
761                try:
762                    code = _unmarshal_code(self, modpath, fullpath, fullname, data)
763                except ImportError as exc:
764                    import_error = exc
765            else:
766                code = _compile_source(modpath, data)
767            if code is None:
768                # bad magic number or non-matching mtime
769                # in byte code, try next
770                continue
771            modpath = toc_entry[0]
772            return code, ispackage, modpath
773    else:
774        if import_error:
775            msg = f"module load failed: {import_error}"
776            raise ZipImportError(msg, name=fullname) from import_error
777        else:
778            raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
779