1import os
2import sys
3import time
4import stat
5import socket
6import email
7import email.message
8import re
9import io
10import tempfile
11from test import support
12from test.support import os_helper
13from test.support import socket_helper
14import unittest
15import textwrap
16import mailbox
17import glob
18
19
20if not socket_helper.has_gethostname:
21    raise unittest.SkipTest("test requires gethostname()")
22
23
24class TestBase:
25
26    all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
27                         mailbox.mboxMessage, mailbox.MHMessage,
28                         mailbox.BabylMessage, mailbox.MMDFMessage)
29
30    def _check_sample(self, msg):
31        # Inspect a mailbox.Message representation of the sample message
32        self.assertIsInstance(msg, email.message.Message)
33        self.assertIsInstance(msg, mailbox.Message)
34        for key, value in _sample_headers.items():
35            self.assertIn(value, msg.get_all(key))
36        self.assertTrue(msg.is_multipart())
37        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
38        for i, payload in enumerate(_sample_payloads):
39            part = msg.get_payload(i)
40            self.assertIsInstance(part, email.message.Message)
41            self.assertNotIsInstance(part, mailbox.Message)
42            self.assertEqual(part.get_payload(), payload)
43
44    def _delete_recursively(self, target):
45        # Delete a file or delete a directory recursively
46        if os.path.isdir(target):
47            os_helper.rmtree(target)
48        elif os.path.exists(target):
49            os_helper.unlink(target)
50
51
52class TestMailbox(TestBase):
53
54    maxDiff = None
55
56    _factory = None     # Overridden by subclasses to reuse tests
57    _template = 'From: foo\n\n%s\n'
58
59    def setUp(self):
60        self._path = os_helper.TESTFN
61        self._delete_recursively(self._path)
62        self._box = self._factory(self._path)
63
64    def tearDown(self):
65        self._box.close()
66        self._delete_recursively(self._path)
67
68    def test_add(self):
69        # Add copies of a sample message
70        keys = []
71        keys.append(self._box.add(self._template % 0))
72        self.assertEqual(len(self._box), 1)
73        keys.append(self._box.add(mailbox.Message(_sample_message)))
74        self.assertEqual(len(self._box), 2)
75        keys.append(self._box.add(email.message_from_string(_sample_message)))
76        self.assertEqual(len(self._box), 3)
77        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
78        self.assertEqual(len(self._box), 4)
79        keys.append(self._box.add(_sample_message))
80        self.assertEqual(len(self._box), 5)
81        keys.append(self._box.add(_bytes_sample_message))
82        self.assertEqual(len(self._box), 6)
83        with self.assertWarns(DeprecationWarning):
84            keys.append(self._box.add(
85                io.TextIOWrapper(io.BytesIO(_bytes_sample_message), encoding="utf-8")))
86        self.assertEqual(len(self._box), 7)
87        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
88        for i in (1, 2, 3, 4, 5, 6):
89            self._check_sample(self._box[keys[i]])
90
91    _nonascii_msg = textwrap.dedent("""\
92            From: foo
93            Subject: Falinaptár házhozszállítással. Már rendeltél?
94
95            0
96            """)
97
98    def test_add_invalid_8bit_bytes_header(self):
99        key = self._box.add(self._nonascii_msg.encode('latin-1'))
100        self.assertEqual(len(self._box), 1)
101        self.assertEqual(self._box.get_bytes(key),
102            self._nonascii_msg.encode('latin-1'))
103
104    def test_invalid_nonascii_header_as_string(self):
105        subj = self._nonascii_msg.splitlines()[1]
106        key = self._box.add(subj.encode('latin-1'))
107        self.assertEqual(self._box.get_string(key),
108            'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
109            'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
110
111    def test_add_nonascii_string_header_raises(self):
112        with self.assertRaisesRegex(ValueError, "ASCII-only"):
113            self._box.add(self._nonascii_msg)
114        self._box.flush()
115        self.assertEqual(len(self._box), 0)
116        self.assertMailboxEmpty()
117
118    def test_add_that_raises_leaves_mailbox_empty(self):
119        def raiser(*args, **kw):
120            raise Exception("a fake error")
121        support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
122        with self.assertRaises(Exception):
123            self._box.add(email.message_from_string("From: Alphöso"))
124        self.assertEqual(len(self._box), 0)
125        self._box.close()
126        self.assertMailboxEmpty()
127
128    _non_latin_bin_msg = textwrap.dedent("""\
129        From: [email protected]
130        To: báz
131        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
132        \tJean de Baddie
133        Mime-Version: 1.0
134        Content-Type: text/plain; charset="utf-8"
135        Content-Transfer-Encoding: 8bit
136
137        Да, они летят.
138        """).encode('utf-8')
139
140    def test_add_8bit_body(self):
141        key = self._box.add(self._non_latin_bin_msg)
142        self.assertEqual(self._box.get_bytes(key),
143                         self._non_latin_bin_msg)
144        with self._box.get_file(key) as f:
145            self.assertEqual(f.read(),
146                             self._non_latin_bin_msg.replace(b'\n',
147                                os.linesep.encode()))
148        self.assertEqual(self._box[key].get_payload(),
149                        "Да, они летят.\n")
150
151    def test_add_binary_file(self):
152        with tempfile.TemporaryFile('wb+') as f:
153            f.write(_bytes_sample_message)
154            f.seek(0)
155            key = self._box.add(f)
156        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
157            _bytes_sample_message.split(b'\n'))
158
159    def test_add_binary_nonascii_file(self):
160        with tempfile.TemporaryFile('wb+') as f:
161            f.write(self._non_latin_bin_msg)
162            f.seek(0)
163            key = self._box.add(f)
164        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
165            self._non_latin_bin_msg.split(b'\n'))
166
167    def test_add_text_file_warns(self):
168        with tempfile.TemporaryFile('w+', encoding='utf-8') as f:
169            f.write(_sample_message)
170            f.seek(0)
171            with self.assertWarns(DeprecationWarning):
172                key = self._box.add(f)
173        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
174            _bytes_sample_message.split(b'\n'))
175
176    def test_add_StringIO_warns(self):
177        with self.assertWarns(DeprecationWarning):
178            key = self._box.add(io.StringIO(self._template % "0"))
179        self.assertEqual(self._box.get_string(key), self._template % "0")
180
181    def test_add_nonascii_StringIO_raises(self):
182        with self.assertWarns(DeprecationWarning):
183            with self.assertRaisesRegex(ValueError, "ASCII-only"):
184                self._box.add(io.StringIO(self._nonascii_msg))
185        self.assertEqual(len(self._box), 0)
186        self._box.close()
187        self.assertMailboxEmpty()
188
189    def test_remove(self):
190        # Remove messages using remove()
191        self._test_remove_or_delitem(self._box.remove)
192
193    def test_delitem(self):
194        # Remove messages using __delitem__()
195        self._test_remove_or_delitem(self._box.__delitem__)
196
197    def _test_remove_or_delitem(self, method):
198        # (Used by test_remove() and test_delitem().)
199        key0 = self._box.add(self._template % 0)
200        key1 = self._box.add(self._template % 1)
201        self.assertEqual(len(self._box), 2)
202        method(key0)
203        self.assertEqual(len(self._box), 1)
204        self.assertRaises(KeyError, lambda: self._box[key0])
205        self.assertRaises(KeyError, lambda: method(key0))
206        self.assertEqual(self._box.get_string(key1), self._template % 1)
207        key2 = self._box.add(self._template % 2)
208        self.assertEqual(len(self._box), 2)
209        method(key2)
210        self.assertEqual(len(self._box), 1)
211        self.assertRaises(KeyError, lambda: self._box[key2])
212        self.assertRaises(KeyError, lambda: method(key2))
213        self.assertEqual(self._box.get_string(key1), self._template % 1)
214        method(key1)
215        self.assertEqual(len(self._box), 0)
216        self.assertRaises(KeyError, lambda: self._box[key1])
217        self.assertRaises(KeyError, lambda: method(key1))
218
219    def test_discard(self, repetitions=10):
220        # Discard messages
221        key0 = self._box.add(self._template % 0)
222        key1 = self._box.add(self._template % 1)
223        self.assertEqual(len(self._box), 2)
224        self._box.discard(key0)
225        self.assertEqual(len(self._box), 1)
226        self.assertRaises(KeyError, lambda: self._box[key0])
227        self._box.discard(key0)
228        self.assertEqual(len(self._box), 1)
229        self.assertRaises(KeyError, lambda: self._box[key0])
230
231    def test_get(self):
232        # Retrieve messages using get()
233        key0 = self._box.add(self._template % 0)
234        msg = self._box.get(key0)
235        self.assertEqual(msg['from'], 'foo')
236        self.assertEqual(msg.get_payload(), '0\n')
237        self.assertIsNone(self._box.get('foo'))
238        self.assertIs(self._box.get('foo', False), False)
239        self._box.close()
240        self._box = self._factory(self._path)
241        key1 = self._box.add(self._template % 1)
242        msg = self._box.get(key1)
243        self.assertEqual(msg['from'], 'foo')
244        self.assertEqual(msg.get_payload(), '1\n')
245
246    def test_getitem(self):
247        # Retrieve message using __getitem__()
248        key0 = self._box.add(self._template % 0)
249        msg = self._box[key0]
250        self.assertEqual(msg['from'], 'foo')
251        self.assertEqual(msg.get_payload(), '0\n')
252        self.assertRaises(KeyError, lambda: self._box['foo'])
253        self._box.discard(key0)
254        self.assertRaises(KeyError, lambda: self._box[key0])
255
256    def test_get_message(self):
257        # Get Message representations of messages
258        key0 = self._box.add(self._template % 0)
259        key1 = self._box.add(_sample_message)
260        msg0 = self._box.get_message(key0)
261        self.assertIsInstance(msg0, mailbox.Message)
262        self.assertEqual(msg0['from'], 'foo')
263        self.assertEqual(msg0.get_payload(), '0\n')
264        self._check_sample(self._box.get_message(key1))
265
266    def test_get_bytes(self):
267        # Get bytes representations of messages
268        key0 = self._box.add(self._template % 0)
269        key1 = self._box.add(_sample_message)
270        self.assertEqual(self._box.get_bytes(key0),
271            (self._template % 0).encode('ascii'))
272        self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
273
274    def test_get_string(self):
275        # Get string representations of messages
276        key0 = self._box.add(self._template % 0)
277        key1 = self._box.add(_sample_message)
278        self.assertEqual(self._box.get_string(key0), self._template % 0)
279        self.assertEqual(self._box.get_string(key1).split('\n'),
280                         _sample_message.split('\n'))
281
282    def test_get_file(self):
283        # Get file representations of messages
284        key0 = self._box.add(self._template % 0)
285        key1 = self._box.add(_sample_message)
286        with self._box.get_file(key0) as file:
287            data0 = file.read()
288        with self._box.get_file(key1) as file:
289            data1 = file.read()
290        self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
291                         self._template % 0)
292        self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
293                         _sample_message)
294
295    def test_get_file_can_be_closed_twice(self):
296        # Issue 11700
297        key = self._box.add(_sample_message)
298        f = self._box.get_file(key)
299        f.close()
300        f.close()
301
302    def test_iterkeys(self):
303        # Get keys using iterkeys()
304        self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
305
306    def test_keys(self):
307        # Get keys using keys()
308        self._check_iteration(self._box.keys, do_keys=True, do_values=False)
309
310    def test_itervalues(self):
311        # Get values using itervalues()
312        self._check_iteration(self._box.itervalues, do_keys=False,
313                              do_values=True)
314
315    def test_iter(self):
316        # Get values using __iter__()
317        self._check_iteration(self._box.__iter__, do_keys=False,
318                              do_values=True)
319
320    def test_values(self):
321        # Get values using values()
322        self._check_iteration(self._box.values, do_keys=False, do_values=True)
323
324    def test_iteritems(self):
325        # Get keys and values using iteritems()
326        self._check_iteration(self._box.iteritems, do_keys=True,
327                              do_values=True)
328
329    def test_items(self):
330        # Get keys and values using items()
331        self._check_iteration(self._box.items, do_keys=True, do_values=True)
332
333    def _check_iteration(self, method, do_keys, do_values, repetitions=10):
334        for value in method():
335            self.fail("Not empty")
336        keys, values = [], []
337        for i in range(repetitions):
338            keys.append(self._box.add(self._template % i))
339            values.append(self._template % i)
340        if do_keys and not do_values:
341            returned_keys = list(method())
342        elif do_values and not do_keys:
343            returned_values = list(method())
344        else:
345            returned_keys, returned_values = [], []
346            for key, value in method():
347                returned_keys.append(key)
348                returned_values.append(value)
349        if do_keys:
350            self.assertEqual(len(keys), len(returned_keys))
351            self.assertEqual(set(keys), set(returned_keys))
352        if do_values:
353            count = 0
354            for value in returned_values:
355                self.assertEqual(value['from'], 'foo')
356                self.assertLess(int(value.get_payload()), repetitions)
357                count += 1
358            self.assertEqual(len(values), count)
359
360    def test_contains(self):
361        # Check existence of keys using __contains__()
362        self.assertNotIn('foo', self._box)
363        key0 = self._box.add(self._template % 0)
364        self.assertIn(key0, self._box)
365        self.assertNotIn('foo', self._box)
366        key1 = self._box.add(self._template % 1)
367        self.assertIn(key1, self._box)
368        self.assertIn(key0, self._box)
369        self.assertNotIn('foo', self._box)
370        self._box.remove(key0)
371        self.assertNotIn(key0, self._box)
372        self.assertIn(key1, self._box)
373        self.assertNotIn('foo', self._box)
374        self._box.remove(key1)
375        self.assertNotIn(key1, self._box)
376        self.assertNotIn(key0, self._box)
377        self.assertNotIn('foo', self._box)
378
379    def test_len(self, repetitions=10):
380        # Get message count
381        keys = []
382        for i in range(repetitions):
383            self.assertEqual(len(self._box), i)
384            keys.append(self._box.add(self._template % i))
385            self.assertEqual(len(self._box), i + 1)
386        for i in range(repetitions):
387            self.assertEqual(len(self._box), repetitions - i)
388            self._box.remove(keys[i])
389            self.assertEqual(len(self._box), repetitions - i - 1)
390
391    def test_set_item(self):
392        # Modify messages using __setitem__()
393        key0 = self._box.add(self._template % 'original 0')
394        self.assertEqual(self._box.get_string(key0),
395                         self._template % 'original 0')
396        key1 = self._box.add(self._template % 'original 1')
397        self.assertEqual(self._box.get_string(key1),
398                         self._template % 'original 1')
399        self._box[key0] = self._template % 'changed 0'
400        self.assertEqual(self._box.get_string(key0),
401                         self._template % 'changed 0')
402        self._box[key1] = self._template % 'changed 1'
403        self.assertEqual(self._box.get_string(key1),
404                         self._template % 'changed 1')
405        self._box[key0] = _sample_message
406        self._check_sample(self._box[key0])
407        self._box[key1] = self._box[key0]
408        self._check_sample(self._box[key1])
409        self._box[key0] = self._template % 'original 0'
410        self.assertEqual(self._box.get_string(key0),
411                     self._template % 'original 0')
412        self._check_sample(self._box[key1])
413        self.assertRaises(KeyError,
414                          lambda: self._box.__setitem__('foo', 'bar'))
415        self.assertRaises(KeyError, lambda: self._box['foo'])
416        self.assertEqual(len(self._box), 2)
417
418    def test_clear(self, iterations=10):
419        # Remove all messages using clear()
420        keys = []
421        for i in range(iterations):
422            self._box.add(self._template % i)
423        for i, key in enumerate(keys):
424            self.assertEqual(self._box.get_string(key), self._template % i)
425        self._box.clear()
426        self.assertEqual(len(self._box), 0)
427        for i, key in enumerate(keys):
428            self.assertRaises(KeyError, lambda: self._box.get_string(key))
429
430    def test_pop(self):
431        # Get and remove a message using pop()
432        key0 = self._box.add(self._template % 0)
433        self.assertIn(key0, self._box)
434        key1 = self._box.add(self._template % 1)
435        self.assertIn(key1, self._box)
436        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
437        self.assertNotIn(key0, self._box)
438        self.assertIn(key1, self._box)
439        key2 = self._box.add(self._template % 2)
440        self.assertIn(key2, self._box)
441        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
442        self.assertNotIn(key2, self._box)
443        self.assertIn(key1, self._box)
444        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
445        self.assertNotIn(key1, self._box)
446        self.assertEqual(len(self._box), 0)
447
448    def test_popitem(self, iterations=10):
449        # Get and remove an arbitrary (key, message) using popitem()
450        keys = []
451        for i in range(10):
452            keys.append(self._box.add(self._template % i))
453        seen = []
454        for i in range(10):
455            key, msg = self._box.popitem()
456            self.assertIn(key, keys)
457            self.assertNotIn(key, seen)
458            seen.append(key)
459            self.assertEqual(int(msg.get_payload()), keys.index(key))
460        self.assertEqual(len(self._box), 0)
461        for key in keys:
462            self.assertRaises(KeyError, lambda: self._box[key])
463
464    def test_update(self):
465        # Modify multiple messages using update()
466        key0 = self._box.add(self._template % 'original 0')
467        key1 = self._box.add(self._template % 'original 1')
468        key2 = self._box.add(self._template % 'original 2')
469        self._box.update({key0: self._template % 'changed 0',
470                          key2: _sample_message})
471        self.assertEqual(len(self._box), 3)
472        self.assertEqual(self._box.get_string(key0),
473                     self._template % 'changed 0')
474        self.assertEqual(self._box.get_string(key1),
475                     self._template % 'original 1')
476        self._check_sample(self._box[key2])
477        self._box.update([(key2, self._template % 'changed 2'),
478                    (key1, self._template % 'changed 1'),
479                    (key0, self._template % 'original 0')])
480        self.assertEqual(len(self._box), 3)
481        self.assertEqual(self._box.get_string(key0),
482                     self._template % 'original 0')
483        self.assertEqual(self._box.get_string(key1),
484                     self._template % 'changed 1')
485        self.assertEqual(self._box.get_string(key2),
486                     self._template % 'changed 2')
487        self.assertRaises(KeyError,
488                          lambda: self._box.update({'foo': 'bar',
489                                          key0: self._template % "changed 0"}))
490        self.assertEqual(len(self._box), 3)
491        self.assertEqual(self._box.get_string(key0),
492                     self._template % "changed 0")
493        self.assertEqual(self._box.get_string(key1),
494                     self._template % "changed 1")
495        self.assertEqual(self._box.get_string(key2),
496                     self._template % "changed 2")
497
498    def test_flush(self):
499        # Write changes to disk
500        self._test_flush_or_close(self._box.flush, True)
501
502    def test_popitem_and_flush_twice(self):
503        # See #15036.
504        self._box.add(self._template % 0)
505        self._box.add(self._template % 1)
506        self._box.flush()
507
508        self._box.popitem()
509        self._box.flush()
510        self._box.popitem()
511        self._box.flush()
512
513    def test_lock_unlock(self):
514        # Lock and unlock the mailbox
515        self.assertFalse(os.path.exists(self._get_lock_path()))
516        self._box.lock()
517        self.assertTrue(os.path.exists(self._get_lock_path()))
518        self._box.unlock()
519        self.assertFalse(os.path.exists(self._get_lock_path()))
520
521    def test_close(self):
522        # Close mailbox and flush changes to disk
523        self._test_flush_or_close(self._box.close, False)
524
525    def _test_flush_or_close(self, method, should_call_close):
526        contents = [self._template % i for i in range(3)]
527        self._box.add(contents[0])
528        self._box.add(contents[1])
529        self._box.add(contents[2])
530        oldbox = self._box
531        method()
532        if should_call_close:
533            self._box.close()
534        self._box = self._factory(self._path)
535        keys = self._box.keys()
536        self.assertEqual(len(keys), 3)
537        for key in keys:
538            self.assertIn(self._box.get_string(key), contents)
539        oldbox.close()
540
541    def test_dump_message(self):
542        # Write message representations to disk
543        for input in (email.message_from_string(_sample_message),
544                      _sample_message, io.BytesIO(_bytes_sample_message)):
545            output = io.BytesIO()
546            self._box._dump_message(input, output)
547            self.assertEqual(output.getvalue(),
548                _bytes_sample_message.replace(b'\n', os.linesep.encode()))
549        output = io.BytesIO()
550        self.assertRaises(TypeError,
551                          lambda: self._box._dump_message(None, output))
552
553    def _get_lock_path(self):
554        # Return the path of the dot lock file. May be overridden.
555        return self._path + '.lock'
556
557
558class TestMailboxSuperclass(TestBase, unittest.TestCase):
559
560    def test_notimplemented(self):
561        # Test that all Mailbox methods raise NotImplementedException.
562        box = mailbox.Mailbox('path')
563        self.assertRaises(NotImplementedError, lambda: box.add(''))
564        self.assertRaises(NotImplementedError, lambda: box.remove(''))
565        self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
566        self.assertRaises(NotImplementedError, lambda: box.discard(''))
567        self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
568        self.assertRaises(NotImplementedError, lambda: box.iterkeys())
569        self.assertRaises(NotImplementedError, lambda: box.keys())
570        self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
571        self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
572        self.assertRaises(NotImplementedError, lambda: box.values())
573        self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
574        self.assertRaises(NotImplementedError, lambda: box.items())
575        self.assertRaises(NotImplementedError, lambda: box.get(''))
576        self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
577        self.assertRaises(NotImplementedError, lambda: box.get_message(''))
578        self.assertRaises(NotImplementedError, lambda: box.get_string(''))
579        self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
580        self.assertRaises(NotImplementedError, lambda: box.get_file(''))
581        self.assertRaises(NotImplementedError, lambda: '' in box)
582        self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
583        self.assertRaises(NotImplementedError, lambda: box.__len__())
584        self.assertRaises(NotImplementedError, lambda: box.clear())
585        self.assertRaises(NotImplementedError, lambda: box.pop(''))
586        self.assertRaises(NotImplementedError, lambda: box.popitem())
587        self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
588        self.assertRaises(NotImplementedError, lambda: box.flush())
589        self.assertRaises(NotImplementedError, lambda: box.lock())
590        self.assertRaises(NotImplementedError, lambda: box.unlock())
591        self.assertRaises(NotImplementedError, lambda: box.close())
592
593
594class TestMaildir(TestMailbox, unittest.TestCase):
595
596    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
597
598    def setUp(self):
599        TestMailbox.setUp(self)
600        if (os.name == 'nt') or (sys.platform == 'cygwin'):
601            self._box.colon = '!'
602
603    def assertMailboxEmpty(self):
604        self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
605
606    def test_add_MM(self):
607        # Add a MaildirMessage instance
608        msg = mailbox.MaildirMessage(self._template % 0)
609        msg.set_subdir('cur')
610        msg.set_info('foo')
611        key = self._box.add(msg)
612        self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
613                                                 (key, self._box.colon))))
614
615    def test_get_MM(self):
616        # Get a MaildirMessage instance
617        msg = mailbox.MaildirMessage(self._template % 0)
618        msg.set_subdir('cur')
619        msg.set_flags('RF')
620        key = self._box.add(msg)
621        msg_returned = self._box.get_message(key)
622        self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
623        self.assertEqual(msg_returned.get_subdir(), 'cur')
624        self.assertEqual(msg_returned.get_flags(), 'FR')
625
626    def test_set_MM(self):
627        # Set with a MaildirMessage instance
628        msg0 = mailbox.MaildirMessage(self._template % 0)
629        msg0.set_flags('TP')
630        key = self._box.add(msg0)
631        msg_returned = self._box.get_message(key)
632        self.assertEqual(msg_returned.get_subdir(), 'new')
633        self.assertEqual(msg_returned.get_flags(), 'PT')
634        msg1 = mailbox.MaildirMessage(self._template % 1)
635        self._box[key] = msg1
636        msg_returned = self._box.get_message(key)
637        self.assertEqual(msg_returned.get_subdir(), 'new')
638        self.assertEqual(msg_returned.get_flags(), '')
639        self.assertEqual(msg_returned.get_payload(), '1\n')
640        msg2 = mailbox.MaildirMessage(self._template % 2)
641        msg2.set_info('2,S')
642        self._box[key] = msg2
643        self._box[key] = self._template % 3
644        msg_returned = self._box.get_message(key)
645        self.assertEqual(msg_returned.get_subdir(), 'new')
646        self.assertEqual(msg_returned.get_flags(), 'S')
647        self.assertEqual(msg_returned.get_payload(), '3\n')
648
649    def test_consistent_factory(self):
650        # Add a message.
651        msg = mailbox.MaildirMessage(self._template % 0)
652        msg.set_subdir('cur')
653        msg.set_flags('RF')
654        key = self._box.add(msg)
655
656        # Create new mailbox with
657        class FakeMessage(mailbox.MaildirMessage):
658            pass
659        box = mailbox.Maildir(self._path, factory=FakeMessage)
660        box.colon = self._box.colon
661        msg2 = box.get_message(key)
662        self.assertIsInstance(msg2, FakeMessage)
663
664    def test_initialize_new(self):
665        # Initialize a non-existent mailbox
666        self.tearDown()
667        self._box = mailbox.Maildir(self._path)
668        self._check_basics()
669        self._delete_recursively(self._path)
670        self._box = self._factory(self._path, factory=None)
671        self._check_basics()
672
673    def test_initialize_existing(self):
674        # Initialize an existing mailbox
675        self.tearDown()
676        for subdir in '', 'tmp', 'new', 'cur':
677            os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
678        self._box = mailbox.Maildir(self._path)
679        self._check_basics()
680
681    def _check_basics(self, factory=None):
682        # (Used by test_open_new() and test_open_existing().)
683        self.assertEqual(self._box._path, os.path.abspath(self._path))
684        self.assertEqual(self._box._factory, factory)
685        for subdir in '', 'tmp', 'new', 'cur':
686            path = os.path.join(self._path, subdir)
687            mode = os.stat(path)[stat.ST_MODE]
688            self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
689
690    def test_list_folders(self):
691        # List folders
692        self._box.add_folder('one')
693        self._box.add_folder('two')
694        self._box.add_folder('three')
695        self.assertEqual(len(self._box.list_folders()), 3)
696        self.assertEqual(set(self._box.list_folders()),
697                     set(('one', 'two', 'three')))
698
699    def test_get_folder(self):
700        # Open folders
701        self._box.add_folder('foo.bar')
702        folder0 = self._box.get_folder('foo.bar')
703        folder0.add(self._template % 'bar')
704        self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
705        folder1 = self._box.get_folder('foo.bar')
706        self.assertEqual(folder1.get_string(folder1.keys()[0]),
707                         self._template % 'bar')
708
709    def test_add_and_remove_folders(self):
710        # Delete folders
711        self._box.add_folder('one')
712        self._box.add_folder('two')
713        self.assertEqual(len(self._box.list_folders()), 2)
714        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
715        self._box.remove_folder('one')
716        self.assertEqual(len(self._box.list_folders()), 1)
717        self.assertEqual(set(self._box.list_folders()), set(('two',)))
718        self._box.add_folder('three')
719        self.assertEqual(len(self._box.list_folders()), 2)
720        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
721        self._box.remove_folder('three')
722        self.assertEqual(len(self._box.list_folders()), 1)
723        self.assertEqual(set(self._box.list_folders()), set(('two',)))
724        self._box.remove_folder('two')
725        self.assertEqual(len(self._box.list_folders()), 0)
726        self.assertEqual(self._box.list_folders(), [])
727
728    def test_clean(self):
729        # Remove old files from 'tmp'
730        foo_path = os.path.join(self._path, 'tmp', 'foo')
731        bar_path = os.path.join(self._path, 'tmp', 'bar')
732        with open(foo_path, 'w', encoding='utf-8') as f:
733            f.write("@")
734        with open(bar_path, 'w', encoding='utf-8') as f:
735            f.write("@")
736        self._box.clean()
737        self.assertTrue(os.path.exists(foo_path))
738        self.assertTrue(os.path.exists(bar_path))
739        foo_stat = os.stat(foo_path)
740        os.utime(foo_path, (time.time() - 129600 - 2,
741                            foo_stat.st_mtime))
742        self._box.clean()
743        self.assertFalse(os.path.exists(foo_path))
744        self.assertTrue(os.path.exists(bar_path))
745
746    def test_create_tmp(self, repetitions=10):
747        # Create files in tmp directory
748        hostname = socket.gethostname()
749        if '/' in hostname:
750            hostname = hostname.replace('/', r'\057')
751        if ':' in hostname:
752            hostname = hostname.replace(':', r'\072')
753        pid = os.getpid()
754        pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
755                             r"Q(?P<Q>\d+)\.(?P<host>[^:/]*)")
756        previous_groups = None
757        for x in range(repetitions):
758            tmp_file = self._box._create_tmp()
759            head, tail = os.path.split(tmp_file.name)
760            self.assertEqual(head, os.path.abspath(os.path.join(self._path,
761                                                                "tmp")),
762                             "File in wrong location: '%s'" % head)
763            match = pattern.match(tail)
764            self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
765            groups = match.groups()
766            if previous_groups is not None:
767                self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
768                             "Non-monotonic seconds: '%s' before '%s'" %
769                             (previous_groups[0], groups[0]))
770                if int(groups[0]) == int(previous_groups[0]):
771                    self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
772                                "Non-monotonic milliseconds: '%s' before '%s'" %
773                                (previous_groups[1], groups[1]))
774                self.assertEqual(int(groups[2]), pid,
775                             "Process ID mismatch: '%s' should be '%s'" %
776                             (groups[2], pid))
777                self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
778                             "Non-sequential counter: '%s' before '%s'" %
779                             (previous_groups[3], groups[3]))
780                self.assertEqual(groups[4], hostname,
781                             "Host name mismatch: '%s' should be '%s'" %
782                             (groups[4], hostname))
783            previous_groups = groups
784            tmp_file.write(_bytes_sample_message)
785            tmp_file.seek(0)
786            self.assertEqual(tmp_file.read(), _bytes_sample_message)
787            tmp_file.close()
788        file_count = len(os.listdir(os.path.join(self._path, "tmp")))
789        self.assertEqual(file_count, repetitions,
790                     "Wrong file count: '%s' should be '%s'" %
791                     (file_count, repetitions))
792
793    def test_refresh(self):
794        # Update the table of contents
795        self.assertEqual(self._box._toc, {})
796        key0 = self._box.add(self._template % 0)
797        key1 = self._box.add(self._template % 1)
798        self.assertEqual(self._box._toc, {})
799        self._box._refresh()
800        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
801                                          key1: os.path.join('new', key1)})
802        key2 = self._box.add(self._template % 2)
803        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
804                                          key1: os.path.join('new', key1)})
805        self._box._refresh()
806        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
807                                          key1: os.path.join('new', key1),
808                                          key2: os.path.join('new', key2)})
809
810    def test_refresh_after_safety_period(self):
811        # Issue #13254: Call _refresh after the "file system safety
812        # period" of 2 seconds has passed; _toc should still be
813        # updated because this is the first call to _refresh.
814        key0 = self._box.add(self._template % 0)
815        key1 = self._box.add(self._template % 1)
816
817        self._box = self._factory(self._path)
818        self.assertEqual(self._box._toc, {})
819
820        # Emulate sleeping. Instead of sleeping for 2 seconds, use the
821        # skew factor to make _refresh think that the filesystem
822        # safety period has passed and re-reading the _toc is only
823        # required if mtimes differ.
824        self._box._skewfactor = -3
825
826        self._box._refresh()
827        self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
828
829    def test_lookup(self):
830        # Look up message subpaths in the TOC
831        self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
832        key0 = self._box.add(self._template % 0)
833        self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
834        os.remove(os.path.join(self._path, 'new', key0))
835        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
836        # Be sure that the TOC is read back from disk (see issue #6896
837        # about bad mtime behaviour on some systems).
838        self._box.flush()
839        self.assertRaises(KeyError, lambda: self._box._lookup(key0))
840        self.assertEqual(self._box._toc, {})
841
842    def test_lock_unlock(self):
843        # Lock and unlock the mailbox. For Maildir, this does nothing.
844        self._box.lock()
845        self._box.unlock()
846
847    def test_folder (self):
848        # Test for bug #1569790: verify that folders returned by .get_folder()
849        # use the same factory function.
850        def dummy_factory (s):
851            return None
852        box = self._factory(self._path, factory=dummy_factory)
853        folder = box.add_folder('folder1')
854        self.assertIs(folder._factory, dummy_factory)
855
856        folder1_alias = box.get_folder('folder1')
857        self.assertIs(folder1_alias._factory, dummy_factory)
858
859    def test_directory_in_folder (self):
860        # Test that mailboxes still work if there's a stray extra directory
861        # in a folder.
862        for i in range(10):
863            self._box.add(mailbox.Message(_sample_message))
864
865        # Create a stray directory
866        os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
867
868        # Check that looping still works with the directory present.
869        for msg in self._box:
870            pass
871
872    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
873    def test_file_permissions(self):
874        # Verify that message files are created without execute permissions
875        msg = mailbox.MaildirMessage(self._template % 0)
876        orig_umask = os.umask(0)
877        try:
878            key = self._box.add(msg)
879        finally:
880            os.umask(orig_umask)
881        path = os.path.join(self._path, self._box._lookup(key))
882        mode = os.stat(path).st_mode
883        self.assertFalse(mode & 0o111)
884
885    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
886    def test_folder_file_perms(self):
887        # From bug #3228, we want to verify that the file created inside a Maildir
888        # subfolder isn't marked as executable.
889        orig_umask = os.umask(0)
890        try:
891            subfolder = self._box.add_folder('subfolder')
892        finally:
893            os.umask(orig_umask)
894
895        path = os.path.join(subfolder._path, 'maildirfolder')
896        st = os.stat(path)
897        perms = st.st_mode
898        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
899
900    def test_reread(self):
901        # Do an initial unconditional refresh
902        self._box._refresh()
903
904        # Put the last modified times more than two seconds into the past
905        # (because mtime may have a two second granularity)
906        for subdir in ('cur', 'new'):
907            os.utime(os.path.join(self._box._path, subdir),
908                     (time.time()-5,)*2)
909
910        # Because mtime has a two second granularity in worst case (FAT), a
911        # refresh is done unconditionally if called for within
912        # two-second-plus-a-bit of the last one, just in case the mbox has
913        # changed; so now we have to wait for that interval to expire.
914        #
915        # Because this is a test, emulate sleeping. Instead of
916        # sleeping for 2 seconds, use the skew factor to make _refresh
917        # think that 2 seconds have passed and re-reading the _toc is
918        # only required if mtimes differ.
919        self._box._skewfactor = -3
920
921        # Re-reading causes the ._toc attribute to be assigned a new dictionary
922        # object, so we'll check that the ._toc attribute isn't a different
923        # object.
924        orig_toc = self._box._toc
925        def refreshed():
926            return self._box._toc is not orig_toc
927
928        self._box._refresh()
929        self.assertFalse(refreshed())
930
931        # Now, write something into cur and remove it.  This changes
932        # the mtime and should cause a re-read. Note that "sleep
933        # emulation" is still in effect, as skewfactor is -3.
934        filename = os.path.join(self._path, 'cur', 'stray-file')
935        os_helper.create_empty_file(filename)
936        os.unlink(filename)
937        self._box._refresh()
938        self.assertTrue(refreshed())
939
940
941class _TestSingleFile(TestMailbox):
942    '''Common tests for single-file mailboxes'''
943
944    def test_add_doesnt_rewrite(self):
945        # When only adding messages, flush() should not rewrite the
946        # mailbox file. See issue #9559.
947
948        # Inode number changes if the contents are written to another
949        # file which is then renamed over the original file. So we
950        # must check that the inode number doesn't change.
951        inode_before = os.stat(self._path).st_ino
952
953        self._box.add(self._template % 0)
954        self._box.flush()
955
956        inode_after = os.stat(self._path).st_ino
957        self.assertEqual(inode_before, inode_after)
958
959        # Make sure the message was really added
960        self._box.close()
961        self._box = self._factory(self._path)
962        self.assertEqual(len(self._box), 1)
963
964    def test_permissions_after_flush(self):
965        # See issue #5346
966
967        # Make the mailbox world writable. It's unlikely that the new
968        # mailbox file would have these permissions after flush(),
969        # because umask usually prevents it.
970        mode = os.stat(self._path).st_mode | 0o666
971        os.chmod(self._path, mode)
972
973        self._box.add(self._template % 0)
974        i = self._box.add(self._template % 1)
975        # Need to remove one message to make flush() create a new file
976        self._box.remove(i)
977        self._box.flush()
978
979        self.assertEqual(os.stat(self._path).st_mode, mode)
980
981
982class _TestMboxMMDF(_TestSingleFile):
983
984    def tearDown(self):
985        super().tearDown()
986        self._box.close()
987        self._delete_recursively(self._path)
988        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
989            os_helper.unlink(lock_remnant)
990
991    def assertMailboxEmpty(self):
992        with open(self._path, 'rb') as f:
993            self.assertEqual(f.readlines(), [])
994
995    def test_get_bytes_from(self):
996        # Get bytes representations of messages with _unixfrom.
997        unixfrom = 'From foo@bar blah\n'
998        key0 = self._box.add(unixfrom + self._template % 0)
999        key1 = self._box.add(unixfrom + _sample_message)
1000        self.assertEqual(self._box.get_bytes(key0, from_=False),
1001            (self._template % 0).encode('ascii'))
1002        self.assertEqual(self._box.get_bytes(key1, from_=False),
1003            _bytes_sample_message)
1004        self.assertEqual(self._box.get_bytes(key0, from_=True),
1005            (unixfrom + self._template % 0).encode('ascii'))
1006        self.assertEqual(self._box.get_bytes(key1, from_=True),
1007            unixfrom.encode('ascii') + _bytes_sample_message)
1008
1009    def test_get_string_from(self):
1010        # Get string representations of messages with _unixfrom.
1011        unixfrom = 'From foo@bar blah\n'
1012        key0 = self._box.add(unixfrom + self._template % 0)
1013        key1 = self._box.add(unixfrom + _sample_message)
1014        self.assertEqual(self._box.get_string(key0, from_=False),
1015                         self._template % 0)
1016        self.assertEqual(self._box.get_string(key1, from_=False).split('\n'),
1017                         _sample_message.split('\n'))
1018        self.assertEqual(self._box.get_string(key0, from_=True),
1019                         unixfrom + self._template % 0)
1020        self.assertEqual(self._box.get_string(key1, from_=True).split('\n'),
1021                         (unixfrom + _sample_message).split('\n'))
1022
1023    def test_add_from_string(self):
1024        # Add a string starting with 'From ' to the mailbox
1025        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
1026        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1027        self.assertEqual(self._box[key].get_payload(), '0\n')
1028
1029    def test_add_from_bytes(self):
1030        # Add a byte string starting with 'From ' to the mailbox
1031        key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1032        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1033        self.assertEqual(self._box[key].get_payload(), '0\n')
1034
1035    def test_add_mbox_or_mmdf_message(self):
1036        # Add an mboxMessage or MMDFMessage
1037        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1038            msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
1039            key = self._box.add(msg)
1040
1041    def test_open_close_open(self):
1042        # Open and inspect previously-created mailbox
1043        values = [self._template % i for i in range(3)]
1044        for value in values:
1045            self._box.add(value)
1046        self._box.close()
1047        mtime = os.path.getmtime(self._path)
1048        self._box = self._factory(self._path)
1049        self.assertEqual(len(self._box), 3)
1050        for key in self._box.iterkeys():
1051            self.assertIn(self._box.get_string(key), values)
1052        self._box.close()
1053        self.assertEqual(mtime, os.path.getmtime(self._path))
1054
1055    def test_add_and_close(self):
1056        # Verifying that closing a mailbox doesn't change added items
1057        self._box.add(_sample_message)
1058        for i in range(3):
1059            self._box.add(self._template % i)
1060        self._box.add(_sample_message)
1061        self._box._file.flush()
1062        self._box._file.seek(0)
1063        contents = self._box._file.read()
1064        self._box.close()
1065        with open(self._path, 'rb') as f:
1066            self.assertEqual(contents, f.read())
1067        self._box = self._factory(self._path)
1068
1069    @support.requires_fork()
1070    @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
1071    def test_lock_conflict(self):
1072        # Fork off a child process that will lock the mailbox temporarily,
1073        # unlock it and exit.
1074        c, p = socket.socketpair()
1075        self.addCleanup(c.close)
1076        self.addCleanup(p.close)
1077
1078        pid = os.fork()
1079        if pid == 0:
1080            # child
1081            try:
1082                # lock the mailbox, and signal the parent it can proceed
1083                self._box.lock()
1084                c.send(b'c')
1085
1086                # wait until the parent is done, and unlock the mailbox
1087                c.recv(1)
1088                self._box.unlock()
1089            finally:
1090                os._exit(0)
1091
1092        # In the parent, wait until the child signals it locked the mailbox.
1093        p.recv(1)
1094        try:
1095            self.assertRaises(mailbox.ExternalClashError,
1096                              self._box.lock)
1097        finally:
1098            # Signal the child it can now release the lock and exit.
1099            p.send(b'p')
1100            # Wait for child to exit.  Locking should now succeed.
1101            support.wait_process(pid, exitcode=0)
1102
1103        self._box.lock()
1104        self._box.unlock()
1105
1106    def test_relock(self):
1107        # Test case for bug #1575506: the mailbox class was locking the
1108        # wrong file object in its flush() method.
1109        msg = "Subject: sub\n\nbody\n"
1110        key1 = self._box.add(msg)
1111        self._box.flush()
1112        self._box.close()
1113
1114        self._box = self._factory(self._path)
1115        self._box.lock()
1116        key2 = self._box.add(msg)
1117        self._box.flush()
1118        self.assertTrue(self._box._locked)
1119        self._box.close()
1120
1121
1122class TestMbox(_TestMboxMMDF, unittest.TestCase):
1123
1124    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1125
1126    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
1127    def test_file_perms(self):
1128        # From bug #3228, we want to verify that the mailbox file isn't executable,
1129        # even if the umask is set to something that would leave executable bits set.
1130        # We only run this test on platforms that support umask.
1131        try:
1132            old_umask = os.umask(0o077)
1133            self._box.close()
1134            os.unlink(self._path)
1135            self._box = mailbox.mbox(self._path, create=True)
1136            self._box.add('')
1137            self._box.close()
1138        finally:
1139            os.umask(old_umask)
1140
1141        st = os.stat(self._path)
1142        perms = st.st_mode
1143        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1144
1145    def test_terminating_newline(self):
1146        message = email.message.Message()
1147        message['From'] = '[email protected]'
1148        message.set_payload('No newline at the end')
1149        i = self._box.add(message)
1150
1151        # A newline should have been appended to the payload
1152        message = self._box.get(i)
1153        self.assertEqual(message.get_payload(), 'No newline at the end\n')
1154
1155    def test_message_separator(self):
1156        # Check there's always a single blank line after each message
1157        self._box.add('From: foo\n\n0')  # No newline at the end
1158        with open(self._path, encoding='utf-8') as f:
1159            data = f.read()
1160            self.assertEqual(data[-3:], '0\n\n')
1161
1162        self._box.add('From: foo\n\n0\n')  # Newline at the end
1163        with open(self._path, encoding='utf-8') as f:
1164            data = f.read()
1165            self.assertEqual(data[-3:], '0\n\n')
1166
1167
1168class TestMMDF(_TestMboxMMDF, unittest.TestCase):
1169
1170    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1171
1172
1173class TestMH(TestMailbox, unittest.TestCase):
1174
1175    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1176
1177    def assertMailboxEmpty(self):
1178        self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1179
1180    def test_list_folders(self):
1181        # List folders
1182        self._box.add_folder('one')
1183        self._box.add_folder('two')
1184        self._box.add_folder('three')
1185        self.assertEqual(len(self._box.list_folders()), 3)
1186        self.assertEqual(set(self._box.list_folders()),
1187                     set(('one', 'two', 'three')))
1188
1189    def test_get_folder(self):
1190        # Open folders
1191        def dummy_factory (s):
1192            return None
1193        self._box = self._factory(self._path, dummy_factory)
1194
1195        new_folder = self._box.add_folder('foo.bar')
1196        folder0 = self._box.get_folder('foo.bar')
1197        folder0.add(self._template % 'bar')
1198        self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1199        folder1 = self._box.get_folder('foo.bar')
1200        self.assertEqual(folder1.get_string(folder1.keys()[0]),
1201                         self._template % 'bar')
1202
1203        # Test for bug #1569790: verify that folders returned by .get_folder()
1204        # use the same factory function.
1205        self.assertIs(new_folder._factory, self._box._factory)
1206        self.assertIs(folder0._factory, self._box._factory)
1207
1208    def test_add_and_remove_folders(self):
1209        # Delete folders
1210        self._box.add_folder('one')
1211        self._box.add_folder('two')
1212        self.assertEqual(len(self._box.list_folders()), 2)
1213        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1214        self._box.remove_folder('one')
1215        self.assertEqual(len(self._box.list_folders()), 1)
1216        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1217        self._box.add_folder('three')
1218        self.assertEqual(len(self._box.list_folders()), 2)
1219        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1220        self._box.remove_folder('three')
1221        self.assertEqual(len(self._box.list_folders()), 1)
1222        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1223        self._box.remove_folder('two')
1224        self.assertEqual(len(self._box.list_folders()), 0)
1225        self.assertEqual(self._box.list_folders(), [])
1226
1227    def test_sequences(self):
1228        # Get and set sequences
1229        self.assertEqual(self._box.get_sequences(), {})
1230        msg0 = mailbox.MHMessage(self._template % 0)
1231        msg0.add_sequence('foo')
1232        key0 = self._box.add(msg0)
1233        self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1234        msg1 = mailbox.MHMessage(self._template % 1)
1235        msg1.set_sequences(['bar', 'replied', 'foo'])
1236        key1 = self._box.add(msg1)
1237        self.assertEqual(self._box.get_sequences(),
1238                     {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
1239        msg0.set_sequences(['flagged'])
1240        self._box[key0] = msg0
1241        self.assertEqual(self._box.get_sequences(),
1242                     {'foo':[key1], 'bar':[key1], 'replied':[key1],
1243                      'flagged':[key0]})
1244        self._box.remove(key1)
1245        self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1246
1247    def test_issue2625(self):
1248        msg0 = mailbox.MHMessage(self._template % 0)
1249        msg0.add_sequence('foo')
1250        key0 = self._box.add(msg0)
1251        refmsg0 = self._box.get_message(key0)
1252
1253    def test_issue7627(self):
1254        msg0 = mailbox.MHMessage(self._template % 0)
1255        key0 = self._box.add(msg0)
1256        self._box.lock()
1257        self._box.remove(key0)
1258        self._box.unlock()
1259
1260    def test_pack(self):
1261        # Pack the contents of the mailbox
1262        msg0 = mailbox.MHMessage(self._template % 0)
1263        msg1 = mailbox.MHMessage(self._template % 1)
1264        msg2 = mailbox.MHMessage(self._template % 2)
1265        msg3 = mailbox.MHMessage(self._template % 3)
1266        msg0.set_sequences(['foo', 'unseen'])
1267        msg1.set_sequences(['foo'])
1268        msg2.set_sequences(['foo', 'flagged'])
1269        msg3.set_sequences(['foo', 'bar', 'replied'])
1270        key0 = self._box.add(msg0)
1271        key1 = self._box.add(msg1)
1272        key2 = self._box.add(msg2)
1273        key3 = self._box.add(msg3)
1274        self.assertEqual(self._box.get_sequences(),
1275                     {'foo':[key0,key1,key2,key3], 'unseen':[key0],
1276                      'flagged':[key2], 'bar':[key3], 'replied':[key3]})
1277        self._box.remove(key2)
1278        self.assertEqual(self._box.get_sequences(),
1279                     {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
1280                      'replied':[key3]})
1281        self._box.pack()
1282        self.assertEqual(self._box.keys(), [1, 2, 3])
1283        key0 = key0
1284        key1 = key0 + 1
1285        key2 = key1 + 1
1286        self.assertEqual(self._box.get_sequences(),
1287                     {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1288
1289        # Test case for packing while holding the mailbox locked.
1290        key0 = self._box.add(msg1)
1291        key1 = self._box.add(msg1)
1292        key2 = self._box.add(msg1)
1293        key3 = self._box.add(msg1)
1294
1295        self._box.remove(key0)
1296        self._box.remove(key2)
1297        self._box.lock()
1298        self._box.pack()
1299        self._box.unlock()
1300        self.assertEqual(self._box.get_sequences(),
1301                     {'foo':[1, 2, 3, 4, 5],
1302                      'unseen':[1], 'bar':[3], 'replied':[3]})
1303
1304    def _get_lock_path(self):
1305        return os.path.join(self._path, '.mh_sequences.lock')
1306
1307
1308class TestBabyl(_TestSingleFile, unittest.TestCase):
1309
1310    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1311
1312    def assertMailboxEmpty(self):
1313        with open(self._path, 'rb') as f:
1314            self.assertEqual(f.readlines(), [])
1315
1316    def tearDown(self):
1317        super().tearDown()
1318        self._box.close()
1319        self._delete_recursively(self._path)
1320        for lock_remnant in glob.glob(glob.escape(self._path) + '.*'):
1321            os_helper.unlink(lock_remnant)
1322
1323    def test_labels(self):
1324        # Get labels from the mailbox
1325        self.assertEqual(self._box.get_labels(), [])
1326        msg0 = mailbox.BabylMessage(self._template % 0)
1327        msg0.add_label('foo')
1328        key0 = self._box.add(msg0)
1329        self.assertEqual(self._box.get_labels(), ['foo'])
1330        msg1 = mailbox.BabylMessage(self._template % 1)
1331        msg1.set_labels(['bar', 'answered', 'foo'])
1332        key1 = self._box.add(msg1)
1333        self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1334        msg0.set_labels(['blah', 'filed'])
1335        self._box[key0] = msg0
1336        self.assertEqual(set(self._box.get_labels()),
1337                     set(['foo', 'bar', 'blah']))
1338        self._box.remove(key1)
1339        self.assertEqual(set(self._box.get_labels()), set(['blah']))
1340
1341
1342class FakeFileLikeObject:
1343
1344    def __init__(self):
1345        self.closed = False
1346
1347    def close(self):
1348        self.closed = True
1349
1350
1351class FakeMailBox(mailbox.Mailbox):
1352
1353    def __init__(self):
1354        mailbox.Mailbox.__init__(self, '', lambda file: None)
1355        self.files = [FakeFileLikeObject() for i in range(10)]
1356
1357    def get_file(self, key):
1358        return self.files[key]
1359
1360
1361class TestFakeMailBox(unittest.TestCase):
1362
1363    def test_closing_fd(self):
1364        box = FakeMailBox()
1365        for i in range(10):
1366            self.assertFalse(box.files[i].closed)
1367        for i in range(10):
1368            box[i]
1369        for i in range(10):
1370            self.assertTrue(box.files[i].closed)
1371
1372
1373class TestMessage(TestBase, unittest.TestCase):
1374
1375    _factory = mailbox.Message      # Overridden by subclasses to reuse tests
1376
1377    def setUp(self):
1378        self._path = os_helper.TESTFN
1379
1380    def tearDown(self):
1381        self._delete_recursively(self._path)
1382
1383    def test_initialize_with_eMM(self):
1384        # Initialize based on email.message.Message instance
1385        eMM = email.message_from_string(_sample_message)
1386        msg = self._factory(eMM)
1387        self._post_initialize_hook(msg)
1388        self._check_sample(msg)
1389
1390    def test_initialize_with_string(self):
1391        # Initialize based on string
1392        msg = self._factory(_sample_message)
1393        self._post_initialize_hook(msg)
1394        self._check_sample(msg)
1395
1396    def test_initialize_with_file(self):
1397        # Initialize based on contents of file
1398        with open(self._path, 'w+', encoding='utf-8') as f:
1399            f.write(_sample_message)
1400            f.seek(0)
1401            msg = self._factory(f)
1402            self._post_initialize_hook(msg)
1403            self._check_sample(msg)
1404
1405    def test_initialize_with_binary_file(self):
1406        # Initialize based on contents of binary file
1407        with open(self._path, 'wb+') as f:
1408            f.write(_bytes_sample_message)
1409            f.seek(0)
1410            msg = self._factory(f)
1411            self._post_initialize_hook(msg)
1412            self._check_sample(msg)
1413
1414    def test_initialize_with_nothing(self):
1415        # Initialize without arguments
1416        msg = self._factory()
1417        self._post_initialize_hook(msg)
1418        self.assertIsInstance(msg, email.message.Message)
1419        self.assertIsInstance(msg, mailbox.Message)
1420        self.assertIsInstance(msg, self._factory)
1421        self.assertEqual(msg.keys(), [])
1422        self.assertFalse(msg.is_multipart())
1423        self.assertIsNone(msg.get_payload())
1424
1425    def test_initialize_incorrectly(self):
1426        # Initialize with invalid argument
1427        self.assertRaises(TypeError, lambda: self._factory(object()))
1428
1429    def test_all_eMM_attributes_exist(self):
1430        # Issue 12537
1431        eMM = email.message_from_string(_sample_message)
1432        msg = self._factory(_sample_message)
1433        for attr in eMM.__dict__:
1434            self.assertIn(attr, msg.__dict__,
1435                '{} attribute does not exist'.format(attr))
1436
1437    def test_become_message(self):
1438        # Take on the state of another message
1439        eMM = email.message_from_string(_sample_message)
1440        msg = self._factory()
1441        msg._become_message(eMM)
1442        self._check_sample(msg)
1443
1444    def test_explain_to(self):
1445        # Copy self's format-specific data to other message formats.
1446        # This test is superficial; better ones are in TestMessageConversion.
1447        msg = self._factory()
1448        for class_ in self.all_mailbox_types:
1449            other_msg = class_()
1450            msg._explain_to(other_msg)
1451        other_msg = email.message.Message()
1452        self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1453
1454    def _post_initialize_hook(self, msg):
1455        # Overridden by subclasses to check extra things after initialization
1456        pass
1457
1458
1459class TestMaildirMessage(TestMessage, unittest.TestCase):
1460
1461    _factory = mailbox.MaildirMessage
1462
1463    def _post_initialize_hook(self, msg):
1464        self.assertEqual(msg._subdir, 'new')
1465        self.assertEqual(msg._info, '')
1466
1467    def test_subdir(self):
1468        # Use get_subdir() and set_subdir()
1469        msg = mailbox.MaildirMessage(_sample_message)
1470        self.assertEqual(msg.get_subdir(), 'new')
1471        msg.set_subdir('cur')
1472        self.assertEqual(msg.get_subdir(), 'cur')
1473        msg.set_subdir('new')
1474        self.assertEqual(msg.get_subdir(), 'new')
1475        self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1476        self.assertEqual(msg.get_subdir(), 'new')
1477        msg.set_subdir('new')
1478        self.assertEqual(msg.get_subdir(), 'new')
1479        self._check_sample(msg)
1480
1481    def test_flags(self):
1482        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1483        msg = mailbox.MaildirMessage(_sample_message)
1484        self.assertEqual(msg.get_flags(), '')
1485        self.assertEqual(msg.get_subdir(), 'new')
1486        msg.set_flags('F')
1487        self.assertEqual(msg.get_subdir(), 'new')
1488        self.assertEqual(msg.get_flags(), 'F')
1489        msg.set_flags('SDTP')
1490        self.assertEqual(msg.get_flags(), 'DPST')
1491        msg.add_flag('FT')
1492        self.assertEqual(msg.get_flags(), 'DFPST')
1493        msg.remove_flag('TDRP')
1494        self.assertEqual(msg.get_flags(), 'FS')
1495        self.assertEqual(msg.get_subdir(), 'new')
1496        self._check_sample(msg)
1497
1498    def test_date(self):
1499        # Use get_date() and set_date()
1500        msg = mailbox.MaildirMessage(_sample_message)
1501        self.assertLess(abs(msg.get_date() - time.time()), 60)
1502        msg.set_date(0.0)
1503        self.assertEqual(msg.get_date(), 0.0)
1504
1505    def test_info(self):
1506        # Use get_info() and set_info()
1507        msg = mailbox.MaildirMessage(_sample_message)
1508        self.assertEqual(msg.get_info(), '')
1509        msg.set_info('1,foo=bar')
1510        self.assertEqual(msg.get_info(), '1,foo=bar')
1511        self.assertRaises(TypeError, lambda: msg.set_info(None))
1512        self._check_sample(msg)
1513
1514    def test_info_and_flags(self):
1515        # Test interaction of info and flag methods
1516        msg = mailbox.MaildirMessage(_sample_message)
1517        self.assertEqual(msg.get_info(), '')
1518        msg.set_flags('SF')
1519        self.assertEqual(msg.get_flags(), 'FS')
1520        self.assertEqual(msg.get_info(), '2,FS')
1521        msg.set_info('1,')
1522        self.assertEqual(msg.get_flags(), '')
1523        self.assertEqual(msg.get_info(), '1,')
1524        msg.remove_flag('RPT')
1525        self.assertEqual(msg.get_flags(), '')
1526        self.assertEqual(msg.get_info(), '1,')
1527        msg.add_flag('D')
1528        self.assertEqual(msg.get_flags(), 'D')
1529        self.assertEqual(msg.get_info(), '2,D')
1530        self._check_sample(msg)
1531
1532
1533class _TestMboxMMDFMessage:
1534
1535    _factory = mailbox._mboxMMDFMessage
1536
1537    def _post_initialize_hook(self, msg):
1538        self._check_from(msg)
1539
1540    def test_initialize_with_unixfrom(self):
1541        # Initialize with a message that already has a _unixfrom attribute
1542        msg = mailbox.Message(_sample_message)
1543        msg.set_unixfrom('From foo@bar blah')
1544        msg = mailbox.mboxMessage(msg)
1545        self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1546
1547    def test_from(self):
1548        # Get and set "From " line
1549        msg = mailbox.mboxMessage(_sample_message)
1550        self._check_from(msg)
1551        msg.set_from('foo bar')
1552        self.assertEqual(msg.get_from(), 'foo bar')
1553        msg.set_from('foo@bar', True)
1554        self._check_from(msg, 'foo@bar')
1555        msg.set_from('blah@temp', time.localtime())
1556        self._check_from(msg, 'blah@temp')
1557
1558    def test_flags(self):
1559        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1560        msg = mailbox.mboxMessage(_sample_message)
1561        self.assertEqual(msg.get_flags(), '')
1562        msg.set_flags('F')
1563        self.assertEqual(msg.get_flags(), 'F')
1564        msg.set_flags('XODR')
1565        self.assertEqual(msg.get_flags(), 'RODX')
1566        msg.add_flag('FA')
1567        self.assertEqual(msg.get_flags(), 'RODFAX')
1568        msg.remove_flag('FDXA')
1569        self.assertEqual(msg.get_flags(), 'RO')
1570        self._check_sample(msg)
1571
1572    def _check_from(self, msg, sender=None):
1573        # Check contents of "From " line
1574        if sender is None:
1575            sender = "MAILER-DAEMON"
1576        self.assertIsNotNone(re.match(
1577                sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
1578                msg.get_from()))
1579
1580
1581class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
1582
1583    _factory = mailbox.mboxMessage
1584
1585
1586class TestMHMessage(TestMessage, unittest.TestCase):
1587
1588    _factory = mailbox.MHMessage
1589
1590    def _post_initialize_hook(self, msg):
1591        self.assertEqual(msg._sequences, [])
1592
1593    def test_sequences(self):
1594        # Get, set, join, and leave sequences
1595        msg = mailbox.MHMessage(_sample_message)
1596        self.assertEqual(msg.get_sequences(), [])
1597        msg.set_sequences(['foobar'])
1598        self.assertEqual(msg.get_sequences(), ['foobar'])
1599        msg.set_sequences([])
1600        self.assertEqual(msg.get_sequences(), [])
1601        msg.add_sequence('unseen')
1602        self.assertEqual(msg.get_sequences(), ['unseen'])
1603        msg.add_sequence('flagged')
1604        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1605        msg.add_sequence('flagged')
1606        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1607        msg.remove_sequence('unseen')
1608        self.assertEqual(msg.get_sequences(), ['flagged'])
1609        msg.add_sequence('foobar')
1610        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1611        msg.remove_sequence('replied')
1612        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1613        msg.set_sequences(['foobar', 'replied'])
1614        self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1615
1616
1617class TestBabylMessage(TestMessage, unittest.TestCase):
1618
1619    _factory = mailbox.BabylMessage
1620
1621    def _post_initialize_hook(self, msg):
1622        self.assertEqual(msg._labels, [])
1623
1624    def test_labels(self):
1625        # Get, set, join, and leave labels
1626        msg = mailbox.BabylMessage(_sample_message)
1627        self.assertEqual(msg.get_labels(), [])
1628        msg.set_labels(['foobar'])
1629        self.assertEqual(msg.get_labels(), ['foobar'])
1630        msg.set_labels([])
1631        self.assertEqual(msg.get_labels(), [])
1632        msg.add_label('filed')
1633        self.assertEqual(msg.get_labels(), ['filed'])
1634        msg.add_label('resent')
1635        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1636        msg.add_label('resent')
1637        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1638        msg.remove_label('filed')
1639        self.assertEqual(msg.get_labels(), ['resent'])
1640        msg.add_label('foobar')
1641        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1642        msg.remove_label('unseen')
1643        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1644        msg.set_labels(['foobar', 'answered'])
1645        self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1646
1647    def test_visible(self):
1648        # Get, set, and update visible headers
1649        msg = mailbox.BabylMessage(_sample_message)
1650        visible = msg.get_visible()
1651        self.assertEqual(visible.keys(), [])
1652        self.assertIsNone(visible.get_payload())
1653        visible['User-Agent'] = 'FooBar 1.0'
1654        visible['X-Whatever'] = 'Blah'
1655        self.assertEqual(msg.get_visible().keys(), [])
1656        msg.set_visible(visible)
1657        visible = msg.get_visible()
1658        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1659        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1660        self.assertEqual(visible['X-Whatever'], 'Blah')
1661        self.assertIsNone(visible.get_payload())
1662        msg.update_visible()
1663        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1664        self.assertIsNone(visible.get_payload())
1665        visible = msg.get_visible()
1666        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1667                                          'Subject'])
1668        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1669            self.assertEqual(visible[header], msg[header])
1670
1671
1672class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
1673
1674    _factory = mailbox.MMDFMessage
1675
1676
1677class TestMessageConversion(TestBase, unittest.TestCase):
1678
1679    def test_plain_to_x(self):
1680        # Convert Message to all formats
1681        for class_ in self.all_mailbox_types:
1682            msg_plain = mailbox.Message(_sample_message)
1683            msg = class_(msg_plain)
1684            self._check_sample(msg)
1685
1686    def test_x_to_plain(self):
1687        # Convert all formats to Message
1688        for class_ in self.all_mailbox_types:
1689            msg = class_(_sample_message)
1690            msg_plain = mailbox.Message(msg)
1691            self._check_sample(msg_plain)
1692
1693    def test_x_from_bytes(self):
1694        # Convert all formats to Message
1695        for class_ in self.all_mailbox_types:
1696            msg = class_(_bytes_sample_message)
1697            self._check_sample(msg)
1698
1699    def test_x_to_invalid(self):
1700        # Convert all formats to an invalid format
1701        for class_ in self.all_mailbox_types:
1702            self.assertRaises(TypeError, lambda: class_(False))
1703
1704    def test_type_specific_attributes_removed_on_conversion(self):
1705        reference = {class_: class_(_sample_message).__dict__
1706                        for class_ in self.all_mailbox_types}
1707        for class1 in self.all_mailbox_types:
1708            for class2 in self.all_mailbox_types:
1709                if class1 is class2:
1710                    continue
1711                source = class1(_sample_message)
1712                target = class2(source)
1713                type_specific = [a for a in reference[class1]
1714                                   if a not in reference[class2]]
1715                for attr in type_specific:
1716                    self.assertNotIn(attr, target.__dict__,
1717                        "while converting {} to {}".format(class1, class2))
1718
1719    def test_maildir_to_maildir(self):
1720        # Convert MaildirMessage to MaildirMessage
1721        msg_maildir = mailbox.MaildirMessage(_sample_message)
1722        msg_maildir.set_flags('DFPRST')
1723        msg_maildir.set_subdir('cur')
1724        date = msg_maildir.get_date()
1725        msg = mailbox.MaildirMessage(msg_maildir)
1726        self._check_sample(msg)
1727        self.assertEqual(msg.get_flags(), 'DFPRST')
1728        self.assertEqual(msg.get_subdir(), 'cur')
1729        self.assertEqual(msg.get_date(), date)
1730
1731    def test_maildir_to_mboxmmdf(self):
1732        # Convert MaildirMessage to mboxmessage and MMDFMessage
1733        pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1734                 ('T', 'D'), ('DFPRST', 'RDFA'))
1735        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1736            msg_maildir = mailbox.MaildirMessage(_sample_message)
1737            msg_maildir.set_date(0.0)
1738            for setting, result in pairs:
1739                msg_maildir.set_flags(setting)
1740                msg = class_(msg_maildir)
1741                self.assertEqual(msg.get_flags(), result)
1742                self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1743                             time.asctime(time.gmtime(0.0)))
1744            msg_maildir.set_subdir('cur')
1745            self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1746
1747    def test_maildir_to_mh(self):
1748        # Convert MaildirMessage to MHMessage
1749        msg_maildir = mailbox.MaildirMessage(_sample_message)
1750        pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1751                 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1752                 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1753        for setting, result in pairs:
1754            msg_maildir.set_flags(setting)
1755            self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1756                             result)
1757
1758    def test_maildir_to_babyl(self):
1759        # Convert MaildirMessage to Babyl
1760        msg_maildir = mailbox.MaildirMessage(_sample_message)
1761        pairs = (('D', ['unseen']), ('F', ['unseen']),
1762                 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1763                 ('S', []), ('T', ['unseen', 'deleted']),
1764                 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1765        for setting, result in pairs:
1766            msg_maildir.set_flags(setting)
1767            self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1768                             result)
1769
1770    def test_mboxmmdf_to_maildir(self):
1771        # Convert mboxMessage and MMDFMessage to MaildirMessage
1772        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1773            msg_mboxMMDF = class_(_sample_message)
1774            msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
1775            pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1776                     ('RODFA', 'FRST'))
1777            for setting, result in pairs:
1778                msg_mboxMMDF.set_flags(setting)
1779                msg = mailbox.MaildirMessage(msg_mboxMMDF)
1780                self.assertEqual(msg.get_flags(), result)
1781                self.assertEqual(msg.get_date(), 0.0)
1782            msg_mboxMMDF.set_flags('O')
1783            self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1784                             'cur')
1785
1786    def test_mboxmmdf_to_mboxmmdf(self):
1787        # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1788        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1789            msg_mboxMMDF = class_(_sample_message)
1790            msg_mboxMMDF.set_flags('RODFA')
1791            msg_mboxMMDF.set_from('foo@bar')
1792            for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1793                msg2 = class2_(msg_mboxMMDF)
1794                self.assertEqual(msg2.get_flags(), 'RODFA')
1795                self.assertEqual(msg2.get_from(), 'foo@bar')
1796
1797    def test_mboxmmdf_to_mh(self):
1798        # Convert mboxMessage and MMDFMessage to MHMessage
1799        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1800            msg_mboxMMDF = class_(_sample_message)
1801            pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1802                     ('F', ['unseen', 'flagged']),
1803                     ('A', ['unseen', 'replied']),
1804                     ('RODFA', ['replied', 'flagged']))
1805            for setting, result in pairs:
1806                msg_mboxMMDF.set_flags(setting)
1807                self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1808                                 result)
1809
1810    def test_mboxmmdf_to_babyl(self):
1811        # Convert mboxMessage and MMDFMessage to BabylMessage
1812        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1813            msg = class_(_sample_message)
1814            pairs = (('R', []), ('O', ['unseen']),
1815                     ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1816                     ('A', ['unseen', 'answered']),
1817                     ('RODFA', ['deleted', 'answered']))
1818            for setting, result in pairs:
1819                msg.set_flags(setting)
1820                self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1821
1822    def test_mh_to_maildir(self):
1823        # Convert MHMessage to MaildirMessage
1824        pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1825        for setting, result in pairs:
1826            msg = mailbox.MHMessage(_sample_message)
1827            msg.add_sequence(setting)
1828            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1829            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1830        msg = mailbox.MHMessage(_sample_message)
1831        msg.add_sequence('unseen')
1832        msg.add_sequence('replied')
1833        msg.add_sequence('flagged')
1834        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1835        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1836
1837    def test_mh_to_mboxmmdf(self):
1838        # Convert MHMessage to mboxMessage and MMDFMessage
1839        pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1840        for setting, result in pairs:
1841            msg = mailbox.MHMessage(_sample_message)
1842            msg.add_sequence(setting)
1843            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1844                self.assertEqual(class_(msg).get_flags(), result)
1845        msg = mailbox.MHMessage(_sample_message)
1846        msg.add_sequence('unseen')
1847        msg.add_sequence('replied')
1848        msg.add_sequence('flagged')
1849        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1850            self.assertEqual(class_(msg).get_flags(), 'OFA')
1851
1852    def test_mh_to_mh(self):
1853        # Convert MHMessage to MHMessage
1854        msg = mailbox.MHMessage(_sample_message)
1855        msg.add_sequence('unseen')
1856        msg.add_sequence('replied')
1857        msg.add_sequence('flagged')
1858        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1859                         ['unseen', 'replied', 'flagged'])
1860
1861    def test_mh_to_babyl(self):
1862        # Convert MHMessage to BabylMessage
1863        pairs = (('unseen', ['unseen']), ('replied', ['answered']),
1864                 ('flagged', []))
1865        for setting, result in pairs:
1866            msg = mailbox.MHMessage(_sample_message)
1867            msg.add_sequence(setting)
1868            self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1869        msg = mailbox.MHMessage(_sample_message)
1870        msg.add_sequence('unseen')
1871        msg.add_sequence('replied')
1872        msg.add_sequence('flagged')
1873        self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1874                         ['unseen', 'answered'])
1875
1876    def test_babyl_to_maildir(self):
1877        # Convert BabylMessage to MaildirMessage
1878        pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1879                 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1880                 ('resent', 'PS'))
1881        for setting, result in pairs:
1882            msg = mailbox.BabylMessage(_sample_message)
1883            msg.add_label(setting)
1884            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1885            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1886        msg = mailbox.BabylMessage(_sample_message)
1887        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1888                      'edited', 'resent'):
1889            msg.add_label(label)
1890        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1891        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1892
1893    def test_babyl_to_mboxmmdf(self):
1894        # Convert BabylMessage to mboxMessage and MMDFMessage
1895        pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1896                 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1897                 ('resent', 'RO'))
1898        for setting, result in pairs:
1899            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1900                msg = mailbox.BabylMessage(_sample_message)
1901                msg.add_label(setting)
1902                self.assertEqual(class_(msg).get_flags(), result)
1903        msg = mailbox.BabylMessage(_sample_message)
1904        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1905                      'edited', 'resent'):
1906            msg.add_label(label)
1907        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1908            self.assertEqual(class_(msg).get_flags(), 'ODA')
1909
1910    def test_babyl_to_mh(self):
1911        # Convert BabylMessage to MHMessage
1912        pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1913                 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1914                 ('resent', []))
1915        for setting, result in pairs:
1916            msg = mailbox.BabylMessage(_sample_message)
1917            msg.add_label(setting)
1918            self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1919        msg = mailbox.BabylMessage(_sample_message)
1920        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1921                      'edited', 'resent'):
1922            msg.add_label(label)
1923        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1924                         ['unseen', 'replied'])
1925
1926    def test_babyl_to_babyl(self):
1927        # Convert BabylMessage to BabylMessage
1928        msg = mailbox.BabylMessage(_sample_message)
1929        msg.update_visible()
1930        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1931                      'edited', 'resent'):
1932            msg.add_label(label)
1933        msg2 = mailbox.BabylMessage(msg)
1934        self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1935                                             'answered', 'forwarded', 'edited',
1936                                             'resent'])
1937        self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1938        for key in msg.get_visible().keys():
1939            self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1940
1941
1942class TestProxyFileBase(TestBase):
1943
1944    def _test_read(self, proxy):
1945        # Read by byte
1946        proxy.seek(0)
1947        self.assertEqual(proxy.read(), b'bar')
1948        proxy.seek(1)
1949        self.assertEqual(proxy.read(), b'ar')
1950        proxy.seek(0)
1951        self.assertEqual(proxy.read(2), b'ba')
1952        proxy.seek(1)
1953        self.assertEqual(proxy.read(-1), b'ar')
1954        proxy.seek(2)
1955        self.assertEqual(proxy.read(1000), b'r')
1956
1957    def _test_readline(self, proxy):
1958        # Read by line
1959        linesep = os.linesep.encode()
1960        proxy.seek(0)
1961        self.assertEqual(proxy.readline(), b'foo' + linesep)
1962        self.assertEqual(proxy.readline(), b'bar' + linesep)
1963        self.assertEqual(proxy.readline(), b'fred' + linesep)
1964        self.assertEqual(proxy.readline(), b'bob')
1965        proxy.seek(2)
1966        self.assertEqual(proxy.readline(), b'o' + linesep)
1967        proxy.seek(6 + 2 * len(os.linesep))
1968        self.assertEqual(proxy.readline(), b'fred' + linesep)
1969        proxy.seek(6 + 2 * len(os.linesep))
1970        self.assertEqual(proxy.readline(2), b'fr')
1971        self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1972
1973    def _test_readlines(self, proxy):
1974        # Read multiple lines
1975        linesep = os.linesep.encode()
1976        proxy.seek(0)
1977        self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1978                                           b'bar' + linesep,
1979                                           b'fred' + linesep, b'bob'])
1980        proxy.seek(0)
1981        self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1982        proxy.seek(3 + len(linesep))
1983        self.assertEqual(proxy.readlines(4 + len(linesep)),
1984                     [b'bar' + linesep, b'fred' + linesep])
1985        proxy.seek(3)
1986        self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1987                                               b'fred' + linesep, b'bob'])
1988
1989    def _test_iteration(self, proxy):
1990        # Iterate by line
1991        linesep = os.linesep.encode()
1992        proxy.seek(0)
1993        iterator = iter(proxy)
1994        self.assertEqual(next(iterator), b'foo' + linesep)
1995        self.assertEqual(next(iterator), b'bar' + linesep)
1996        self.assertEqual(next(iterator), b'fred' + linesep)
1997        self.assertEqual(next(iterator), b'bob')
1998        self.assertRaises(StopIteration, next, iterator)
1999
2000    def _test_seek_and_tell(self, proxy):
2001        # Seek and use tell to check position
2002        linesep = os.linesep.encode()
2003        proxy.seek(3)
2004        self.assertEqual(proxy.tell(), 3)
2005        self.assertEqual(proxy.read(len(linesep)), linesep)
2006        proxy.seek(2, 1)
2007        self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
2008        proxy.seek(-3 - len(linesep), 2)
2009        self.assertEqual(proxy.read(3), b'bar')
2010        proxy.seek(2, 0)
2011        self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
2012        proxy.seek(100)
2013        self.assertFalse(proxy.read())
2014
2015    def _test_close(self, proxy):
2016        # Close a file
2017        self.assertFalse(proxy.closed)
2018        proxy.close()
2019        self.assertTrue(proxy.closed)
2020        # Issue 11700 subsequent closes should be a no-op.
2021        proxy.close()
2022        self.assertTrue(proxy.closed)
2023
2024
2025class TestProxyFile(TestProxyFileBase, unittest.TestCase):
2026
2027    def setUp(self):
2028        self._path = os_helper.TESTFN
2029        self._file = open(self._path, 'wb+')
2030
2031    def tearDown(self):
2032        self._file.close()
2033        self._delete_recursively(self._path)
2034
2035    def test_initialize(self):
2036        # Initialize and check position
2037        self._file.write(b'foo')
2038        pos = self._file.tell()
2039        proxy0 = mailbox._ProxyFile(self._file)
2040        self.assertEqual(proxy0.tell(), pos)
2041        self.assertEqual(self._file.tell(), pos)
2042        proxy1 = mailbox._ProxyFile(self._file, 0)
2043        self.assertEqual(proxy1.tell(), 0)
2044        self.assertEqual(self._file.tell(), pos)
2045
2046    def test_read(self):
2047        self._file.write(b'bar')
2048        self._test_read(mailbox._ProxyFile(self._file))
2049
2050    def test_readline(self):
2051        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2052                                                  os.linesep), 'ascii'))
2053        self._test_readline(mailbox._ProxyFile(self._file))
2054
2055    def test_readlines(self):
2056        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2057                                                  os.linesep), 'ascii'))
2058        self._test_readlines(mailbox._ProxyFile(self._file))
2059
2060    def test_iteration(self):
2061        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2062                                                  os.linesep), 'ascii'))
2063        self._test_iteration(mailbox._ProxyFile(self._file))
2064
2065    def test_seek_and_tell(self):
2066        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2067        self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2068
2069    def test_close(self):
2070        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2071        self._test_close(mailbox._ProxyFile(self._file))
2072
2073
2074class TestPartialFile(TestProxyFileBase, unittest.TestCase):
2075
2076    def setUp(self):
2077        self._path = os_helper.TESTFN
2078        self._file = open(self._path, 'wb+')
2079
2080    def tearDown(self):
2081        self._file.close()
2082        self._delete_recursively(self._path)
2083
2084    def test_initialize(self):
2085        # Initialize and check position
2086        self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2087        pos = self._file.tell()
2088        proxy = mailbox._PartialFile(self._file, 2, 5)
2089        self.assertEqual(proxy.tell(), 0)
2090        self.assertEqual(self._file.tell(), pos)
2091
2092    def test_read(self):
2093        self._file.write(bytes('***bar***', 'ascii'))
2094        self._test_read(mailbox._PartialFile(self._file, 3, 6))
2095
2096    def test_readline(self):
2097        self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2098                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2099        self._test_readline(mailbox._PartialFile(self._file, 5,
2100                                                 18 + 3 * len(os.linesep)))
2101
2102    def test_readlines(self):
2103        self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2104                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2105        self._test_readlines(mailbox._PartialFile(self._file, 0,
2106                                                  13 + 3 * len(os.linesep)))
2107
2108    def test_iteration(self):
2109        self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2110                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2111        self._test_iteration(mailbox._PartialFile(self._file, 4,
2112                                                  17 + 3 * len(os.linesep)))
2113
2114    def test_seek_and_tell(self):
2115        self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2116        self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2117                                                      9 + 2 * len(os.linesep)))
2118
2119    def test_close(self):
2120        self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2121        self._test_close(mailbox._PartialFile(self._file, 1,
2122                                              6 + 3 * len(os.linesep)))
2123
2124
2125## Start: tests from the original module (for backward compatibility).
2126
2127FROM_ = "From [email protected]  Sat Jul 24 13:43:35 2004\n"
2128DUMMY_MESSAGE = """\
2129From: [email protected]
2130To: [email protected]
2131Subject: Simple Test
2132
2133This is a dummy message.
2134"""
2135
2136class MaildirTestCase(unittest.TestCase):
2137
2138    def setUp(self):
2139        # create a new maildir mailbox to work with:
2140        self._dir = os_helper.TESTFN
2141        if os.path.isdir(self._dir):
2142            os_helper.rmtree(self._dir)
2143        elif os.path.isfile(self._dir):
2144            os_helper.unlink(self._dir)
2145        os.mkdir(self._dir)
2146        os.mkdir(os.path.join(self._dir, "cur"))
2147        os.mkdir(os.path.join(self._dir, "tmp"))
2148        os.mkdir(os.path.join(self._dir, "new"))
2149        self._counter = 1
2150        self._msgfiles = []
2151
2152    def tearDown(self):
2153        list(map(os.unlink, self._msgfiles))
2154        os_helper.rmdir(os.path.join(self._dir, "cur"))
2155        os_helper.rmdir(os.path.join(self._dir, "tmp"))
2156        os_helper.rmdir(os.path.join(self._dir, "new"))
2157        os_helper.rmdir(self._dir)
2158
2159    def createMessage(self, dir, mbox=False):
2160        t = int(time.time() % 1000000)
2161        pid = self._counter
2162        self._counter += 1
2163        filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
2164        tmpname = os.path.join(self._dir, "tmp", filename)
2165        newname = os.path.join(self._dir, dir, filename)
2166        with open(tmpname, "w", encoding="utf-8") as fp:
2167            self._msgfiles.append(tmpname)
2168            if mbox:
2169                fp.write(FROM_)
2170            fp.write(DUMMY_MESSAGE)
2171        try:
2172            os.link(tmpname, newname)
2173        except (AttributeError, PermissionError):
2174            with open(newname, "w") as fp:
2175                fp.write(DUMMY_MESSAGE)
2176        self._msgfiles.append(newname)
2177        return tmpname
2178
2179    def test_empty_maildir(self):
2180        """Test an empty maildir mailbox"""
2181        # Test for regression on bug #117490:
2182        # Make sure the boxes attribute actually gets set.
2183        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2184        #self.assertTrue(hasattr(self.mbox, "boxes"))
2185        #self.assertEqual(len(self.mbox.boxes), 0)
2186        self.assertIsNone(self.mbox.next())
2187        self.assertIsNone(self.mbox.next())
2188
2189    def test_nonempty_maildir_cur(self):
2190        self.createMessage("cur")
2191        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2192        #self.assertEqual(len(self.mbox.boxes), 1)
2193        self.assertIsNotNone(self.mbox.next())
2194        self.assertIsNone(self.mbox.next())
2195        self.assertIsNone(self.mbox.next())
2196
2197    def test_nonempty_maildir_new(self):
2198        self.createMessage("new")
2199        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2200        #self.assertEqual(len(self.mbox.boxes), 1)
2201        self.assertIsNotNone(self.mbox.next())
2202        self.assertIsNone(self.mbox.next())
2203        self.assertIsNone(self.mbox.next())
2204
2205    def test_nonempty_maildir_both(self):
2206        self.createMessage("cur")
2207        self.createMessage("new")
2208        self.mbox = mailbox.Maildir(os_helper.TESTFN)
2209        #self.assertEqual(len(self.mbox.boxes), 2)
2210        self.assertIsNotNone(self.mbox.next())
2211        self.assertIsNotNone(self.mbox.next())
2212        self.assertIsNone(self.mbox.next())
2213        self.assertIsNone(self.mbox.next())
2214
2215## End: tests from the original module (for backward compatibility).
2216
2217
2218_sample_message = """\
2219Return-Path: <[email protected]>
2220X-Original-To: gkj+person@localhost
2221Delivered-To: gkj+person@localhost
2222Received: from localhost (localhost [127.0.0.1])
2223        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2224        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2225Delivered-To: [email protected]
2226Received: from localhost [127.0.0.1]
2227        by localhost with POP3 (fetchmail-6.2.5)
2228        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2229Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2230        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2231        for <[email protected]>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2232Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
2233        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2234Date: Wed, 13 Jul 2005 17:23:11 -0400
2235From: "Gregory K. Johnson" <[email protected]>
2236To: [email protected]
2237Subject: Sample message
2238Message-ID: <[email protected]>
2239Mime-Version: 1.0
2240Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
2241Content-Disposition: inline
2242User-Agent: Mutt/1.5.9i
2243
2244
2245--NMuMz9nt05w80d4+
2246Content-Type: text/plain; charset=us-ascii
2247Content-Disposition: inline
2248
2249This is a sample message.
2250
2251--
2252Gregory K. Johnson
2253
2254--NMuMz9nt05w80d4+
2255Content-Type: application/octet-stream
2256Content-Disposition: attachment; filename="text.gz"
2257Content-Transfer-Encoding: base64
2258
2259H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22603FYlAAAA
2261
2262--NMuMz9nt05w80d4+--
2263"""
2264
2265_bytes_sample_message = _sample_message.encode('ascii')
2266
2267_sample_headers = {
2268    "Return-Path":"<[email protected]>",
2269    "X-Original-To":"gkj+person@localhost",
2270    "Delivered-To":"gkj+person@localhost",
2271    "Received":"""from localhost (localhost [127.0.0.1])
2272        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2273        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2274    "Delivered-To":"[email protected]",
2275    "Received":"""from localhost [127.0.0.1]
2276        by localhost with POP3 (fetchmail-6.2.5)
2277        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2278    "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2279        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2280        for <[email protected]>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2281    "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
2282        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2283    "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
2284    "From":""""Gregory K. Johnson" <[email protected]>""",
2285    "To":"[email protected]",
2286    "Subject":"Sample message",
2287    "Mime-Version":"1.0",
2288    "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
2289    "Content-Disposition":"inline",
2290    "User-Agent": "Mutt/1.5.9i" }
2291
2292_sample_payloads = ("""This is a sample message.
2293
2294--
2295Gregory K. Johnson
2296""",
2297"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22983FYlAAAA
2299""")
2300
2301
2302class MiscTestCase(unittest.TestCase):
2303    def test__all__(self):
2304        support.check__all__(self, mailbox,
2305                             not_exported={"linesep", "fcntl"})
2306
2307
2308def tearDownModule():
2309    support.reap_children()
2310
2311
2312if __name__ == '__main__':
2313    unittest.main()
2314