1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import contextlib 6import io 7import operator 8import os 9import stat 10import unittest 11import dbm.dumb as dumbdbm 12from test import support 13from test.support import os_helper 14from functools import partial 15 16_fname = os_helper.TESTFN 17 18 19def _delete_files(): 20 for ext in [".dir", ".dat", ".bak"]: 21 try: 22 os.unlink(_fname + ext) 23 except OSError: 24 pass 25 26class DumbDBMTestCase(unittest.TestCase): 27 _dict = {b'0': b'', 28 b'a': b'Python:', 29 b'b': b'Programming', 30 b'c': b'the', 31 b'd': b'way', 32 b'f': b'Guido', 33 b'g': b'intended', 34 '\u00fc'.encode('utf-8') : b'!', 35 } 36 37 def test_dumbdbm_creation(self): 38 with contextlib.closing(dumbdbm.open(_fname, 'c')) as f: 39 self.assertEqual(list(f.keys()), []) 40 for key in self._dict: 41 f[key] = self._dict[key] 42 self.read_helper(f) 43 44 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 45 @os_helper.skip_unless_working_chmod 46 def test_dumbdbm_creation_mode(self): 47 try: 48 old_umask = os.umask(0o002) 49 f = dumbdbm.open(_fname, 'c', 0o637) 50 f.close() 51 finally: 52 os.umask(old_umask) 53 54 expected_mode = 0o635 55 if os.name != 'posix': 56 # Windows only supports setting the read-only attribute. 57 # This shouldn't fail, but doesn't work like Unix either. 58 expected_mode = 0o666 59 60 import stat 61 st = os.stat(_fname + '.dat') 62 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 63 st = os.stat(_fname + '.dir') 64 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 65 66 def test_close_twice(self): 67 f = dumbdbm.open(_fname) 68 f[b'a'] = b'b' 69 self.assertEqual(f[b'a'], b'b') 70 f.close() 71 f.close() 72 73 def test_dumbdbm_modification(self): 74 self.init_db() 75 with contextlib.closing(dumbdbm.open(_fname, 'w')) as f: 76 self._dict[b'g'] = f[b'g'] = b"indented" 77 self.read_helper(f) 78 # setdefault() works as in the dict interface 79 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') 80 self.assertEqual(f[b'xxx'], b'foo') 81 82 def test_dumbdbm_read(self): 83 self.init_db() 84 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 85 self.read_helper(f) 86 with self.assertRaisesRegex(dumbdbm.error, 87 'The database is opened for reading only'): 88 f[b'g'] = b'x' 89 with self.assertRaisesRegex(dumbdbm.error, 90 'The database is opened for reading only'): 91 del f[b'a'] 92 # get() works as in the dict interface 93 self.assertEqual(f.get(b'a'), self._dict[b'a']) 94 self.assertEqual(f.get(b'xxx', b'foo'), b'foo') 95 self.assertIsNone(f.get(b'xxx')) 96 with self.assertRaises(KeyError): 97 f[b'xxx'] 98 99 def test_dumbdbm_keys(self): 100 self.init_db() 101 with contextlib.closing(dumbdbm.open(_fname)) as f: 102 keys = self.keys_helper(f) 103 104 def test_write_contains(self): 105 with contextlib.closing(dumbdbm.open(_fname)) as f: 106 f[b'1'] = b'hello' 107 self.assertIn(b'1', f) 108 109 def test_write_write_read(self): 110 # test for bug #482460 111 with contextlib.closing(dumbdbm.open(_fname)) as f: 112 f[b'1'] = b'hello' 113 f[b'1'] = b'hello2' 114 with contextlib.closing(dumbdbm.open(_fname)) as f: 115 self.assertEqual(f[b'1'], b'hello2') 116 117 def test_str_read(self): 118 self.init_db() 119 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 120 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 121 122 def test_str_write_contains(self): 123 self.init_db() 124 with contextlib.closing(dumbdbm.open(_fname)) as f: 125 f['\u00fc'] = b'!' 126 f['1'] = 'a' 127 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 128 self.assertIn('\u00fc', f) 129 self.assertEqual(f['\u00fc'.encode('utf-8')], 130 self._dict['\u00fc'.encode('utf-8')]) 131 self.assertEqual(f[b'1'], b'a') 132 133 def test_line_endings(self): 134 # test for bug #1172763: dumbdbm would die if the line endings 135 # weren't what was expected. 136 with contextlib.closing(dumbdbm.open(_fname)) as f: 137 f[b'1'] = b'hello' 138 f[b'2'] = b'hello2' 139 140 # Mangle the file by changing the line separator to Windows or Unix 141 with io.open(_fname + '.dir', 'rb') as file: 142 data = file.read() 143 if os.linesep == '\n': 144 data = data.replace(b'\n', b'\r\n') 145 else: 146 data = data.replace(b'\r\n', b'\n') 147 with io.open(_fname + '.dir', 'wb') as file: 148 file.write(data) 149 150 f = dumbdbm.open(_fname) 151 self.assertEqual(f[b'1'], b'hello') 152 self.assertEqual(f[b'2'], b'hello2') 153 154 155 def read_helper(self, f): 156 keys = self.keys_helper(f) 157 for key in self._dict: 158 self.assertEqual(self._dict[key], f[key]) 159 160 def init_db(self): 161 with contextlib.closing(dumbdbm.open(_fname, 'n')) as f: 162 for k in self._dict: 163 f[k] = self._dict[k] 164 165 def keys_helper(self, f): 166 keys = sorted(f.keys()) 167 dkeys = sorted(self._dict.keys()) 168 self.assertEqual(keys, dkeys) 169 return keys 170 171 # Perform randomized operations. This doesn't make assumptions about 172 # what *might* fail. 173 def test_random(self): 174 import random 175 d = {} # mirror the database 176 for dummy in range(5): 177 with contextlib.closing(dumbdbm.open(_fname)) as f: 178 for dummy in range(100): 179 k = random.choice('abcdefghijklm') 180 if random.random() < 0.2: 181 if k in d: 182 del d[k] 183 del f[k] 184 else: 185 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 186 d[k] = v 187 f[k] = v 188 self.assertEqual(f[k], v) 189 190 with contextlib.closing(dumbdbm.open(_fname)) as f: 191 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 192 got = sorted(f.items()) 193 self.assertEqual(expected, got) 194 195 def test_context_manager(self): 196 with dumbdbm.open(_fname, 'c') as db: 197 db["dumbdbm context manager"] = "context manager" 198 199 with dumbdbm.open(_fname, 'r') as db: 200 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 201 202 with self.assertRaises(dumbdbm.error): 203 db.keys() 204 205 def test_check_closed(self): 206 f = dumbdbm.open(_fname, 'c') 207 f.close() 208 209 for meth in (partial(operator.delitem, f), 210 partial(operator.setitem, f, 'b'), 211 partial(operator.getitem, f), 212 partial(operator.contains, f)): 213 with self.assertRaises(dumbdbm.error) as cm: 214 meth('test') 215 self.assertEqual(str(cm.exception), 216 "DBM object has already been closed") 217 218 for meth in (operator.methodcaller('keys'), 219 operator.methodcaller('iterkeys'), 220 operator.methodcaller('items'), 221 len): 222 with self.assertRaises(dumbdbm.error) as cm: 223 meth(f) 224 self.assertEqual(str(cm.exception), 225 "DBM object has already been closed") 226 227 def test_create_new(self): 228 with dumbdbm.open(_fname, 'n') as f: 229 for k in self._dict: 230 f[k] = self._dict[k] 231 232 with dumbdbm.open(_fname, 'n') as f: 233 self.assertEqual(f.keys(), []) 234 235 def test_eval(self): 236 with open(_fname + '.dir', 'w', encoding="utf-8") as stream: 237 stream.write("str(print('Hacked!')), 0\n") 238 with support.captured_stdout() as stdout: 239 with self.assertRaises(ValueError): 240 with dumbdbm.open(_fname) as f: 241 pass 242 self.assertEqual(stdout.getvalue(), '') 243 244 def test_missing_data(self): 245 for value in ('r', 'w'): 246 _delete_files() 247 with self.assertRaises(FileNotFoundError): 248 dumbdbm.open(_fname, value) 249 self.assertFalse(os.path.exists(_fname + '.dir')) 250 self.assertFalse(os.path.exists(_fname + '.bak')) 251 252 def test_missing_index(self): 253 with dumbdbm.open(_fname, 'n') as f: 254 pass 255 os.unlink(_fname + '.dir') 256 for value in ('r', 'w'): 257 with self.assertRaises(FileNotFoundError): 258 dumbdbm.open(_fname, value) 259 self.assertFalse(os.path.exists(_fname + '.dir')) 260 self.assertFalse(os.path.exists(_fname + '.bak')) 261 262 def test_invalid_flag(self): 263 for flag in ('x', 'rf', None): 264 with self.assertRaisesRegex(ValueError, 265 "Flag must be one of " 266 "'r', 'w', 'c', or 'n'"): 267 dumbdbm.open(_fname, flag) 268 269 @os_helper.skip_unless_working_chmod 270 def test_readonly_files(self): 271 with os_helper.temp_dir() as dir: 272 fname = os.path.join(dir, 'db') 273 with dumbdbm.open(fname, 'n') as f: 274 self.assertEqual(list(f.keys()), []) 275 for key in self._dict: 276 f[key] = self._dict[key] 277 os.chmod(fname + ".dir", stat.S_IRUSR) 278 os.chmod(fname + ".dat", stat.S_IRUSR) 279 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 280 with dumbdbm.open(fname, 'r') as f: 281 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 282 f.close() # don't write 283 284 @unittest.skipUnless(os_helper.TESTFN_NONASCII, 285 'requires OS support of non-ASCII encodings') 286 def test_nonascii_filename(self): 287 filename = os_helper.TESTFN_NONASCII 288 for suffix in ['.dir', '.dat', '.bak']: 289 self.addCleanup(os_helper.unlink, filename + suffix) 290 with dumbdbm.open(filename, 'c') as db: 291 db[b'key'] = b'value' 292 self.assertTrue(os.path.exists(filename + '.dat')) 293 self.assertTrue(os.path.exists(filename + '.dir')) 294 with dumbdbm.open(filename, 'r') as db: 295 self.assertEqual(list(db.keys()), [b'key']) 296 self.assertTrue(b'key' in db) 297 self.assertEqual(db[b'key'], b'value') 298 299 def test_open_with_pathlib_path(self): 300 dumbdbm.open(os_helper.FakePath(_fname), "c").close() 301 302 def test_open_with_bytes_path(self): 303 dumbdbm.open(os.fsencode(_fname), "c").close() 304 305 def test_open_with_pathlib_bytes_path(self): 306 dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close() 307 308 def tearDown(self): 309 _delete_files() 310 311 def setUp(self): 312 _delete_files() 313 314 315if __name__ == "__main__": 316 unittest.main() 317