1"""Test script for the dumbdbm module
2   Original by Roger E. Masse
3"""
4
5import contextlib
6import io
7import operator
8import os
9import stat
10import unittest
11import dbm.dumb as dumbdbm
12from test import support
13from test.support import os_helper
14from functools import partial
15
16_fname = os_helper.TESTFN
17
18
19def _delete_files():
20    for ext in [".dir", ".dat", ".bak"]:
21        try:
22            os.unlink(_fname + ext)
23        except OSError:
24            pass
25
26class DumbDBMTestCase(unittest.TestCase):
27    _dict = {b'0': b'',
28             b'a': b'Python:',
29             b'b': b'Programming',
30             b'c': b'the',
31             b'd': b'way',
32             b'f': b'Guido',
33             b'g': b'intended',
34             '\u00fc'.encode('utf-8') : b'!',
35             }
36
37    def test_dumbdbm_creation(self):
38        with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
39            self.assertEqual(list(f.keys()), [])
40            for key in self._dict:
41                f[key] = self._dict[key]
42            self.read_helper(f)
43
44    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
45    @os_helper.skip_unless_working_chmod
46    def test_dumbdbm_creation_mode(self):
47        try:
48            old_umask = os.umask(0o002)
49            f = dumbdbm.open(_fname, 'c', 0o637)
50            f.close()
51        finally:
52            os.umask(old_umask)
53
54        expected_mode = 0o635
55        if os.name != 'posix':
56            # Windows only supports setting the read-only attribute.
57            # This shouldn't fail, but doesn't work like Unix either.
58            expected_mode = 0o666
59
60        import stat
61        st = os.stat(_fname + '.dat')
62        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
63        st = os.stat(_fname + '.dir')
64        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
65
66    def test_close_twice(self):
67        f = dumbdbm.open(_fname)
68        f[b'a'] = b'b'
69        self.assertEqual(f[b'a'], b'b')
70        f.close()
71        f.close()
72
73    def test_dumbdbm_modification(self):
74        self.init_db()
75        with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
76            self._dict[b'g'] = f[b'g'] = b"indented"
77            self.read_helper(f)
78            # setdefault() works as in the dict interface
79            self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
80            self.assertEqual(f[b'xxx'], b'foo')
81
82    def test_dumbdbm_read(self):
83        self.init_db()
84        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
85            self.read_helper(f)
86            with self.assertRaisesRegex(dumbdbm.error,
87                                    'The database is opened for reading only'):
88                f[b'g'] = b'x'
89            with self.assertRaisesRegex(dumbdbm.error,
90                                    'The database is opened for reading only'):
91                del f[b'a']
92            # get() works as in the dict interface
93            self.assertEqual(f.get(b'a'), self._dict[b'a'])
94            self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
95            self.assertIsNone(f.get(b'xxx'))
96            with self.assertRaises(KeyError):
97                f[b'xxx']
98
99    def test_dumbdbm_keys(self):
100        self.init_db()
101        with contextlib.closing(dumbdbm.open(_fname)) as f:
102            keys = self.keys_helper(f)
103
104    def test_write_contains(self):
105        with contextlib.closing(dumbdbm.open(_fname)) as f:
106            f[b'1'] = b'hello'
107            self.assertIn(b'1', f)
108
109    def test_write_write_read(self):
110        # test for bug #482460
111        with contextlib.closing(dumbdbm.open(_fname)) as f:
112            f[b'1'] = b'hello'
113            f[b'1'] = b'hello2'
114        with contextlib.closing(dumbdbm.open(_fname)) as f:
115            self.assertEqual(f[b'1'], b'hello2')
116
117    def test_str_read(self):
118        self.init_db()
119        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
120            self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
121
122    def test_str_write_contains(self):
123        self.init_db()
124        with contextlib.closing(dumbdbm.open(_fname)) as f:
125            f['\u00fc'] = b'!'
126            f['1'] = 'a'
127        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
128            self.assertIn('\u00fc', f)
129            self.assertEqual(f['\u00fc'.encode('utf-8')],
130                             self._dict['\u00fc'.encode('utf-8')])
131            self.assertEqual(f[b'1'], b'a')
132
133    def test_line_endings(self):
134        # test for bug #1172763: dumbdbm would die if the line endings
135        # weren't what was expected.
136        with contextlib.closing(dumbdbm.open(_fname)) as f:
137            f[b'1'] = b'hello'
138            f[b'2'] = b'hello2'
139
140        # Mangle the file by changing the line separator to Windows or Unix
141        with io.open(_fname + '.dir', 'rb') as file:
142            data = file.read()
143        if os.linesep == '\n':
144            data = data.replace(b'\n', b'\r\n')
145        else:
146            data = data.replace(b'\r\n', b'\n')
147        with io.open(_fname + '.dir', 'wb') as file:
148            file.write(data)
149
150        f = dumbdbm.open(_fname)
151        self.assertEqual(f[b'1'], b'hello')
152        self.assertEqual(f[b'2'], b'hello2')
153
154
155    def read_helper(self, f):
156        keys = self.keys_helper(f)
157        for key in self._dict:
158            self.assertEqual(self._dict[key], f[key])
159
160    def init_db(self):
161        with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
162            for k in self._dict:
163                f[k] = self._dict[k]
164
165    def keys_helper(self, f):
166        keys = sorted(f.keys())
167        dkeys = sorted(self._dict.keys())
168        self.assertEqual(keys, dkeys)
169        return keys
170
171    # Perform randomized operations.  This doesn't make assumptions about
172    # what *might* fail.
173    def test_random(self):
174        import random
175        d = {}  # mirror the database
176        for dummy in range(5):
177            with contextlib.closing(dumbdbm.open(_fname)) as f:
178                for dummy in range(100):
179                    k = random.choice('abcdefghijklm')
180                    if random.random() < 0.2:
181                        if k in d:
182                            del d[k]
183                            del f[k]
184                    else:
185                        v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
186                        d[k] = v
187                        f[k] = v
188                        self.assertEqual(f[k], v)
189
190            with contextlib.closing(dumbdbm.open(_fname)) as f:
191                expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
192                got = sorted(f.items())
193                self.assertEqual(expected, got)
194
195    def test_context_manager(self):
196        with dumbdbm.open(_fname, 'c') as db:
197            db["dumbdbm context manager"] = "context manager"
198
199        with dumbdbm.open(_fname, 'r') as db:
200            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
201
202        with self.assertRaises(dumbdbm.error):
203            db.keys()
204
205    def test_check_closed(self):
206        f = dumbdbm.open(_fname, 'c')
207        f.close()
208
209        for meth in (partial(operator.delitem, f),
210                     partial(operator.setitem, f, 'b'),
211                     partial(operator.getitem, f),
212                     partial(operator.contains, f)):
213            with self.assertRaises(dumbdbm.error) as cm:
214                meth('test')
215            self.assertEqual(str(cm.exception),
216                             "DBM object has already been closed")
217
218        for meth in (operator.methodcaller('keys'),
219                     operator.methodcaller('iterkeys'),
220                     operator.methodcaller('items'),
221                     len):
222            with self.assertRaises(dumbdbm.error) as cm:
223                meth(f)
224            self.assertEqual(str(cm.exception),
225                             "DBM object has already been closed")
226
227    def test_create_new(self):
228        with dumbdbm.open(_fname, 'n') as f:
229            for k in self._dict:
230                f[k] = self._dict[k]
231
232        with dumbdbm.open(_fname, 'n') as f:
233            self.assertEqual(f.keys(), [])
234
235    def test_eval(self):
236        with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
237            stream.write("str(print('Hacked!')), 0\n")
238        with support.captured_stdout() as stdout:
239            with self.assertRaises(ValueError):
240                with dumbdbm.open(_fname) as f:
241                    pass
242            self.assertEqual(stdout.getvalue(), '')
243
244    def test_missing_data(self):
245        for value in ('r', 'w'):
246            _delete_files()
247            with self.assertRaises(FileNotFoundError):
248                dumbdbm.open(_fname, value)
249            self.assertFalse(os.path.exists(_fname + '.dir'))
250            self.assertFalse(os.path.exists(_fname + '.bak'))
251
252    def test_missing_index(self):
253        with dumbdbm.open(_fname, 'n') as f:
254            pass
255        os.unlink(_fname + '.dir')
256        for value in ('r', 'w'):
257            with self.assertRaises(FileNotFoundError):
258                dumbdbm.open(_fname, value)
259            self.assertFalse(os.path.exists(_fname + '.dir'))
260            self.assertFalse(os.path.exists(_fname + '.bak'))
261
262    def test_invalid_flag(self):
263        for flag in ('x', 'rf', None):
264            with self.assertRaisesRegex(ValueError,
265                                        "Flag must be one of "
266                                        "'r', 'w', 'c', or 'n'"):
267                dumbdbm.open(_fname, flag)
268
269    @os_helper.skip_unless_working_chmod
270    def test_readonly_files(self):
271        with os_helper.temp_dir() as dir:
272            fname = os.path.join(dir, 'db')
273            with dumbdbm.open(fname, 'n') as f:
274                self.assertEqual(list(f.keys()), [])
275                for key in self._dict:
276                    f[key] = self._dict[key]
277            os.chmod(fname + ".dir", stat.S_IRUSR)
278            os.chmod(fname + ".dat", stat.S_IRUSR)
279            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
280            with dumbdbm.open(fname, 'r') as f:
281                self.assertEqual(sorted(f.keys()), sorted(self._dict))
282                f.close()  # don't write
283
284    @unittest.skipUnless(os_helper.TESTFN_NONASCII,
285                         'requires OS support of non-ASCII encodings')
286    def test_nonascii_filename(self):
287        filename = os_helper.TESTFN_NONASCII
288        for suffix in ['.dir', '.dat', '.bak']:
289            self.addCleanup(os_helper.unlink, filename + suffix)
290        with dumbdbm.open(filename, 'c') as db:
291            db[b'key'] = b'value'
292        self.assertTrue(os.path.exists(filename + '.dat'))
293        self.assertTrue(os.path.exists(filename + '.dir'))
294        with dumbdbm.open(filename, 'r') as db:
295            self.assertEqual(list(db.keys()), [b'key'])
296            self.assertTrue(b'key' in db)
297            self.assertEqual(db[b'key'], b'value')
298
299    def test_open_with_pathlib_path(self):
300        dumbdbm.open(os_helper.FakePath(_fname), "c").close()
301
302    def test_open_with_bytes_path(self):
303        dumbdbm.open(os.fsencode(_fname), "c").close()
304
305    def test_open_with_pathlib_bytes_path(self):
306        dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close()
307
308    def tearDown(self):
309        _delete_files()
310
311    def setUp(self):
312        _delete_files()
313
314
315if __name__ == "__main__":
316    unittest.main()
317