1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8import shutil
9import re
10import warnings
11import stat
12
13import unittest
14import unittest.mock
15import tarfile
16
17from test import support
18from test.support import os_helper
19from test.support import script_helper
20from test.support import warnings_helper
21
22# Check for our compression modules.
23try:
24    import gzip
25except ImportError:
26    gzip = None
27try:
28    import zlib
29except ImportError:
30    zlib = None
31try:
32    import bz2
33except ImportError:
34    bz2 = None
35try:
36    import lzma
37except ImportError:
38    lzma = None
39
40def sha256sum(data):
41    return sha256(data).hexdigest()
42
43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
44tarextdir = TEMPDIR + '-extract-test'
45tarname = support.findfile("testtar.tar")
46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
48xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
49tmpname = os.path.join(TEMPDIR, "tmp.tar")
50dotlessname = os.path.join(TEMPDIR, "testtar")
51
52sha256_regtype = (
53    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
54)
55sha256_sparse = (
56    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
57)
58
59
60class TarTest:
61    tarname = tarname
62    suffix = ''
63    open = io.FileIO
64    taropen = tarfile.TarFile.taropen
65
66    @property
67    def mode(self):
68        return self.prefix + self.suffix
69
70@support.requires_gzip()
71class GzipTest:
72    tarname = gzipname
73    suffix = 'gz'
74    open = gzip.GzipFile if gzip else None
75    taropen = tarfile.TarFile.gzopen
76
77@support.requires_bz2()
78class Bz2Test:
79    tarname = bz2name
80    suffix = 'bz2'
81    open = bz2.BZ2File if bz2 else None
82    taropen = tarfile.TarFile.bz2open
83
84@support.requires_lzma()
85class LzmaTest:
86    tarname = xzname
87    suffix = 'xz'
88    open = lzma.LZMAFile if lzma else None
89    taropen = tarfile.TarFile.xzopen
90
91
92class ReadTest(TarTest):
93
94    prefix = "r:"
95
96    def setUp(self):
97        self.tar = tarfile.open(self.tarname, mode=self.mode,
98                                encoding="iso8859-1")
99
100    def tearDown(self):
101        self.tar.close()
102
103
104class UstarReadTest(ReadTest, unittest.TestCase):
105
106    def test_fileobj_regular_file(self):
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with self.tar.extractfile(tarinfo) as fobj:
109            data = fobj.read()
110            self.assertEqual(len(data), tarinfo.size,
111                    "regular file extraction failed")
112            self.assertEqual(sha256sum(data), sha256_regtype,
113                    "regular file extraction failed")
114
115    def test_fileobj_readlines(self):
116        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
117        tarinfo = self.tar.getmember("ustar/regtype")
118        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
119            lines1 = fobj1.readlines()
120
121        with self.tar.extractfile(tarinfo) as fobj:
122            fobj2 = io.TextIOWrapper(fobj)
123            lines2 = fobj2.readlines()
124            self.assertEqual(lines1, lines2,
125                    "fileobj.readlines() failed")
126            self.assertEqual(len(lines2), 114,
127                    "fileobj.readlines() failed")
128            self.assertEqual(lines2[83],
129                    "I will gladly admit that Python is not the fastest "
130                    "running scripting language.\n",
131                    "fileobj.readlines() failed")
132
133    def test_fileobj_iter(self):
134        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
135        tarinfo = self.tar.getmember("ustar/regtype")
136        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
137            lines1 = fobj1.readlines()
138        with self.tar.extractfile(tarinfo) as fobj2:
139            lines2 = list(io.TextIOWrapper(fobj2))
140            self.assertEqual(lines1, lines2,
141                    "fileobj.__iter__() failed")
142
143    def test_fileobj_seek(self):
144        self.tar.extract("ustar/regtype", TEMPDIR,
145                         filter='data')
146        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
147            data = fobj.read()
148
149        tarinfo = self.tar.getmember("ustar/regtype")
150        with self.tar.extractfile(tarinfo) as fobj:
151            text = fobj.read()
152            fobj.seek(0)
153            self.assertEqual(0, fobj.tell(),
154                         "seek() to file's start failed")
155            fobj.seek(2048, 0)
156            self.assertEqual(2048, fobj.tell(),
157                         "seek() to absolute position failed")
158            fobj.seek(-1024, 1)
159            self.assertEqual(1024, fobj.tell(),
160                         "seek() to negative relative position failed")
161            fobj.seek(1024, 1)
162            self.assertEqual(2048, fobj.tell(),
163                         "seek() to positive relative position failed")
164            s = fobj.read(10)
165            self.assertEqual(s, data[2048:2058],
166                         "read() after seek failed")
167            fobj.seek(0, 2)
168            self.assertEqual(tarinfo.size, fobj.tell(),
169                         "seek() to file's end failed")
170            self.assertEqual(fobj.read(), b"",
171                         "read() at file's end did not return empty string")
172            fobj.seek(-tarinfo.size, 2)
173            self.assertEqual(0, fobj.tell(),
174                         "relative seek() to file's end failed")
175            fobj.seek(512)
176            s1 = fobj.readlines()
177            fobj.seek(512)
178            s2 = fobj.readlines()
179            self.assertEqual(s1, s2,
180                         "readlines() after seek failed")
181            fobj.seek(0)
182            self.assertEqual(len(fobj.readline()), fobj.tell(),
183                         "tell() after readline() failed")
184            fobj.seek(512)
185            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
186                         "tell() after seek() and readline() failed")
187            fobj.seek(0)
188            line = fobj.readline()
189            self.assertEqual(fobj.read(), data[len(line):],
190                         "read() after readline() failed")
191
192    def test_fileobj_text(self):
193        with self.tar.extractfile("ustar/regtype") as fobj:
194            fobj = io.TextIOWrapper(fobj)
195            data = fobj.read().encode("iso8859-1")
196            self.assertEqual(sha256sum(data), sha256_regtype)
197            try:
198                fobj.seek(100)
199            except AttributeError:
200                # Issue #13815: seek() complained about a missing
201                # flush() method.
202                self.fail("seeking failed in text mode")
203
204    # Test if symbolic and hard links are resolved by extractfile().  The
205    # test link members each point to a regular member whose data is
206    # supposed to be exported.
207    def _test_fileobj_link(self, lnktype, regtype):
208        with self.tar.extractfile(lnktype) as a, \
209             self.tar.extractfile(regtype) as b:
210            self.assertEqual(a.name, b.name)
211
212    def test_fileobj_link1(self):
213        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
214
215    def test_fileobj_link2(self):
216        self._test_fileobj_link("./ustar/linktest2/lnktype",
217                                "ustar/linktest1/regtype")
218
219    def test_fileobj_symlink1(self):
220        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
221
222    def test_fileobj_symlink2(self):
223        self._test_fileobj_link("./ustar/linktest2/symtype",
224                                "ustar/linktest1/regtype")
225
226    def test_issue14160(self):
227        self._test_fileobj_link("symtype2", "ustar/regtype")
228
229    def test_add_dir_getmember(self):
230        # bpo-21987
231        self.add_dir_and_getmember('bar')
232        self.add_dir_and_getmember('a'*101)
233
234    @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"),
235                         "Missing getuid or getgid implementation")
236    def add_dir_and_getmember(self, name):
237        def filter(tarinfo):
238            tarinfo.uid = tarinfo.gid = 100
239            return tarinfo
240
241        with os_helper.temp_cwd():
242            with tarfile.open(tmpname, 'w') as tar:
243                tar.format = tarfile.USTAR_FORMAT
244                try:
245                    os.mkdir(name)
246                    tar.add(name, filter=filter)
247                finally:
248                    os.rmdir(name)
249            with tarfile.open(tmpname) as tar:
250                self.assertEqual(
251                    tar.getmember(name),
252                    tar.getmember(name + '/')
253                )
254
255class GzipUstarReadTest(GzipTest, UstarReadTest):
256    pass
257
258class Bz2UstarReadTest(Bz2Test, UstarReadTest):
259    pass
260
261class LzmaUstarReadTest(LzmaTest, UstarReadTest):
262    pass
263
264
265class ListTest(ReadTest, unittest.TestCase):
266
267    # Override setUp to use default encoding (UTF-8)
268    def setUp(self):
269        self.tar = tarfile.open(self.tarname, mode=self.mode)
270
271    def test_list(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=False)
275        out = tio.detach().getvalue()
276        self.assertIn(b'ustar/conttype', out)
277        self.assertIn(b'ustar/regtype', out)
278        self.assertIn(b'ustar/lnktype', out)
279        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
280        self.assertIn(b'./ustar/linktest2/symtype', out)
281        self.assertIn(b'./ustar/linktest2/lnktype', out)
282        # Make sure it puts trailing slash for directory
283        self.assertIn(b'ustar/dirtype/', out)
284        self.assertIn(b'ustar/dirtype-with-size/', out)
285        # Make sure it is able to print unencodable characters
286        def conv(b):
287            s = b.decode(self.tar.encoding, 'surrogateescape')
288            return s.encode('ascii', 'backslashreplace')
289        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
290        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
291                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
292        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
293                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
294        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
295        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
296        # Make sure it prints files separated by one newline without any
297        # 'ls -l'-like accessories if verbose flag is not being used
298        # ...
299        # ustar/conttype
300        # ustar/regtype
301        # ...
302        self.assertRegex(out, br'ustar/conttype ?\r?\n'
303                              br'ustar/regtype ?\r?\n')
304        # Make sure it does not print the source of link without verbose flag
305        self.assertNotIn(b'link to', out)
306        self.assertNotIn(b'->', out)
307
308    def test_list_verbose(self):
309        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
310        with support.swap_attr(sys, 'stdout', tio):
311            self.tar.list(verbose=True)
312        out = tio.detach().getvalue()
313        # Make sure it prints files separated by one newline with 'ls -l'-like
314        # accessories if verbose flag is being used
315        # ...
316        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
317        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
318        # ...
319        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
320                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
321                               br'ustar/\w+type ?\r?\n') * 2)
322        # Make sure it prints the source of link with verbose flag
323        self.assertIn(b'ustar/symtype -> regtype', out)
324        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
325        self.assertIn(b'./ustar/linktest2/lnktype link to '
326                      b'./ustar/linktest1/regtype', out)
327        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
328                      (b'/123' * 125) + b'/longname', out)
329        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
330                      (b'/123' * 125) + b'/longname', out)
331
332    def test_list_members(self):
333        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
334        def members(tar):
335            for tarinfo in tar.getmembers():
336                if 'reg' in tarinfo.name:
337                    yield tarinfo
338        with support.swap_attr(sys, 'stdout', tio):
339            self.tar.list(verbose=False, members=members(self.tar))
340        out = tio.detach().getvalue()
341        self.assertIn(b'ustar/regtype', out)
342        self.assertNotIn(b'ustar/conttype', out)
343
344
345class GzipListTest(GzipTest, ListTest):
346    pass
347
348
349class Bz2ListTest(Bz2Test, ListTest):
350    pass
351
352
353class LzmaListTest(LzmaTest, ListTest):
354    pass
355
356
357class CommonReadTest(ReadTest):
358
359    def test_is_tarfile_erroneous(self):
360        with open(tmpname, "wb"):
361            pass
362
363        # is_tarfile works on filenames
364        self.assertFalse(tarfile.is_tarfile(tmpname))
365
366        # is_tarfile works on path-like objects
367        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
368
369        # is_tarfile works on file objects
370        with open(tmpname, "rb") as fobj:
371            self.assertFalse(tarfile.is_tarfile(fobj))
372
373        # is_tarfile works on file-like objects
374        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
375
376    def test_is_tarfile_valid(self):
377        # is_tarfile works on filenames
378        self.assertTrue(tarfile.is_tarfile(self.tarname))
379
380        # is_tarfile works on path-like objects
381        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
382
383        # is_tarfile works on file objects
384        with open(self.tarname, "rb") as fobj:
385            self.assertTrue(tarfile.is_tarfile(fobj))
386
387        # is_tarfile works on file-like objects
388        with open(self.tarname, "rb") as fobj:
389            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
390
391    def test_is_tarfile_keeps_position(self):
392        # Test for issue44289: tarfile.is_tarfile() modifies
393        # file object's current position
394        with open(self.tarname, "rb") as fobj:
395            tarfile.is_tarfile(fobj)
396            self.assertEqual(fobj.tell(), 0)
397
398        with open(self.tarname, "rb") as fobj:
399            file_like = io.BytesIO(fobj.read())
400            tarfile.is_tarfile(file_like)
401            self.assertEqual(file_like.tell(), 0)
402
403    def test_empty_tarfile(self):
404        # Test for issue6123: Allow opening empty archives.
405        # This test checks if tarfile.open() is able to open an empty tar
406        # archive successfully. Note that an empty tar archive is not the
407        # same as an empty file!
408        with tarfile.open(tmpname, self.mode.replace("r", "w")):
409            pass
410        try:
411            tar = tarfile.open(tmpname, self.mode)
412            tar.getnames()
413        except tarfile.ReadError:
414            self.fail("tarfile.open() failed on empty archive")
415        else:
416            self.assertListEqual(tar.getmembers(), [])
417        finally:
418            tar.close()
419
420    def test_non_existent_tarfile(self):
421        # Test for issue11513: prevent non-existent gzipped tarfiles raising
422        # multiple exceptions.
423        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
424            tarfile.open("xxx", self.mode)
425
426    def test_null_tarfile(self):
427        # Test for issue6123: Allow opening empty archives.
428        # This test guarantees that tarfile.open() does not treat an empty
429        # file as an empty tar archive.
430        with open(tmpname, "wb"):
431            pass
432        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
433        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
434
435    def test_ignore_zeros(self):
436        # Test TarFile's ignore_zeros option.
437        # generate 512 pseudorandom bytes
438        data = Random(0).randbytes(512)
439        for char in (b'\0', b'a'):
440            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
441            # are ignored correctly.
442            with self.open(tmpname, "w") as fobj:
443                fobj.write(char * 1024)
444                tarinfo = tarfile.TarInfo("foo")
445                tarinfo.size = len(data)
446                fobj.write(tarinfo.tobuf())
447                fobj.write(data)
448
449            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
450            try:
451                self.assertListEqual(tar.getnames(), ["foo"],
452                    "ignore_zeros=True should have skipped the %r-blocks" %
453                    char)
454            finally:
455                tar.close()
456
457    def test_premature_end_of_archive(self):
458        for size in (512, 600, 1024, 1200):
459            with tarfile.open(tmpname, "w:") as tar:
460                t = tarfile.TarInfo("foo")
461                t.size = 1024
462                tar.addfile(t, io.BytesIO(b"a" * 1024))
463
464            with open(tmpname, "r+b") as fobj:
465                fobj.truncate(size)
466
467            with tarfile.open(tmpname) as tar:
468                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
469                    for t in tar:
470                        pass
471
472            with tarfile.open(tmpname) as tar:
473                t = tar.next()
474
475                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
476                    tar.extract(t, TEMPDIR, filter='data')
477
478                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
479                    tar.extractfile(t).read()
480
481    def test_length_zero_header(self):
482        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
483        # with an exception
484        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
485            with tarfile.open(support.findfile('recursion.tar')) as tar:
486                pass
487
488class MiscReadTestBase(CommonReadTest):
489    def requires_name_attribute(self):
490        pass
491
492    def test_no_name_argument(self):
493        self.requires_name_attribute()
494        with open(self.tarname, "rb") as fobj:
495            self.assertIsInstance(fobj.name, str)
496            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
497                self.assertIsInstance(tar.name, str)
498                self.assertEqual(tar.name, os.path.abspath(fobj.name))
499
500    def test_no_name_attribute(self):
501        with open(self.tarname, "rb") as fobj:
502            data = fobj.read()
503        fobj = io.BytesIO(data)
504        self.assertRaises(AttributeError, getattr, fobj, "name")
505        tar = tarfile.open(fileobj=fobj, mode=self.mode)
506        self.assertIsNone(tar.name)
507
508    def test_empty_name_attribute(self):
509        with open(self.tarname, "rb") as fobj:
510            data = fobj.read()
511        fobj = io.BytesIO(data)
512        fobj.name = ""
513        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
514            self.assertIsNone(tar.name)
515
516    def test_int_name_attribute(self):
517        # Issue 21044: tarfile.open() should handle fileobj with an integer
518        # 'name' attribute.
519        fd = os.open(self.tarname, os.O_RDONLY)
520        with open(fd, 'rb') as fobj:
521            self.assertIsInstance(fobj.name, int)
522            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
523                self.assertIsNone(tar.name)
524
525    def test_bytes_name_attribute(self):
526        self.requires_name_attribute()
527        tarname = os.fsencode(self.tarname)
528        with open(tarname, 'rb') as fobj:
529            self.assertIsInstance(fobj.name, bytes)
530            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
531                self.assertIsInstance(tar.name, bytes)
532                self.assertEqual(tar.name, os.path.abspath(fobj.name))
533
534    def test_pathlike_name(self):
535        tarname = pathlib.Path(self.tarname)
536        with tarfile.open(tarname, mode=self.mode) as tar:
537            self.assertIsInstance(tar.name, str)
538            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
539        with self.taropen(tarname) as tar:
540            self.assertIsInstance(tar.name, str)
541            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
542        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
543            self.assertIsInstance(tar.name, str)
544            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
545        if self.suffix == '':
546            with tarfile.TarFile(tarname, mode='r') as tar:
547                self.assertIsInstance(tar.name, str)
548                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
549
550    def test_illegal_mode_arg(self):
551        with open(tmpname, 'wb'):
552            pass
553        with self.assertRaisesRegex(ValueError, 'mode must be '):
554            tar = self.taropen(tmpname, 'q')
555        with self.assertRaisesRegex(ValueError, 'mode must be '):
556            tar = self.taropen(tmpname, 'rw')
557        with self.assertRaisesRegex(ValueError, 'mode must be '):
558            tar = self.taropen(tmpname, '')
559
560    def test_fileobj_with_offset(self):
561        # Skip the first member and store values from the second member
562        # of the testtar.
563        tar = tarfile.open(self.tarname, mode=self.mode)
564        try:
565            tar.next()
566            t = tar.next()
567            name = t.name
568            offset = t.offset
569            with tar.extractfile(t) as f:
570                data = f.read()
571        finally:
572            tar.close()
573
574        # Open the testtar and seek to the offset of the second member.
575        with self.open(self.tarname) as fobj:
576            fobj.seek(offset)
577
578            # Test if the tarfile starts with the second member.
579            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
580                t = tar.next()
581                self.assertEqual(t.name, name)
582                # Read to the end of fileobj and test if seeking back to the
583                # beginning works.
584                tar.getmembers()
585                self.assertEqual(tar.extractfile(t).read(), data,
586                        "seek back did not work")
587
588    def test_fail_comp(self):
589        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
590        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
591        with open(tarname, "rb") as fobj:
592            self.assertRaises(tarfile.ReadError, tarfile.open,
593                              fileobj=fobj, mode=self.mode)
594
595    def test_v7_dirtype(self):
596        # Test old style dirtype member (bug #1336623):
597        # Old V7 tars create directory members using an AREGTYPE
598        # header with a "/" appended to the filename field.
599        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
600        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
601                "v7 dirtype failed")
602
603    def test_xstar_type(self):
604        # The xstar format stores extra atime and ctime fields inside the
605        # space reserved for the prefix field. The prefix field must be
606        # ignored in this case, otherwise it will mess up the name.
607        try:
608            self.tar.getmember("misc/regtype-xstar")
609        except KeyError:
610            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
611
612    def test_check_members(self):
613        for tarinfo in self.tar:
614            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
615                    "wrong mtime for %s" % tarinfo.name)
616            if not tarinfo.name.startswith("ustar/"):
617                continue
618            self.assertEqual(tarinfo.uname, "tarfile",
619                    "wrong uname for %s" % tarinfo.name)
620
621    def test_find_members(self):
622        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
623                "could not find all members")
624
625    @unittest.skipUnless(hasattr(os, "link"),
626                         "Missing hardlink implementation")
627    @os_helper.skip_unless_symlink
628    def test_extract_hardlink(self):
629        # Test hardlink extraction (e.g. bug #857297).
630        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
631            tar.extract("ustar/regtype", TEMPDIR, filter='data')
632            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
633
634            tar.extract("ustar/lnktype", TEMPDIR, filter='data')
635            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
636            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
637                data = f.read()
638            self.assertEqual(sha256sum(data), sha256_regtype)
639
640            tar.extract("ustar/symtype", TEMPDIR, filter='data')
641            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
642            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
643                data = f.read()
644            self.assertEqual(sha256sum(data), sha256_regtype)
645
646    @os_helper.skip_unless_working_chmod
647    def test_extractall(self):
648        # Test if extractall() correctly restores directory permissions
649        # and times (see issue1735).
650        tar = tarfile.open(tarname, encoding="iso8859-1")
651        DIR = os.path.join(TEMPDIR, "extractall")
652        os.mkdir(DIR)
653        try:
654            directories = [t for t in tar if t.isdir()]
655            tar.extractall(DIR, directories, filter='fully_trusted')
656            for tarinfo in directories:
657                path = os.path.join(DIR, tarinfo.name)
658                if sys.platform != "win32":
659                    # Win32 has no support for fine grained permissions.
660                    self.assertEqual(tarinfo.mode & 0o777,
661                                     os.stat(path).st_mode & 0o777,
662                                     tarinfo.name)
663                def format_mtime(mtime):
664                    if isinstance(mtime, float):
665                        return "{} ({})".format(mtime, mtime.hex())
666                    else:
667                        return "{!r} (int)".format(mtime)
668                file_mtime = os.path.getmtime(path)
669                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
670                    format_mtime(tarinfo.mtime),
671                    format_mtime(file_mtime),
672                    path)
673                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
674        finally:
675            tar.close()
676            os_helper.rmtree(DIR)
677
678    @os_helper.skip_unless_working_chmod
679    def test_extract_directory(self):
680        dirtype = "ustar/dirtype"
681        DIR = os.path.join(TEMPDIR, "extractdir")
682        os.mkdir(DIR)
683        try:
684            with tarfile.open(tarname, encoding="iso8859-1") as tar:
685                tarinfo = tar.getmember(dirtype)
686                tar.extract(tarinfo, path=DIR, filter='fully_trusted')
687                extracted = os.path.join(DIR, dirtype)
688                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
689                if sys.platform != "win32":
690                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
691        finally:
692            os_helper.rmtree(DIR)
693
694    def test_extractall_pathlike_name(self):
695        DIR = pathlib.Path(TEMPDIR) / "extractall"
696        with os_helper.temp_dir(DIR), \
697             tarfile.open(tarname, encoding="iso8859-1") as tar:
698            directories = [t for t in tar if t.isdir()]
699            tar.extractall(DIR, directories, filter='fully_trusted')
700            for tarinfo in directories:
701                path = DIR / tarinfo.name
702                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
703
704    def test_extract_pathlike_name(self):
705        dirtype = "ustar/dirtype"
706        DIR = pathlib.Path(TEMPDIR) / "extractall"
707        with os_helper.temp_dir(DIR), \
708             tarfile.open(tarname, encoding="iso8859-1") as tar:
709            tarinfo = tar.getmember(dirtype)
710            tar.extract(tarinfo, path=DIR, filter='fully_trusted')
711            extracted = DIR / dirtype
712            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
713
714    def test_init_close_fobj(self):
715        # Issue #7341: Close the internal file object in the TarFile
716        # constructor in case of an error. For the test we rely on
717        # the fact that opening an empty file raises a ReadError.
718        empty = os.path.join(TEMPDIR, "empty")
719        with open(empty, "wb") as fobj:
720            fobj.write(b"")
721
722        try:
723            tar = object.__new__(tarfile.TarFile)
724            try:
725                tar.__init__(empty)
726            except tarfile.ReadError:
727                self.assertTrue(tar.fileobj.closed)
728            else:
729                self.fail("ReadError not raised")
730        finally:
731            os_helper.unlink(empty)
732
733    def test_parallel_iteration(self):
734        # Issue #16601: Restarting iteration over tarfile continued
735        # from where it left off.
736        with tarfile.open(self.tarname) as tar:
737            for m1, m2 in zip(tar, tar):
738                self.assertEqual(m1.offset, m2.offset)
739                self.assertEqual(m1.get_info(), m2.get_info())
740
741    @unittest.skipIf(zlib is None, "requires zlib")
742    def test_zlib_error_does_not_leak(self):
743        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
744        # parsing certain types of invalid data
745        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
746            mock.side_effect = zlib.error
747            with self.assertRaises(tarfile.ReadError):
748                tarfile.open(self.tarname)
749
750    def test_next_on_empty_tarfile(self):
751        fd = io.BytesIO()
752        tf = tarfile.open(fileobj=fd, mode="w")
753        tf.close()
754
755        fd.seek(0)
756        with tarfile.open(fileobj=fd, mode="r|") as tf:
757            self.assertEqual(tf.next(), None)
758
759        fd.seek(0)
760        with tarfile.open(fileobj=fd, mode="r") as tf:
761            self.assertEqual(tf.next(), None)
762
763class MiscReadTest(MiscReadTestBase, unittest.TestCase):
764    test_fail_comp = None
765
766class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
767    pass
768
769class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
770    def requires_name_attribute(self):
771        self.skipTest("BZ2File have no name attribute")
772
773class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
774    def requires_name_attribute(self):
775        self.skipTest("LZMAFile have no name attribute")
776
777
778class StreamReadTest(CommonReadTest, unittest.TestCase):
779
780    prefix="r|"
781
782    def test_read_through(self):
783        # Issue #11224: A poorly designed _FileInFile.read() method
784        # caused seeking errors with stream tar files.
785        for tarinfo in self.tar:
786            if not tarinfo.isreg():
787                continue
788            with self.tar.extractfile(tarinfo) as fobj:
789                while True:
790                    try:
791                        buf = fobj.read(512)
792                    except tarfile.StreamError:
793                        self.fail("simple read-through using "
794                                  "TarFile.extractfile() failed")
795                    if not buf:
796                        break
797
798    def test_fileobj_regular_file(self):
799        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
800        with self.tar.extractfile(tarinfo) as fobj:
801            data = fobj.read()
802        self.assertEqual(len(data), tarinfo.size,
803                "regular file extraction failed")
804        self.assertEqual(sha256sum(data), sha256_regtype,
805                "regular file extraction failed")
806
807    def test_provoke_stream_error(self):
808        tarinfos = self.tar.getmembers()
809        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
810            self.assertRaises(tarfile.StreamError, f.read)
811
812    def test_compare_members(self):
813        tar1 = tarfile.open(tarname, encoding="iso8859-1")
814        try:
815            tar2 = self.tar
816
817            while True:
818                t1 = tar1.next()
819                t2 = tar2.next()
820                if t1 is None:
821                    break
822                self.assertIsNotNone(t2, "stream.next() failed.")
823
824                if t2.islnk() or t2.issym():
825                    with self.assertRaises(tarfile.StreamError):
826                        tar2.extractfile(t2)
827                    continue
828
829                v1 = tar1.extractfile(t1)
830                v2 = tar2.extractfile(t2)
831                if v1 is None:
832                    continue
833                self.assertIsNotNone(v2, "stream.extractfile() failed")
834                self.assertEqual(v1.read(), v2.read(),
835                        "stream extraction failed")
836        finally:
837            tar1.close()
838
839class GzipStreamReadTest(GzipTest, StreamReadTest):
840    pass
841
842class Bz2StreamReadTest(Bz2Test, StreamReadTest):
843    pass
844
845class LzmaStreamReadTest(LzmaTest, StreamReadTest):
846    pass
847
848
849class DetectReadTest(TarTest, unittest.TestCase):
850    def _testfunc_file(self, name, mode):
851        try:
852            tar = tarfile.open(name, mode)
853        except tarfile.ReadError as e:
854            self.fail()
855        else:
856            tar.close()
857
858    def _testfunc_fileobj(self, name, mode):
859        try:
860            with open(name, "rb") as f:
861                tar = tarfile.open(name, mode, fileobj=f)
862        except tarfile.ReadError as e:
863            self.fail()
864        else:
865            tar.close()
866
867    def _test_modes(self, testfunc):
868        if self.suffix:
869            with self.assertRaises(tarfile.ReadError):
870                tarfile.open(tarname, mode="r:" + self.suffix)
871            with self.assertRaises(tarfile.ReadError):
872                tarfile.open(tarname, mode="r|" + self.suffix)
873            with self.assertRaises(tarfile.ReadError):
874                tarfile.open(self.tarname, mode="r:")
875            with self.assertRaises(tarfile.ReadError):
876                tarfile.open(self.tarname, mode="r|")
877        testfunc(self.tarname, "r")
878        testfunc(self.tarname, "r:" + self.suffix)
879        testfunc(self.tarname, "r:*")
880        testfunc(self.tarname, "r|" + self.suffix)
881        testfunc(self.tarname, "r|*")
882
883    def test_detect_file(self):
884        self._test_modes(self._testfunc_file)
885
886    def test_detect_fileobj(self):
887        self._test_modes(self._testfunc_fileobj)
888
889class GzipDetectReadTest(GzipTest, DetectReadTest):
890    pass
891
892class Bz2DetectReadTest(Bz2Test, DetectReadTest):
893    def test_detect_stream_bz2(self):
894        # Originally, tarfile's stream detection looked for the string
895        # "BZh91" at the start of the file. This is incorrect because
896        # the '9' represents the blocksize (900,000 bytes). If the file was
897        # compressed using another blocksize autodetection fails.
898        with open(tarname, "rb") as fobj:
899            data = fobj.read()
900
901        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
902        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
903            fobj.write(data)
904
905        self._testfunc_file(tmpname, "r|*")
906
907class LzmaDetectReadTest(LzmaTest, DetectReadTest):
908    pass
909
910
911class MemberReadTest(ReadTest, unittest.TestCase):
912
913    def _test_member(self, tarinfo, chksum=None, **kwargs):
914        if chksum is not None:
915            with self.tar.extractfile(tarinfo) as f:
916                self.assertEqual(sha256sum(f.read()), chksum,
917                        "wrong sha256sum for %s" % tarinfo.name)
918
919        kwargs["mtime"] = 0o7606136617
920        kwargs["uid"] = 1000
921        kwargs["gid"] = 100
922        if "old-v7" not in tarinfo.name:
923            # V7 tar can't handle alphabetic owners.
924            kwargs["uname"] = "tarfile"
925            kwargs["gname"] = "tarfile"
926        for k, v in kwargs.items():
927            self.assertEqual(getattr(tarinfo, k), v,
928                    "wrong value in %s field of %s" % (k, tarinfo.name))
929
930    def test_find_regtype(self):
931        tarinfo = self.tar.getmember("ustar/regtype")
932        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
933
934    def test_find_conttype(self):
935        tarinfo = self.tar.getmember("ustar/conttype")
936        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
937
938    def test_find_dirtype(self):
939        tarinfo = self.tar.getmember("ustar/dirtype")
940        self._test_member(tarinfo, size=0)
941
942    def test_find_dirtype_with_size(self):
943        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
944        self._test_member(tarinfo, size=255)
945
946    def test_find_lnktype(self):
947        tarinfo = self.tar.getmember("ustar/lnktype")
948        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
949
950    def test_find_symtype(self):
951        tarinfo = self.tar.getmember("ustar/symtype")
952        self._test_member(tarinfo, size=0, linkname="regtype")
953
954    def test_find_blktype(self):
955        tarinfo = self.tar.getmember("ustar/blktype")
956        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
957
958    def test_find_chrtype(self):
959        tarinfo = self.tar.getmember("ustar/chrtype")
960        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
961
962    def test_find_fifotype(self):
963        tarinfo = self.tar.getmember("ustar/fifotype")
964        self._test_member(tarinfo, size=0)
965
966    def test_find_sparse(self):
967        tarinfo = self.tar.getmember("ustar/sparse")
968        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
969
970    def test_find_gnusparse(self):
971        tarinfo = self.tar.getmember("gnu/sparse")
972        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
973
974    def test_find_gnusparse_00(self):
975        tarinfo = self.tar.getmember("gnu/sparse-0.0")
976        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
977
978    def test_find_gnusparse_01(self):
979        tarinfo = self.tar.getmember("gnu/sparse-0.1")
980        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
981
982    def test_find_gnusparse_10(self):
983        tarinfo = self.tar.getmember("gnu/sparse-1.0")
984        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
985
986    def test_find_umlauts(self):
987        tarinfo = self.tar.getmember("ustar/umlauts-"
988                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
989        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
990
991    def test_find_ustar_longname(self):
992        name = "ustar/" + "12345/" * 39 + "1234567/longname"
993        self.assertIn(name, self.tar.getnames())
994
995    def test_find_regtype_oldv7(self):
996        tarinfo = self.tar.getmember("misc/regtype-old-v7")
997        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
998
999    def test_find_pax_umlauts(self):
1000        self.tar.close()
1001        self.tar = tarfile.open(self.tarname, mode=self.mode,
1002                                encoding="iso8859-1")
1003        tarinfo = self.tar.getmember("pax/umlauts-"
1004                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1005        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1006
1007
1008class LongnameTest:
1009
1010    def test_read_longname(self):
1011        # Test reading of longname (bug #1471427).
1012        longname = self.subdir + "/" + "123/" * 125 + "longname"
1013        try:
1014            tarinfo = self.tar.getmember(longname)
1015        except KeyError:
1016            self.fail("longname not found")
1017        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
1018                "read longname as dirtype")
1019
1020    def test_read_longlink(self):
1021        longname = self.subdir + "/" + "123/" * 125 + "longname"
1022        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
1023        try:
1024            tarinfo = self.tar.getmember(longlink)
1025        except KeyError:
1026            self.fail("longlink not found")
1027        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
1028
1029    def test_truncated_longname(self):
1030        longname = self.subdir + "/" + "123/" * 125 + "longname"
1031        tarinfo = self.tar.getmember(longname)
1032        offset = tarinfo.offset
1033        self.tar.fileobj.seek(offset)
1034        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
1035        with self.assertRaises(tarfile.ReadError):
1036            tarfile.open(name="foo.tar", fileobj=fobj)
1037
1038    def test_header_offset(self):
1039        # Test if the start offset of the TarInfo object includes
1040        # the preceding extended header.
1041        longname = self.subdir + "/" + "123/" * 125 + "longname"
1042        offset = self.tar.getmember(longname).offset
1043        with open(tarname, "rb") as fobj:
1044            fobj.seek(offset)
1045            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
1046                                              "iso8859-1", "strict")
1047            self.assertEqual(tarinfo.type, self.longnametype)
1048
1049    def test_longname_directory(self):
1050        # Test reading a longlink directory. Issue #47231.
1051        longdir = ('a' * 101) + '/'
1052        with os_helper.temp_cwd():
1053            with tarfile.open(tmpname, 'w') as tar:
1054                tar.format = self.format
1055                try:
1056                    os.mkdir(longdir)
1057                    tar.add(longdir)
1058                finally:
1059                    os.rmdir(longdir.rstrip("/"))
1060            with tarfile.open(tmpname) as tar:
1061                self.assertIsNotNone(tar.getmember(longdir))
1062                self.assertIsNotNone(tar.getmember(longdir.removesuffix('/')))
1063
1064class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
1065
1066    subdir = "gnu"
1067    longnametype = tarfile.GNUTYPE_LONGNAME
1068    format = tarfile.GNU_FORMAT
1069
1070    # Since 3.2 tarfile is supposed to accurately restore sparse members and
1071    # produce files with holes. This is what we actually want to test here.
1072    # Unfortunately, not all platforms/filesystems support sparse files, and
1073    # even on platforms that do it is non-trivial to make reliable assertions
1074    # about holes in files. Therefore, we first do one basic test which works
1075    # an all platforms, and after that a test that will work only on
1076    # platforms/filesystems that prove to support sparse files.
1077    def _test_sparse_file(self, name):
1078        self.tar.extract(name, TEMPDIR, filter='data')
1079        filename = os.path.join(TEMPDIR, name)
1080        with open(filename, "rb") as fobj:
1081            data = fobj.read()
1082        self.assertEqual(sha256sum(data), sha256_sparse,
1083                "wrong sha256sum for %s" % name)
1084
1085        if self._fs_supports_holes():
1086            s = os.stat(filename)
1087            self.assertLess(s.st_blocks * 512, s.st_size)
1088
1089    def test_sparse_file_old(self):
1090        self._test_sparse_file("gnu/sparse")
1091
1092    def test_sparse_file_00(self):
1093        self._test_sparse_file("gnu/sparse-0.0")
1094
1095    def test_sparse_file_01(self):
1096        self._test_sparse_file("gnu/sparse-0.1")
1097
1098    def test_sparse_file_10(self):
1099        self._test_sparse_file("gnu/sparse-1.0")
1100
1101    @staticmethod
1102    def _fs_supports_holes():
1103        # Return True if the platform knows the st_blocks stat attribute and
1104        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1105        # store holes of 4 KiB in files.
1106        #
1107        # The function returns False if page size is larger than 4 KiB.
1108        # For example, ppc64 uses pages of 64 KiB.
1109        if sys.platform.startswith("linux"):
1110            # Linux evidentially has 512 byte st_blocks units.
1111            name = os.path.join(TEMPDIR, "sparse-test")
1112            with open(name, "wb") as fobj:
1113                # Seek to "punch a hole" of 4 KiB
1114                fobj.seek(4096)
1115                fobj.write(b'x' * 4096)
1116                fobj.truncate()
1117            s = os.stat(name)
1118            os_helper.unlink(name)
1119            return (s.st_blocks * 512 < s.st_size)
1120        else:
1121            return False
1122
1123
1124class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1125
1126    subdir = "pax"
1127    longnametype = tarfile.XHDTYPE
1128    format = tarfile.PAX_FORMAT
1129
1130    def test_pax_global_headers(self):
1131        tar = tarfile.open(tarname, encoding="iso8859-1")
1132        try:
1133            tarinfo = tar.getmember("pax/regtype1")
1134            self.assertEqual(tarinfo.uname, "foo")
1135            self.assertEqual(tarinfo.gname, "bar")
1136            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1137                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1138
1139            tarinfo = tar.getmember("pax/regtype2")
1140            self.assertEqual(tarinfo.uname, "")
1141            self.assertEqual(tarinfo.gname, "bar")
1142            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1143                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1144
1145            tarinfo = tar.getmember("pax/regtype3")
1146            self.assertEqual(tarinfo.uname, "tarfile")
1147            self.assertEqual(tarinfo.gname, "tarfile")
1148            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1149                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1150        finally:
1151            tar.close()
1152
1153    def test_pax_number_fields(self):
1154        # All following number fields are read from the pax header.
1155        tar = tarfile.open(tarname, encoding="iso8859-1")
1156        try:
1157            tarinfo = tar.getmember("pax/regtype4")
1158            self.assertEqual(tarinfo.size, 7011)
1159            self.assertEqual(tarinfo.uid, 123)
1160            self.assertEqual(tarinfo.gid, 123)
1161            self.assertEqual(tarinfo.mtime, 1041808783.0)
1162            self.assertEqual(type(tarinfo.mtime), float)
1163            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1164            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1165        finally:
1166            tar.close()
1167
1168
1169class WriteTestBase(TarTest):
1170    # Put all write tests in here that are supposed to be tested
1171    # in all possible mode combinations.
1172
1173    def test_fileobj_no_close(self):
1174        fobj = io.BytesIO()
1175        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1176            tar.addfile(tarfile.TarInfo("foo"))
1177        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1178        # Issue #20238: Incomplete gzip output with mode="w:gz"
1179        data = fobj.getvalue()
1180        del tar
1181        support.gc_collect()
1182        self.assertFalse(fobj.closed)
1183        self.assertEqual(data, fobj.getvalue())
1184
1185    def test_eof_marker(self):
1186        # Make sure an end of archive marker is written (two zero blocks).
1187        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1188        # So, we create an archive that has exactly 10240 bytes without the
1189        # marker, and has 20480 bytes once the marker is written.
1190        with tarfile.open(tmpname, self.mode) as tar:
1191            t = tarfile.TarInfo("foo")
1192            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1193            tar.addfile(t, io.BytesIO(b"a" * t.size))
1194
1195        with self.open(tmpname, "rb") as fobj:
1196            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1197
1198
1199class WriteTest(WriteTestBase, unittest.TestCase):
1200
1201    prefix = "w:"
1202
1203    def test_100_char_name(self):
1204        # The name field in a tar header stores strings of at most 100 chars.
1205        # If a string is shorter than 100 chars it has to be padded with '\0',
1206        # which implies that a string of exactly 100 chars is stored without
1207        # a trailing '\0'.
1208        name = "0123456789" * 10
1209        tar = tarfile.open(tmpname, self.mode)
1210        try:
1211            t = tarfile.TarInfo(name)
1212            tar.addfile(t)
1213        finally:
1214            tar.close()
1215
1216        tar = tarfile.open(tmpname)
1217        try:
1218            self.assertEqual(tar.getnames()[0], name,
1219                    "failed to store 100 char filename")
1220        finally:
1221            tar.close()
1222
1223    def test_tar_size(self):
1224        # Test for bug #1013882.
1225        tar = tarfile.open(tmpname, self.mode)
1226        try:
1227            path = os.path.join(TEMPDIR, "file")
1228            with open(path, "wb") as fobj:
1229                fobj.write(b"aaa")
1230            tar.add(path)
1231        finally:
1232            tar.close()
1233        self.assertGreater(os.path.getsize(tmpname), 0,
1234                "tarfile is empty")
1235
1236    # The test_*_size tests test for bug #1167128.
1237    def test_file_size(self):
1238        tar = tarfile.open(tmpname, self.mode)
1239        try:
1240            path = os.path.join(TEMPDIR, "file")
1241            with open(path, "wb"):
1242                pass
1243            tarinfo = tar.gettarinfo(path)
1244            self.assertEqual(tarinfo.size, 0)
1245
1246            with open(path, "wb") as fobj:
1247                fobj.write(b"aaa")
1248            tarinfo = tar.gettarinfo(path)
1249            self.assertEqual(tarinfo.size, 3)
1250        finally:
1251            tar.close()
1252
1253    def test_directory_size(self):
1254        path = os.path.join(TEMPDIR, "directory")
1255        os.mkdir(path)
1256        try:
1257            tar = tarfile.open(tmpname, self.mode)
1258            try:
1259                tarinfo = tar.gettarinfo(path)
1260                self.assertEqual(tarinfo.size, 0)
1261            finally:
1262                tar.close()
1263        finally:
1264            os_helper.rmdir(path)
1265
1266    # mock the following:
1267    #  os.listdir: so we know that files are in the wrong order
1268    def test_ordered_recursion(self):
1269        path = os.path.join(TEMPDIR, "directory")
1270        os.mkdir(path)
1271        open(os.path.join(path, "1"), "a").close()
1272        open(os.path.join(path, "2"), "a").close()
1273        try:
1274            tar = tarfile.open(tmpname, self.mode)
1275            try:
1276                with unittest.mock.patch('os.listdir') as mock_listdir:
1277                    mock_listdir.return_value = ["2", "1"]
1278                    tar.add(path)
1279                paths = []
1280                for m in tar.getmembers():
1281                    paths.append(os.path.split(m.name)[-1])
1282                self.assertEqual(paths, ["directory", "1", "2"]);
1283            finally:
1284                tar.close()
1285        finally:
1286            os_helper.unlink(os.path.join(path, "1"))
1287            os_helper.unlink(os.path.join(path, "2"))
1288            os_helper.rmdir(path)
1289
1290    def test_gettarinfo_pathlike_name(self):
1291        with tarfile.open(tmpname, self.mode) as tar:
1292            path = pathlib.Path(TEMPDIR) / "file"
1293            with open(path, "wb") as fobj:
1294                fobj.write(b"aaa")
1295            tarinfo = tar.gettarinfo(path)
1296            tarinfo2 = tar.gettarinfo(os.fspath(path))
1297            self.assertIsInstance(tarinfo.name, str)
1298            self.assertEqual(tarinfo.name, tarinfo2.name)
1299            self.assertEqual(tarinfo.size, 3)
1300
1301    @unittest.skipUnless(hasattr(os, "link"),
1302                         "Missing hardlink implementation")
1303    def test_link_size(self):
1304        link = os.path.join(TEMPDIR, "link")
1305        target = os.path.join(TEMPDIR, "link_target")
1306        with open(target, "wb") as fobj:
1307            fobj.write(b"aaa")
1308        try:
1309            os.link(target, link)
1310        except PermissionError as e:
1311            self.skipTest('os.link(): %s' % e)
1312        try:
1313            tar = tarfile.open(tmpname, self.mode)
1314            try:
1315                # Record the link target in the inodes list.
1316                tar.gettarinfo(target)
1317                tarinfo = tar.gettarinfo(link)
1318                self.assertEqual(tarinfo.size, 0)
1319            finally:
1320                tar.close()
1321        finally:
1322            os_helper.unlink(target)
1323            os_helper.unlink(link)
1324
1325    @os_helper.skip_unless_symlink
1326    def test_symlink_size(self):
1327        path = os.path.join(TEMPDIR, "symlink")
1328        os.symlink("link_target", path)
1329        try:
1330            tar = tarfile.open(tmpname, self.mode)
1331            try:
1332                tarinfo = tar.gettarinfo(path)
1333                self.assertEqual(tarinfo.size, 0)
1334            finally:
1335                tar.close()
1336        finally:
1337            os_helper.unlink(path)
1338
1339    def test_add_self(self):
1340        # Test for #1257255.
1341        dstname = os.path.abspath(tmpname)
1342        tar = tarfile.open(tmpname, self.mode)
1343        try:
1344            self.assertEqual(tar.name, dstname,
1345                    "archive name must be absolute")
1346            tar.add(dstname)
1347            self.assertEqual(tar.getnames(), [],
1348                    "added the archive to itself")
1349
1350            with os_helper.change_cwd(TEMPDIR):
1351                tar.add(dstname)
1352            self.assertEqual(tar.getnames(), [],
1353                    "added the archive to itself")
1354        finally:
1355            tar.close()
1356
1357    def test_filter(self):
1358        tempdir = os.path.join(TEMPDIR, "filter")
1359        os.mkdir(tempdir)
1360        try:
1361            for name in ("foo", "bar", "baz"):
1362                name = os.path.join(tempdir, name)
1363                os_helper.create_empty_file(name)
1364
1365            def filter(tarinfo):
1366                if os.path.basename(tarinfo.name) == "bar":
1367                    return
1368                tarinfo.uid = 123
1369                tarinfo.uname = "foo"
1370                return tarinfo
1371
1372            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1373            try:
1374                tar.add(tempdir, arcname="empty_dir", filter=filter)
1375            finally:
1376                tar.close()
1377
1378            # Verify that filter is a keyword-only argument
1379            with self.assertRaises(TypeError):
1380                tar.add(tempdir, "empty_dir", True, None, filter)
1381
1382            tar = tarfile.open(tmpname, "r")
1383            try:
1384                for tarinfo in tar:
1385                    self.assertEqual(tarinfo.uid, 123)
1386                    self.assertEqual(tarinfo.uname, "foo")
1387                self.assertEqual(len(tar.getmembers()), 3)
1388            finally:
1389                tar.close()
1390        finally:
1391            os_helper.rmtree(tempdir)
1392
1393    # Guarantee that stored pathnames are not modified. Don't
1394    # remove ./ or ../ or double slashes. Still make absolute
1395    # pathnames relative.
1396    # For details see bug #6054.
1397    def _test_pathname(self, path, cmp_path=None, dir=False):
1398        # Create a tarfile with an empty member named path
1399        # and compare the stored name with the original.
1400        foo = os.path.join(TEMPDIR, "foo")
1401        if not dir:
1402            os_helper.create_empty_file(foo)
1403        else:
1404            os.mkdir(foo)
1405
1406        tar = tarfile.open(tmpname, self.mode)
1407        try:
1408            tar.add(foo, arcname=path)
1409        finally:
1410            tar.close()
1411
1412        tar = tarfile.open(tmpname, "r")
1413        try:
1414            t = tar.next()
1415        finally:
1416            tar.close()
1417
1418        if not dir:
1419            os_helper.unlink(foo)
1420        else:
1421            os_helper.rmdir(foo)
1422
1423        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1424
1425
1426    @os_helper.skip_unless_symlink
1427    def test_extractall_symlinks(self):
1428        # Test if extractall works properly when tarfile contains symlinks
1429        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1430        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1431        os.mkdir(tempdir)
1432        try:
1433            source_file = os.path.join(tempdir,'source')
1434            target_file = os.path.join(tempdir,'symlink')
1435            with open(source_file,'w') as f:
1436                f.write('something\n')
1437            os.symlink(source_file, target_file)
1438            with tarfile.open(temparchive, 'w') as tar:
1439                tar.add(source_file, arcname="source")
1440                tar.add(target_file, arcname="symlink")
1441            # Let's extract it to the location which contains the symlink
1442            with tarfile.open(temparchive, errorlevel=2) as tar:
1443                # this should not raise OSError: [Errno 17] File exists
1444                try:
1445                    tar.extractall(path=tempdir,
1446                                   filter='fully_trusted')
1447                except OSError:
1448                    self.fail("extractall failed with symlinked files")
1449        finally:
1450            os_helper.unlink(temparchive)
1451            os_helper.rmtree(tempdir)
1452
1453    def test_pathnames(self):
1454        self._test_pathname("foo")
1455        self._test_pathname(os.path.join("foo", ".", "bar"))
1456        self._test_pathname(os.path.join("foo", "..", "bar"))
1457        self._test_pathname(os.path.join(".", "foo"))
1458        self._test_pathname(os.path.join(".", "foo", "."))
1459        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1460        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1461        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1462        self._test_pathname(os.path.join("..", "foo"))
1463        self._test_pathname(os.path.join("..", "foo", ".."))
1464        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1465        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1466
1467        self._test_pathname("foo" + os.sep + os.sep + "bar")
1468        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1469
1470    def test_abs_pathnames(self):
1471        if sys.platform == "win32":
1472            self._test_pathname("C:\\foo", "foo")
1473        else:
1474            self._test_pathname("/foo", "foo")
1475            self._test_pathname("///foo", "foo")
1476
1477    def test_cwd(self):
1478        # Test adding the current working directory.
1479        with os_helper.change_cwd(TEMPDIR):
1480            tar = tarfile.open(tmpname, self.mode)
1481            try:
1482                tar.add(".")
1483            finally:
1484                tar.close()
1485
1486            tar = tarfile.open(tmpname, "r")
1487            try:
1488                for t in tar:
1489                    if t.name != ".":
1490                        self.assertTrue(t.name.startswith("./"), t.name)
1491            finally:
1492                tar.close()
1493
1494    def test_open_nonwritable_fileobj(self):
1495        for exctype in OSError, EOFError, RuntimeError:
1496            class BadFile(io.BytesIO):
1497                first = True
1498                def write(self, data):
1499                    if self.first:
1500                        self.first = False
1501                        raise exctype
1502
1503            f = BadFile()
1504            with self.assertRaises(exctype):
1505                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1506                                   format=tarfile.PAX_FORMAT,
1507                                   pax_headers={'non': 'empty'})
1508            self.assertFalse(f.closed)
1509
1510
1511class GzipWriteTest(GzipTest, WriteTest):
1512    pass
1513
1514
1515class Bz2WriteTest(Bz2Test, WriteTest):
1516    pass
1517
1518
1519class LzmaWriteTest(LzmaTest, WriteTest):
1520    pass
1521
1522
1523class StreamWriteTest(WriteTestBase, unittest.TestCase):
1524
1525    prefix = "w|"
1526    decompressor = None
1527
1528    def test_stream_padding(self):
1529        # Test for bug #1543303.
1530        tar = tarfile.open(tmpname, self.mode)
1531        tar.close()
1532        if self.decompressor:
1533            dec = self.decompressor()
1534            with open(tmpname, "rb") as fobj:
1535                data = fobj.read()
1536            data = dec.decompress(data)
1537            self.assertFalse(dec.unused_data, "found trailing data")
1538        else:
1539            with self.open(tmpname) as fobj:
1540                data = fobj.read()
1541        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1542                        "incorrect zero padding")
1543
1544    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1545                         "Missing umask implementation")
1546    @unittest.skipIf(
1547        support.is_emscripten or support.is_wasi,
1548        "Emscripten's/WASI's umask is a stub."
1549    )
1550    def test_file_mode(self):
1551        # Test for issue #8464: Create files with correct
1552        # permissions.
1553        if os.path.exists(tmpname):
1554            os_helper.unlink(tmpname)
1555
1556        original_umask = os.umask(0o022)
1557        try:
1558            tar = tarfile.open(tmpname, self.mode)
1559            tar.close()
1560            mode = os.stat(tmpname).st_mode & 0o777
1561            self.assertEqual(mode, 0o644, "wrong file permissions")
1562        finally:
1563            os.umask(original_umask)
1564
1565
1566class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1567    def test_source_directory_not_leaked(self):
1568        """
1569        Ensure the source directory is not included in the tar header
1570        per bpo-41316.
1571        """
1572        tarfile.open(tmpname, self.mode).close()
1573        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1574        assert os.path.dirname(tmpname) not in payload
1575
1576
1577class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1578    decompressor = bz2.BZ2Decompressor if bz2 else None
1579
1580class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1581    decompressor = lzma.LZMADecompressor if lzma else None
1582
1583
1584class GNUWriteTest(unittest.TestCase):
1585    # This testcase checks for correct creation of GNU Longname
1586    # and Longlink extended headers (cp. bug #812325).
1587
1588    def _length(self, s):
1589        blocks = len(s) // 512 + 1
1590        return blocks * 512
1591
1592    def _calc_size(self, name, link=None):
1593        # Initial tar header
1594        count = 512
1595
1596        if len(name) > tarfile.LENGTH_NAME:
1597            # GNU longname extended header + longname
1598            count += 512
1599            count += self._length(name)
1600        if link is not None and len(link) > tarfile.LENGTH_LINK:
1601            # GNU longlink extended header + longlink
1602            count += 512
1603            count += self._length(link)
1604        return count
1605
1606    def _test(self, name, link=None):
1607        tarinfo = tarfile.TarInfo(name)
1608        if link:
1609            tarinfo.linkname = link
1610            tarinfo.type = tarfile.LNKTYPE
1611
1612        tar = tarfile.open(tmpname, "w")
1613        try:
1614            tar.format = tarfile.GNU_FORMAT
1615            tar.addfile(tarinfo)
1616
1617            v1 = self._calc_size(name, link)
1618            v2 = tar.offset
1619            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1620        finally:
1621            tar.close()
1622
1623        tar = tarfile.open(tmpname)
1624        try:
1625            member = tar.next()
1626            self.assertIsNotNone(member,
1627                    "unable to read longname member")
1628            self.assertEqual(tarinfo.name, member.name,
1629                    "unable to read longname member")
1630            self.assertEqual(tarinfo.linkname, member.linkname,
1631                    "unable to read longname member")
1632        finally:
1633            tar.close()
1634
1635    def test_longname_1023(self):
1636        self._test(("longnam/" * 127) + "longnam")
1637
1638    def test_longname_1024(self):
1639        self._test(("longnam/" * 127) + "longname")
1640
1641    def test_longname_1025(self):
1642        self._test(("longnam/" * 127) + "longname_")
1643
1644    def test_longlink_1023(self):
1645        self._test("name", ("longlnk/" * 127) + "longlnk")
1646
1647    def test_longlink_1024(self):
1648        self._test("name", ("longlnk/" * 127) + "longlink")
1649
1650    def test_longlink_1025(self):
1651        self._test("name", ("longlnk/" * 127) + "longlink_")
1652
1653    def test_longnamelink_1023(self):
1654        self._test(("longnam/" * 127) + "longnam",
1655                   ("longlnk/" * 127) + "longlnk")
1656
1657    def test_longnamelink_1024(self):
1658        self._test(("longnam/" * 127) + "longname",
1659                   ("longlnk/" * 127) + "longlink")
1660
1661    def test_longnamelink_1025(self):
1662        self._test(("longnam/" * 127) + "longname_",
1663                   ("longlnk/" * 127) + "longlink_")
1664
1665
1666class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1667
1668    prefix = "w:"
1669
1670    def test_headers_written_only_for_device_files(self):
1671        # Regression test for bpo-18819.
1672        tempdir = os.path.join(TEMPDIR, "device_header_test")
1673        os.mkdir(tempdir)
1674        try:
1675            tar = tarfile.open(tmpname, self.mode)
1676            try:
1677                input_blk = tarfile.TarInfo(name="my_block_device")
1678                input_reg = tarfile.TarInfo(name="my_regular_file")
1679                input_blk.type = tarfile.BLKTYPE
1680                input_reg.type = tarfile.REGTYPE
1681                tar.addfile(input_blk)
1682                tar.addfile(input_reg)
1683            finally:
1684                tar.close()
1685
1686            # devmajor and devminor should be *interpreted* as 0 in both...
1687            tar = tarfile.open(tmpname, "r")
1688            try:
1689                output_blk = tar.getmember("my_block_device")
1690                output_reg = tar.getmember("my_regular_file")
1691            finally:
1692                tar.close()
1693            self.assertEqual(output_blk.devmajor, 0)
1694            self.assertEqual(output_blk.devminor, 0)
1695            self.assertEqual(output_reg.devmajor, 0)
1696            self.assertEqual(output_reg.devminor, 0)
1697
1698            # ...but the fields should not actually be set on regular files:
1699            with open(tmpname, "rb") as infile:
1700                buf = infile.read()
1701            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1702            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1703            # See `struct posixheader` in GNU docs for byte offsets:
1704            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1705            device_headers = slice(329, 329 + 16)
1706            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1707            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1708        finally:
1709            os_helper.rmtree(tempdir)
1710
1711
1712class CreateTest(WriteTestBase, unittest.TestCase):
1713
1714    prefix = "x:"
1715
1716    file_path = os.path.join(TEMPDIR, "spameggs42")
1717
1718    def setUp(self):
1719        os_helper.unlink(tmpname)
1720
1721    @classmethod
1722    def setUpClass(cls):
1723        with open(cls.file_path, "wb") as fobj:
1724            fobj.write(b"aaa")
1725
1726    @classmethod
1727    def tearDownClass(cls):
1728        os_helper.unlink(cls.file_path)
1729
1730    def test_create(self):
1731        with tarfile.open(tmpname, self.mode) as tobj:
1732            tobj.add(self.file_path)
1733
1734        with self.taropen(tmpname) as tobj:
1735            names = tobj.getnames()
1736        self.assertEqual(len(names), 1)
1737        self.assertIn('spameggs42', names[0])
1738
1739    def test_create_existing(self):
1740        with tarfile.open(tmpname, self.mode) as tobj:
1741            tobj.add(self.file_path)
1742
1743        with self.assertRaises(FileExistsError):
1744            tobj = tarfile.open(tmpname, self.mode)
1745
1746        with self.taropen(tmpname) as tobj:
1747            names = tobj.getnames()
1748        self.assertEqual(len(names), 1)
1749        self.assertIn('spameggs42', names[0])
1750
1751    def test_create_taropen(self):
1752        with self.taropen(tmpname, "x") as tobj:
1753            tobj.add(self.file_path)
1754
1755        with self.taropen(tmpname) as tobj:
1756            names = tobj.getnames()
1757        self.assertEqual(len(names), 1)
1758        self.assertIn('spameggs42', names[0])
1759
1760    def test_create_existing_taropen(self):
1761        with self.taropen(tmpname, "x") as tobj:
1762            tobj.add(self.file_path)
1763
1764        with self.assertRaises(FileExistsError):
1765            with self.taropen(tmpname, "x"):
1766                pass
1767
1768        with self.taropen(tmpname) as tobj:
1769            names = tobj.getnames()
1770        self.assertEqual(len(names), 1)
1771        self.assertIn("spameggs42", names[0])
1772
1773    def test_create_pathlike_name(self):
1774        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1775            self.assertIsInstance(tobj.name, str)
1776            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1777            tobj.add(pathlib.Path(self.file_path))
1778            names = tobj.getnames()
1779        self.assertEqual(len(names), 1)
1780        self.assertIn('spameggs42', names[0])
1781
1782        with self.taropen(tmpname) as tobj:
1783            names = tobj.getnames()
1784        self.assertEqual(len(names), 1)
1785        self.assertIn('spameggs42', names[0])
1786
1787    def test_create_taropen_pathlike_name(self):
1788        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1789            self.assertIsInstance(tobj.name, str)
1790            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1791            tobj.add(pathlib.Path(self.file_path))
1792            names = tobj.getnames()
1793        self.assertEqual(len(names), 1)
1794        self.assertIn('spameggs42', names[0])
1795
1796        with self.taropen(tmpname) as tobj:
1797            names = tobj.getnames()
1798        self.assertEqual(len(names), 1)
1799        self.assertIn('spameggs42', names[0])
1800
1801
1802class GzipCreateTest(GzipTest, CreateTest):
1803
1804    def test_create_with_compresslevel(self):
1805        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1806            tobj.add(self.file_path)
1807        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
1808            pass
1809
1810
1811class Bz2CreateTest(Bz2Test, CreateTest):
1812
1813    def test_create_with_compresslevel(self):
1814        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1815            tobj.add(self.file_path)
1816        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
1817            pass
1818
1819
1820class LzmaCreateTest(LzmaTest, CreateTest):
1821
1822    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
1823    # It does not allow for preset to be specified when reading.
1824    def test_create_with_preset(self):
1825        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
1826            tobj.add(self.file_path)
1827
1828
1829class CreateWithXModeTest(CreateTest):
1830
1831    prefix = "x"
1832
1833    test_create_taropen = None
1834    test_create_existing_taropen = None
1835
1836
1837@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1838class HardlinkTest(unittest.TestCase):
1839    # Test the creation of LNKTYPE (hardlink) members in an archive.
1840
1841    def setUp(self):
1842        self.foo = os.path.join(TEMPDIR, "foo")
1843        self.bar = os.path.join(TEMPDIR, "bar")
1844
1845        with open(self.foo, "wb") as fobj:
1846            fobj.write(b"foo")
1847
1848        try:
1849            os.link(self.foo, self.bar)
1850        except PermissionError as e:
1851            self.skipTest('os.link(): %s' % e)
1852
1853        self.tar = tarfile.open(tmpname, "w")
1854        self.tar.add(self.foo)
1855
1856    def tearDown(self):
1857        self.tar.close()
1858        os_helper.unlink(self.foo)
1859        os_helper.unlink(self.bar)
1860
1861    def test_add_twice(self):
1862        # The same name will be added as a REGTYPE every
1863        # time regardless of st_nlink.
1864        tarinfo = self.tar.gettarinfo(self.foo)
1865        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1866                "add file as regular failed")
1867
1868    def test_add_hardlink(self):
1869        tarinfo = self.tar.gettarinfo(self.bar)
1870        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1871                "add file as hardlink failed")
1872
1873    def test_dereference_hardlink(self):
1874        self.tar.dereference = True
1875        tarinfo = self.tar.gettarinfo(self.bar)
1876        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1877                "dereferencing hardlink failed")
1878
1879
1880class PaxWriteTest(GNUWriteTest):
1881
1882    def _test(self, name, link=None):
1883        # See GNUWriteTest.
1884        tarinfo = tarfile.TarInfo(name)
1885        if link:
1886            tarinfo.linkname = link
1887            tarinfo.type = tarfile.LNKTYPE
1888
1889        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1890        try:
1891            tar.addfile(tarinfo)
1892        finally:
1893            tar.close()
1894
1895        tar = tarfile.open(tmpname)
1896        try:
1897            if link:
1898                l = tar.getmembers()[0].linkname
1899                self.assertEqual(link, l, "PAX longlink creation failed")
1900            else:
1901                n = tar.getmembers()[0].name
1902                self.assertEqual(name, n, "PAX longname creation failed")
1903        finally:
1904            tar.close()
1905
1906    def test_pax_global_header(self):
1907        pax_headers = {
1908                "foo": "bar",
1909                "uid": "0",
1910                "mtime": "1.23",
1911                "test": "\xe4\xf6\xfc",
1912                "\xe4\xf6\xfc": "test"}
1913
1914        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1915                pax_headers=pax_headers)
1916        try:
1917            tar.addfile(tarfile.TarInfo("test"))
1918        finally:
1919            tar.close()
1920
1921        # Test if the global header was written correctly.
1922        tar = tarfile.open(tmpname, encoding="iso8859-1")
1923        try:
1924            self.assertEqual(tar.pax_headers, pax_headers)
1925            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1926            # Test if all the fields are strings.
1927            for key, val in tar.pax_headers.items():
1928                self.assertIsNot(type(key), bytes)
1929                self.assertIsNot(type(val), bytes)
1930                if key in tarfile.PAX_NUMBER_FIELDS:
1931                    try:
1932                        tarfile.PAX_NUMBER_FIELDS[key](val)
1933                    except (TypeError, ValueError):
1934                        self.fail("unable to convert pax header field")
1935        finally:
1936            tar.close()
1937
1938    def test_pax_extended_header(self):
1939        # The fields from the pax header have priority over the
1940        # TarInfo.
1941        pax_headers = {"path": "foo", "uid": "123"}
1942
1943        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1944                           encoding="iso8859-1")
1945        try:
1946            t = tarfile.TarInfo()
1947            t.name = "\xe4\xf6\xfc" # non-ASCII
1948            t.uid = 8**8 # too large
1949            t.pax_headers = pax_headers
1950            tar.addfile(t)
1951        finally:
1952            tar.close()
1953
1954        tar = tarfile.open(tmpname, encoding="iso8859-1")
1955        try:
1956            t = tar.getmembers()[0]
1957            self.assertEqual(t.pax_headers, pax_headers)
1958            self.assertEqual(t.name, "foo")
1959            self.assertEqual(t.uid, 123)
1960        finally:
1961            tar.close()
1962
1963    def test_create_pax_header(self):
1964        # The ustar header should contain values that can be
1965        # represented reasonably, even if a better (e.g. higher
1966        # precision) version is set in the pax header.
1967        # Issue #45863
1968
1969        # values that should be kept
1970        t = tarfile.TarInfo()
1971        t.name = "foo"
1972        t.mtime = 1000.1
1973        t.size = 100
1974        t.uid = 123
1975        t.gid = 124
1976        info = t.get_info()
1977        header = t.create_pax_header(info, encoding="iso8859-1")
1978        self.assertEqual(info['name'], "foo")
1979        # mtime should be rounded to nearest second
1980        self.assertIsInstance(info['mtime'], int)
1981        self.assertEqual(info['mtime'], 1000)
1982        self.assertEqual(info['size'], 100)
1983        self.assertEqual(info['uid'], 123)
1984        self.assertEqual(info['gid'], 124)
1985        self.assertEqual(header,
1986            b'././@PaxHeader' + bytes(86) \
1987            + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \
1988            + bytes(100) + b'ustar\x0000'+ bytes(247) \
1989            + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \
1990            + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \
1991            + bytes(100) + b'ustar\x0000' + bytes(247))
1992
1993        # values that should be changed
1994        t = tarfile.TarInfo()
1995        t.name = "foo\u3374" # can't be represented in ascii
1996        t.mtime = 10**10 # too big
1997        t.size = 10**10 # too big
1998        t.uid = 8**8 # too big
1999        t.gid = 8**8+1 # too big
2000        info = t.get_info()
2001        header = t.create_pax_header(info, encoding="iso8859-1")
2002        # name is kept as-is in info but should be added to pax header
2003        self.assertEqual(info['name'], "foo\u3374")
2004        self.assertEqual(info['mtime'], 0)
2005        self.assertEqual(info['size'], 0)
2006        self.assertEqual(info['uid'], 0)
2007        self.assertEqual(info['gid'], 0)
2008        self.assertEqual(header,
2009            b'././@PaxHeader' + bytes(86) \
2010            + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \
2011            + bytes(100) + b'ustar\x0000' + bytes(247) \
2012            + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \
2013            + b'16 gid=16777217\n20 size=10000000000\n' \
2014            + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \
2015            + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \
2016            + bytes(100) + b'ustar\x0000' + bytes(247))
2017
2018
2019class UnicodeTest:
2020
2021    def test_iso8859_1_filename(self):
2022        self._test_unicode_filename("iso8859-1")
2023
2024    def test_utf7_filename(self):
2025        self._test_unicode_filename("utf7")
2026
2027    def test_utf8_filename(self):
2028        self._test_unicode_filename("utf-8")
2029
2030    def _test_unicode_filename(self, encoding):
2031        tar = tarfile.open(tmpname, "w", format=self.format,
2032                           encoding=encoding, errors="strict")
2033        try:
2034            name = "\xe4\xf6\xfc"
2035            tar.addfile(tarfile.TarInfo(name))
2036        finally:
2037            tar.close()
2038
2039        tar = tarfile.open(tmpname, encoding=encoding)
2040        try:
2041            self.assertEqual(tar.getmembers()[0].name, name)
2042        finally:
2043            tar.close()
2044
2045    def test_unicode_filename_error(self):
2046        tar = tarfile.open(tmpname, "w", format=self.format,
2047                           encoding="ascii", errors="strict")
2048        try:
2049            tarinfo = tarfile.TarInfo()
2050
2051            tarinfo.name = "\xe4\xf6\xfc"
2052            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2053
2054            tarinfo.name = "foo"
2055            tarinfo.uname = "\xe4\xf6\xfc"
2056            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2057        finally:
2058            tar.close()
2059
2060    def test_unicode_argument(self):
2061        tar = tarfile.open(tarname, "r",
2062                           encoding="iso8859-1", errors="strict")
2063        try:
2064            for t in tar:
2065                self.assertIs(type(t.name), str)
2066                self.assertIs(type(t.linkname), str)
2067                self.assertIs(type(t.uname), str)
2068                self.assertIs(type(t.gname), str)
2069        finally:
2070            tar.close()
2071
2072    def test_uname_unicode(self):
2073        t = tarfile.TarInfo("foo")
2074        t.uname = "\xe4\xf6\xfc"
2075        t.gname = "\xe4\xf6\xfc"
2076
2077        tar = tarfile.open(tmpname, mode="w", format=self.format,
2078                           encoding="iso8859-1")
2079        try:
2080            tar.addfile(t)
2081        finally:
2082            tar.close()
2083
2084        tar = tarfile.open(tmpname, encoding="iso8859-1")
2085        try:
2086            t = tar.getmember("foo")
2087            self.assertEqual(t.uname, "\xe4\xf6\xfc")
2088            self.assertEqual(t.gname, "\xe4\xf6\xfc")
2089
2090            if self.format != tarfile.PAX_FORMAT:
2091                tar.close()
2092                tar = tarfile.open(tmpname, encoding="ascii")
2093                t = tar.getmember("foo")
2094                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
2095                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
2096        finally:
2097            tar.close()
2098
2099
2100class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
2101
2102    format = tarfile.USTAR_FORMAT
2103
2104    # Test whether the utf-8 encoded version of a filename exceeds the 100
2105    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
2106    # bytes).
2107    def test_unicode_name1(self):
2108        self._test_ustar_name("0123456789" * 10)
2109        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
2110        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
2111        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
2112
2113    def test_unicode_name2(self):
2114        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
2115        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
2116
2117    # Test whether the utf-8 encoded version of a filename exceeds the 155
2118    # bytes prefix + '/' + 100 bytes name limit.
2119    def test_unicode_longname1(self):
2120        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
2121        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
2122        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
2123        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
2124
2125    def test_unicode_longname2(self):
2126        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
2127        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
2128
2129    def test_unicode_longname3(self):
2130        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
2131        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
2132        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
2133
2134    def test_unicode_longname4(self):
2135        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2136        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2137
2138    def _test_ustar_name(self, name, exc=None):
2139        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2140            t = tarfile.TarInfo(name)
2141            if exc is None:
2142                tar.addfile(t)
2143            else:
2144                self.assertRaises(exc, tar.addfile, t)
2145
2146        if exc is None:
2147            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2148                for t in tar:
2149                    self.assertEqual(name, t.name)
2150                    break
2151
2152    # Test the same as above for the 100 bytes link field.
2153    def test_unicode_link1(self):
2154        self._test_ustar_link("0123456789" * 10)
2155        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2156        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2157        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2158
2159    def test_unicode_link2(self):
2160        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2161        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2162
2163    def _test_ustar_link(self, name, exc=None):
2164        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2165            t = tarfile.TarInfo("foo")
2166            t.linkname = name
2167            if exc is None:
2168                tar.addfile(t)
2169            else:
2170                self.assertRaises(exc, tar.addfile, t)
2171
2172        if exc is None:
2173            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2174                for t in tar:
2175                    self.assertEqual(name, t.linkname)
2176                    break
2177
2178
2179class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2180
2181    format = tarfile.GNU_FORMAT
2182
2183    def test_bad_pax_header(self):
2184        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2185        # without a hdrcharset=BINARY header.
2186        for encoding, name in (
2187                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2188                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2189            with tarfile.open(tarname, encoding=encoding,
2190                              errors="surrogateescape") as tar:
2191                try:
2192                    t = tar.getmember(name)
2193                except KeyError:
2194                    self.fail("unable to read bad GNU tar pax header")
2195
2196
2197class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2198
2199    format = tarfile.PAX_FORMAT
2200
2201    # PAX_FORMAT ignores encoding in write mode.
2202    test_unicode_filename_error = None
2203
2204    def test_binary_header(self):
2205        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2206        for encoding, name in (
2207                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2208                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2209            with tarfile.open(tarname, encoding=encoding,
2210                              errors="surrogateescape") as tar:
2211                try:
2212                    t = tar.getmember(name)
2213                except KeyError:
2214                    self.fail("unable to read POSIX.1-2008 binary header")
2215
2216
2217class AppendTestBase:
2218    # Test append mode (cp. patch #1652681).
2219
2220    def setUp(self):
2221        self.tarname = tmpname
2222        if os.path.exists(self.tarname):
2223            os_helper.unlink(self.tarname)
2224
2225    def _create_testtar(self, mode="w:"):
2226        with tarfile.open(tarname, encoding="iso8859-1") as src:
2227            t = src.getmember("ustar/regtype")
2228            t.name = "foo"
2229            with src.extractfile(t) as f:
2230                with tarfile.open(self.tarname, mode) as tar:
2231                    tar.addfile(t, f)
2232
2233    def test_append_compressed(self):
2234        self._create_testtar("w:" + self.suffix)
2235        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2236
2237class AppendTest(AppendTestBase, unittest.TestCase):
2238    test_append_compressed = None
2239
2240    def _add_testfile(self, fileobj=None):
2241        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2242            tar.addfile(tarfile.TarInfo("bar"))
2243
2244    def _test(self, names=["bar"], fileobj=None):
2245        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2246            self.assertEqual(tar.getnames(), names)
2247
2248    def test_non_existing(self):
2249        self._add_testfile()
2250        self._test()
2251
2252    def test_empty(self):
2253        tarfile.open(self.tarname, "w:").close()
2254        self._add_testfile()
2255        self._test()
2256
2257    def test_empty_fileobj(self):
2258        fobj = io.BytesIO(b"\0" * 1024)
2259        self._add_testfile(fobj)
2260        fobj.seek(0)
2261        self._test(fileobj=fobj)
2262
2263    def test_fileobj(self):
2264        self._create_testtar()
2265        with open(self.tarname, "rb") as fobj:
2266            data = fobj.read()
2267        fobj = io.BytesIO(data)
2268        self._add_testfile(fobj)
2269        fobj.seek(0)
2270        self._test(names=["foo", "bar"], fileobj=fobj)
2271
2272    def test_existing(self):
2273        self._create_testtar()
2274        self._add_testfile()
2275        self._test(names=["foo", "bar"])
2276
2277    # Append mode is supposed to fail if the tarfile to append to
2278    # does not end with a zero block.
2279    def _test_error(self, data):
2280        with open(self.tarname, "wb") as fobj:
2281            fobj.write(data)
2282        self.assertRaises(tarfile.ReadError, self._add_testfile)
2283
2284    def test_null(self):
2285        self._test_error(b"")
2286
2287    def test_incomplete(self):
2288        self._test_error(b"\0" * 13)
2289
2290    def test_premature_eof(self):
2291        data = tarfile.TarInfo("foo").tobuf()
2292        self._test_error(data)
2293
2294    def test_trailing_garbage(self):
2295        data = tarfile.TarInfo("foo").tobuf()
2296        self._test_error(data + b"\0" * 13)
2297
2298    def test_invalid(self):
2299        self._test_error(b"a" * 512)
2300
2301class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2302    pass
2303
2304class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2305    pass
2306
2307class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2308    pass
2309
2310
2311class LimitsTest(unittest.TestCase):
2312
2313    def test_ustar_limits(self):
2314        # 100 char name
2315        tarinfo = tarfile.TarInfo("0123456789" * 10)
2316        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2317
2318        # 101 char name that cannot be stored
2319        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2320        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2321
2322        # 256 char name with a slash at pos 156
2323        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2324        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2325
2326        # 256 char name that cannot be stored
2327        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2328        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2329
2330        # 512 char name
2331        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2332        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2333
2334        # 512 char linkname
2335        tarinfo = tarfile.TarInfo("longlink")
2336        tarinfo.linkname = "123/" * 126 + "longname"
2337        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2338
2339        # uid > 8 digits
2340        tarinfo = tarfile.TarInfo("name")
2341        tarinfo.uid = 0o10000000
2342        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2343
2344    def test_gnu_limits(self):
2345        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2346        tarinfo.tobuf(tarfile.GNU_FORMAT)
2347
2348        tarinfo = tarfile.TarInfo("longlink")
2349        tarinfo.linkname = "123/" * 126 + "longname"
2350        tarinfo.tobuf(tarfile.GNU_FORMAT)
2351
2352        # uid >= 256 ** 7
2353        tarinfo = tarfile.TarInfo("name")
2354        tarinfo.uid = 0o4000000000000000000
2355        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2356
2357    def test_pax_limits(self):
2358        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2359        tarinfo.tobuf(tarfile.PAX_FORMAT)
2360
2361        tarinfo = tarfile.TarInfo("longlink")
2362        tarinfo.linkname = "123/" * 126 + "longname"
2363        tarinfo.tobuf(tarfile.PAX_FORMAT)
2364
2365        tarinfo = tarfile.TarInfo("name")
2366        tarinfo.uid = 0o4000000000000000000
2367        tarinfo.tobuf(tarfile.PAX_FORMAT)
2368
2369
2370class MiscTest(unittest.TestCase):
2371
2372    def test_char_fields(self):
2373        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2374                         b"foo\0\0\0\0\0")
2375        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2376                         b"foo")
2377        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2378                         "foo")
2379        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2380                         "foo")
2381
2382    def test_read_number_fields(self):
2383        # Issue 13158: Test if GNU tar specific base-256 number fields
2384        # are decoded correctly.
2385        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2386        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2387        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2388                         0o10000000)
2389        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2390                         0xffffffff)
2391        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2392                         -1)
2393        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2394                         -100)
2395        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2396                         -0x100000000000000)
2397
2398        # Issue 24514: Test if empty number fields are converted to zero.
2399        self.assertEqual(tarfile.nti(b"\0"), 0)
2400        self.assertEqual(tarfile.nti(b"       \0"), 0)
2401
2402    def test_write_number_fields(self):
2403        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2404        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2405        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2406                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2407        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2408                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2409        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2410                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2411        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2412                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2413        self.assertEqual(tarfile.itn(-0x100000000000000,
2414                                     format=tarfile.GNU_FORMAT),
2415                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2416
2417        # Issue 32713: Test if itn() supports float values outside the
2418        # non-GNU format range
2419        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2420                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2421        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2422                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2423        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2424
2425    def test_number_field_limits(self):
2426        with self.assertRaises(ValueError):
2427            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2428        with self.assertRaises(ValueError):
2429            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2430        with self.assertRaises(ValueError):
2431            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2432        with self.assertRaises(ValueError):
2433            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2434
2435    def test__all__(self):
2436        not_exported = {
2437            'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE',
2438            'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME',
2439            'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2440            'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE',
2441            'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE',
2442            'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES',
2443            'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS',
2444            'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums',
2445            'copyfileobj', 'filemode', 'EmptyHeaderError',
2446            'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError',
2447            'SubsequentHeaderError', 'ExFileObject', 'main',
2448            "fully_trusted_filter", "data_filter",
2449            "tar_filter", "FilterError", "AbsoluteLinkError",
2450            "OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
2451            "LinkOutsideDestinationError",
2452            }
2453        support.check__all__(self, tarfile, not_exported=not_exported)
2454
2455    def test_useful_error_message_when_modules_missing(self):
2456        fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz')
2457        with self.assertRaises(tarfile.ReadError) as excinfo:
2458            error = tarfile.CompressionError('lzma module is not available'),
2459            with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error):
2460                tarfile.open(fname)
2461
2462        self.assertIn(
2463            "\n- method xz: CompressionError('lzma module is not available')\n",
2464            str(excinfo.exception),
2465        )
2466
2467
2468class CommandLineTest(unittest.TestCase):
2469
2470    def tarfilecmd(self, *args, **kwargs):
2471        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2472                                                      **kwargs)
2473        return out.replace(os.linesep.encode(), b'\n')
2474
2475    def tarfilecmd_failure(self, *args):
2476        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2477
2478    def make_simple_tarfile(self, tar_name):
2479        files = [support.findfile('tokenize_tests.txt'),
2480                 support.findfile('tokenize_tests-no-coding-cookie-'
2481                                  'and-utf8-bom-sig-only.txt')]
2482        self.addCleanup(os_helper.unlink, tar_name)
2483        with tarfile.open(tar_name, 'w') as tf:
2484            for tardata in files:
2485                tf.add(tardata, arcname=os.path.basename(tardata))
2486
2487    def make_evil_tarfile(self, tar_name):
2488        files = [support.findfile('tokenize_tests.txt')]
2489        self.addCleanup(os_helper.unlink, tar_name)
2490        with tarfile.open(tar_name, 'w') as tf:
2491            benign = tarfile.TarInfo('benign')
2492            tf.addfile(benign, fileobj=io.BytesIO(b''))
2493            evil = tarfile.TarInfo('../evil')
2494            tf.addfile(evil, fileobj=io.BytesIO(b''))
2495
2496    def test_bad_use(self):
2497        rc, out, err = self.tarfilecmd_failure()
2498        self.assertEqual(out, b'')
2499        self.assertIn(b'usage', err.lower())
2500        self.assertIn(b'error', err.lower())
2501        self.assertIn(b'required', err.lower())
2502        rc, out, err = self.tarfilecmd_failure('-l', '')
2503        self.assertEqual(out, b'')
2504        self.assertNotEqual(err.strip(), b'')
2505
2506    def test_test_command(self):
2507        for tar_name in testtarnames:
2508            for opt in '-t', '--test':
2509                out = self.tarfilecmd(opt, tar_name)
2510                self.assertEqual(out, b'')
2511
2512    def test_test_command_verbose(self):
2513        for tar_name in testtarnames:
2514            for opt in '-v', '--verbose':
2515                out = self.tarfilecmd(opt, '-t', tar_name,
2516                                      PYTHONIOENCODING='utf-8')
2517                self.assertIn(b'is a tar archive.\n', out)
2518
2519    def test_test_command_invalid_file(self):
2520        zipname = support.findfile('zipdir.zip')
2521        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2522        self.assertIn(b' is not a tar archive.', err)
2523        self.assertEqual(out, b'')
2524        self.assertEqual(rc, 1)
2525
2526        for tar_name in testtarnames:
2527            with self.subTest(tar_name=tar_name):
2528                with open(tar_name, 'rb') as f:
2529                    data = f.read()
2530                try:
2531                    with open(tmpname, 'wb') as f:
2532                        f.write(data[:511])
2533                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2534                    self.assertEqual(out, b'')
2535                    self.assertEqual(rc, 1)
2536                finally:
2537                    os_helper.unlink(tmpname)
2538
2539    def test_list_command(self):
2540        for tar_name in testtarnames:
2541            with support.captured_stdout() as t:
2542                with tarfile.open(tar_name, 'r') as tf:
2543                    tf.list(verbose=False)
2544            expected = t.getvalue().encode('ascii', 'backslashreplace')
2545            for opt in '-l', '--list':
2546                out = self.tarfilecmd(opt, tar_name,
2547                                      PYTHONIOENCODING='ascii')
2548                self.assertEqual(out, expected)
2549
2550    def test_list_command_verbose(self):
2551        for tar_name in testtarnames:
2552            with support.captured_stdout() as t:
2553                with tarfile.open(tar_name, 'r') as tf:
2554                    tf.list(verbose=True)
2555            expected = t.getvalue().encode('ascii', 'backslashreplace')
2556            for opt in '-v', '--verbose':
2557                out = self.tarfilecmd(opt, '-l', tar_name,
2558                                      PYTHONIOENCODING='ascii')
2559                self.assertEqual(out, expected)
2560
2561    def test_list_command_invalid_file(self):
2562        zipname = support.findfile('zipdir.zip')
2563        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2564        self.assertIn(b' is not a tar archive.', err)
2565        self.assertEqual(out, b'')
2566        self.assertEqual(rc, 1)
2567
2568    def test_create_command(self):
2569        files = [support.findfile('tokenize_tests.txt'),
2570                 support.findfile('tokenize_tests-no-coding-cookie-'
2571                                  'and-utf8-bom-sig-only.txt')]
2572        for opt in '-c', '--create':
2573            try:
2574                out = self.tarfilecmd(opt, tmpname, *files)
2575                self.assertEqual(out, b'')
2576                with tarfile.open(tmpname) as tar:
2577                    tar.getmembers()
2578            finally:
2579                os_helper.unlink(tmpname)
2580
2581    def test_create_command_verbose(self):
2582        files = [support.findfile('tokenize_tests.txt'),
2583                 support.findfile('tokenize_tests-no-coding-cookie-'
2584                                  'and-utf8-bom-sig-only.txt')]
2585        for opt in '-v', '--verbose':
2586            try:
2587                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2588                                      PYTHONIOENCODING='utf-8')
2589                self.assertIn(b' file created.', out)
2590                with tarfile.open(tmpname) as tar:
2591                    tar.getmembers()
2592            finally:
2593                os_helper.unlink(tmpname)
2594
2595    def test_create_command_dotless_filename(self):
2596        files = [support.findfile('tokenize_tests.txt')]
2597        try:
2598            out = self.tarfilecmd('-c', dotlessname, *files)
2599            self.assertEqual(out, b'')
2600            with tarfile.open(dotlessname) as tar:
2601                tar.getmembers()
2602        finally:
2603            os_helper.unlink(dotlessname)
2604
2605    def test_create_command_dot_started_filename(self):
2606        tar_name = os.path.join(TEMPDIR, ".testtar")
2607        files = [support.findfile('tokenize_tests.txt')]
2608        try:
2609            out = self.tarfilecmd('-c', tar_name, *files)
2610            self.assertEqual(out, b'')
2611            with tarfile.open(tar_name) as tar:
2612                tar.getmembers()
2613        finally:
2614            os_helper.unlink(tar_name)
2615
2616    def test_create_command_compressed(self):
2617        files = [support.findfile('tokenize_tests.txt'),
2618                 support.findfile('tokenize_tests-no-coding-cookie-'
2619                                  'and-utf8-bom-sig-only.txt')]
2620        for filetype in (GzipTest, Bz2Test, LzmaTest):
2621            if not filetype.open:
2622                continue
2623            try:
2624                tar_name = tmpname + '.' + filetype.suffix
2625                out = self.tarfilecmd('-c', tar_name, *files)
2626                with filetype.taropen(tar_name) as tar:
2627                    tar.getmembers()
2628            finally:
2629                os_helper.unlink(tar_name)
2630
2631    def test_extract_command(self):
2632        self.make_simple_tarfile(tmpname)
2633        for opt in '-e', '--extract':
2634            try:
2635                with os_helper.temp_cwd(tarextdir):
2636                    out = self.tarfilecmd(opt, tmpname)
2637                self.assertEqual(out, b'')
2638            finally:
2639                os_helper.rmtree(tarextdir)
2640
2641    def test_extract_command_verbose(self):
2642        self.make_simple_tarfile(tmpname)
2643        for opt in '-v', '--verbose':
2644            try:
2645                with os_helper.temp_cwd(tarextdir):
2646                    out = self.tarfilecmd(opt, '-e', tmpname,
2647                                          PYTHONIOENCODING='utf-8')
2648                self.assertIn(b' file is extracted.', out)
2649            finally:
2650                os_helper.rmtree(tarextdir)
2651
2652    def test_extract_command_filter(self):
2653        self.make_evil_tarfile(tmpname)
2654        # Make an inner directory, so the member named '../evil'
2655        # is still extracted into `tarextdir`
2656        destdir = os.path.join(tarextdir, 'dest')
2657        os.mkdir(tarextdir)
2658        try:
2659            with os_helper.temp_cwd(destdir):
2660                self.tarfilecmd_failure('-e', tmpname,
2661                                        '-v',
2662                                        '--filter', 'data')
2663                out = self.tarfilecmd('-e', tmpname,
2664                                      '-v',
2665                                      '--filter', 'fully_trusted',
2666                                      PYTHONIOENCODING='utf-8')
2667                self.assertIn(b' file is extracted.', out)
2668        finally:
2669            os_helper.rmtree(tarextdir)
2670
2671    def test_extract_command_different_directory(self):
2672        self.make_simple_tarfile(tmpname)
2673        try:
2674            with os_helper.temp_cwd(tarextdir):
2675                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2676            self.assertEqual(out, b'')
2677        finally:
2678            os_helper.rmtree(tarextdir)
2679
2680    def test_extract_command_invalid_file(self):
2681        zipname = support.findfile('zipdir.zip')
2682        with os_helper.temp_cwd(tarextdir):
2683            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2684        self.assertIn(b' is not a tar archive.', err)
2685        self.assertEqual(out, b'')
2686        self.assertEqual(rc, 1)
2687
2688
2689class ContextManagerTest(unittest.TestCase):
2690
2691    def test_basic(self):
2692        with tarfile.open(tarname) as tar:
2693            self.assertFalse(tar.closed, "closed inside runtime context")
2694        self.assertTrue(tar.closed, "context manager failed")
2695
2696    def test_closed(self):
2697        # The __enter__() method is supposed to raise OSError
2698        # if the TarFile object is already closed.
2699        tar = tarfile.open(tarname)
2700        tar.close()
2701        with self.assertRaises(OSError):
2702            with tar:
2703                pass
2704
2705    def test_exception(self):
2706        # Test if the OSError exception is passed through properly.
2707        with self.assertRaises(Exception) as exc:
2708            with tarfile.open(tarname) as tar:
2709                raise OSError
2710        self.assertIsInstance(exc.exception, OSError,
2711                              "wrong exception raised in context manager")
2712        self.assertTrue(tar.closed, "context manager failed")
2713
2714    def test_no_eof(self):
2715        # __exit__() must not write end-of-archive blocks if an
2716        # exception was raised.
2717        try:
2718            with tarfile.open(tmpname, "w") as tar:
2719                raise Exception
2720        except:
2721            pass
2722        self.assertEqual(os.path.getsize(tmpname), 0,
2723                "context manager wrote an end-of-archive block")
2724        self.assertTrue(tar.closed, "context manager failed")
2725
2726    def test_eof(self):
2727        # __exit__() must write end-of-archive blocks, i.e. call
2728        # TarFile.close() if there was no error.
2729        with tarfile.open(tmpname, "w"):
2730            pass
2731        self.assertNotEqual(os.path.getsize(tmpname), 0,
2732                "context manager wrote no end-of-archive block")
2733
2734    def test_fileobj(self):
2735        # Test that __exit__() did not close the external file
2736        # object.
2737        with open(tmpname, "wb") as fobj:
2738            try:
2739                with tarfile.open(fileobj=fobj, mode="w") as tar:
2740                    raise Exception
2741            except:
2742                pass
2743            self.assertFalse(fobj.closed, "external file object was closed")
2744            self.assertTrue(tar.closed, "context manager failed")
2745
2746
2747@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2748class LinkEmulationTest(ReadTest, unittest.TestCase):
2749
2750    # Test for issue #8741 regression. On platforms that do not support
2751    # symbolic or hard links tarfile tries to extract these types of members
2752    # as the regular files they point to.
2753    def _test_link_extraction(self, name):
2754        self.tar.extract(name, TEMPDIR, filter='fully_trusted')
2755        with open(os.path.join(TEMPDIR, name), "rb") as f:
2756            data = f.read()
2757        self.assertEqual(sha256sum(data), sha256_regtype)
2758
2759    # See issues #1578269, #8879, and #17689 for some history on these skips
2760    @unittest.skipIf(hasattr(os.path, "islink"),
2761                     "Skip emulation - has os.path.islink but not os.link")
2762    def test_hardlink_extraction1(self):
2763        self._test_link_extraction("ustar/lnktype")
2764
2765    @unittest.skipIf(hasattr(os.path, "islink"),
2766                     "Skip emulation - has os.path.islink but not os.link")
2767    def test_hardlink_extraction2(self):
2768        self._test_link_extraction("./ustar/linktest2/lnktype")
2769
2770    @unittest.skipIf(hasattr(os, "symlink"),
2771                     "Skip emulation if symlink exists")
2772    def test_symlink_extraction1(self):
2773        self._test_link_extraction("ustar/symtype")
2774
2775    @unittest.skipIf(hasattr(os, "symlink"),
2776                     "Skip emulation if symlink exists")
2777    def test_symlink_extraction2(self):
2778        self._test_link_extraction("./ustar/linktest2/symtype")
2779
2780
2781class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2782    # Issue5068: The _BZ2Proxy.read() method loops forever
2783    # on an empty or partial bzipped file.
2784
2785    def _test_partial_input(self, mode):
2786        class MyBytesIO(io.BytesIO):
2787            hit_eof = False
2788            def read(self, n):
2789                if self.hit_eof:
2790                    raise AssertionError("infinite loop detected in "
2791                                         "tarfile.open()")
2792                self.hit_eof = self.tell() == len(self.getvalue())
2793                return super(MyBytesIO, self).read(n)
2794            def seek(self, *args):
2795                self.hit_eof = False
2796                return super(MyBytesIO, self).seek(*args)
2797
2798        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2799        for x in range(len(data) + 1):
2800            try:
2801                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2802            except tarfile.ReadError:
2803                pass # we have no interest in ReadErrors
2804
2805    def test_partial_input(self):
2806        self._test_partial_input("r")
2807
2808    def test_partial_input_bz2(self):
2809        self._test_partial_input("r:bz2")
2810
2811
2812def root_is_uid_gid_0():
2813    try:
2814        import pwd, grp
2815    except ImportError:
2816        return False
2817    if pwd.getpwuid(0)[0] != 'root':
2818        return False
2819    if grp.getgrgid(0)[0] != 'root':
2820        return False
2821    return True
2822
2823
2824@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2825@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2826class NumericOwnerTest(unittest.TestCase):
2827    # mock the following:
2828    #  os.chown: so we can test what's being called
2829    #  os.chmod: so the modes are not actually changed. if they are, we can't
2830    #             delete the files/directories
2831    #  os.geteuid: so we can lie and say we're root (uid = 0)
2832
2833    @staticmethod
2834    def _make_test_archive(filename_1, dirname_1, filename_2):
2835        # the file contents to write
2836        fobj = io.BytesIO(b"content")
2837
2838        # create a tar file with a file, a directory, and a file within that
2839        #  directory. Assign various .uid/.gid values to them
2840        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2841                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2842                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2843                 ]
2844        with tarfile.open(tmpname, 'w') as tarfl:
2845            for name, uid, gid, typ, contents in items:
2846                t = tarfile.TarInfo(name)
2847                t.uid = uid
2848                t.gid = gid
2849                t.uname = 'root'
2850                t.gname = 'root'
2851                t.type = typ
2852                tarfl.addfile(t, contents)
2853
2854        # return the full pathname to the tar file
2855        return tmpname
2856
2857    @staticmethod
2858    @contextmanager
2859    def _setup_test(mock_geteuid):
2860        mock_geteuid.return_value = 0  # lie and say we're root
2861        fname = 'numeric-owner-testfile'
2862        dirname = 'dir'
2863
2864        # the names we want stored in the tarfile
2865        filename_1 = fname
2866        dirname_1 = dirname
2867        filename_2 = os.path.join(dirname, fname)
2868
2869        # create the tarfile with the contents we're after
2870        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2871                                                           dirname_1,
2872                                                           filename_2)
2873
2874        # open the tarfile for reading. yield it and the names of the items
2875        #  we stored into the file
2876        with tarfile.open(tar_filename) as tarfl:
2877            yield tarfl, filename_1, dirname_1, filename_2
2878
2879    @unittest.mock.patch('os.chown')
2880    @unittest.mock.patch('os.chmod')
2881    @unittest.mock.patch('os.geteuid')
2882    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2883                                        mock_chown):
2884        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2885                                                filename_2):
2886            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True,
2887                          filter='fully_trusted')
2888            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True,
2889                          filter='fully_trusted')
2890
2891        # convert to filesystem paths
2892        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2893        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2894
2895        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2896                                     unittest.mock.call(f_filename_2, 88, 87),
2897                                     ],
2898                                    any_order=True)
2899
2900    @unittest.mock.patch('os.chown')
2901    @unittest.mock.patch('os.chmod')
2902    @unittest.mock.patch('os.geteuid')
2903    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2904                                           mock_chown):
2905        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2906                                                filename_2):
2907            tarfl.extractall(TEMPDIR, numeric_owner=True,
2908                             filter='fully_trusted')
2909
2910        # convert to filesystem paths
2911        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2912        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2913        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2914
2915        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2916                                     unittest.mock.call(f_dirname_1, 77, 76),
2917                                     unittest.mock.call(f_filename_2, 88, 87),
2918                                     ],
2919                                    any_order=True)
2920
2921    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2922    #  because the uname and gname in the test file are 'root', and extract()
2923    #  will look them up using pwd and grp to find their uid and gid, which we
2924    #  test here to be 0.
2925    @unittest.skipUnless(root_is_uid_gid_0(),
2926                         'uid=0,gid=0 must be named "root"')
2927    @unittest.mock.patch('os.chown')
2928    @unittest.mock.patch('os.chmod')
2929    @unittest.mock.patch('os.geteuid')
2930    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2931                                           mock_chown):
2932        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2933            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False,
2934                          filter='fully_trusted')
2935
2936        # convert to filesystem paths
2937        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2938
2939        mock_chown.assert_called_with(f_filename_1, 0, 0)
2940
2941    @unittest.mock.patch('os.geteuid')
2942    def test_keyword_only(self, mock_geteuid):
2943        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2944            self.assertRaises(TypeError,
2945                              tarfl.extract, filename_1, TEMPDIR, False, True)
2946
2947
2948class ReplaceTests(ReadTest, unittest.TestCase):
2949    def test_replace_name(self):
2950        member = self.tar.getmember('ustar/regtype')
2951        replaced = member.replace(name='misc/other')
2952        self.assertEqual(replaced.name, 'misc/other')
2953        self.assertEqual(member.name, 'ustar/regtype')
2954        self.assertEqual(self.tar.getmember('ustar/regtype').name,
2955                         'ustar/regtype')
2956
2957    def test_replace_deep(self):
2958        member = self.tar.getmember('pax/regtype1')
2959        replaced = member.replace()
2960        replaced.pax_headers['gname'] = 'not-bar'
2961        self.assertEqual(member.pax_headers['gname'], 'bar')
2962        self.assertEqual(
2963            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar')
2964
2965    def test_replace_shallow(self):
2966        member = self.tar.getmember('pax/regtype1')
2967        replaced = member.replace(deep=False)
2968        replaced.pax_headers['gname'] = 'not-bar'
2969        self.assertEqual(member.pax_headers['gname'], 'not-bar')
2970        self.assertEqual(
2971            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar')
2972
2973    def test_replace_all(self):
2974        member = self.tar.getmember('ustar/regtype')
2975        for attr_name in ('name', 'mtime', 'mode', 'linkname',
2976                          'uid', 'gid', 'uname', 'gname'):
2977            with self.subTest(attr_name=attr_name):
2978                replaced = member.replace(**{attr_name: None})
2979                self.assertEqual(getattr(replaced, attr_name), None)
2980                self.assertNotEqual(getattr(member, attr_name), None)
2981
2982    def test_replace_internal(self):
2983        member = self.tar.getmember('ustar/regtype')
2984        with self.assertRaises(TypeError):
2985            member.replace(offset=123456789)
2986
2987
2988class NoneInfoExtractTests(ReadTest):
2989    # These mainly check that all kinds of members are extracted successfully
2990    # if some metadata is None.
2991    # Some of the methods do additional spot checks.
2992
2993    # We also test that the default filters can deal with None.
2994
2995    extraction_filter = None
2996
2997    @classmethod
2998    def setUpClass(cls):
2999        tar = tarfile.open(tarname, mode='r', encoding="iso8859-1")
3000        cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl"
3001        tar.errorlevel = 0
3002        tar.extractall(cls.control_dir, filter=cls.extraction_filter)
3003        tar.close()
3004        cls.control_paths = set(
3005            p.relative_to(cls.control_dir)
3006            for p in pathlib.Path(cls.control_dir).glob('**/*'))
3007
3008    @classmethod
3009    def tearDownClass(cls):
3010        shutil.rmtree(cls.control_dir)
3011
3012    def check_files_present(self, directory):
3013        got_paths = set(
3014            p.relative_to(directory)
3015            for p in pathlib.Path(directory).glob('**/*'))
3016        self.assertEqual(self.control_paths, got_paths)
3017
3018    @contextmanager
3019    def extract_with_none(self, *attr_names):
3020        DIR = pathlib.Path(TEMPDIR) / "extractall_none"
3021        self.tar.errorlevel = 0
3022        for member in self.tar.getmembers():
3023            for attr_name in attr_names:
3024                setattr(member, attr_name, None)
3025        with os_helper.temp_dir(DIR):
3026            self.tar.extractall(DIR, filter='fully_trusted')
3027            self.check_files_present(DIR)
3028            yield DIR
3029
3030    def test_extractall_none_mtime(self):
3031        # mtimes of extracted files should be later than 'now' -- the mtime
3032        # of a previously created directory.
3033        now = pathlib.Path(TEMPDIR).stat().st_mtime
3034        with self.extract_with_none('mtime') as DIR:
3035            for path in pathlib.Path(DIR).glob('**/*'):
3036                with self.subTest(path=path):
3037                    try:
3038                        mtime = path.stat().st_mtime
3039                    except OSError:
3040                        # Some systems can't stat symlinks, ignore those
3041                        if not path.is_symlink():
3042                            raise
3043                    else:
3044                        self.assertGreaterEqual(path.stat().st_mtime, now)
3045
3046    def test_extractall_none_mode(self):
3047        # modes of directories and regular files should match the mode
3048        # of a "normally" created directory or regular file
3049        dir_mode = pathlib.Path(TEMPDIR).stat().st_mode
3050        regular_file = pathlib.Path(TEMPDIR) / 'regular_file'
3051        regular_file.write_text('')
3052        regular_file_mode = regular_file.stat().st_mode
3053        with self.extract_with_none('mode') as DIR:
3054            for path in pathlib.Path(DIR).glob('**/*'):
3055                with self.subTest(path=path):
3056                    if path.is_dir():
3057                        self.assertEqual(path.stat().st_mode, dir_mode)
3058                    elif path.is_file():
3059                        self.assertEqual(path.stat().st_mode,
3060                                         regular_file_mode)
3061
3062    def test_extractall_none_uid(self):
3063        with self.extract_with_none('uid'):
3064            pass
3065
3066    def test_extractall_none_gid(self):
3067        with self.extract_with_none('gid'):
3068            pass
3069
3070    def test_extractall_none_uname(self):
3071        with self.extract_with_none('uname'):
3072            pass
3073
3074    def test_extractall_none_gname(self):
3075        with self.extract_with_none('gname'):
3076            pass
3077
3078    def test_extractall_none_ownership(self):
3079        with self.extract_with_none('uid', 'gid', 'uname', 'gname'):
3080            pass
3081
3082class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase):
3083    extraction_filter = 'data'
3084
3085class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests,
3086                                        unittest.TestCase):
3087    extraction_filter = 'fully_trusted'
3088
3089class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase):
3090    extraction_filter = 'tar'
3091
3092class NoneInfoExtractTests_Default(NoneInfoExtractTests,
3093                                   unittest.TestCase):
3094    extraction_filter = None
3095
3096class NoneInfoTests_Misc(unittest.TestCase):
3097    def test_add(self):
3098        # When addfile() encounters None metadata, it raises a ValueError
3099        bio = io.BytesIO()
3100        for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT,
3101                          tarfile.PAX_FORMAT):
3102            with self.subTest(tarformat=tarformat):
3103                tar = tarfile.open(fileobj=bio, mode='w', format=tarformat)
3104                tarinfo = tar.gettarinfo(tarname)
3105                try:
3106                    tar.addfile(tarinfo)
3107                except Exception:
3108                    if tarformat == tarfile.USTAR_FORMAT:
3109                        # In the old, limited format, adding might fail for
3110                        # reasons like the UID being too large
3111                        pass
3112                    else:
3113                        raise
3114                else:
3115                    for attr_name in ('mtime', 'mode', 'uid', 'gid',
3116                                    'uname', 'gname'):
3117                        with self.subTest(attr_name=attr_name):
3118                            replaced = tarinfo.replace(**{attr_name: None})
3119                            with self.assertRaisesRegex(ValueError,
3120                                                        f"{attr_name}"):
3121                                tar.addfile(replaced)
3122
3123    def test_list(self):
3124        # Change some metadata to None, then compare list() output
3125        # word-for-word. We want list() to not raise, and to only change
3126        # printout for the affected piece of metadata.
3127        # (n.b.: some contents of the test archive are hardcoded.)
3128        for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'},
3129                           {'uname'}, {'gname'},
3130                           {'uid', 'uname'}, {'gid', 'gname'}):
3131            with (self.subTest(attr_names=attr_names),
3132                  tarfile.open(tarname, encoding="iso8859-1") as tar):
3133                tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3134                with support.swap_attr(sys, 'stdout', tio_prev):
3135                    tar.list()
3136                for member in tar.getmembers():
3137                    for attr_name in attr_names:
3138                        setattr(member, attr_name, None)
3139                tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3140                with support.swap_attr(sys, 'stdout', tio_new):
3141                    tar.list()
3142                for expected, got in zip(tio_prev.detach().getvalue().split(),
3143                                         tio_new.detach().getvalue().split()):
3144                    if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected):
3145                        self.assertEqual(got, b'????-??-??')
3146                    elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected):
3147                        self.assertEqual(got, b'??:??:??')
3148                    elif attr_names == {'mode'} and re.match(
3149                            rb'.([r-][w-][x-]){3}', expected):
3150                        self.assertEqual(got, b'??????????')
3151                    elif attr_names == {'uname'} and expected.startswith(
3152                            (b'tarfile/', b'lars/', b'foo/')):
3153                        exp_user, exp_group = expected.split(b'/')
3154                        got_user, got_group = got.split(b'/')
3155                        self.assertEqual(got_group, exp_group)
3156                        self.assertRegex(got_user, b'[0-9]+')
3157                    elif attr_names == {'gname'} and expected.endswith(
3158                            (b'/tarfile', b'/users', b'/bar')):
3159                        exp_user, exp_group = expected.split(b'/')
3160                        got_user, got_group = got.split(b'/')
3161                        self.assertEqual(got_user, exp_user)
3162                        self.assertRegex(got_group, b'[0-9]+')
3163                    elif attr_names == {'uid'} and expected.startswith(
3164                            (b'1000/')):
3165                        exp_user, exp_group = expected.split(b'/')
3166                        got_user, got_group = got.split(b'/')
3167                        self.assertEqual(got_group, exp_group)
3168                        self.assertEqual(got_user, b'None')
3169                    elif attr_names == {'gid'} and expected.endswith((b'/100')):
3170                        exp_user, exp_group = expected.split(b'/')
3171                        got_user, got_group = got.split(b'/')
3172                        self.assertEqual(got_user, exp_user)
3173                        self.assertEqual(got_group, b'None')
3174                    elif attr_names == {'uid', 'uname'} and expected.startswith(
3175                            (b'tarfile/', b'lars/', b'foo/', b'1000/')):
3176                        exp_user, exp_group = expected.split(b'/')
3177                        got_user, got_group = got.split(b'/')
3178                        self.assertEqual(got_group, exp_group)
3179                        self.assertEqual(got_user, b'None')
3180                    elif attr_names == {'gname', 'gid'} and expected.endswith(
3181                            (b'/tarfile', b'/users', b'/bar', b'/100')):
3182                        exp_user, exp_group = expected.split(b'/')
3183                        got_user, got_group = got.split(b'/')
3184                        self.assertEqual(got_user, exp_user)
3185                        self.assertEqual(got_group, b'None')
3186                    else:
3187                        # In other cases the output should be the same
3188                        self.assertEqual(expected, got)
3189
3190def _filemode_to_int(mode):
3191    """Inverse of `stat.filemode` (for permission bits)
3192
3193    Using mode strings rather than numbers makes the later tests more readable.
3194    """
3195    str_mode = mode[1:]
3196    result = (
3197          {'r': stat.S_IRUSR, '-': 0}[str_mode[0]]
3198        | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]]
3199        | {'x': stat.S_IXUSR, '-': 0,
3200           's': stat.S_IXUSR | stat.S_ISUID,
3201           'S': stat.S_ISUID}[str_mode[2]]
3202        | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]]
3203        | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]]
3204        | {'x': stat.S_IXGRP, '-': 0,
3205           's': stat.S_IXGRP | stat.S_ISGID,
3206           'S': stat.S_ISGID}[str_mode[5]]
3207        | {'r': stat.S_IROTH, '-': 0}[str_mode[6]]
3208        | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]]
3209        | {'x': stat.S_IXOTH, '-': 0,
3210           't': stat.S_IXOTH | stat.S_ISVTX,
3211           'T': stat.S_ISVTX}[str_mode[8]]
3212        )
3213    # check we did this right
3214    assert stat.filemode(result)[1:] == mode[1:]
3215
3216    return result
3217
3218class ArchiveMaker:
3219    """Helper to create a tar file with specific contents
3220
3221    Usage:
3222
3223        with ArchiveMaker() as t:
3224            t.add('filename', ...)
3225
3226        with t.open() as tar:
3227            ... # `tar` is now a TarFile with 'filename' in it!
3228    """
3229    def __init__(self):
3230        self.bio = io.BytesIO()
3231
3232    def __enter__(self):
3233        self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio)
3234        return self
3235
3236    def __exit__(self, *exc):
3237        self.tar_w.close()
3238        self.contents = self.bio.getvalue()
3239        self.bio = None
3240
3241    def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
3242            mode=None, **kwargs):
3243        """Add a member to the test archive. Call within `with`."""
3244        name = str(name)
3245        tarinfo = tarfile.TarInfo(name).replace(**kwargs)
3246        if mode:
3247            tarinfo.mode = _filemode_to_int(mode)
3248        if symlink_to is not None:
3249            type = tarfile.SYMTYPE
3250            tarinfo.linkname = str(symlink_to)
3251        if hardlink_to is not None:
3252            type = tarfile.LNKTYPE
3253            tarinfo.linkname = str(hardlink_to)
3254        if name.endswith('/') and type is None:
3255            type = tarfile.DIRTYPE
3256        if type is not None:
3257            tarinfo.type = type
3258        if tarinfo.isreg():
3259            fileobj = io.BytesIO(bytes(tarinfo.size))
3260        else:
3261            fileobj = None
3262        self.tar_w.addfile(tarinfo, fileobj)
3263
3264    def open(self, **kwargs):
3265        """Open the resulting archive as TarFile. Call after `with`."""
3266        bio = io.BytesIO(self.contents)
3267        return tarfile.open(fileobj=bio, **kwargs)
3268
3269# Under WASI, `os_helper.can_symlink` is False to make
3270# `skip_unless_symlink` skip symlink tests. "
3271# But in the following tests we use can_symlink to *determine* which
3272# behavior is expected.
3273# Like other symlink tests, skip these on WASI for now.
3274if support.is_wasi:
3275    def symlink_test(f):
3276        return unittest.skip("WASI: Skip symlink test for now")(f)
3277else:
3278    def symlink_test(f):
3279        return f
3280
3281
3282class TestExtractionFilters(unittest.TestCase):
3283
3284    # A temporary directory for the extraction results.
3285    # All files that "escape" the destination path should still end
3286    # up in this directory.
3287    outerdir = pathlib.Path(TEMPDIR) / 'outerdir'
3288
3289    # The destination for the extraction, within `outerdir`
3290    destdir = outerdir / 'dest'
3291
3292    @contextmanager
3293    def check_context(self, tar, filter):
3294        """Extracts `tar` to `self.destdir` and allows checking the result
3295
3296        If an error occurs, it must be checked using `expect_exception`
3297
3298        Otherwise, all resulting files must be checked using `expect_file`,
3299        except the destination directory itself and parent directories of
3300        other files.
3301        When checking directories, do so before their contents.
3302        """
3303        with os_helper.temp_dir(self.outerdir):
3304            try:
3305                tar.extractall(self.destdir, filter=filter)
3306            except Exception as exc:
3307                self.raised_exception = exc
3308                self.expected_paths = set()
3309            else:
3310                self.raised_exception = None
3311                self.expected_paths = set(self.outerdir.glob('**/*'))
3312                self.expected_paths.discard(self.destdir)
3313            try:
3314                yield
3315            finally:
3316                tar.close()
3317            if self.raised_exception:
3318                raise self.raised_exception
3319            self.assertEqual(self.expected_paths, set())
3320
3321    def expect_file(self, name, type=None, symlink_to=None, mode=None):
3322        """Check a single file. See check_context."""
3323        if self.raised_exception:
3324            raise self.raised_exception
3325        # use normpath() rather than resolve() so we don't follow symlinks
3326        path = pathlib.Path(os.path.normpath(self.destdir / name))
3327        self.assertIn(path, self.expected_paths)
3328        self.expected_paths.remove(path)
3329        if mode is not None and os_helper.can_chmod():
3330            got = stat.filemode(stat.S_IMODE(path.stat().st_mode))
3331            self.assertEqual(got, mode)
3332        if type is None and isinstance(name, str) and name.endswith('/'):
3333            type = tarfile.DIRTYPE
3334        if symlink_to is not None:
3335            got = (self.destdir / name).readlink()
3336            expected = pathlib.Path(symlink_to)
3337            # The symlink might be the same (textually) as what we expect,
3338            # but some systems change the link to an equivalent path, so
3339            # we fall back to samefile().
3340            if expected != got:
3341                self.assertTrue(got.samefile(expected))
3342        elif type == tarfile.REGTYPE or type is None:
3343            self.assertTrue(path.is_file())
3344        elif type == tarfile.DIRTYPE:
3345            self.assertTrue(path.is_dir())
3346        elif type == tarfile.FIFOTYPE:
3347            self.assertTrue(path.is_fifo())
3348        else:
3349            raise NotImplementedError(type)
3350        for parent in path.parents:
3351            self.expected_paths.discard(parent)
3352
3353    def expect_exception(self, exc_type, message_re='.'):
3354        with self.assertRaisesRegex(exc_type, message_re):
3355            if self.raised_exception is not None:
3356                raise self.raised_exception
3357        self.raised_exception = None
3358
3359    def test_benign_file(self):
3360        with ArchiveMaker() as arc:
3361            arc.add('benign.txt')
3362        for filter in 'fully_trusted', 'tar', 'data':
3363            with self.check_context(arc.open(), filter):
3364                self.expect_file('benign.txt')
3365
3366    def test_absolute(self):
3367        # Test handling a member with an absolute path
3368        # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives
3369        with ArchiveMaker() as arc:
3370            arc.add(self.outerdir / 'escaped.evil')
3371
3372        with self.check_context(arc.open(), 'fully_trusted'):
3373            self.expect_file('../escaped.evil')
3374
3375        for filter in 'tar', 'data':
3376            with self.check_context(arc.open(), filter):
3377                if str(self.outerdir).startswith('/'):
3378                    # We strip leading slashes, as e.g. GNU tar does
3379                    # (without --absolute-filenames).
3380                    outerdir_stripped = str(self.outerdir).lstrip('/')
3381                    self.expect_file(f'{outerdir_stripped}/escaped.evil')
3382                else:
3383                    # On this system, absolute paths don't have leading
3384                    # slashes.
3385                    # So, there's nothing to strip. We refuse to unpack
3386                    # to an absolute path, nonetheless.
3387                    self.expect_exception(
3388                        tarfile.AbsolutePathError,
3389                        """['"].*escaped.evil['"] has an absolute path""")
3390
3391    @symlink_test
3392    def test_parent_symlink(self):
3393        # Test interplaying symlinks
3394        # Inspired by 'dirsymlink2a' in jwilk/traversal-archives
3395        with ArchiveMaker() as arc:
3396            arc.add('current', symlink_to='.')
3397            arc.add('parent', symlink_to='current/..')
3398            arc.add('parent/evil')
3399
3400        if os_helper.can_symlink():
3401            with self.check_context(arc.open(), 'fully_trusted'):
3402                if self.raised_exception is not None:
3403                    # Windows will refuse to create a file that's a symlink to itself
3404                    # (and tarfile doesn't swallow that exception)
3405                    self.expect_exception(FileExistsError)
3406                    # The other cases will fail with this error too.
3407                    # Skip the rest of this test.
3408                    return
3409                else:
3410                    self.expect_file('current', symlink_to='.')
3411                    self.expect_file('parent', symlink_to='current/..')
3412                    self.expect_file('../evil')
3413
3414            with self.check_context(arc.open(), 'tar'):
3415                self.expect_exception(
3416                    tarfile.OutsideDestinationError,
3417                    """'parent/evil' would be extracted to ['"].*evil['"], """
3418                    + "which is outside the destination")
3419
3420            with self.check_context(arc.open(), 'data'):
3421                self.expect_exception(
3422                    tarfile.LinkOutsideDestinationError,
3423                    """'parent' would link to ['"].*outerdir['"], """
3424                    + "which is outside the destination")
3425
3426        else:
3427            # No symlink support. The symlinks are ignored.
3428            with self.check_context(arc.open(), 'fully_trusted'):
3429                self.expect_file('parent/evil')
3430            with self.check_context(arc.open(), 'tar'):
3431                self.expect_file('parent/evil')
3432            with self.check_context(arc.open(), 'data'):
3433                self.expect_file('parent/evil')
3434
3435    @symlink_test
3436    def test_parent_symlink2(self):
3437        # Test interplaying symlinks
3438        # Inspired by 'dirsymlink2b' in jwilk/traversal-archives
3439        with ArchiveMaker() as arc:
3440            arc.add('current', symlink_to='.')
3441            arc.add('current/parent', symlink_to='..')
3442            arc.add('parent/evil')
3443
3444        with self.check_context(arc.open(), 'fully_trusted'):
3445            if os_helper.can_symlink():
3446                self.expect_file('current', symlink_to='.')
3447                self.expect_file('parent', symlink_to='..')
3448                self.expect_file('../evil')
3449            else:
3450                self.expect_file('current/')
3451                self.expect_file('parent/evil')
3452
3453        with self.check_context(arc.open(), 'tar'):
3454            if os_helper.can_symlink():
3455                self.expect_exception(
3456                        tarfile.OutsideDestinationError,
3457                        "'parent/evil' would be extracted to "
3458                        + """['"].*evil['"], which is outside """
3459                        + "the destination")
3460            else:
3461                self.expect_file('current/')
3462                self.expect_file('parent/evil')
3463
3464        with self.check_context(arc.open(), 'data'):
3465            self.expect_exception(
3466                    tarfile.LinkOutsideDestinationError,
3467                    """'current/parent' would link to ['"].*['"], """
3468                    + "which is outside the destination")
3469
3470    @symlink_test
3471    def test_absolute_symlink(self):
3472        # Test symlink to an absolute path
3473        # Inspired by 'dirsymlink' in jwilk/traversal-archives
3474        with ArchiveMaker() as arc:
3475            arc.add('parent', symlink_to=self.outerdir)
3476            arc.add('parent/evil')
3477
3478        with self.check_context(arc.open(), 'fully_trusted'):
3479            if os_helper.can_symlink():
3480                self.expect_file('parent', symlink_to=self.outerdir)
3481                self.expect_file('../evil')
3482            else:
3483                self.expect_file('parent/evil')
3484
3485        with self.check_context(arc.open(), 'tar'):
3486            if os_helper.can_symlink():
3487                self.expect_exception(
3488                        tarfile.OutsideDestinationError,
3489                        "'parent/evil' would be extracted to "
3490                        + """['"].*evil['"], which is outside """
3491                        + "the destination")
3492            else:
3493                self.expect_file('parent/evil')
3494
3495        with self.check_context(arc.open(), 'data'):
3496            self.expect_exception(
3497                tarfile.AbsoluteLinkError,
3498                "'parent' is a symlink to an absolute path")
3499
3500    @symlink_test
3501    def test_sly_relative0(self):
3502        # Inspired by 'relative0' in jwilk/traversal-archives
3503        with ArchiveMaker() as arc:
3504            arc.add('../moo', symlink_to='..//tmp/moo')
3505
3506        try:
3507            with self.check_context(arc.open(), filter='fully_trusted'):
3508                if os_helper.can_symlink():
3509                    if isinstance(self.raised_exception, FileExistsError):
3510                        # XXX TarFile happens to fail creating a parent
3511                        # directory.
3512                        # This might be a bug, but fixing it would hurt
3513                        # security.
3514                        # Note that e.g. GNU `tar` rejects '..' components,
3515                        # so you could argue this is an invalid archive and we
3516                        # just raise an bad type of exception.
3517                        self.expect_exception(FileExistsError)
3518                    else:
3519                        self.expect_file('../moo', symlink_to='..//tmp/moo')
3520                else:
3521                    # The symlink can't be extracted and is ignored
3522                    pass
3523        except FileExistsError:
3524            pass
3525
3526        for filter in 'tar', 'data':
3527            with self.check_context(arc.open(), filter):
3528                self.expect_exception(
3529                        tarfile.OutsideDestinationError,
3530                        "'../moo' would be extracted to "
3531                        + "'.*moo', which is outside "
3532                        + "the destination")
3533
3534    @symlink_test
3535    def test_sly_relative2(self):
3536        # Inspired by 'relative2' in jwilk/traversal-archives
3537        with ArchiveMaker() as arc:
3538            arc.add('tmp/')
3539            arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo')
3540
3541        with self.check_context(arc.open(), 'fully_trusted'):
3542            self.expect_file('tmp', type=tarfile.DIRTYPE)
3543            if os_helper.can_symlink():
3544                self.expect_file('../moo', symlink_to='tmp/../../tmp/moo')
3545
3546        for filter in 'tar', 'data':
3547            with self.check_context(arc.open(), filter):
3548                self.expect_exception(
3549                    tarfile.OutsideDestinationError,
3550                    "'tmp/../../moo' would be extracted to "
3551                    + """['"].*moo['"], which is outside the """
3552                    + "destination")
3553
3554    def test_modes(self):
3555        # Test how file modes are extracted
3556        # (Note that the modes are ignored on platforms without working chmod)
3557        with ArchiveMaker() as arc:
3558            arc.add('all_bits', mode='?rwsrwsrwt')
3559            arc.add('perm_bits', mode='?rwxrwxrwx')
3560            arc.add('exec_group_other', mode='?rw-rwxrwx')
3561            arc.add('read_group_only', mode='?---r-----')
3562            arc.add('no_bits', mode='?---------')
3563            arc.add('dir/', mode='?---rwsrwt')
3564
3565        # On some systems, setting the sticky bit is a no-op.
3566        # Check if that's the case.
3567        tmp_filename = os.path.join(TEMPDIR, "tmp.file")
3568        with open(tmp_filename, 'w'):
3569            pass
3570        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3571        have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3572        os.unlink(tmp_filename)
3573
3574        os.mkdir(tmp_filename)
3575        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3576        have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3577        os.rmdir(tmp_filename)
3578
3579        with self.check_context(arc.open(), 'fully_trusted'):
3580            if have_sticky_files:
3581                self.expect_file('all_bits', mode='?rwsrwsrwt')
3582            else:
3583                self.expect_file('all_bits', mode='?rwsrwsrwx')
3584            self.expect_file('perm_bits', mode='?rwxrwxrwx')
3585            self.expect_file('exec_group_other', mode='?rw-rwxrwx')
3586            self.expect_file('read_group_only', mode='?---r-----')
3587            self.expect_file('no_bits', mode='?---------')
3588            if have_sticky_dirs:
3589                self.expect_file('dir/', mode='?---rwsrwt')
3590            else:
3591                self.expect_file('dir/', mode='?---rwsrwx')
3592
3593        with self.check_context(arc.open(), 'tar'):
3594            self.expect_file('all_bits', mode='?rwxr-xr-x')
3595            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3596            self.expect_file('exec_group_other', mode='?rw-r-xr-x')
3597            self.expect_file('read_group_only', mode='?---r-----')
3598            self.expect_file('no_bits', mode='?---------')
3599            self.expect_file('dir/', mode='?---r-xr-x')
3600
3601        with self.check_context(arc.open(), 'data'):
3602            normal_dir_mode = stat.filemode(stat.S_IMODE(
3603                self.outerdir.stat().st_mode))
3604            self.expect_file('all_bits', mode='?rwxr-xr-x')
3605            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3606            self.expect_file('exec_group_other', mode='?rw-r--r--')
3607            self.expect_file('read_group_only', mode='?rw-r-----')
3608            self.expect_file('no_bits', mode='?rw-------')
3609            self.expect_file('dir/', mode=normal_dir_mode)
3610
3611    def test_pipe(self):
3612        # Test handling of a special file
3613        with ArchiveMaker() as arc:
3614            arc.add('foo', type=tarfile.FIFOTYPE)
3615
3616        for filter in 'fully_trusted', 'tar':
3617            with self.check_context(arc.open(), filter):
3618                if hasattr(os, 'mkfifo'):
3619                    self.expect_file('foo', type=tarfile.FIFOTYPE)
3620                else:
3621                    # The pipe can't be extracted and is skipped.
3622                    pass
3623
3624        with self.check_context(arc.open(), 'data'):
3625            self.expect_exception(
3626                tarfile.SpecialFileError,
3627                "'foo' is a special file")
3628
3629    def test_special_files(self):
3630        # Creating device files is tricky. Instead of attempting that let's
3631        # only check the filter result.
3632        for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE:
3633            tarinfo = tarfile.TarInfo('foo')
3634            tarinfo.type = special_type
3635            trusted = tarfile.fully_trusted_filter(tarinfo, '')
3636            self.assertIs(trusted, tarinfo)
3637            tar = tarfile.tar_filter(tarinfo, '')
3638            self.assertEqual(tar.type, special_type)
3639            with self.assertRaises(tarfile.SpecialFileError) as cm:
3640                tarfile.data_filter(tarinfo, '')
3641            self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo)
3642            self.assertEqual(cm.exception.tarinfo.name, 'foo')
3643
3644    def test_fully_trusted_filter(self):
3645        # The 'fully_trusted' filter returns the original TarInfo objects.
3646        with tarfile.TarFile.open(tarname) as tar:
3647            for tarinfo in tar.getmembers():
3648                filtered = tarfile.fully_trusted_filter(tarinfo, '')
3649                self.assertIs(filtered, tarinfo)
3650
3651    def test_tar_filter(self):
3652        # The 'tar' filter returns TarInfo objects with the same name/type.
3653        # (It can also fail for particularly "evil" input, but we don't have
3654        # that in the test archive.)
3655        with tarfile.TarFile.open(tarname) as tar:
3656            for tarinfo in tar.getmembers():
3657                filtered = tarfile.tar_filter(tarinfo, '')
3658                self.assertIs(filtered.name, tarinfo.name)
3659                self.assertIs(filtered.type, tarinfo.type)
3660
3661    def test_data_filter(self):
3662        # The 'data' filter either raises, or returns TarInfo with the same
3663        # name/type.
3664        with tarfile.TarFile.open(tarname) as tar:
3665            for tarinfo in tar.getmembers():
3666                try:
3667                    filtered = tarfile.data_filter(tarinfo, '')
3668                except tarfile.FilterError:
3669                    continue
3670                self.assertIs(filtered.name, tarinfo.name)
3671                self.assertIs(filtered.type, tarinfo.type)
3672
3673    def test_default_filter_warns_not(self):
3674        """Ensure the default filter does not warn (like in 3.12)"""
3675        with ArchiveMaker() as arc:
3676            arc.add('foo')
3677        with warnings_helper.check_no_warnings(self):
3678            with self.check_context(arc.open(), None):
3679                self.expect_file('foo')
3680
3681    def test_change_default_filter_on_instance(self):
3682        tar = tarfile.TarFile(tarname, 'r')
3683        def strict_filter(tarinfo, path):
3684            if tarinfo.name == 'ustar/regtype':
3685                return tarinfo
3686            else:
3687                return None
3688        tar.extraction_filter = strict_filter
3689        with self.check_context(tar, None):
3690            self.expect_file('ustar/regtype')
3691
3692    def test_change_default_filter_on_class(self):
3693        def strict_filter(tarinfo, path):
3694            if tarinfo.name == 'ustar/regtype':
3695                return tarinfo
3696            else:
3697                return None
3698        tar = tarfile.TarFile(tarname, 'r')
3699        with support.swap_attr(tarfile.TarFile, 'extraction_filter',
3700                               staticmethod(strict_filter)):
3701            with self.check_context(tar, None):
3702                self.expect_file('ustar/regtype')
3703
3704    def test_change_default_filter_on_subclass(self):
3705        class TarSubclass(tarfile.TarFile):
3706            def extraction_filter(self, tarinfo, path):
3707                if tarinfo.name == 'ustar/regtype':
3708                    return tarinfo
3709                else:
3710                    return None
3711
3712        tar = TarSubclass(tarname, 'r')
3713        with self.check_context(tar, None):
3714            self.expect_file('ustar/regtype')
3715
3716    def test_change_default_filter_to_string(self):
3717        tar = tarfile.TarFile(tarname, 'r')
3718        tar.extraction_filter = 'data'
3719        with self.check_context(tar, None):
3720            self.expect_exception(TypeError)
3721
3722    def test_custom_filter(self):
3723        def custom_filter(tarinfo, path):
3724            self.assertIs(path, self.destdir)
3725            if tarinfo.name == 'move_this':
3726                return tarinfo.replace(name='moved')
3727            if tarinfo.name == 'ignore_this':
3728                return None
3729            return tarinfo
3730
3731        with ArchiveMaker() as arc:
3732            arc.add('move_this')
3733            arc.add('ignore_this')
3734            arc.add('keep')
3735        with self.check_context(arc.open(), custom_filter):
3736            self.expect_file('moved')
3737            self.expect_file('keep')
3738
3739    def test_bad_filter_name(self):
3740        with ArchiveMaker() as arc:
3741            arc.add('foo')
3742        with self.check_context(arc.open(), 'bad filter name'):
3743            self.expect_exception(ValueError)
3744
3745    def test_stateful_filter(self):
3746        # Stateful filters should be possible.
3747        # (This doesn't really test tarfile. Rather, it demonstrates
3748        # that third parties can implement a stateful filter.)
3749        class StatefulFilter:
3750            def __enter__(self):
3751                self.num_files_processed = 0
3752                return self
3753
3754            def __call__(self, tarinfo, path):
3755                try:
3756                    tarinfo = tarfile.data_filter(tarinfo, path)
3757                except tarfile.FilterError:
3758                    return None
3759                self.num_files_processed += 1
3760                return tarinfo
3761
3762            def __exit__(self, *exc_info):
3763                self.done = True
3764
3765        with ArchiveMaker() as arc:
3766            arc.add('good')
3767            arc.add('bad', symlink_to='/')
3768            arc.add('good')
3769        with StatefulFilter() as custom_filter:
3770            with self.check_context(arc.open(), custom_filter):
3771                self.expect_file('good')
3772        self.assertEqual(custom_filter.num_files_processed, 2)
3773        self.assertEqual(custom_filter.done, True)
3774
3775    def test_errorlevel(self):
3776        def extracterror_filter(tarinfo, path):
3777            raise tarfile.ExtractError('failed with ExtractError')
3778        def filtererror_filter(tarinfo, path):
3779            raise tarfile.FilterError('failed with FilterError')
3780        def oserror_filter(tarinfo, path):
3781            raise OSError('failed with OSError')
3782        def tarerror_filter(tarinfo, path):
3783            raise tarfile.TarError('failed with base TarError')
3784        def valueerror_filter(tarinfo, path):
3785            raise ValueError('failed with ValueError')
3786
3787        with ArchiveMaker() as arc:
3788            arc.add('file')
3789
3790        # If errorlevel is 0, errors affected by errorlevel are ignored
3791
3792        with self.check_context(arc.open(errorlevel=0), extracterror_filter):
3793            self.expect_file('file')
3794
3795        with self.check_context(arc.open(errorlevel=0), filtererror_filter):
3796            self.expect_file('file')
3797
3798        with self.check_context(arc.open(errorlevel=0), oserror_filter):
3799            self.expect_file('file')
3800
3801        with self.check_context(arc.open(errorlevel=0), tarerror_filter):
3802            self.expect_exception(tarfile.TarError)
3803
3804        with self.check_context(arc.open(errorlevel=0), valueerror_filter):
3805            self.expect_exception(ValueError)
3806
3807        # If 1, all fatal errors are raised
3808
3809        with self.check_context(arc.open(errorlevel=1), extracterror_filter):
3810            self.expect_file('file')
3811
3812        with self.check_context(arc.open(errorlevel=1), filtererror_filter):
3813            self.expect_exception(tarfile.FilterError)
3814
3815        with self.check_context(arc.open(errorlevel=1), oserror_filter):
3816            self.expect_exception(OSError)
3817
3818        with self.check_context(arc.open(errorlevel=1), tarerror_filter):
3819            self.expect_exception(tarfile.TarError)
3820
3821        with self.check_context(arc.open(errorlevel=1), valueerror_filter):
3822            self.expect_exception(ValueError)
3823
3824        # If 2, all non-fatal errors are raised as well.
3825
3826        with self.check_context(arc.open(errorlevel=2), extracterror_filter):
3827            self.expect_exception(tarfile.ExtractError)
3828
3829        with self.check_context(arc.open(errorlevel=2), filtererror_filter):
3830            self.expect_exception(tarfile.FilterError)
3831
3832        with self.check_context(arc.open(errorlevel=2), oserror_filter):
3833            self.expect_exception(OSError)
3834
3835        with self.check_context(arc.open(errorlevel=2), tarerror_filter):
3836            self.expect_exception(tarfile.TarError)
3837
3838        with self.check_context(arc.open(errorlevel=2), valueerror_filter):
3839            self.expect_exception(ValueError)
3840
3841        # We only handle ExtractionError, FilterError & OSError specially.
3842
3843        with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter):
3844            self.expect_exception(TypeError)  # errorlevel is not int
3845
3846
3847def setUpModule():
3848    os_helper.unlink(TEMPDIR)
3849    os.makedirs(TEMPDIR)
3850
3851    global testtarnames
3852    testtarnames = [tarname]
3853    with open(tarname, "rb") as fobj:
3854        data = fobj.read()
3855
3856    # Create compressed tarfiles.
3857    for c in GzipTest, Bz2Test, LzmaTest:
3858        if c.open:
3859            os_helper.unlink(c.tarname)
3860            testtarnames.append(c.tarname)
3861            with c.open(c.tarname, "wb") as tar:
3862                tar.write(data)
3863
3864def tearDownModule():
3865    if os.path.exists(TEMPDIR):
3866        os_helper.rmtree(TEMPDIR)
3867
3868if __name__ == "__main__":
3869    unittest.main()
3870