1import os 2import sys 3import time 4import stat 5import socket 6import email 7import email.message 8import re 9import io 10import tempfile 11from test import support 12from test.support import os_helper 13from test.support import socket_helper 14import unittest 15import textwrap 16import mailbox 17import glob 18 19 20if not socket_helper.has_gethostname: 21 raise unittest.SkipTest("test requires gethostname()") 22 23 24class TestBase: 25 26 all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, 27 mailbox.mboxMessage, mailbox.MHMessage, 28 mailbox.BabylMessage, mailbox.MMDFMessage) 29 30 def _check_sample(self, msg): 31 # Inspect a mailbox.Message representation of the sample message 32 self.assertIsInstance(msg, email.message.Message) 33 self.assertIsInstance(msg, mailbox.Message) 34 for key, value in _sample_headers.items(): 35 self.assertIn(value, msg.get_all(key)) 36 self.assertTrue(msg.is_multipart()) 37 self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) 38 for i, payload in enumerate(_sample_payloads): 39 part = msg.get_payload(i) 40 self.assertIsInstance(part, email.message.Message) 41 self.assertNotIsInstance(part, mailbox.Message) 42 self.assertEqual(part.get_payload(), payload) 43 44 def _delete_recursively(self, target): 45 # Delete a file or delete a directory recursively 46 if os.path.isdir(target): 47 os_helper.rmtree(target) 48 elif os.path.exists(target): 49 os_helper.unlink(target) 50 51 52class TestMailbox(TestBase): 53 54 maxDiff = None 55 56 _factory = None # Overridden by subclasses to reuse tests 57 _template = 'From: foo\n\n%s\n' 58 59 def setUp(self): 60 self._path = os_helper.TESTFN 61 self._delete_recursively(self._path) 62 self._box = self._factory(self._path) 63 64 def tearDown(self): 65 self._box.close() 66 self._delete_recursively(self._path) 67 68 def test_add(self): 69 # Add copies of a sample message 70 keys = [] 71 keys.append(self._box.add(self._template % 0)) 72 self.assertEqual(len(self._box), 1) 73 keys.append(self._box.add(mailbox.Message(_sample_message))) 74 self.assertEqual(len(self._box), 2) 75 keys.append(self._box.add(email.message_from_string(_sample_message))) 76 self.assertEqual(len(self._box), 3) 77 keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) 78 self.assertEqual(len(self._box), 4) 79 keys.append(self._box.add(_sample_message)) 80 self.assertEqual(len(self._box), 5) 81 keys.append(self._box.add(_bytes_sample_message)) 82 self.assertEqual(len(self._box), 6) 83 with self.assertWarns(DeprecationWarning): 84 keys.append(self._box.add( 85 io.TextIOWrapper(io.BytesIO(_bytes_sample_message), encoding="utf-8"))) 86 self.assertEqual(len(self._box), 7) 87 self.assertEqual(self._box.get_string(keys[0]), self._template % 0) 88 for i in (1, 2, 3, 4, 5, 6): 89 self._check_sample(self._box[keys[i]]) 90 91 _nonascii_msg = textwrap.dedent("""\ 92 From: foo 93 Subject: Falinaptár házhozszállítással. Már rendeltél? 94 95 0 96 """) 97 98 def test_add_invalid_8bit_bytes_header(self): 99 key = self._box.add(self._nonascii_msg.encode('latin-1')) 100 self.assertEqual(len(self._box), 1) 101 self.assertEqual(self._box.get_bytes(key), 102 self._nonascii_msg.encode('latin-1')) 103 104 def test_invalid_nonascii_header_as_string(self): 105 subj = self._nonascii_msg.splitlines()[1] 106 key = self._box.add(subj.encode('latin-1')) 107 self.assertEqual(self._box.get_string(key), 108 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 109 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') 110 111 def test_add_nonascii_string_header_raises(self): 112 with self.assertRaisesRegex(ValueError, "ASCII-only"): 113 self._box.add(self._nonascii_msg) 114 self._box.flush() 115 self.assertEqual(len(self._box), 0) 116 self.assertMailboxEmpty() 117 118 def test_add_that_raises_leaves_mailbox_empty(self): 119 def raiser(*args, **kw): 120 raise Exception("a fake error") 121 support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) 122 with self.assertRaises(Exception): 123 self._box.add(email.message_from_string("From: Alphöso")) 124 self.assertEqual(len(self._box), 0) 125 self._box.close() 126 self.assertMailboxEmpty() 127 128 _non_latin_bin_msg = textwrap.dedent("""\ 129 From: [email protected] 130 To: báz 131 Subject: Maintenant je vous présente mon collègue, le pouf célèbre 132 \tJean de Baddie 133 Mime-Version: 1.0 134 Content-Type: text/plain; charset="utf-8" 135 Content-Transfer-Encoding: 8bit 136 137 Да, они летят. 138 """).encode('utf-8') 139 140 def test_add_8bit_body(self): 141 key = self._box.add(self._non_latin_bin_msg) 142 self.assertEqual(self._box.get_bytes(key), 143 self._non_latin_bin_msg) 144 with self._box.get_file(key) as f: 145 self.assertEqual(f.read(), 146 self._non_latin_bin_msg.replace(b'\n', 147 os.linesep.encode())) 148 self.assertEqual(self._box[key].get_payload(), 149 "Да, они летят.\n") 150 151 def test_add_binary_file(self): 152 with tempfile.TemporaryFile('wb+') as f: 153 f.write(_bytes_sample_message) 154 f.seek(0) 155 key = self._box.add(f) 156 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 157 _bytes_sample_message.split(b'\n')) 158 159 def test_add_binary_nonascii_file(self): 160 with tempfile.TemporaryFile('wb+') as f: 161 f.write(self._non_latin_bin_msg) 162 f.seek(0) 163 key = self._box.add(f) 164 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 165 self._non_latin_bin_msg.split(b'\n')) 166 167 def test_add_text_file_warns(self): 168 with tempfile.TemporaryFile('w+', encoding='utf-8') as f: 169 f.write(_sample_message) 170 f.seek(0) 171 with self.assertWarns(DeprecationWarning): 172 key = self._box.add(f) 173 self.assertEqual(self._box.get_bytes(key).split(b'\n'), 174 _bytes_sample_message.split(b'\n')) 175 176 def test_add_StringIO_warns(self): 177 with self.assertWarns(DeprecationWarning): 178 key = self._box.add(io.StringIO(self._template % "0")) 179 self.assertEqual(self._box.get_string(key), self._template % "0") 180 181 def test_add_nonascii_StringIO_raises(self): 182 with self.assertWarns(DeprecationWarning): 183 with self.assertRaisesRegex(ValueError, "ASCII-only"): 184 self._box.add(io.StringIO(self._nonascii_msg)) 185 self.assertEqual(len(self._box), 0) 186 self._box.close() 187 self.assertMailboxEmpty() 188 189 def test_remove(self): 190 # Remove messages using remove() 191 self._test_remove_or_delitem(self._box.remove) 192 193 def test_delitem(self): 194 # Remove messages using __delitem__() 195 self._test_remove_or_delitem(self._box.__delitem__) 196 197 def _test_remove_or_delitem(self, method): 198 # (Used by test_remove() and test_delitem().) 199 key0 = self._box.add(self._template % 0) 200 key1 = self._box.add(self._template % 1) 201 self.assertEqual(len(self._box), 2) 202 method(key0) 203 self.assertEqual(len(self._box), 1) 204 self.assertRaises(KeyError, lambda: self._box[key0]) 205 self.assertRaises(KeyError, lambda: method(key0)) 206 self.assertEqual(self._box.get_string(key1), self._template % 1) 207 key2 = self._box.add(self._template % 2) 208 self.assertEqual(len(self._box), 2) 209 method(key2) 210 self.assertEqual(len(self._box), 1) 211 self.assertRaises(KeyError, lambda: self._box[key2]) 212 self.assertRaises(KeyError, lambda: method(key2)) 213 self.assertEqual(self._box.get_string(key1), self._template % 1) 214 method(key1) 215 self.assertEqual(len(self._box), 0) 216 self.assertRaises(KeyError, lambda: self._box[key1]) 217 self.assertRaises(KeyError, lambda: method(key1)) 218 219 def test_discard(self, repetitions=10): 220 # Discard messages 221 key0 = self._box.add(self._template % 0) 222 key1 = self._box.add(self._template % 1) 223 self.assertEqual(len(self._box), 2) 224 self._box.discard(key0) 225 self.assertEqual(len(self._box), 1) 226 self.assertRaises(KeyError, lambda: self._box[key0]) 227 self._box.discard(key0) 228 self.assertEqual(len(self._box), 1) 229 self.assertRaises(KeyError, lambda: self._box[key0]) 230 231 def test_get(self): 232 # Retrieve messages using get() 233 key0 = self._box.add(self._template % 0) 234 msg = self._box.get(key0) 235 self.assertEqual(msg['from'], 'foo') 236 self.assertEqual(msg.get_payload(), '0\n') 237 self.assertIsNone(self._box.get('foo')) 238 self.assertIs(self._box.get('foo', False), False) 239 self._box.close() 240 self._box = self._factory(self._path) 241 key1 = self._box.add(self._template % 1) 242 msg = self._box.get(key1) 243 self.assertEqual(msg['from'], 'foo') 244 self.assertEqual(msg.get_payload(), '1\n') 245 246 def test_getitem(self): 247 # Retrieve message using __getitem__() 248 key0 = self._box.add(self._template % 0) 249 msg = self._box[key0] 250 self.assertEqual(msg['from'], 'foo') 251 self.assertEqual(msg.get_payload(), '0\n') 252 self.assertRaises(KeyError, lambda: self._box['foo']) 253 self._box.discard(key0) 254 self.assertRaises(KeyError, lambda: self._box[key0]) 255 256 def test_get_message(self): 257 # Get Message representations of messages 258 key0 = self._box.add(self._template % 0) 259 key1 = self._box.add(_sample_message) 260 msg0 = self._box.get_message(key0) 261 self.assertIsInstance(msg0, mailbox.Message) 262 self.assertEqual(msg0['from'], 'foo') 263 self.assertEqual(msg0.get_payload(), '0\n') 264 self._check_sample(self._box.get_message(key1)) 265 266 def test_get_bytes(self): 267 # Get bytes representations of messages 268 key0 = self._box.add(self._template % 0) 269 key1 = self._box.add(_sample_message) 270 self.assertEqual(self._box.get_bytes(key0), 271 (self._template % 0).encode('ascii')) 272 self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) 273 274 def test_get_string(self): 275 # Get string representations of messages 276 key0 = self._box.add(self._template % 0) 277 key1 = self._box.add(_sample_message) 278 self.assertEqual(self._box.get_string(key0), self._template % 0) 279 self.assertEqual(self._box.get_string(key1).split('\n'), 280 _sample_message.split('\n')) 281 282 def test_get_file(self): 283 # Get file representations of messages 284 key0 = self._box.add(self._template % 0) 285 key1 = self._box.add(_sample_message) 286 with self._box.get_file(key0) as file: 287 data0 = file.read() 288 with self._box.get_file(key1) as file: 289 data1 = file.read() 290 self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), 291 self._template % 0) 292 self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), 293 _sample_message) 294 295 def test_get_file_can_be_closed_twice(self): 296 # Issue 11700 297 key = self._box.add(_sample_message) 298 f = self._box.get_file(key) 299 f.close() 300 f.close() 301 302 def test_iterkeys(self): 303 # Get keys using iterkeys() 304 self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) 305 306 def test_keys(self): 307 # Get keys using keys() 308 self._check_iteration(self._box.keys, do_keys=True, do_values=False) 309 310 def test_itervalues(self): 311 # Get values using itervalues() 312 self._check_iteration(self._box.itervalues, do_keys=False, 313 do_values=True) 314 315 def test_iter(self): 316 # Get values using __iter__() 317 self._check_iteration(self._box.__iter__, do_keys=False, 318 do_values=True) 319 320 def test_values(self): 321 # Get values using values() 322 self._check_iteration(self._box.values, do_keys=False, do_values=True) 323 324 def test_iteritems(self): 325 # Get keys and values using iteritems() 326 self._check_iteration(self._box.iteritems, do_keys=True, 327 do_values=True) 328 329 def test_items(self): 330 # Get keys and values using items() 331 self._check_iteration(self._box.items, do_keys=True, do_values=True) 332 333 def _check_iteration(self, method, do_keys, do_values, repetitions=10): 334 for value in method(): 335 self.fail("Not empty") 336 keys, values = [], [] 337 for i in range(repetitions): 338 keys.append(self._box.add(self._template % i)) 339 values.append(self._template % i) 340 if do_keys and not do_values: 341 returned_keys = list(method()) 342 elif do_values and not do_keys: 343 returned_values = list(method()) 344 else: 345 returned_keys, returned_values = [], [] 346 for key, value in method(): 347 returned_keys.append(key) 348 returned_values.append(value) 349 if do_keys: 350 self.assertEqual(len(keys), len(returned_keys)) 351 self.assertEqual(set(keys), set(returned_keys)) 352 if do_values: 353 count = 0 354 for value in returned_values: 355 self.assertEqual(value['from'], 'foo') 356 self.assertLess(int(value.get_payload()), repetitions) 357 count += 1 358 self.assertEqual(len(values), count) 359 360 def test_contains(self): 361 # Check existence of keys using __contains__() 362 self.assertNotIn('foo', self._box) 363 key0 = self._box.add(self._template % 0) 364 self.assertIn(key0, self._box) 365 self.assertNotIn('foo', self._box) 366 key1 = self._box.add(self._template % 1) 367 self.assertIn(key1, self._box) 368 self.assertIn(key0, self._box) 369 self.assertNotIn('foo', self._box) 370 self._box.remove(key0) 371 self.assertNotIn(key0, self._box) 372 self.assertIn(key1, self._box) 373 self.assertNotIn('foo', self._box) 374 self._box.remove(key1) 375 self.assertNotIn(key1, self._box) 376 self.assertNotIn(key0, self._box) 377 self.assertNotIn('foo', self._box) 378 379 def test_len(self, repetitions=10): 380 # Get message count 381 keys = [] 382 for i in range(repetitions): 383 self.assertEqual(len(self._box), i) 384 keys.append(self._box.add(self._template % i)) 385 self.assertEqual(len(self._box), i + 1) 386 for i in range(repetitions): 387 self.assertEqual(len(self._box), repetitions - i) 388 self._box.remove(keys[i]) 389 self.assertEqual(len(self._box), repetitions - i - 1) 390 391 def test_set_item(self): 392 # Modify messages using __setitem__() 393 key0 = self._box.add(self._template % 'original 0') 394 self.assertEqual(self._box.get_string(key0), 395 self._template % 'original 0') 396 key1 = self._box.add(self._template % 'original 1') 397 self.assertEqual(self._box.get_string(key1), 398 self._template % 'original 1') 399 self._box[key0] = self._template % 'changed 0' 400 self.assertEqual(self._box.get_string(key0), 401 self._template % 'changed 0') 402 self._box[key1] = self._template % 'changed 1' 403 self.assertEqual(self._box.get_string(key1), 404 self._template % 'changed 1') 405 self._box[key0] = _sample_message 406 self._check_sample(self._box[key0]) 407 self._box[key1] = self._box[key0] 408 self._check_sample(self._box[key1]) 409 self._box[key0] = self._template % 'original 0' 410 self.assertEqual(self._box.get_string(key0), 411 self._template % 'original 0') 412 self._check_sample(self._box[key1]) 413 self.assertRaises(KeyError, 414 lambda: self._box.__setitem__('foo', 'bar')) 415 self.assertRaises(KeyError, lambda: self._box['foo']) 416 self.assertEqual(len(self._box), 2) 417 418 def test_clear(self, iterations=10): 419 # Remove all messages using clear() 420 keys = [] 421 for i in range(iterations): 422 self._box.add(self._template % i) 423 for i, key in enumerate(keys): 424 self.assertEqual(self._box.get_string(key), self._template % i) 425 self._box.clear() 426 self.assertEqual(len(self._box), 0) 427 for i, key in enumerate(keys): 428 self.assertRaises(KeyError, lambda: self._box.get_string(key)) 429 430 def test_pop(self): 431 # Get and remove a message using pop() 432 key0 = self._box.add(self._template % 0) 433 self.assertIn(key0, self._box) 434 key1 = self._box.add(self._template % 1) 435 self.assertIn(key1, self._box) 436 self.assertEqual(self._box.pop(key0).get_payload(), '0\n') 437 self.assertNotIn(key0, self._box) 438 self.assertIn(key1, self._box) 439 key2 = self._box.add(self._template % 2) 440 self.assertIn(key2, self._box) 441 self.assertEqual(self._box.pop(key2).get_payload(), '2\n') 442 self.assertNotIn(key2, self._box) 443 self.assertIn(key1, self._box) 444 self.assertEqual(self._box.pop(key1).get_payload(), '1\n') 445 self.assertNotIn(key1, self._box) 446 self.assertEqual(len(self._box), 0) 447 448 def test_popitem(self, iterations=10): 449 # Get and remove an arbitrary (key, message) using popitem() 450 keys = [] 451 for i in range(10): 452 keys.append(self._box.add(self._template % i)) 453 seen = [] 454 for i in range(10): 455 key, msg = self._box.popitem() 456 self.assertIn(key, keys) 457 self.assertNotIn(key, seen) 458 seen.append(key) 459 self.assertEqual(int(msg.get_payload()), keys.index(key)) 460 self.assertEqual(len(self._box), 0) 461 for key in keys: 462 self.assertRaises(KeyError, lambda: self._box[key]) 463 464 def test_update(self): 465 # Modify multiple messages using update() 466 key0 = self._box.add(self._template % 'original 0') 467 key1 = self._box.add(self._template % 'original 1') 468 key2 = self._box.add(self._template % 'original 2') 469 self._box.update({key0: self._template % 'changed 0', 470 key2: _sample_message}) 471 self.assertEqual(len(self._box), 3) 472 self.assertEqual(self._box.get_string(key0), 473 self._template % 'changed 0') 474 self.assertEqual(self._box.get_string(key1), 475 self._template % 'original 1') 476 self._check_sample(self._box[key2]) 477 self._box.update([(key2, self._template % 'changed 2'), 478 (key1, self._template % 'changed 1'), 479 (key0, self._template % 'original 0')]) 480 self.assertEqual(len(self._box), 3) 481 self.assertEqual(self._box.get_string(key0), 482 self._template % 'original 0') 483 self.assertEqual(self._box.get_string(key1), 484 self._template % 'changed 1') 485 self.assertEqual(self._box.get_string(key2), 486 self._template % 'changed 2') 487 self.assertRaises(KeyError, 488 lambda: self._box.update({'foo': 'bar', 489 key0: self._template % "changed 0"})) 490 self.assertEqual(len(self._box), 3) 491 self.assertEqual(self._box.get_string(key0), 492 self._template % "changed 0") 493 self.assertEqual(self._box.get_string(key1), 494 self._template % "changed 1") 495 self.assertEqual(self._box.get_string(key2), 496 self._template % "changed 2") 497 498 def test_flush(self): 499 # Write changes to disk 500 self._test_flush_or_close(self._box.flush, True) 501 502 def test_popitem_and_flush_twice(self): 503 # See #15036. 504 self._box.add(self._template % 0) 505 self._box.add(self._template % 1) 506 self._box.flush() 507 508 self._box.popitem() 509 self._box.flush() 510 self._box.popitem() 511 self._box.flush() 512 513 def test_lock_unlock(self): 514 # Lock and unlock the mailbox 515 self.assertFalse(os.path.exists(self._get_lock_path())) 516 self._box.lock() 517 self.assertTrue(os.path.exists(self._get_lock_path())) 518 self._box.unlock() 519 self.assertFalse(os.path.exists(self._get_lock_path())) 520 521 def test_close(self): 522 # Close mailbox and flush changes to disk 523 self._test_flush_or_close(self._box.close, False) 524 525 def _test_flush_or_close(self, method, should_call_close): 526 contents = [self._template % i for i in range(3)] 527 self._box.add(contents[0]) 528 self._box.add(contents[1]) 529 self._box.add(contents[2]) 530 oldbox = self._box 531 method() 532 if should_call_close: 533 self._box.close() 534 self._box = self._factory(self._path) 535 keys = self._box.keys() 536 self.assertEqual(len(keys), 3) 537 for key in keys: 538 self.assertIn(self._box.get_string(key), contents) 539 oldbox.close() 540 541 def test_dump_message(self): 542 # Write message representations to disk 543 for input in (email.message_from_string(_sample_message), 544 _sample_message, io.BytesIO(_bytes_sample_message)): 545 output = io.BytesIO() 546 self._box._dump_message(input, output) 547 self.assertEqual(output.getvalue(), 548 _bytes_sample_message.replace(b'\n', os.linesep.encode())) 549 output = io.BytesIO() 550 self.assertRaises(TypeError, 551 lambda: self._box._dump_message(None, output)) 552 553 def _get_lock_path(self): 554 # Return the path of the dot lock file. May be overridden. 555 return self._path + '.lock' 556 557 558class TestMailboxSuperclass(TestBase, unittest.TestCase): 559 560 def test_notimplemented(self): 561 # Test that all Mailbox methods raise NotImplementedException. 562 box = mailbox.Mailbox('path') 563 self.assertRaises(NotImplementedError, lambda: box.add('')) 564 self.assertRaises(NotImplementedError, lambda: box.remove('')) 565 self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) 566 self.assertRaises(NotImplementedError, lambda: box.discard('')) 567 self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) 568 self.assertRaises(NotImplementedError, lambda: box.iterkeys()) 569 self.assertRaises(NotImplementedError, lambda: box.keys()) 570 self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) 571 self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) 572 self.assertRaises(NotImplementedError, lambda: box.values()) 573 self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) 574 self.assertRaises(NotImplementedError, lambda: box.items()) 575 self.assertRaises(NotImplementedError, lambda: box.get('')) 576 self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) 577 self.assertRaises(NotImplementedError, lambda: box.get_message('')) 578 self.assertRaises(NotImplementedError, lambda: box.get_string('')) 579 self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) 580 self.assertRaises(NotImplementedError, lambda: box.get_file('')) 581 self.assertRaises(NotImplementedError, lambda: '' in box) 582 self.assertRaises(NotImplementedError, lambda: box.__contains__('')) 583 self.assertRaises(NotImplementedError, lambda: box.__len__()) 584 self.assertRaises(NotImplementedError, lambda: box.clear()) 585 self.assertRaises(NotImplementedError, lambda: box.pop('')) 586 self.assertRaises(NotImplementedError, lambda: box.popitem()) 587 self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) 588 self.assertRaises(NotImplementedError, lambda: box.flush()) 589 self.assertRaises(NotImplementedError, lambda: box.lock()) 590 self.assertRaises(NotImplementedError, lambda: box.unlock()) 591 self.assertRaises(NotImplementedError, lambda: box.close()) 592 593 594class TestMaildir(TestMailbox, unittest.TestCase): 595 596 _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) 597 598 def setUp(self): 599 TestMailbox.setUp(self) 600 if (os.name == 'nt') or (sys.platform == 'cygwin'): 601 self._box.colon = '!' 602 603 def assertMailboxEmpty(self): 604 self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) 605 606 def test_add_MM(self): 607 # Add a MaildirMessage instance 608 msg = mailbox.MaildirMessage(self._template % 0) 609 msg.set_subdir('cur') 610 msg.set_info('foo') 611 key = self._box.add(msg) 612 self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % 613 (key, self._box.colon)))) 614 615 def test_get_MM(self): 616 # Get a MaildirMessage instance 617 msg = mailbox.MaildirMessage(self._template % 0) 618 msg.set_subdir('cur') 619 msg.set_flags('RF') 620 key = self._box.add(msg) 621 msg_returned = self._box.get_message(key) 622 self.assertIsInstance(msg_returned, mailbox.MaildirMessage) 623 self.assertEqual(msg_returned.get_subdir(), 'cur') 624 self.assertEqual(msg_returned.get_flags(), 'FR') 625 626 def test_set_MM(self): 627 # Set with a MaildirMessage instance 628 msg0 = mailbox.MaildirMessage(self._template % 0) 629 msg0.set_flags('TP') 630 key = self._box.add(msg0) 631 msg_returned = self._box.get_message(key) 632 self.assertEqual(msg_returned.get_subdir(), 'new') 633 self.assertEqual(msg_returned.get_flags(), 'PT') 634 msg1 = mailbox.MaildirMessage(self._template % 1) 635 self._box[key] = msg1 636 msg_returned = self._box.get_message(key) 637 self.assertEqual(msg_returned.get_subdir(), 'new') 638 self.assertEqual(msg_returned.get_flags(), '') 639 self.assertEqual(msg_returned.get_payload(), '1\n') 640 msg2 = mailbox.MaildirMessage(self._template % 2) 641 msg2.set_info('2,S') 642 self._box[key] = msg2 643 self._box[key] = self._template % 3 644 msg_returned = self._box.get_message(key) 645 self.assertEqual(msg_returned.get_subdir(), 'new') 646 self.assertEqual(msg_returned.get_flags(), 'S') 647 self.assertEqual(msg_returned.get_payload(), '3\n') 648 649 def test_consistent_factory(self): 650 # Add a message. 651 msg = mailbox.MaildirMessage(self._template % 0) 652 msg.set_subdir('cur') 653 msg.set_flags('RF') 654 key = self._box.add(msg) 655 656 # Create new mailbox with 657 class FakeMessage(mailbox.MaildirMessage): 658 pass 659 box = mailbox.Maildir(self._path, factory=FakeMessage) 660 box.colon = self._box.colon 661 msg2 = box.get_message(key) 662 self.assertIsInstance(msg2, FakeMessage) 663 664 def test_initialize_new(self): 665 # Initialize a non-existent mailbox 666 self.tearDown() 667 self._box = mailbox.Maildir(self._path) 668 self._check_basics() 669 self._delete_recursively(self._path) 670 self._box = self._factory(self._path, factory=None) 671 self._check_basics() 672 673 def test_initialize_existing(self): 674 # Initialize an existing mailbox 675 self.tearDown() 676 for subdir in '', 'tmp', 'new', 'cur': 677 os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) 678 self._box = mailbox.Maildir(self._path) 679 self._check_basics() 680 681 def _check_basics(self, factory=None): 682 # (Used by test_open_new() and test_open_existing().) 683 self.assertEqual(self._box._path, os.path.abspath(self._path)) 684 self.assertEqual(self._box._factory, factory) 685 for subdir in '', 'tmp', 'new', 'cur': 686 path = os.path.join(self._path, subdir) 687 mode = os.stat(path)[stat.ST_MODE] 688 self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) 689 690 def test_list_folders(self): 691 # List folders 692 self._box.add_folder('one') 693 self._box.add_folder('two') 694 self._box.add_folder('three') 695 self.assertEqual(len(self._box.list_folders()), 3) 696 self.assertEqual(set(self._box.list_folders()), 697 set(('one', 'two', 'three'))) 698 699 def test_get_folder(self): 700 # Open folders 701 self._box.add_folder('foo.bar') 702 folder0 = self._box.get_folder('foo.bar') 703 folder0.add(self._template % 'bar') 704 self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) 705 folder1 = self._box.get_folder('foo.bar') 706 self.assertEqual(folder1.get_string(folder1.keys()[0]), 707 self._template % 'bar') 708 709 def test_add_and_remove_folders(self): 710 # Delete folders 711 self._box.add_folder('one') 712 self._box.add_folder('two') 713 self.assertEqual(len(self._box.list_folders()), 2) 714 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 715 self._box.remove_folder('one') 716 self.assertEqual(len(self._box.list_folders()), 1) 717 self.assertEqual(set(self._box.list_folders()), set(('two',))) 718 self._box.add_folder('three') 719 self.assertEqual(len(self._box.list_folders()), 2) 720 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 721 self._box.remove_folder('three') 722 self.assertEqual(len(self._box.list_folders()), 1) 723 self.assertEqual(set(self._box.list_folders()), set(('two',))) 724 self._box.remove_folder('two') 725 self.assertEqual(len(self._box.list_folders()), 0) 726 self.assertEqual(self._box.list_folders(), []) 727 728 def test_clean(self): 729 # Remove old files from 'tmp' 730 foo_path = os.path.join(self._path, 'tmp', 'foo') 731 bar_path = os.path.join(self._path, 'tmp', 'bar') 732 with open(foo_path, 'w', encoding='utf-8') as f: 733 f.write("@") 734 with open(bar_path, 'w', encoding='utf-8') as f: 735 f.write("@") 736 self._box.clean() 737 self.assertTrue(os.path.exists(foo_path)) 738 self.assertTrue(os.path.exists(bar_path)) 739 foo_stat = os.stat(foo_path) 740 os.utime(foo_path, (time.time() - 129600 - 2, 741 foo_stat.st_mtime)) 742 self._box.clean() 743 self.assertFalse(os.path.exists(foo_path)) 744 self.assertTrue(os.path.exists(bar_path)) 745 746 def test_create_tmp(self, repetitions=10): 747 # Create files in tmp directory 748 hostname = socket.gethostname() 749 if '/' in hostname: 750 hostname = hostname.replace('/', r'\057') 751 if ':' in hostname: 752 hostname = hostname.replace(':', r'\072') 753 pid = os.getpid() 754 pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)" 755 r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)") 756 previous_groups = None 757 for x in range(repetitions): 758 tmp_file = self._box._create_tmp() 759 head, tail = os.path.split(tmp_file.name) 760 self.assertEqual(head, os.path.abspath(os.path.join(self._path, 761 "tmp")), 762 "File in wrong location: '%s'" % head) 763 match = pattern.match(tail) 764 self.assertIsNotNone(match, "Invalid file name: '%s'" % tail) 765 groups = match.groups() 766 if previous_groups is not None: 767 self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]), 768 "Non-monotonic seconds: '%s' before '%s'" % 769 (previous_groups[0], groups[0])) 770 if int(groups[0]) == int(previous_groups[0]): 771 self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]), 772 "Non-monotonic milliseconds: '%s' before '%s'" % 773 (previous_groups[1], groups[1])) 774 self.assertEqual(int(groups[2]), pid, 775 "Process ID mismatch: '%s' should be '%s'" % 776 (groups[2], pid)) 777 self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1, 778 "Non-sequential counter: '%s' before '%s'" % 779 (previous_groups[3], groups[3])) 780 self.assertEqual(groups[4], hostname, 781 "Host name mismatch: '%s' should be '%s'" % 782 (groups[4], hostname)) 783 previous_groups = groups 784 tmp_file.write(_bytes_sample_message) 785 tmp_file.seek(0) 786 self.assertEqual(tmp_file.read(), _bytes_sample_message) 787 tmp_file.close() 788 file_count = len(os.listdir(os.path.join(self._path, "tmp"))) 789 self.assertEqual(file_count, repetitions, 790 "Wrong file count: '%s' should be '%s'" % 791 (file_count, repetitions)) 792 793 def test_refresh(self): 794 # Update the table of contents 795 self.assertEqual(self._box._toc, {}) 796 key0 = self._box.add(self._template % 0) 797 key1 = self._box.add(self._template % 1) 798 self.assertEqual(self._box._toc, {}) 799 self._box._refresh() 800 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 801 key1: os.path.join('new', key1)}) 802 key2 = self._box.add(self._template % 2) 803 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 804 key1: os.path.join('new', key1)}) 805 self._box._refresh() 806 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0), 807 key1: os.path.join('new', key1), 808 key2: os.path.join('new', key2)}) 809 810 def test_refresh_after_safety_period(self): 811 # Issue #13254: Call _refresh after the "file system safety 812 # period" of 2 seconds has passed; _toc should still be 813 # updated because this is the first call to _refresh. 814 key0 = self._box.add(self._template % 0) 815 key1 = self._box.add(self._template % 1) 816 817 self._box = self._factory(self._path) 818 self.assertEqual(self._box._toc, {}) 819 820 # Emulate sleeping. Instead of sleeping for 2 seconds, use the 821 # skew factor to make _refresh think that the filesystem 822 # safety period has passed and re-reading the _toc is only 823 # required if mtimes differ. 824 self._box._skewfactor = -3 825 826 self._box._refresh() 827 self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1])) 828 829 def test_lookup(self): 830 # Look up message subpaths in the TOC 831 self.assertRaises(KeyError, lambda: self._box._lookup('foo')) 832 key0 = self._box.add(self._template % 0) 833 self.assertEqual(self._box._lookup(key0), os.path.join('new', key0)) 834 os.remove(os.path.join(self._path, 'new', key0)) 835 self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)}) 836 # Be sure that the TOC is read back from disk (see issue #6896 837 # about bad mtime behaviour on some systems). 838 self._box.flush() 839 self.assertRaises(KeyError, lambda: self._box._lookup(key0)) 840 self.assertEqual(self._box._toc, {}) 841 842 def test_lock_unlock(self): 843 # Lock and unlock the mailbox. For Maildir, this does nothing. 844 self._box.lock() 845 self._box.unlock() 846 847 def test_folder (self): 848 # Test for bug #1569790: verify that folders returned by .get_folder() 849 # use the same factory function. 850 def dummy_factory (s): 851 return None 852 box = self._factory(self._path, factory=dummy_factory) 853 folder = box.add_folder('folder1') 854 self.assertIs(folder._factory, dummy_factory) 855 856 folder1_alias = box.get_folder('folder1') 857 self.assertIs(folder1_alias._factory, dummy_factory) 858 859 def test_directory_in_folder (self): 860 # Test that mailboxes still work if there's a stray extra directory 861 # in a folder. 862 for i in range(10): 863 self._box.add(mailbox.Message(_sample_message)) 864 865 # Create a stray directory 866 os.mkdir(os.path.join(self._path, 'cur', 'stray-dir')) 867 868 # Check that looping still works with the directory present. 869 for msg in self._box: 870 pass 871 872 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 873 def test_file_permissions(self): 874 # Verify that message files are created without execute permissions 875 msg = mailbox.MaildirMessage(self._template % 0) 876 orig_umask = os.umask(0) 877 try: 878 key = self._box.add(msg) 879 finally: 880 os.umask(orig_umask) 881 path = os.path.join(self._path, self._box._lookup(key)) 882 mode = os.stat(path).st_mode 883 self.assertFalse(mode & 0o111) 884 885 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 886 def test_folder_file_perms(self): 887 # From bug #3228, we want to verify that the file created inside a Maildir 888 # subfolder isn't marked as executable. 889 orig_umask = os.umask(0) 890 try: 891 subfolder = self._box.add_folder('subfolder') 892 finally: 893 os.umask(orig_umask) 894 895 path = os.path.join(subfolder._path, 'maildirfolder') 896 st = os.stat(path) 897 perms = st.st_mode 898 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 899 900 def test_reread(self): 901 # Do an initial unconditional refresh 902 self._box._refresh() 903 904 # Put the last modified times more than two seconds into the past 905 # (because mtime may have a two second granularity) 906 for subdir in ('cur', 'new'): 907 os.utime(os.path.join(self._box._path, subdir), 908 (time.time()-5,)*2) 909 910 # Because mtime has a two second granularity in worst case (FAT), a 911 # refresh is done unconditionally if called for within 912 # two-second-plus-a-bit of the last one, just in case the mbox has 913 # changed; so now we have to wait for that interval to expire. 914 # 915 # Because this is a test, emulate sleeping. Instead of 916 # sleeping for 2 seconds, use the skew factor to make _refresh 917 # think that 2 seconds have passed and re-reading the _toc is 918 # only required if mtimes differ. 919 self._box._skewfactor = -3 920 921 # Re-reading causes the ._toc attribute to be assigned a new dictionary 922 # object, so we'll check that the ._toc attribute isn't a different 923 # object. 924 orig_toc = self._box._toc 925 def refreshed(): 926 return self._box._toc is not orig_toc 927 928 self._box._refresh() 929 self.assertFalse(refreshed()) 930 931 # Now, write something into cur and remove it. This changes 932 # the mtime and should cause a re-read. Note that "sleep 933 # emulation" is still in effect, as skewfactor is -3. 934 filename = os.path.join(self._path, 'cur', 'stray-file') 935 os_helper.create_empty_file(filename) 936 os.unlink(filename) 937 self._box._refresh() 938 self.assertTrue(refreshed()) 939 940 941class _TestSingleFile(TestMailbox): 942 '''Common tests for single-file mailboxes''' 943 944 def test_add_doesnt_rewrite(self): 945 # When only adding messages, flush() should not rewrite the 946 # mailbox file. See issue #9559. 947 948 # Inode number changes if the contents are written to another 949 # file which is then renamed over the original file. So we 950 # must check that the inode number doesn't change. 951 inode_before = os.stat(self._path).st_ino 952 953 self._box.add(self._template % 0) 954 self._box.flush() 955 956 inode_after = os.stat(self._path).st_ino 957 self.assertEqual(inode_before, inode_after) 958 959 # Make sure the message was really added 960 self._box.close() 961 self._box = self._factory(self._path) 962 self.assertEqual(len(self._box), 1) 963 964 def test_permissions_after_flush(self): 965 # See issue #5346 966 967 # Make the mailbox world writable. It's unlikely that the new 968 # mailbox file would have these permissions after flush(), 969 # because umask usually prevents it. 970 mode = os.stat(self._path).st_mode | 0o666 971 os.chmod(self._path, mode) 972 973 self._box.add(self._template % 0) 974 i = self._box.add(self._template % 1) 975 # Need to remove one message to make flush() create a new file 976 self._box.remove(i) 977 self._box.flush() 978 979 self.assertEqual(os.stat(self._path).st_mode, mode) 980 981 982class _TestMboxMMDF(_TestSingleFile): 983 984 def tearDown(self): 985 super().tearDown() 986 self._box.close() 987 self._delete_recursively(self._path) 988 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 989 os_helper.unlink(lock_remnant) 990 991 def assertMailboxEmpty(self): 992 with open(self._path, 'rb') as f: 993 self.assertEqual(f.readlines(), []) 994 995 def test_get_bytes_from(self): 996 # Get bytes representations of messages with _unixfrom. 997 unixfrom = 'From foo@bar blah\n' 998 key0 = self._box.add(unixfrom + self._template % 0) 999 key1 = self._box.add(unixfrom + _sample_message) 1000 self.assertEqual(self._box.get_bytes(key0, from_=False), 1001 (self._template % 0).encode('ascii')) 1002 self.assertEqual(self._box.get_bytes(key1, from_=False), 1003 _bytes_sample_message) 1004 self.assertEqual(self._box.get_bytes(key0, from_=True), 1005 (unixfrom + self._template % 0).encode('ascii')) 1006 self.assertEqual(self._box.get_bytes(key1, from_=True), 1007 unixfrom.encode('ascii') + _bytes_sample_message) 1008 1009 def test_get_string_from(self): 1010 # Get string representations of messages with _unixfrom. 1011 unixfrom = 'From foo@bar blah\n' 1012 key0 = self._box.add(unixfrom + self._template % 0) 1013 key1 = self._box.add(unixfrom + _sample_message) 1014 self.assertEqual(self._box.get_string(key0, from_=False), 1015 self._template % 0) 1016 self.assertEqual(self._box.get_string(key1, from_=False).split('\n'), 1017 _sample_message.split('\n')) 1018 self.assertEqual(self._box.get_string(key0, from_=True), 1019 unixfrom + self._template % 0) 1020 self.assertEqual(self._box.get_string(key1, from_=True).split('\n'), 1021 (unixfrom + _sample_message).split('\n')) 1022 1023 def test_add_from_string(self): 1024 # Add a string starting with 'From ' to the mailbox 1025 key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n') 1026 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1027 self.assertEqual(self._box[key].get_payload(), '0\n') 1028 1029 def test_add_from_bytes(self): 1030 # Add a byte string starting with 'From ' to the mailbox 1031 key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n') 1032 self.assertEqual(self._box[key].get_from(), 'foo@bar blah') 1033 self.assertEqual(self._box[key].get_payload(), '0\n') 1034 1035 def test_add_mbox_or_mmdf_message(self): 1036 # Add an mboxMessage or MMDFMessage 1037 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1038 msg = class_('From foo@bar blah\nFrom: foo\n\n0\n') 1039 key = self._box.add(msg) 1040 1041 def test_open_close_open(self): 1042 # Open and inspect previously-created mailbox 1043 values = [self._template % i for i in range(3)] 1044 for value in values: 1045 self._box.add(value) 1046 self._box.close() 1047 mtime = os.path.getmtime(self._path) 1048 self._box = self._factory(self._path) 1049 self.assertEqual(len(self._box), 3) 1050 for key in self._box.iterkeys(): 1051 self.assertIn(self._box.get_string(key), values) 1052 self._box.close() 1053 self.assertEqual(mtime, os.path.getmtime(self._path)) 1054 1055 def test_add_and_close(self): 1056 # Verifying that closing a mailbox doesn't change added items 1057 self._box.add(_sample_message) 1058 for i in range(3): 1059 self._box.add(self._template % i) 1060 self._box.add(_sample_message) 1061 self._box._file.flush() 1062 self._box._file.seek(0) 1063 contents = self._box._file.read() 1064 self._box.close() 1065 with open(self._path, 'rb') as f: 1066 self.assertEqual(contents, f.read()) 1067 self._box = self._factory(self._path) 1068 1069 @support.requires_fork() 1070 @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().") 1071 def test_lock_conflict(self): 1072 # Fork off a child process that will lock the mailbox temporarily, 1073 # unlock it and exit. 1074 c, p = socket.socketpair() 1075 self.addCleanup(c.close) 1076 self.addCleanup(p.close) 1077 1078 pid = os.fork() 1079 if pid == 0: 1080 # child 1081 try: 1082 # lock the mailbox, and signal the parent it can proceed 1083 self._box.lock() 1084 c.send(b'c') 1085 1086 # wait until the parent is done, and unlock the mailbox 1087 c.recv(1) 1088 self._box.unlock() 1089 finally: 1090 os._exit(0) 1091 1092 # In the parent, wait until the child signals it locked the mailbox. 1093 p.recv(1) 1094 try: 1095 self.assertRaises(mailbox.ExternalClashError, 1096 self._box.lock) 1097 finally: 1098 # Signal the child it can now release the lock and exit. 1099 p.send(b'p') 1100 # Wait for child to exit. Locking should now succeed. 1101 support.wait_process(pid, exitcode=0) 1102 1103 self._box.lock() 1104 self._box.unlock() 1105 1106 def test_relock(self): 1107 # Test case for bug #1575506: the mailbox class was locking the 1108 # wrong file object in its flush() method. 1109 msg = "Subject: sub\n\nbody\n" 1110 key1 = self._box.add(msg) 1111 self._box.flush() 1112 self._box.close() 1113 1114 self._box = self._factory(self._path) 1115 self._box.lock() 1116 key2 = self._box.add(msg) 1117 self._box.flush() 1118 self.assertTrue(self._box._locked) 1119 self._box.close() 1120 1121 1122class TestMbox(_TestMboxMMDF, unittest.TestCase): 1123 1124 _factory = lambda self, path, factory=None: mailbox.mbox(path, factory) 1125 1126 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 1127 def test_file_perms(self): 1128 # From bug #3228, we want to verify that the mailbox file isn't executable, 1129 # even if the umask is set to something that would leave executable bits set. 1130 # We only run this test on platforms that support umask. 1131 try: 1132 old_umask = os.umask(0o077) 1133 self._box.close() 1134 os.unlink(self._path) 1135 self._box = mailbox.mbox(self._path, create=True) 1136 self._box.add('') 1137 self._box.close() 1138 finally: 1139 os.umask(old_umask) 1140 1141 st = os.stat(self._path) 1142 perms = st.st_mode 1143 self.assertFalse((perms & 0o111)) # Execute bits should all be off. 1144 1145 def test_terminating_newline(self): 1146 message = email.message.Message() 1147 message['From'] = '[email protected]' 1148 message.set_payload('No newline at the end') 1149 i = self._box.add(message) 1150 1151 # A newline should have been appended to the payload 1152 message = self._box.get(i) 1153 self.assertEqual(message.get_payload(), 'No newline at the end\n') 1154 1155 def test_message_separator(self): 1156 # Check there's always a single blank line after each message 1157 self._box.add('From: foo\n\n0') # No newline at the end 1158 with open(self._path, encoding='utf-8') as f: 1159 data = f.read() 1160 self.assertEqual(data[-3:], '0\n\n') 1161 1162 self._box.add('From: foo\n\n0\n') # Newline at the end 1163 with open(self._path, encoding='utf-8') as f: 1164 data = f.read() 1165 self.assertEqual(data[-3:], '0\n\n') 1166 1167 1168class TestMMDF(_TestMboxMMDF, unittest.TestCase): 1169 1170 _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory) 1171 1172 1173class TestMH(TestMailbox, unittest.TestCase): 1174 1175 _factory = lambda self, path, factory=None: mailbox.MH(path, factory) 1176 1177 def assertMailboxEmpty(self): 1178 self.assertEqual(os.listdir(self._path), ['.mh_sequences']) 1179 1180 def test_list_folders(self): 1181 # List folders 1182 self._box.add_folder('one') 1183 self._box.add_folder('two') 1184 self._box.add_folder('three') 1185 self.assertEqual(len(self._box.list_folders()), 3) 1186 self.assertEqual(set(self._box.list_folders()), 1187 set(('one', 'two', 'three'))) 1188 1189 def test_get_folder(self): 1190 # Open folders 1191 def dummy_factory (s): 1192 return None 1193 self._box = self._factory(self._path, dummy_factory) 1194 1195 new_folder = self._box.add_folder('foo.bar') 1196 folder0 = self._box.get_folder('foo.bar') 1197 folder0.add(self._template % 'bar') 1198 self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar'))) 1199 folder1 = self._box.get_folder('foo.bar') 1200 self.assertEqual(folder1.get_string(folder1.keys()[0]), 1201 self._template % 'bar') 1202 1203 # Test for bug #1569790: verify that folders returned by .get_folder() 1204 # use the same factory function. 1205 self.assertIs(new_folder._factory, self._box._factory) 1206 self.assertIs(folder0._factory, self._box._factory) 1207 1208 def test_add_and_remove_folders(self): 1209 # Delete folders 1210 self._box.add_folder('one') 1211 self._box.add_folder('two') 1212 self.assertEqual(len(self._box.list_folders()), 2) 1213 self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) 1214 self._box.remove_folder('one') 1215 self.assertEqual(len(self._box.list_folders()), 1) 1216 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1217 self._box.add_folder('three') 1218 self.assertEqual(len(self._box.list_folders()), 2) 1219 self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) 1220 self._box.remove_folder('three') 1221 self.assertEqual(len(self._box.list_folders()), 1) 1222 self.assertEqual(set(self._box.list_folders()), set(('two',))) 1223 self._box.remove_folder('two') 1224 self.assertEqual(len(self._box.list_folders()), 0) 1225 self.assertEqual(self._box.list_folders(), []) 1226 1227 def test_sequences(self): 1228 # Get and set sequences 1229 self.assertEqual(self._box.get_sequences(), {}) 1230 msg0 = mailbox.MHMessage(self._template % 0) 1231 msg0.add_sequence('foo') 1232 key0 = self._box.add(msg0) 1233 self.assertEqual(self._box.get_sequences(), {'foo':[key0]}) 1234 msg1 = mailbox.MHMessage(self._template % 1) 1235 msg1.set_sequences(['bar', 'replied', 'foo']) 1236 key1 = self._box.add(msg1) 1237 self.assertEqual(self._box.get_sequences(), 1238 {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]}) 1239 msg0.set_sequences(['flagged']) 1240 self._box[key0] = msg0 1241 self.assertEqual(self._box.get_sequences(), 1242 {'foo':[key1], 'bar':[key1], 'replied':[key1], 1243 'flagged':[key0]}) 1244 self._box.remove(key1) 1245 self.assertEqual(self._box.get_sequences(), {'flagged':[key0]}) 1246 1247 def test_issue2625(self): 1248 msg0 = mailbox.MHMessage(self._template % 0) 1249 msg0.add_sequence('foo') 1250 key0 = self._box.add(msg0) 1251 refmsg0 = self._box.get_message(key0) 1252 1253 def test_issue7627(self): 1254 msg0 = mailbox.MHMessage(self._template % 0) 1255 key0 = self._box.add(msg0) 1256 self._box.lock() 1257 self._box.remove(key0) 1258 self._box.unlock() 1259 1260 def test_pack(self): 1261 # Pack the contents of the mailbox 1262 msg0 = mailbox.MHMessage(self._template % 0) 1263 msg1 = mailbox.MHMessage(self._template % 1) 1264 msg2 = mailbox.MHMessage(self._template % 2) 1265 msg3 = mailbox.MHMessage(self._template % 3) 1266 msg0.set_sequences(['foo', 'unseen']) 1267 msg1.set_sequences(['foo']) 1268 msg2.set_sequences(['foo', 'flagged']) 1269 msg3.set_sequences(['foo', 'bar', 'replied']) 1270 key0 = self._box.add(msg0) 1271 key1 = self._box.add(msg1) 1272 key2 = self._box.add(msg2) 1273 key3 = self._box.add(msg3) 1274 self.assertEqual(self._box.get_sequences(), 1275 {'foo':[key0,key1,key2,key3], 'unseen':[key0], 1276 'flagged':[key2], 'bar':[key3], 'replied':[key3]}) 1277 self._box.remove(key2) 1278 self.assertEqual(self._box.get_sequences(), 1279 {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3], 1280 'replied':[key3]}) 1281 self._box.pack() 1282 self.assertEqual(self._box.keys(), [1, 2, 3]) 1283 key0 = key0 1284 key1 = key0 + 1 1285 key2 = key1 + 1 1286 self.assertEqual(self._box.get_sequences(), 1287 {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]}) 1288 1289 # Test case for packing while holding the mailbox locked. 1290 key0 = self._box.add(msg1) 1291 key1 = self._box.add(msg1) 1292 key2 = self._box.add(msg1) 1293 key3 = self._box.add(msg1) 1294 1295 self._box.remove(key0) 1296 self._box.remove(key2) 1297 self._box.lock() 1298 self._box.pack() 1299 self._box.unlock() 1300 self.assertEqual(self._box.get_sequences(), 1301 {'foo':[1, 2, 3, 4, 5], 1302 'unseen':[1], 'bar':[3], 'replied':[3]}) 1303 1304 def _get_lock_path(self): 1305 return os.path.join(self._path, '.mh_sequences.lock') 1306 1307 1308class TestBabyl(_TestSingleFile, unittest.TestCase): 1309 1310 _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory) 1311 1312 def assertMailboxEmpty(self): 1313 with open(self._path, 'rb') as f: 1314 self.assertEqual(f.readlines(), []) 1315 1316 def tearDown(self): 1317 super().tearDown() 1318 self._box.close() 1319 self._delete_recursively(self._path) 1320 for lock_remnant in glob.glob(glob.escape(self._path) + '.*'): 1321 os_helper.unlink(lock_remnant) 1322 1323 def test_labels(self): 1324 # Get labels from the mailbox 1325 self.assertEqual(self._box.get_labels(), []) 1326 msg0 = mailbox.BabylMessage(self._template % 0) 1327 msg0.add_label('foo') 1328 key0 = self._box.add(msg0) 1329 self.assertEqual(self._box.get_labels(), ['foo']) 1330 msg1 = mailbox.BabylMessage(self._template % 1) 1331 msg1.set_labels(['bar', 'answered', 'foo']) 1332 key1 = self._box.add(msg1) 1333 self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar'])) 1334 msg0.set_labels(['blah', 'filed']) 1335 self._box[key0] = msg0 1336 self.assertEqual(set(self._box.get_labels()), 1337 set(['foo', 'bar', 'blah'])) 1338 self._box.remove(key1) 1339 self.assertEqual(set(self._box.get_labels()), set(['blah'])) 1340 1341 1342class FakeFileLikeObject: 1343 1344 def __init__(self): 1345 self.closed = False 1346 1347 def close(self): 1348 self.closed = True 1349 1350 1351class FakeMailBox(mailbox.Mailbox): 1352 1353 def __init__(self): 1354 mailbox.Mailbox.__init__(self, '', lambda file: None) 1355 self.files = [FakeFileLikeObject() for i in range(10)] 1356 1357 def get_file(self, key): 1358 return self.files[key] 1359 1360 1361class TestFakeMailBox(unittest.TestCase): 1362 1363 def test_closing_fd(self): 1364 box = FakeMailBox() 1365 for i in range(10): 1366 self.assertFalse(box.files[i].closed) 1367 for i in range(10): 1368 box[i] 1369 for i in range(10): 1370 self.assertTrue(box.files[i].closed) 1371 1372 1373class TestMessage(TestBase, unittest.TestCase): 1374 1375 _factory = mailbox.Message # Overridden by subclasses to reuse tests 1376 1377 def setUp(self): 1378 self._path = os_helper.TESTFN 1379 1380 def tearDown(self): 1381 self._delete_recursively(self._path) 1382 1383 def test_initialize_with_eMM(self): 1384 # Initialize based on email.message.Message instance 1385 eMM = email.message_from_string(_sample_message) 1386 msg = self._factory(eMM) 1387 self._post_initialize_hook(msg) 1388 self._check_sample(msg) 1389 1390 def test_initialize_with_string(self): 1391 # Initialize based on string 1392 msg = self._factory(_sample_message) 1393 self._post_initialize_hook(msg) 1394 self._check_sample(msg) 1395 1396 def test_initialize_with_file(self): 1397 # Initialize based on contents of file 1398 with open(self._path, 'w+', encoding='utf-8') as f: 1399 f.write(_sample_message) 1400 f.seek(0) 1401 msg = self._factory(f) 1402 self._post_initialize_hook(msg) 1403 self._check_sample(msg) 1404 1405 def test_initialize_with_binary_file(self): 1406 # Initialize based on contents of binary file 1407 with open(self._path, 'wb+') as f: 1408 f.write(_bytes_sample_message) 1409 f.seek(0) 1410 msg = self._factory(f) 1411 self._post_initialize_hook(msg) 1412 self._check_sample(msg) 1413 1414 def test_initialize_with_nothing(self): 1415 # Initialize without arguments 1416 msg = self._factory() 1417 self._post_initialize_hook(msg) 1418 self.assertIsInstance(msg, email.message.Message) 1419 self.assertIsInstance(msg, mailbox.Message) 1420 self.assertIsInstance(msg, self._factory) 1421 self.assertEqual(msg.keys(), []) 1422 self.assertFalse(msg.is_multipart()) 1423 self.assertIsNone(msg.get_payload()) 1424 1425 def test_initialize_incorrectly(self): 1426 # Initialize with invalid argument 1427 self.assertRaises(TypeError, lambda: self._factory(object())) 1428 1429 def test_all_eMM_attributes_exist(self): 1430 # Issue 12537 1431 eMM = email.message_from_string(_sample_message) 1432 msg = self._factory(_sample_message) 1433 for attr in eMM.__dict__: 1434 self.assertIn(attr, msg.__dict__, 1435 '{} attribute does not exist'.format(attr)) 1436 1437 def test_become_message(self): 1438 # Take on the state of another message 1439 eMM = email.message_from_string(_sample_message) 1440 msg = self._factory() 1441 msg._become_message(eMM) 1442 self._check_sample(msg) 1443 1444 def test_explain_to(self): 1445 # Copy self's format-specific data to other message formats. 1446 # This test is superficial; better ones are in TestMessageConversion. 1447 msg = self._factory() 1448 for class_ in self.all_mailbox_types: 1449 other_msg = class_() 1450 msg._explain_to(other_msg) 1451 other_msg = email.message.Message() 1452 self.assertRaises(TypeError, lambda: msg._explain_to(other_msg)) 1453 1454 def _post_initialize_hook(self, msg): 1455 # Overridden by subclasses to check extra things after initialization 1456 pass 1457 1458 1459class TestMaildirMessage(TestMessage, unittest.TestCase): 1460 1461 _factory = mailbox.MaildirMessage 1462 1463 def _post_initialize_hook(self, msg): 1464 self.assertEqual(msg._subdir, 'new') 1465 self.assertEqual(msg._info, '') 1466 1467 def test_subdir(self): 1468 # Use get_subdir() and set_subdir() 1469 msg = mailbox.MaildirMessage(_sample_message) 1470 self.assertEqual(msg.get_subdir(), 'new') 1471 msg.set_subdir('cur') 1472 self.assertEqual(msg.get_subdir(), 'cur') 1473 msg.set_subdir('new') 1474 self.assertEqual(msg.get_subdir(), 'new') 1475 self.assertRaises(ValueError, lambda: msg.set_subdir('tmp')) 1476 self.assertEqual(msg.get_subdir(), 'new') 1477 msg.set_subdir('new') 1478 self.assertEqual(msg.get_subdir(), 'new') 1479 self._check_sample(msg) 1480 1481 def test_flags(self): 1482 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1483 msg = mailbox.MaildirMessage(_sample_message) 1484 self.assertEqual(msg.get_flags(), '') 1485 self.assertEqual(msg.get_subdir(), 'new') 1486 msg.set_flags('F') 1487 self.assertEqual(msg.get_subdir(), 'new') 1488 self.assertEqual(msg.get_flags(), 'F') 1489 msg.set_flags('SDTP') 1490 self.assertEqual(msg.get_flags(), 'DPST') 1491 msg.add_flag('FT') 1492 self.assertEqual(msg.get_flags(), 'DFPST') 1493 msg.remove_flag('TDRP') 1494 self.assertEqual(msg.get_flags(), 'FS') 1495 self.assertEqual(msg.get_subdir(), 'new') 1496 self._check_sample(msg) 1497 1498 def test_date(self): 1499 # Use get_date() and set_date() 1500 msg = mailbox.MaildirMessage(_sample_message) 1501 self.assertLess(abs(msg.get_date() - time.time()), 60) 1502 msg.set_date(0.0) 1503 self.assertEqual(msg.get_date(), 0.0) 1504 1505 def test_info(self): 1506 # Use get_info() and set_info() 1507 msg = mailbox.MaildirMessage(_sample_message) 1508 self.assertEqual(msg.get_info(), '') 1509 msg.set_info('1,foo=bar') 1510 self.assertEqual(msg.get_info(), '1,foo=bar') 1511 self.assertRaises(TypeError, lambda: msg.set_info(None)) 1512 self._check_sample(msg) 1513 1514 def test_info_and_flags(self): 1515 # Test interaction of info and flag methods 1516 msg = mailbox.MaildirMessage(_sample_message) 1517 self.assertEqual(msg.get_info(), '') 1518 msg.set_flags('SF') 1519 self.assertEqual(msg.get_flags(), 'FS') 1520 self.assertEqual(msg.get_info(), '2,FS') 1521 msg.set_info('1,') 1522 self.assertEqual(msg.get_flags(), '') 1523 self.assertEqual(msg.get_info(), '1,') 1524 msg.remove_flag('RPT') 1525 self.assertEqual(msg.get_flags(), '') 1526 self.assertEqual(msg.get_info(), '1,') 1527 msg.add_flag('D') 1528 self.assertEqual(msg.get_flags(), 'D') 1529 self.assertEqual(msg.get_info(), '2,D') 1530 self._check_sample(msg) 1531 1532 1533class _TestMboxMMDFMessage: 1534 1535 _factory = mailbox._mboxMMDFMessage 1536 1537 def _post_initialize_hook(self, msg): 1538 self._check_from(msg) 1539 1540 def test_initialize_with_unixfrom(self): 1541 # Initialize with a message that already has a _unixfrom attribute 1542 msg = mailbox.Message(_sample_message) 1543 msg.set_unixfrom('From foo@bar blah') 1544 msg = mailbox.mboxMessage(msg) 1545 self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from()) 1546 1547 def test_from(self): 1548 # Get and set "From " line 1549 msg = mailbox.mboxMessage(_sample_message) 1550 self._check_from(msg) 1551 msg.set_from('foo bar') 1552 self.assertEqual(msg.get_from(), 'foo bar') 1553 msg.set_from('foo@bar', True) 1554 self._check_from(msg, 'foo@bar') 1555 msg.set_from('blah@temp', time.localtime()) 1556 self._check_from(msg, 'blah@temp') 1557 1558 def test_flags(self): 1559 # Use get_flags(), set_flags(), add_flag(), remove_flag() 1560 msg = mailbox.mboxMessage(_sample_message) 1561 self.assertEqual(msg.get_flags(), '') 1562 msg.set_flags('F') 1563 self.assertEqual(msg.get_flags(), 'F') 1564 msg.set_flags('XODR') 1565 self.assertEqual(msg.get_flags(), 'RODX') 1566 msg.add_flag('FA') 1567 self.assertEqual(msg.get_flags(), 'RODFAX') 1568 msg.remove_flag('FDXA') 1569 self.assertEqual(msg.get_flags(), 'RO') 1570 self._check_sample(msg) 1571 1572 def _check_from(self, msg, sender=None): 1573 # Check contents of "From " line 1574 if sender is None: 1575 sender = "MAILER-DAEMON" 1576 self.assertIsNotNone(re.match( 1577 sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}", 1578 msg.get_from())) 1579 1580 1581class TestMboxMessage(_TestMboxMMDFMessage, TestMessage): 1582 1583 _factory = mailbox.mboxMessage 1584 1585 1586class TestMHMessage(TestMessage, unittest.TestCase): 1587 1588 _factory = mailbox.MHMessage 1589 1590 def _post_initialize_hook(self, msg): 1591 self.assertEqual(msg._sequences, []) 1592 1593 def test_sequences(self): 1594 # Get, set, join, and leave sequences 1595 msg = mailbox.MHMessage(_sample_message) 1596 self.assertEqual(msg.get_sequences(), []) 1597 msg.set_sequences(['foobar']) 1598 self.assertEqual(msg.get_sequences(), ['foobar']) 1599 msg.set_sequences([]) 1600 self.assertEqual(msg.get_sequences(), []) 1601 msg.add_sequence('unseen') 1602 self.assertEqual(msg.get_sequences(), ['unseen']) 1603 msg.add_sequence('flagged') 1604 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1605 msg.add_sequence('flagged') 1606 self.assertEqual(msg.get_sequences(), ['unseen', 'flagged']) 1607 msg.remove_sequence('unseen') 1608 self.assertEqual(msg.get_sequences(), ['flagged']) 1609 msg.add_sequence('foobar') 1610 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1611 msg.remove_sequence('replied') 1612 self.assertEqual(msg.get_sequences(), ['flagged', 'foobar']) 1613 msg.set_sequences(['foobar', 'replied']) 1614 self.assertEqual(msg.get_sequences(), ['foobar', 'replied']) 1615 1616 1617class TestBabylMessage(TestMessage, unittest.TestCase): 1618 1619 _factory = mailbox.BabylMessage 1620 1621 def _post_initialize_hook(self, msg): 1622 self.assertEqual(msg._labels, []) 1623 1624 def test_labels(self): 1625 # Get, set, join, and leave labels 1626 msg = mailbox.BabylMessage(_sample_message) 1627 self.assertEqual(msg.get_labels(), []) 1628 msg.set_labels(['foobar']) 1629 self.assertEqual(msg.get_labels(), ['foobar']) 1630 msg.set_labels([]) 1631 self.assertEqual(msg.get_labels(), []) 1632 msg.add_label('filed') 1633 self.assertEqual(msg.get_labels(), ['filed']) 1634 msg.add_label('resent') 1635 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1636 msg.add_label('resent') 1637 self.assertEqual(msg.get_labels(), ['filed', 'resent']) 1638 msg.remove_label('filed') 1639 self.assertEqual(msg.get_labels(), ['resent']) 1640 msg.add_label('foobar') 1641 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1642 msg.remove_label('unseen') 1643 self.assertEqual(msg.get_labels(), ['resent', 'foobar']) 1644 msg.set_labels(['foobar', 'answered']) 1645 self.assertEqual(msg.get_labels(), ['foobar', 'answered']) 1646 1647 def test_visible(self): 1648 # Get, set, and update visible headers 1649 msg = mailbox.BabylMessage(_sample_message) 1650 visible = msg.get_visible() 1651 self.assertEqual(visible.keys(), []) 1652 self.assertIsNone(visible.get_payload()) 1653 visible['User-Agent'] = 'FooBar 1.0' 1654 visible['X-Whatever'] = 'Blah' 1655 self.assertEqual(msg.get_visible().keys(), []) 1656 msg.set_visible(visible) 1657 visible = msg.get_visible() 1658 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1659 self.assertEqual(visible['User-Agent'], 'FooBar 1.0') 1660 self.assertEqual(visible['X-Whatever'], 'Blah') 1661 self.assertIsNone(visible.get_payload()) 1662 msg.update_visible() 1663 self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever']) 1664 self.assertIsNone(visible.get_payload()) 1665 visible = msg.get_visible() 1666 self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To', 1667 'Subject']) 1668 for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'): 1669 self.assertEqual(visible[header], msg[header]) 1670 1671 1672class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage): 1673 1674 _factory = mailbox.MMDFMessage 1675 1676 1677class TestMessageConversion(TestBase, unittest.TestCase): 1678 1679 def test_plain_to_x(self): 1680 # Convert Message to all formats 1681 for class_ in self.all_mailbox_types: 1682 msg_plain = mailbox.Message(_sample_message) 1683 msg = class_(msg_plain) 1684 self._check_sample(msg) 1685 1686 def test_x_to_plain(self): 1687 # Convert all formats to Message 1688 for class_ in self.all_mailbox_types: 1689 msg = class_(_sample_message) 1690 msg_plain = mailbox.Message(msg) 1691 self._check_sample(msg_plain) 1692 1693 def test_x_from_bytes(self): 1694 # Convert all formats to Message 1695 for class_ in self.all_mailbox_types: 1696 msg = class_(_bytes_sample_message) 1697 self._check_sample(msg) 1698 1699 def test_x_to_invalid(self): 1700 # Convert all formats to an invalid format 1701 for class_ in self.all_mailbox_types: 1702 self.assertRaises(TypeError, lambda: class_(False)) 1703 1704 def test_type_specific_attributes_removed_on_conversion(self): 1705 reference = {class_: class_(_sample_message).__dict__ 1706 for class_ in self.all_mailbox_types} 1707 for class1 in self.all_mailbox_types: 1708 for class2 in self.all_mailbox_types: 1709 if class1 is class2: 1710 continue 1711 source = class1(_sample_message) 1712 target = class2(source) 1713 type_specific = [a for a in reference[class1] 1714 if a not in reference[class2]] 1715 for attr in type_specific: 1716 self.assertNotIn(attr, target.__dict__, 1717 "while converting {} to {}".format(class1, class2)) 1718 1719 def test_maildir_to_maildir(self): 1720 # Convert MaildirMessage to MaildirMessage 1721 msg_maildir = mailbox.MaildirMessage(_sample_message) 1722 msg_maildir.set_flags('DFPRST') 1723 msg_maildir.set_subdir('cur') 1724 date = msg_maildir.get_date() 1725 msg = mailbox.MaildirMessage(msg_maildir) 1726 self._check_sample(msg) 1727 self.assertEqual(msg.get_flags(), 'DFPRST') 1728 self.assertEqual(msg.get_subdir(), 'cur') 1729 self.assertEqual(msg.get_date(), date) 1730 1731 def test_maildir_to_mboxmmdf(self): 1732 # Convert MaildirMessage to mboxmessage and MMDFMessage 1733 pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'), 1734 ('T', 'D'), ('DFPRST', 'RDFA')) 1735 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1736 msg_maildir = mailbox.MaildirMessage(_sample_message) 1737 msg_maildir.set_date(0.0) 1738 for setting, result in pairs: 1739 msg_maildir.set_flags(setting) 1740 msg = class_(msg_maildir) 1741 self.assertEqual(msg.get_flags(), result) 1742 self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' % 1743 time.asctime(time.gmtime(0.0))) 1744 msg_maildir.set_subdir('cur') 1745 self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA') 1746 1747 def test_maildir_to_mh(self): 1748 # Convert MaildirMessage to MHMessage 1749 msg_maildir = mailbox.MaildirMessage(_sample_message) 1750 pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']), 1751 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []), 1752 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged'])) 1753 for setting, result in pairs: 1754 msg_maildir.set_flags(setting) 1755 self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(), 1756 result) 1757 1758 def test_maildir_to_babyl(self): 1759 # Convert MaildirMessage to Babyl 1760 msg_maildir = mailbox.MaildirMessage(_sample_message) 1761 pairs = (('D', ['unseen']), ('F', ['unseen']), 1762 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']), 1763 ('S', []), ('T', ['unseen', 'deleted']), 1764 ('DFPRST', ['deleted', 'answered', 'forwarded'])) 1765 for setting, result in pairs: 1766 msg_maildir.set_flags(setting) 1767 self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(), 1768 result) 1769 1770 def test_mboxmmdf_to_maildir(self): 1771 # Convert mboxMessage and MMDFMessage to MaildirMessage 1772 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1773 msg_mboxMMDF = class_(_sample_message) 1774 msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0)) 1775 pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'), 1776 ('RODFA', 'FRST')) 1777 for setting, result in pairs: 1778 msg_mboxMMDF.set_flags(setting) 1779 msg = mailbox.MaildirMessage(msg_mboxMMDF) 1780 self.assertEqual(msg.get_flags(), result) 1781 self.assertEqual(msg.get_date(), 0.0) 1782 msg_mboxMMDF.set_flags('O') 1783 self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(), 1784 'cur') 1785 1786 def test_mboxmmdf_to_mboxmmdf(self): 1787 # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage 1788 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1789 msg_mboxMMDF = class_(_sample_message) 1790 msg_mboxMMDF.set_flags('RODFA') 1791 msg_mboxMMDF.set_from('foo@bar') 1792 for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1793 msg2 = class2_(msg_mboxMMDF) 1794 self.assertEqual(msg2.get_flags(), 'RODFA') 1795 self.assertEqual(msg2.get_from(), 'foo@bar') 1796 1797 def test_mboxmmdf_to_mh(self): 1798 # Convert mboxMessage and MMDFMessage to MHMessage 1799 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1800 msg_mboxMMDF = class_(_sample_message) 1801 pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']), 1802 ('F', ['unseen', 'flagged']), 1803 ('A', ['unseen', 'replied']), 1804 ('RODFA', ['replied', 'flagged'])) 1805 for setting, result in pairs: 1806 msg_mboxMMDF.set_flags(setting) 1807 self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(), 1808 result) 1809 1810 def test_mboxmmdf_to_babyl(self): 1811 # Convert mboxMessage and MMDFMessage to BabylMessage 1812 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1813 msg = class_(_sample_message) 1814 pairs = (('R', []), ('O', ['unseen']), 1815 ('D', ['unseen', 'deleted']), ('F', ['unseen']), 1816 ('A', ['unseen', 'answered']), 1817 ('RODFA', ['deleted', 'answered'])) 1818 for setting, result in pairs: 1819 msg.set_flags(setting) 1820 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1821 1822 def test_mh_to_maildir(self): 1823 # Convert MHMessage to MaildirMessage 1824 pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS')) 1825 for setting, result in pairs: 1826 msg = mailbox.MHMessage(_sample_message) 1827 msg.add_sequence(setting) 1828 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1829 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1830 msg = mailbox.MHMessage(_sample_message) 1831 msg.add_sequence('unseen') 1832 msg.add_sequence('replied') 1833 msg.add_sequence('flagged') 1834 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR') 1835 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1836 1837 def test_mh_to_mboxmmdf(self): 1838 # Convert MHMessage to mboxMessage and MMDFMessage 1839 pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF')) 1840 for setting, result in pairs: 1841 msg = mailbox.MHMessage(_sample_message) 1842 msg.add_sequence(setting) 1843 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1844 self.assertEqual(class_(msg).get_flags(), result) 1845 msg = mailbox.MHMessage(_sample_message) 1846 msg.add_sequence('unseen') 1847 msg.add_sequence('replied') 1848 msg.add_sequence('flagged') 1849 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1850 self.assertEqual(class_(msg).get_flags(), 'OFA') 1851 1852 def test_mh_to_mh(self): 1853 # Convert MHMessage to MHMessage 1854 msg = mailbox.MHMessage(_sample_message) 1855 msg.add_sequence('unseen') 1856 msg.add_sequence('replied') 1857 msg.add_sequence('flagged') 1858 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1859 ['unseen', 'replied', 'flagged']) 1860 1861 def test_mh_to_babyl(self): 1862 # Convert MHMessage to BabylMessage 1863 pairs = (('unseen', ['unseen']), ('replied', ['answered']), 1864 ('flagged', [])) 1865 for setting, result in pairs: 1866 msg = mailbox.MHMessage(_sample_message) 1867 msg.add_sequence(setting) 1868 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result) 1869 msg = mailbox.MHMessage(_sample_message) 1870 msg.add_sequence('unseen') 1871 msg.add_sequence('replied') 1872 msg.add_sequence('flagged') 1873 self.assertEqual(mailbox.BabylMessage(msg).get_labels(), 1874 ['unseen', 'answered']) 1875 1876 def test_babyl_to_maildir(self): 1877 # Convert BabylMessage to MaildirMessage 1878 pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'), 1879 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'), 1880 ('resent', 'PS')) 1881 for setting, result in pairs: 1882 msg = mailbox.BabylMessage(_sample_message) 1883 msg.add_label(setting) 1884 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result) 1885 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1886 msg = mailbox.BabylMessage(_sample_message) 1887 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1888 'edited', 'resent'): 1889 msg.add_label(label) 1890 self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT') 1891 self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur') 1892 1893 def test_babyl_to_mboxmmdf(self): 1894 # Convert BabylMessage to mboxMessage and MMDFMessage 1895 pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'), 1896 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'), 1897 ('resent', 'RO')) 1898 for setting, result in pairs: 1899 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1900 msg = mailbox.BabylMessage(_sample_message) 1901 msg.add_label(setting) 1902 self.assertEqual(class_(msg).get_flags(), result) 1903 msg = mailbox.BabylMessage(_sample_message) 1904 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1905 'edited', 'resent'): 1906 msg.add_label(label) 1907 for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage): 1908 self.assertEqual(class_(msg).get_flags(), 'ODA') 1909 1910 def test_babyl_to_mh(self): 1911 # Convert BabylMessage to MHMessage 1912 pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []), 1913 ('answered', ['replied']), ('forwarded', []), ('edited', []), 1914 ('resent', [])) 1915 for setting, result in pairs: 1916 msg = mailbox.BabylMessage(_sample_message) 1917 msg.add_label(setting) 1918 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result) 1919 msg = mailbox.BabylMessage(_sample_message) 1920 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1921 'edited', 'resent'): 1922 msg.add_label(label) 1923 self.assertEqual(mailbox.MHMessage(msg).get_sequences(), 1924 ['unseen', 'replied']) 1925 1926 def test_babyl_to_babyl(self): 1927 # Convert BabylMessage to BabylMessage 1928 msg = mailbox.BabylMessage(_sample_message) 1929 msg.update_visible() 1930 for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded', 1931 'edited', 'resent'): 1932 msg.add_label(label) 1933 msg2 = mailbox.BabylMessage(msg) 1934 self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed', 1935 'answered', 'forwarded', 'edited', 1936 'resent']) 1937 self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys()) 1938 for key in msg.get_visible().keys(): 1939 self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key]) 1940 1941 1942class TestProxyFileBase(TestBase): 1943 1944 def _test_read(self, proxy): 1945 # Read by byte 1946 proxy.seek(0) 1947 self.assertEqual(proxy.read(), b'bar') 1948 proxy.seek(1) 1949 self.assertEqual(proxy.read(), b'ar') 1950 proxy.seek(0) 1951 self.assertEqual(proxy.read(2), b'ba') 1952 proxy.seek(1) 1953 self.assertEqual(proxy.read(-1), b'ar') 1954 proxy.seek(2) 1955 self.assertEqual(proxy.read(1000), b'r') 1956 1957 def _test_readline(self, proxy): 1958 # Read by line 1959 linesep = os.linesep.encode() 1960 proxy.seek(0) 1961 self.assertEqual(proxy.readline(), b'foo' + linesep) 1962 self.assertEqual(proxy.readline(), b'bar' + linesep) 1963 self.assertEqual(proxy.readline(), b'fred' + linesep) 1964 self.assertEqual(proxy.readline(), b'bob') 1965 proxy.seek(2) 1966 self.assertEqual(proxy.readline(), b'o' + linesep) 1967 proxy.seek(6 + 2 * len(os.linesep)) 1968 self.assertEqual(proxy.readline(), b'fred' + linesep) 1969 proxy.seek(6 + 2 * len(os.linesep)) 1970 self.assertEqual(proxy.readline(2), b'fr') 1971 self.assertEqual(proxy.readline(-10), b'ed' + linesep) 1972 1973 def _test_readlines(self, proxy): 1974 # Read multiple lines 1975 linesep = os.linesep.encode() 1976 proxy.seek(0) 1977 self.assertEqual(proxy.readlines(), [b'foo' + linesep, 1978 b'bar' + linesep, 1979 b'fred' + linesep, b'bob']) 1980 proxy.seek(0) 1981 self.assertEqual(proxy.readlines(2), [b'foo' + linesep]) 1982 proxy.seek(3 + len(linesep)) 1983 self.assertEqual(proxy.readlines(4 + len(linesep)), 1984 [b'bar' + linesep, b'fred' + linesep]) 1985 proxy.seek(3) 1986 self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep, 1987 b'fred' + linesep, b'bob']) 1988 1989 def _test_iteration(self, proxy): 1990 # Iterate by line 1991 linesep = os.linesep.encode() 1992 proxy.seek(0) 1993 iterator = iter(proxy) 1994 self.assertEqual(next(iterator), b'foo' + linesep) 1995 self.assertEqual(next(iterator), b'bar' + linesep) 1996 self.assertEqual(next(iterator), b'fred' + linesep) 1997 self.assertEqual(next(iterator), b'bob') 1998 self.assertRaises(StopIteration, next, iterator) 1999 2000 def _test_seek_and_tell(self, proxy): 2001 # Seek and use tell to check position 2002 linesep = os.linesep.encode() 2003 proxy.seek(3) 2004 self.assertEqual(proxy.tell(), 3) 2005 self.assertEqual(proxy.read(len(linesep)), linesep) 2006 proxy.seek(2, 1) 2007 self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep) 2008 proxy.seek(-3 - len(linesep), 2) 2009 self.assertEqual(proxy.read(3), b'bar') 2010 proxy.seek(2, 0) 2011 self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep) 2012 proxy.seek(100) 2013 self.assertFalse(proxy.read()) 2014 2015 def _test_close(self, proxy): 2016 # Close a file 2017 self.assertFalse(proxy.closed) 2018 proxy.close() 2019 self.assertTrue(proxy.closed) 2020 # Issue 11700 subsequent closes should be a no-op. 2021 proxy.close() 2022 self.assertTrue(proxy.closed) 2023 2024 2025class TestProxyFile(TestProxyFileBase, unittest.TestCase): 2026 2027 def setUp(self): 2028 self._path = os_helper.TESTFN 2029 self._file = open(self._path, 'wb+') 2030 2031 def tearDown(self): 2032 self._file.close() 2033 self._delete_recursively(self._path) 2034 2035 def test_initialize(self): 2036 # Initialize and check position 2037 self._file.write(b'foo') 2038 pos = self._file.tell() 2039 proxy0 = mailbox._ProxyFile(self._file) 2040 self.assertEqual(proxy0.tell(), pos) 2041 self.assertEqual(self._file.tell(), pos) 2042 proxy1 = mailbox._ProxyFile(self._file, 0) 2043 self.assertEqual(proxy1.tell(), 0) 2044 self.assertEqual(self._file.tell(), pos) 2045 2046 def test_read(self): 2047 self._file.write(b'bar') 2048 self._test_read(mailbox._ProxyFile(self._file)) 2049 2050 def test_readline(self): 2051 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2052 os.linesep), 'ascii')) 2053 self._test_readline(mailbox._ProxyFile(self._file)) 2054 2055 def test_readlines(self): 2056 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2057 os.linesep), 'ascii')) 2058 self._test_readlines(mailbox._ProxyFile(self._file)) 2059 2060 def test_iteration(self): 2061 self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep, 2062 os.linesep), 'ascii')) 2063 self._test_iteration(mailbox._ProxyFile(self._file)) 2064 2065 def test_seek_and_tell(self): 2066 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2067 self._test_seek_and_tell(mailbox._ProxyFile(self._file)) 2068 2069 def test_close(self): 2070 self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii')) 2071 self._test_close(mailbox._ProxyFile(self._file)) 2072 2073 2074class TestPartialFile(TestProxyFileBase, unittest.TestCase): 2075 2076 def setUp(self): 2077 self._path = os_helper.TESTFN 2078 self._file = open(self._path, 'wb+') 2079 2080 def tearDown(self): 2081 self._file.close() 2082 self._delete_recursively(self._path) 2083 2084 def test_initialize(self): 2085 # Initialize and check position 2086 self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii')) 2087 pos = self._file.tell() 2088 proxy = mailbox._PartialFile(self._file, 2, 5) 2089 self.assertEqual(proxy.tell(), 0) 2090 self.assertEqual(self._file.tell(), pos) 2091 2092 def test_read(self): 2093 self._file.write(bytes('***bar***', 'ascii')) 2094 self._test_read(mailbox._PartialFile(self._file, 3, 6)) 2095 2096 def test_readline(self): 2097 self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' % 2098 (os.linesep, os.linesep, os.linesep), 'ascii')) 2099 self._test_readline(mailbox._PartialFile(self._file, 5, 2100 18 + 3 * len(os.linesep))) 2101 2102 def test_readlines(self): 2103 self._file.write(bytes('foo%sbar%sfred%sbob?????' % 2104 (os.linesep, os.linesep, os.linesep), 'ascii')) 2105 self._test_readlines(mailbox._PartialFile(self._file, 0, 2106 13 + 3 * len(os.linesep))) 2107 2108 def test_iteration(self): 2109 self._file.write(bytes('____foo%sbar%sfred%sbob####' % 2110 (os.linesep, os.linesep, os.linesep), 'ascii')) 2111 self._test_iteration(mailbox._PartialFile(self._file, 4, 2112 17 + 3 * len(os.linesep))) 2113 2114 def test_seek_and_tell(self): 2115 self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii')) 2116 self._test_seek_and_tell(mailbox._PartialFile(self._file, 3, 2117 9 + 2 * len(os.linesep))) 2118 2119 def test_close(self): 2120 self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii')) 2121 self._test_close(mailbox._PartialFile(self._file, 1, 2122 6 + 3 * len(os.linesep))) 2123 2124 2125## Start: tests from the original module (for backward compatibility). 2126 2127FROM_ = "From [email protected] Sat Jul 24 13:43:35 2004\n" 2128DUMMY_MESSAGE = """\ 2129From: [email protected] 2130To: [email protected] 2131Subject: Simple Test 2132 2133This is a dummy message. 2134""" 2135 2136class MaildirTestCase(unittest.TestCase): 2137 2138 def setUp(self): 2139 # create a new maildir mailbox to work with: 2140 self._dir = os_helper.TESTFN 2141 if os.path.isdir(self._dir): 2142 os_helper.rmtree(self._dir) 2143 elif os.path.isfile(self._dir): 2144 os_helper.unlink(self._dir) 2145 os.mkdir(self._dir) 2146 os.mkdir(os.path.join(self._dir, "cur")) 2147 os.mkdir(os.path.join(self._dir, "tmp")) 2148 os.mkdir(os.path.join(self._dir, "new")) 2149 self._counter = 1 2150 self._msgfiles = [] 2151 2152 def tearDown(self): 2153 list(map(os.unlink, self._msgfiles)) 2154 os_helper.rmdir(os.path.join(self._dir, "cur")) 2155 os_helper.rmdir(os.path.join(self._dir, "tmp")) 2156 os_helper.rmdir(os.path.join(self._dir, "new")) 2157 os_helper.rmdir(self._dir) 2158 2159 def createMessage(self, dir, mbox=False): 2160 t = int(time.time() % 1000000) 2161 pid = self._counter 2162 self._counter += 1 2163 filename = ".".join((str(t), str(pid), "myhostname", "mydomain")) 2164 tmpname = os.path.join(self._dir, "tmp", filename) 2165 newname = os.path.join(self._dir, dir, filename) 2166 with open(tmpname, "w", encoding="utf-8") as fp: 2167 self._msgfiles.append(tmpname) 2168 if mbox: 2169 fp.write(FROM_) 2170 fp.write(DUMMY_MESSAGE) 2171 try: 2172 os.link(tmpname, newname) 2173 except (AttributeError, PermissionError): 2174 with open(newname, "w") as fp: 2175 fp.write(DUMMY_MESSAGE) 2176 self._msgfiles.append(newname) 2177 return tmpname 2178 2179 def test_empty_maildir(self): 2180 """Test an empty maildir mailbox""" 2181 # Test for regression on bug #117490: 2182 # Make sure the boxes attribute actually gets set. 2183 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2184 #self.assertTrue(hasattr(self.mbox, "boxes")) 2185 #self.assertEqual(len(self.mbox.boxes), 0) 2186 self.assertIsNone(self.mbox.next()) 2187 self.assertIsNone(self.mbox.next()) 2188 2189 def test_nonempty_maildir_cur(self): 2190 self.createMessage("cur") 2191 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2192 #self.assertEqual(len(self.mbox.boxes), 1) 2193 self.assertIsNotNone(self.mbox.next()) 2194 self.assertIsNone(self.mbox.next()) 2195 self.assertIsNone(self.mbox.next()) 2196 2197 def test_nonempty_maildir_new(self): 2198 self.createMessage("new") 2199 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2200 #self.assertEqual(len(self.mbox.boxes), 1) 2201 self.assertIsNotNone(self.mbox.next()) 2202 self.assertIsNone(self.mbox.next()) 2203 self.assertIsNone(self.mbox.next()) 2204 2205 def test_nonempty_maildir_both(self): 2206 self.createMessage("cur") 2207 self.createMessage("new") 2208 self.mbox = mailbox.Maildir(os_helper.TESTFN) 2209 #self.assertEqual(len(self.mbox.boxes), 2) 2210 self.assertIsNotNone(self.mbox.next()) 2211 self.assertIsNotNone(self.mbox.next()) 2212 self.assertIsNone(self.mbox.next()) 2213 self.assertIsNone(self.mbox.next()) 2214 2215## End: tests from the original module (for backward compatibility). 2216 2217 2218_sample_message = """\ 2219Return-Path: <[email protected]> 2220X-Original-To: gkj+person@localhost 2221Delivered-To: gkj+person@localhost 2222Received: from localhost (localhost [127.0.0.1]) 2223 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2224 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2225Delivered-To: [email protected] 2226Received: from localhost [127.0.0.1] 2227 by localhost with POP3 (fetchmail-6.2.5) 2228 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT) 2229Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2230 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2231 for <[email protected]>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2232Received: by andy.gregorykjohnson.com (Postfix, from userid 1000) 2233 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT) 2234Date: Wed, 13 Jul 2005 17:23:11 -0400 2235From: "Gregory K. Johnson" <[email protected]> 2236To: [email protected] 2237Subject: Sample message 2238Message-ID: <[email protected]> 2239Mime-Version: 1.0 2240Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+" 2241Content-Disposition: inline 2242User-Agent: Mutt/1.5.9i 2243 2244 2245--NMuMz9nt05w80d4+ 2246Content-Type: text/plain; charset=us-ascii 2247Content-Disposition: inline 2248 2249This is a sample message. 2250 2251-- 2252Gregory K. Johnson 2253 2254--NMuMz9nt05w80d4+ 2255Content-Type: application/octet-stream 2256Content-Disposition: attachment; filename="text.gz" 2257Content-Transfer-Encoding: base64 2258 2259H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22603FYlAAAA 2261 2262--NMuMz9nt05w80d4+-- 2263""" 2264 2265_bytes_sample_message = _sample_message.encode('ascii') 2266 2267_sample_headers = { 2268 "Return-Path":"<[email protected]>", 2269 "X-Original-To":"gkj+person@localhost", 2270 "Delivered-To":"gkj+person@localhost", 2271 "Received":"""from localhost (localhost [127.0.0.1]) 2272 by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17 2273 for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2274 "Delivered-To":"[email protected]", 2275 "Received":"""from localhost [127.0.0.1] 2276 by localhost with POP3 (fetchmail-6.2.5) 2277 for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""", 2278 "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228]) 2279 by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746 2280 for <[email protected]>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2281 "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000) 2282 id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""", 2283 "Date":"Wed, 13 Jul 2005 17:23:11 -0400", 2284 "From":""""Gregory K. Johnson" <[email protected]>""", 2285 "To":"[email protected]", 2286 "Subject":"Sample message", 2287 "Mime-Version":"1.0", 2288 "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""", 2289 "Content-Disposition":"inline", 2290 "User-Agent": "Mutt/1.5.9i" } 2291 2292_sample_payloads = ("""This is a sample message. 2293 2294-- 2295Gregory K. Johnson 2296""", 2297"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs 22983FYlAAAA 2299""") 2300 2301 2302class MiscTestCase(unittest.TestCase): 2303 def test__all__(self): 2304 support.check__all__(self, mailbox, 2305 not_exported={"linesep", "fcntl"}) 2306 2307 2308def tearDownModule(): 2309 support.reap_children() 2310 2311 2312if __name__ == '__main__': 2313 unittest.main() 2314