1import base64
2import email.mime.text
3from email.message import EmailMessage
4from email.base64mime import body_encode as encode_base64
5import email.utils
6import hashlib
7import hmac
8import socket
9import smtplib
10import io
11import re
12import sys
13import time
14import select
15import errno
16import textwrap
17import threading
18
19import unittest
20from test import support, mock_socket
21from test.support import hashlib_helper
22from test.support import socket_helper
23from test.support import threading_helper
24from test.support import warnings_helper
25from unittest.mock import Mock
26
27
28asyncore = warnings_helper.import_deprecated('asyncore')
29smtpd = warnings_helper.import_deprecated('smtpd')
30
31
32support.requires_working_socket(module=True)
33
34HOST = socket_helper.HOST
35
36if sys.platform == 'darwin':
37    # select.poll returns a select.POLLHUP at the end of the tests
38    # on darwin, so just ignore it
39    def handle_expt(self):
40        pass
41    smtpd.SMTPChannel.handle_expt = handle_expt
42
43
44def server(evt, buf, serv):
45    serv.listen()
46    evt.set()
47    try:
48        conn, addr = serv.accept()
49    except TimeoutError:
50        pass
51    else:
52        n = 500
53        while buf and n > 0:
54            r, w, e = select.select([], [conn], [])
55            if w:
56                sent = conn.send(buf)
57                buf = buf[sent:]
58
59            n -= 1
60
61        conn.close()
62    finally:
63        serv.close()
64        evt.set()
65
66class GeneralTests:
67
68    def setUp(self):
69        smtplib.socket = mock_socket
70        self.port = 25
71
72    def tearDown(self):
73        smtplib.socket = socket
74
75    # This method is no longer used but is retained for backward compatibility,
76    # so test to make sure it still works.
77    def testQuoteData(self):
78        teststr  = "abc\n.jkl\rfoo\r\n..blue"
79        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
80        self.assertEqual(expected, smtplib.quotedata(teststr))
81
82    def testBasic1(self):
83        mock_socket.reply_with(b"220 Hola mundo")
84        # connects
85        client = self.client(HOST, self.port)
86        client.close()
87
88    def testSourceAddress(self):
89        mock_socket.reply_with(b"220 Hola mundo")
90        # connects
91        client = self.client(HOST, self.port,
92                             source_address=('127.0.0.1',19876))
93        self.assertEqual(client.source_address, ('127.0.0.1', 19876))
94        client.close()
95
96    def testBasic2(self):
97        mock_socket.reply_with(b"220 Hola mundo")
98        # connects, include port in host name
99        client = self.client("%s:%s" % (HOST, self.port))
100        client.close()
101
102    def testLocalHostName(self):
103        mock_socket.reply_with(b"220 Hola mundo")
104        # check that supplied local_hostname is used
105        client = self.client(HOST, self.port, local_hostname="testhost")
106        self.assertEqual(client.local_hostname, "testhost")
107        client.close()
108
109    def testTimeoutDefault(self):
110        mock_socket.reply_with(b"220 Hola mundo")
111        self.assertIsNone(mock_socket.getdefaulttimeout())
112        mock_socket.setdefaulttimeout(30)
113        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
114        try:
115            client = self.client(HOST, self.port)
116        finally:
117            mock_socket.setdefaulttimeout(None)
118        self.assertEqual(client.sock.gettimeout(), 30)
119        client.close()
120
121    def testTimeoutNone(self):
122        mock_socket.reply_with(b"220 Hola mundo")
123        self.assertIsNone(socket.getdefaulttimeout())
124        socket.setdefaulttimeout(30)
125        try:
126            client = self.client(HOST, self.port, timeout=None)
127        finally:
128            socket.setdefaulttimeout(None)
129        self.assertIsNone(client.sock.gettimeout())
130        client.close()
131
132    def testTimeoutZero(self):
133        mock_socket.reply_with(b"220 Hola mundo")
134        with self.assertRaises(ValueError):
135            self.client(HOST, self.port, timeout=0)
136
137    def testTimeoutValue(self):
138        mock_socket.reply_with(b"220 Hola mundo")
139        client = self.client(HOST, self.port, timeout=30)
140        self.assertEqual(client.sock.gettimeout(), 30)
141        client.close()
142
143    def test_debuglevel(self):
144        mock_socket.reply_with(b"220 Hello world")
145        client = self.client()
146        client.set_debuglevel(1)
147        with support.captured_stderr() as stderr:
148            client.connect(HOST, self.port)
149        client.close()
150        expected = re.compile(r"^connect:", re.MULTILINE)
151        self.assertRegex(stderr.getvalue(), expected)
152
153    def test_debuglevel_2(self):
154        mock_socket.reply_with(b"220 Hello world")
155        client = self.client()
156        client.set_debuglevel(2)
157        with support.captured_stderr() as stderr:
158            client.connect(HOST, self.port)
159        client.close()
160        expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ",
161                              re.MULTILINE)
162        self.assertRegex(stderr.getvalue(), expected)
163
164
165class SMTPGeneralTests(GeneralTests, unittest.TestCase):
166
167    client = smtplib.SMTP
168
169
170class LMTPGeneralTests(GeneralTests, unittest.TestCase):
171
172    client = smtplib.LMTP
173
174    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket")
175    def testUnixDomainSocketTimeoutDefault(self):
176        local_host = '/some/local/lmtp/delivery/program'
177        mock_socket.reply_with(b"220 Hello world")
178        try:
179            client = self.client(local_host, self.port)
180        finally:
181            mock_socket.setdefaulttimeout(None)
182        self.assertIsNone(client.sock.gettimeout())
183        client.close()
184
185    def testTimeoutZero(self):
186        super().testTimeoutZero()
187        local_host = '/some/local/lmtp/delivery/program'
188        with self.assertRaises(ValueError):
189            self.client(local_host, timeout=0)
190
191# Test server thread using the specified SMTP server class
192def debugging_server(serv, serv_evt, client_evt):
193    serv_evt.set()
194
195    try:
196        if hasattr(select, 'poll'):
197            poll_fun = asyncore.poll2
198        else:
199            poll_fun = asyncore.poll
200
201        n = 1000
202        while asyncore.socket_map and n > 0:
203            poll_fun(0.01, asyncore.socket_map)
204
205            # when the client conversation is finished, it will
206            # set client_evt, and it's then ok to kill the server
207            if client_evt.is_set():
208                serv.close()
209                break
210
211            n -= 1
212
213    except TimeoutError:
214        pass
215    finally:
216        if not client_evt.is_set():
217            # allow some time for the client to read the result
218            time.sleep(0.5)
219            serv.close()
220        asyncore.close_all()
221        serv_evt.set()
222
223MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
224MSG_END = '------------ END MESSAGE ------------\n'
225
226# NOTE: Some SMTP objects in the tests below are created with a non-default
227# local_hostname argument to the constructor, since (on some systems) the FQDN
228# lookup caused by the default local_hostname sometimes takes so long that the
229# test server times out, causing the test to fail.
230
231# Test behavior of smtpd.DebuggingServer
232class DebuggingServerTests(unittest.TestCase):
233
234    maxDiff = None
235
236    def setUp(self):
237        self.thread_key = threading_helper.threading_setup()
238        self.real_getfqdn = socket.getfqdn
239        socket.getfqdn = mock_socket.getfqdn
240        # temporarily replace sys.stdout to capture DebuggingServer output
241        self.old_stdout = sys.stdout
242        self.output = io.StringIO()
243        sys.stdout = self.output
244
245        self.serv_evt = threading.Event()
246        self.client_evt = threading.Event()
247        # Capture SMTPChannel debug output
248        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
249        smtpd.DEBUGSTREAM = io.StringIO()
250        # Pick a random unused port by passing 0 for the port number
251        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1),
252                                          decode_data=True)
253        # Keep a note of what server host and port were assigned
254        self.host, self.port = self.serv.socket.getsockname()[:2]
255        serv_args = (self.serv, self.serv_evt, self.client_evt)
256        self.thread = threading.Thread(target=debugging_server, args=serv_args)
257        self.thread.start()
258
259        # wait until server thread has assigned a port number
260        self.serv_evt.wait()
261        self.serv_evt.clear()
262
263    def tearDown(self):
264        socket.getfqdn = self.real_getfqdn
265        # indicate that the client is finished
266        self.client_evt.set()
267        # wait for the server thread to terminate
268        self.serv_evt.wait()
269        threading_helper.join_thread(self.thread)
270        # restore sys.stdout
271        sys.stdout = self.old_stdout
272        # restore DEBUGSTREAM
273        smtpd.DEBUGSTREAM.close()
274        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
275        del self.thread
276        self.doCleanups()
277        threading_helper.threading_cleanup(*self.thread_key)
278
279    def get_output_without_xpeer(self):
280        test_output = self.output.getvalue()
281        return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2',
282                      test_output, flags=re.MULTILINE|re.DOTALL)
283
284    def testBasic(self):
285        # connect
286        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
287                            timeout=support.LOOPBACK_TIMEOUT)
288        smtp.quit()
289
290    def testSourceAddress(self):
291        # connect
292        src_port = socket_helper.find_unused_port()
293        try:
294            smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
295                                timeout=support.LOOPBACK_TIMEOUT,
296                                source_address=(self.host, src_port))
297            self.addCleanup(smtp.close)
298            self.assertEqual(smtp.source_address, (self.host, src_port))
299            self.assertEqual(smtp.local_hostname, 'localhost')
300            smtp.quit()
301        except OSError as e:
302            if e.errno == errno.EADDRINUSE:
303                self.skipTest("couldn't bind to source port %d" % src_port)
304            raise
305
306    def testNOOP(self):
307        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
308                            timeout=support.LOOPBACK_TIMEOUT)
309        self.addCleanup(smtp.close)
310        expected = (250, b'OK')
311        self.assertEqual(smtp.noop(), expected)
312        smtp.quit()
313
314    def testRSET(self):
315        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
316                            timeout=support.LOOPBACK_TIMEOUT)
317        self.addCleanup(smtp.close)
318        expected = (250, b'OK')
319        self.assertEqual(smtp.rset(), expected)
320        smtp.quit()
321
322    def testELHO(self):
323        # EHLO isn't implemented in DebuggingServer
324        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
325                            timeout=support.LOOPBACK_TIMEOUT)
326        self.addCleanup(smtp.close)
327        expected = (250, b'\nSIZE 33554432\nHELP')
328        self.assertEqual(smtp.ehlo(), expected)
329        smtp.quit()
330
331    def testEXPNNotImplemented(self):
332        # EXPN isn't implemented in DebuggingServer
333        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
334                            timeout=support.LOOPBACK_TIMEOUT)
335        self.addCleanup(smtp.close)
336        expected = (502, b'EXPN not implemented')
337        smtp.putcmd('EXPN')
338        self.assertEqual(smtp.getreply(), expected)
339        smtp.quit()
340
341    def test_issue43124_putcmd_escapes_newline(self):
342        # see: https://bugs.python.org/issue43124
343        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
344                            timeout=support.LOOPBACK_TIMEOUT)
345        self.addCleanup(smtp.close)
346        with self.assertRaises(ValueError) as exc:
347            smtp.putcmd('helo\nX-INJECTED')
348        self.assertIn("prohibited newline characters", str(exc.exception))
349        smtp.quit()
350
351    def testVRFY(self):
352        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
353                            timeout=support.LOOPBACK_TIMEOUT)
354        self.addCleanup(smtp.close)
355        expected = (252, b'Cannot VRFY user, but will accept message ' + \
356                         b'and attempt delivery')
357        self.assertEqual(smtp.vrfy('[email protected]'), expected)
358        self.assertEqual(smtp.verify('[email protected]'), expected)
359        smtp.quit()
360
361    def testSecondHELO(self):
362        # check that a second HELO returns a message that it's a duplicate
363        # (this behavior is specific to smtpd.SMTPChannel)
364        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
365                            timeout=support.LOOPBACK_TIMEOUT)
366        self.addCleanup(smtp.close)
367        smtp.helo()
368        expected = (503, b'Duplicate HELO/EHLO')
369        self.assertEqual(smtp.helo(), expected)
370        smtp.quit()
371
372    def testHELP(self):
373        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
374                            timeout=support.LOOPBACK_TIMEOUT)
375        self.addCleanup(smtp.close)
376        self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \
377                                      b'RCPT DATA RSET NOOP QUIT VRFY')
378        smtp.quit()
379
380    def testSend(self):
381        # connect and send mail
382        m = 'A test message'
383        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
384                            timeout=support.LOOPBACK_TIMEOUT)
385        self.addCleanup(smtp.close)
386        smtp.sendmail('John', 'Sally', m)
387        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
388        # in asyncore.  This sleep might help, but should really be fixed
389        # properly by using an Event variable.
390        time.sleep(0.01)
391        smtp.quit()
392
393        self.client_evt.set()
394        self.serv_evt.wait()
395        self.output.flush()
396        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
397        self.assertEqual(self.output.getvalue(), mexpect)
398
399    def testSendBinary(self):
400        m = b'A test message'
401        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
402                            timeout=support.LOOPBACK_TIMEOUT)
403        self.addCleanup(smtp.close)
404        smtp.sendmail('John', 'Sally', m)
405        # XXX (see comment in testSend)
406        time.sleep(0.01)
407        smtp.quit()
408
409        self.client_evt.set()
410        self.serv_evt.wait()
411        self.output.flush()
412        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
413        self.assertEqual(self.output.getvalue(), mexpect)
414
415    def testSendNeedingDotQuote(self):
416        # Issue 12283
417        m = '.A test\n.mes.sage.'
418        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
419                            timeout=support.LOOPBACK_TIMEOUT)
420        self.addCleanup(smtp.close)
421        smtp.sendmail('John', 'Sally', m)
422        # XXX (see comment in testSend)
423        time.sleep(0.01)
424        smtp.quit()
425
426        self.client_evt.set()
427        self.serv_evt.wait()
428        self.output.flush()
429        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
430        self.assertEqual(self.output.getvalue(), mexpect)
431
432    def test_issue43124_escape_localhostname(self):
433        # see: https://bugs.python.org/issue43124
434        # connect and send mail
435        m = 'wazzuuup\nlinetwo'
436        smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED',
437                            timeout=support.LOOPBACK_TIMEOUT)
438        self.addCleanup(smtp.close)
439        with self.assertRaises(ValueError) as exc:
440            smtp.sendmail("[email protected]", "[email protected]", m)
441        self.assertIn(
442            "prohibited newline characters: ehlo hi\\nX-INJECTED",
443            str(exc.exception),
444        )
445        # XXX (see comment in testSend)
446        time.sleep(0.01)
447        smtp.quit()
448
449        debugout = smtpd.DEBUGSTREAM.getvalue()
450        self.assertNotIn("X-INJECTED", debugout)
451
452    def test_issue43124_escape_options(self):
453        # see: https://bugs.python.org/issue43124
454        # connect and send mail
455        m = 'wazzuuup\nlinetwo'
456        smtp = smtplib.SMTP(
457            HOST, self.port, local_hostname='localhost',
458            timeout=support.LOOPBACK_TIMEOUT)
459
460        self.addCleanup(smtp.close)
461        smtp.sendmail("[email protected]", "[email protected]", m)
462        with self.assertRaises(ValueError) as exc:
463            smtp.mail("[email protected]", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"])
464        msg = str(exc.exception)
465        self.assertIn("prohibited newline characters", msg)
466        self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg)
467        # XXX (see comment in testSend)
468        time.sleep(0.01)
469        smtp.quit()
470
471        debugout = smtpd.DEBUGSTREAM.getvalue()
472        self.assertNotIn("X-OPTION", debugout)
473        self.assertNotIn("X-OPTION2", debugout)
474        self.assertNotIn("X-INJECTED-1", debugout)
475        self.assertNotIn("X-INJECTED-2", debugout)
476
477    def testSendNullSender(self):
478        m = 'A test message'
479        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
480                            timeout=support.LOOPBACK_TIMEOUT)
481        self.addCleanup(smtp.close)
482        smtp.sendmail('<>', 'Sally', m)
483        # XXX (see comment in testSend)
484        time.sleep(0.01)
485        smtp.quit()
486
487        self.client_evt.set()
488        self.serv_evt.wait()
489        self.output.flush()
490        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
491        self.assertEqual(self.output.getvalue(), mexpect)
492        debugout = smtpd.DEBUGSTREAM.getvalue()
493        sender = re.compile("^sender: <>$", re.MULTILINE)
494        self.assertRegex(debugout, sender)
495
496    def testSendMessage(self):
497        m = email.mime.text.MIMEText('A test message')
498        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
499                            timeout=support.LOOPBACK_TIMEOUT)
500        self.addCleanup(smtp.close)
501        smtp.send_message(m, from_addr='John', to_addrs='Sally')
502        # XXX (see comment in testSend)
503        time.sleep(0.01)
504        smtp.quit()
505
506        self.client_evt.set()
507        self.serv_evt.wait()
508        self.output.flush()
509        # Remove the X-Peer header that DebuggingServer adds as figuring out
510        # exactly what IP address format is put there is not easy (and
511        # irrelevant to our test).  Typically 127.0.0.1 or ::1, but it is
512        # not always the same as socket.gethostbyname(HOST). :(
513        test_output = self.get_output_without_xpeer()
514        del m['X-Peer']
515        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
516        self.assertEqual(test_output, mexpect)
517
518    def testSendMessageWithAddresses(self):
519        m = email.mime.text.MIMEText('A test message')
520        m['From'] = '[email protected]'
521        m['To'] = 'John'
522        m['CC'] = 'Sally, Fred'
523        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>'
524        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
525                            timeout=support.LOOPBACK_TIMEOUT)
526        self.addCleanup(smtp.close)
527        smtp.send_message(m)
528        # XXX (see comment in testSend)
529        time.sleep(0.01)
530        smtp.quit()
531        # make sure the Bcc header is still in the message.
532        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
533                                    '<[email protected]>')
534
535        self.client_evt.set()
536        self.serv_evt.wait()
537        self.output.flush()
538        # Remove the X-Peer header that DebuggingServer adds.
539        test_output = self.get_output_without_xpeer()
540        del m['X-Peer']
541        # The Bcc header should not be transmitted.
542        del m['Bcc']
543        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
544        self.assertEqual(test_output, mexpect)
545        debugout = smtpd.DEBUGSTREAM.getvalue()
546        sender = re.compile("^sender: [email protected]$", re.MULTILINE)
547        self.assertRegex(debugout, sender)
548        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
549                     '[email protected]'):
550            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
551                                 re.MULTILINE)
552            self.assertRegex(debugout, to_addr)
553
554    def testSendMessageWithSomeAddresses(self):
555        # Make sure nothing breaks if not all of the three 'to' headers exist
556        m = email.mime.text.MIMEText('A test message')
557        m['From'] = '[email protected]'
558        m['To'] = 'John, Dinsdale'
559        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
560                            timeout=support.LOOPBACK_TIMEOUT)
561        self.addCleanup(smtp.close)
562        smtp.send_message(m)
563        # XXX (see comment in testSend)
564        time.sleep(0.01)
565        smtp.quit()
566
567        self.client_evt.set()
568        self.serv_evt.wait()
569        self.output.flush()
570        # Remove the X-Peer header that DebuggingServer adds.
571        test_output = self.get_output_without_xpeer()
572        del m['X-Peer']
573        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
574        self.assertEqual(test_output, mexpect)
575        debugout = smtpd.DEBUGSTREAM.getvalue()
576        sender = re.compile("^sender: [email protected]$", re.MULTILINE)
577        self.assertRegex(debugout, sender)
578        for addr in ('John', 'Dinsdale'):
579            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
580                                 re.MULTILINE)
581            self.assertRegex(debugout, to_addr)
582
583    def testSendMessageWithSpecifiedAddresses(self):
584        # Make sure addresses specified in call override those in message.
585        m = email.mime.text.MIMEText('A test message')
586        m['From'] = '[email protected]'
587        m['To'] = 'John, Dinsdale'
588        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
589                            timeout=support.LOOPBACK_TIMEOUT)
590        self.addCleanup(smtp.close)
591        smtp.send_message(m, from_addr='[email protected]', to_addrs='[email protected]')
592        # XXX (see comment in testSend)
593        time.sleep(0.01)
594        smtp.quit()
595
596        self.client_evt.set()
597        self.serv_evt.wait()
598        self.output.flush()
599        # Remove the X-Peer header that DebuggingServer adds.
600        test_output = self.get_output_without_xpeer()
601        del m['X-Peer']
602        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
603        self.assertEqual(test_output, mexpect)
604        debugout = smtpd.DEBUGSTREAM.getvalue()
605        sender = re.compile("^sender: [email protected]$", re.MULTILINE)
606        self.assertRegex(debugout, sender)
607        for addr in ('John', 'Dinsdale'):
608            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
609                                 re.MULTILINE)
610            self.assertNotRegex(debugout, to_addr)
611        recip = re.compile(r"^recips: .*'[email protected]'.*$", re.MULTILINE)
612        self.assertRegex(debugout, recip)
613
614    def testSendMessageWithMultipleFrom(self):
615        # Sender overrides To
616        m = email.mime.text.MIMEText('A test message')
617        m['From'] = 'Bernard, Bianca'
618        m['Sender'] = '[email protected]'
619        m['To'] = 'John, Dinsdale'
620        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
621                            timeout=support.LOOPBACK_TIMEOUT)
622        self.addCleanup(smtp.close)
623        smtp.send_message(m)
624        # XXX (see comment in testSend)
625        time.sleep(0.01)
626        smtp.quit()
627
628        self.client_evt.set()
629        self.serv_evt.wait()
630        self.output.flush()
631        # Remove the X-Peer header that DebuggingServer adds.
632        test_output = self.get_output_without_xpeer()
633        del m['X-Peer']
634        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
635        self.assertEqual(test_output, mexpect)
636        debugout = smtpd.DEBUGSTREAM.getvalue()
637        sender = re.compile("^sender: [email protected]$", re.MULTILINE)
638        self.assertRegex(debugout, sender)
639        for addr in ('John', 'Dinsdale'):
640            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
641                                 re.MULTILINE)
642            self.assertRegex(debugout, to_addr)
643
644    def testSendMessageResent(self):
645        m = email.mime.text.MIMEText('A test message')
646        m['From'] = '[email protected]'
647        m['To'] = 'John'
648        m['CC'] = 'Sally, Fred'
649        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>'
650        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
651        m['Resent-From'] = '[email protected]'
652        m['Resent-To'] = 'Martha <[email protected]>, Jeff'
653        m['Resent-Bcc'] = '[email protected]'
654        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
655                            timeout=support.LOOPBACK_TIMEOUT)
656        self.addCleanup(smtp.close)
657        smtp.send_message(m)
658        # XXX (see comment in testSend)
659        time.sleep(0.01)
660        smtp.quit()
661
662        self.client_evt.set()
663        self.serv_evt.wait()
664        self.output.flush()
665        # The Resent-Bcc headers are deleted before serialization.
666        del m['Bcc']
667        del m['Resent-Bcc']
668        # Remove the X-Peer header that DebuggingServer adds.
669        test_output = self.get_output_without_xpeer()
670        del m['X-Peer']
671        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
672        self.assertEqual(test_output, mexpect)
673        debugout = smtpd.DEBUGSTREAM.getvalue()
674        sender = re.compile("^sender: [email protected]$", re.MULTILINE)
675        self.assertRegex(debugout, sender)
676        for addr in ('[email protected]', 'Jeff', '[email protected]'):
677            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
678                                 re.MULTILINE)
679            self.assertRegex(debugout, to_addr)
680
681    def testSendMessageMultipleResentRaises(self):
682        m = email.mime.text.MIMEText('A test message')
683        m['From'] = '[email protected]'
684        m['To'] = 'John'
685        m['CC'] = 'Sally, Fred'
686        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>'
687        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
688        m['Resent-From'] = '[email protected]'
689        m['Resent-To'] = 'Martha <[email protected]>, Jeff'
690        m['Resent-Bcc'] = '[email protected]'
691        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
692        m['Resent-To'] = '[email protected]'
693        m['Resent-From'] = 'Martha <[email protected]>, Jeff'
694        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
695                            timeout=support.LOOPBACK_TIMEOUT)
696        self.addCleanup(smtp.close)
697        with self.assertRaises(ValueError):
698            smtp.send_message(m)
699        smtp.close()
700
701class NonConnectingTests(unittest.TestCase):
702
703    def testNotConnected(self):
704        # Test various operations on an unconnected SMTP object that
705        # should raise exceptions (at present the attempt in SMTP.send
706        # to reference the nonexistent 'sock' attribute of the SMTP object
707        # causes an AttributeError)
708        smtp = smtplib.SMTP()
709        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
710        self.assertRaises(smtplib.SMTPServerDisconnected,
711                          smtp.send, 'test msg')
712
713    def testNonnumericPort(self):
714        # check that non-numeric port raises OSError
715        self.assertRaises(OSError, smtplib.SMTP,
716                          "localhost", "bogus")
717        self.assertRaises(OSError, smtplib.SMTP,
718                          "localhost:bogus")
719
720    def testSockAttributeExists(self):
721        # check that sock attribute is present outside of a connect() call
722        # (regression test, the previous behavior raised an
723        #  AttributeError: 'SMTP' object has no attribute 'sock')
724        with smtplib.SMTP() as smtp:
725            self.assertIsNone(smtp.sock)
726
727
728class DefaultArgumentsTests(unittest.TestCase):
729
730    def setUp(self):
731        self.msg = EmailMessage()
732        self.msg['From'] = 'Páolo <fő[email protected]>'
733        self.smtp = smtplib.SMTP()
734        self.smtp.ehlo = Mock(return_value=(200, 'OK'))
735        self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
736
737    def testSendMessage(self):
738        expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
739        self.smtp.send_message(self.msg)
740        self.smtp.send_message(self.msg)
741        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
742                         expected_mail_options)
743        self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
744                         expected_mail_options)
745
746    def testSendMessageWithMailOptions(self):
747        mail_options = ['STARTTLS']
748        expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
749        self.smtp.send_message(self.msg, None, None, mail_options)
750        self.assertEqual(mail_options, ['STARTTLS'])
751        self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
752                         expected_mail_options)
753
754
755# test response of client to a non-successful HELO message
756class BadHELOServerTests(unittest.TestCase):
757
758    def setUp(self):
759        smtplib.socket = mock_socket
760        mock_socket.reply_with(b"199 no hello for you!")
761        self.old_stdout = sys.stdout
762        self.output = io.StringIO()
763        sys.stdout = self.output
764        self.port = 25
765
766    def tearDown(self):
767        smtplib.socket = socket
768        sys.stdout = self.old_stdout
769
770    def testFailingHELO(self):
771        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
772                            HOST, self.port, 'localhost', 3)
773
774
775class TooLongLineTests(unittest.TestCase):
776    respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
777
778    def setUp(self):
779        self.thread_key = threading_helper.threading_setup()
780        self.old_stdout = sys.stdout
781        self.output = io.StringIO()
782        sys.stdout = self.output
783
784        self.evt = threading.Event()
785        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
786        self.sock.settimeout(15)
787        self.port = socket_helper.bind_port(self.sock)
788        servargs = (self.evt, self.respdata, self.sock)
789        self.thread = threading.Thread(target=server, args=servargs)
790        self.thread.start()
791        self.evt.wait()
792        self.evt.clear()
793
794    def tearDown(self):
795        self.evt.wait()
796        sys.stdout = self.old_stdout
797        threading_helper.join_thread(self.thread)
798        del self.thread
799        self.doCleanups()
800        threading_helper.threading_cleanup(*self.thread_key)
801
802    def testLineTooLong(self):
803        self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
804                          HOST, self.port, 'localhost', 3)
805
806
807sim_users = {'[email protected]':'John A',
808             '[email protected]':'Sally B',
809             '[email protected]':'Ruth C',
810            }
811
812sim_auth = ('[email protected]', 'somepassword')
813sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
814                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
815sim_lists = {'list-1':['[email protected]','[email protected]'],
816             'list-2':['[email protected]',],
817            }
818
819# Simulated SMTP channel & server
820class ResponseException(Exception): pass
821class SimSMTPChannel(smtpd.SMTPChannel):
822
823    quit_response = None
824    mail_response = None
825    rcpt_response = None
826    data_response = None
827    rcpt_count = 0
828    rset_count = 0
829    disconnect = 0
830    AUTH = 99    # Add protocol state to enable auth testing.
831    authenticated_user = None
832
833    def __init__(self, extra_features, *args, **kw):
834        self._extrafeatures = ''.join(
835            [ "250-{0}\r\n".format(x) for x in extra_features ])
836        super(SimSMTPChannel, self).__init__(*args, **kw)
837
838    # AUTH related stuff.  It would be nice if support for this were in smtpd.
839    def found_terminator(self):
840        if self.smtp_state == self.AUTH:
841            line = self._emptystring.join(self.received_lines)
842            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
843            self.received_lines = []
844            try:
845                self.auth_object(line)
846            except ResponseException as e:
847                self.smtp_state = self.COMMAND
848                self.push('%s %s' % (e.smtp_code, e.smtp_error))
849            return
850        super().found_terminator()
851
852
853    def smtp_AUTH(self, arg):
854        if not self.seen_greeting:
855            self.push('503 Error: send EHLO first')
856            return
857        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
858            self.push('500 Error: command "AUTH" not recognized')
859            return
860        if self.authenticated_user is not None:
861            self.push(
862                '503 Bad sequence of commands: already authenticated')
863            return
864        args = arg.split()
865        if len(args) not in [1, 2]:
866            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
867            return
868        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
869        try:
870            self.auth_object = getattr(self, auth_object_name)
871        except AttributeError:
872            self.push('504 Command parameter not implemented: unsupported '
873                      ' authentication mechanism {!r}'.format(auth_object_name))
874            return
875        self.smtp_state = self.AUTH
876        self.auth_object(args[1] if len(args) == 2 else None)
877
878    def _authenticated(self, user, valid):
879        if valid:
880            self.authenticated_user = user
881            self.push('235 Authentication Succeeded')
882        else:
883            self.push('535 Authentication credentials invalid')
884        self.smtp_state = self.COMMAND
885
886    def _decode_base64(self, string):
887        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
888
889    def _auth_plain(self, arg=None):
890        if arg is None:
891            self.push('334 ')
892        else:
893            logpass = self._decode_base64(arg)
894            try:
895                *_, user, password = logpass.split('\0')
896            except ValueError as e:
897                self.push('535 Splitting response {!r} into user and password'
898                          ' failed: {}'.format(logpass, e))
899                return
900            self._authenticated(user, password == sim_auth[1])
901
902    def _auth_login(self, arg=None):
903        if arg is None:
904            # base64 encoded 'Username:'
905            self.push('334 VXNlcm5hbWU6')
906        elif not hasattr(self, '_auth_login_user'):
907            self._auth_login_user = self._decode_base64(arg)
908            # base64 encoded 'Password:'
909            self.push('334 UGFzc3dvcmQ6')
910        else:
911            password = self._decode_base64(arg)
912            self._authenticated(self._auth_login_user, password == sim_auth[1])
913            del self._auth_login_user
914
915    def _auth_buggy(self, arg=None):
916        # This AUTH mechanism will 'trap' client in a neverending 334
917        # base64 encoded 'BuGgYbUgGy'
918        self.push('334 QnVHZ1liVWdHeQ==')
919
920    def _auth_cram_md5(self, arg=None):
921        if arg is None:
922            self.push('334 {}'.format(sim_cram_md5_challenge))
923        else:
924            logpass = self._decode_base64(arg)
925            try:
926                user, hashed_pass = logpass.split()
927            except ValueError as e:
928                self.push('535 Splitting response {!r} into user and password '
929                          'failed: {}'.format(logpass, e))
930                return False
931            valid_hashed_pass = hmac.HMAC(
932                sim_auth[1].encode('ascii'),
933                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
934                'md5').hexdigest()
935            self._authenticated(user, hashed_pass == valid_hashed_pass)
936    # end AUTH related stuff.
937
938    def smtp_EHLO(self, arg):
939        resp = ('250-testhost\r\n'
940                '250-EXPN\r\n'
941                '250-SIZE 20000000\r\n'
942                '250-STARTTLS\r\n'
943                '250-DELIVERBY\r\n')
944        resp = resp + self._extrafeatures + '250 HELP'
945        self.push(resp)
946        self.seen_greeting = arg
947        self.extended_smtp = True
948
949    def smtp_VRFY(self, arg):
950        # For max compatibility smtplib should be sending the raw address.
951        if arg in sim_users:
952            self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg)))
953        else:
954            self.push('550 No such user: %s' % arg)
955
956    def smtp_EXPN(self, arg):
957        list_name = arg.lower()
958        if list_name in sim_lists:
959            user_list = sim_lists[list_name]
960            for n, user_email in enumerate(user_list):
961                quoted_addr = smtplib.quoteaddr(user_email)
962                if n < len(user_list) - 1:
963                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
964                else:
965                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
966        else:
967            self.push('550 No access for you!')
968
969    def smtp_QUIT(self, arg):
970        if self.quit_response is None:
971            super(SimSMTPChannel, self).smtp_QUIT(arg)
972        else:
973            self.push(self.quit_response)
974            self.close_when_done()
975
976    def smtp_MAIL(self, arg):
977        if self.mail_response is None:
978            super().smtp_MAIL(arg)
979        else:
980            self.push(self.mail_response)
981            if self.disconnect:
982                self.close_when_done()
983
984    def smtp_RCPT(self, arg):
985        if self.rcpt_response is None:
986            super().smtp_RCPT(arg)
987            return
988        self.rcpt_count += 1
989        self.push(self.rcpt_response[self.rcpt_count-1])
990
991    def smtp_RSET(self, arg):
992        self.rset_count += 1
993        super().smtp_RSET(arg)
994
995    def smtp_DATA(self, arg):
996        if self.data_response is None:
997            super().smtp_DATA(arg)
998        else:
999            self.push(self.data_response)
1000
1001    def handle_error(self):
1002        raise
1003
1004
1005class SimSMTPServer(smtpd.SMTPServer):
1006
1007    channel_class = SimSMTPChannel
1008
1009    def __init__(self, *args, **kw):
1010        self._extra_features = []
1011        self._addresses = {}
1012        smtpd.SMTPServer.__init__(self, *args, **kw)
1013
1014    def handle_accepted(self, conn, addr):
1015        self._SMTPchannel = self.channel_class(
1016            self._extra_features, self, conn, addr,
1017            decode_data=self._decode_data)
1018
1019    def process_message(self, peer, mailfrom, rcpttos, data):
1020        self._addresses['from'] = mailfrom
1021        self._addresses['tos'] = rcpttos
1022
1023    def add_feature(self, feature):
1024        self._extra_features.append(feature)
1025
1026    def handle_error(self):
1027        raise
1028
1029
1030# Test various SMTP & ESMTP commands/behaviors that require a simulated server
1031# (i.e., something with more features than DebuggingServer)
1032class SMTPSimTests(unittest.TestCase):
1033
1034    def setUp(self):
1035        self.thread_key = threading_helper.threading_setup()
1036        self.real_getfqdn = socket.getfqdn
1037        socket.getfqdn = mock_socket.getfqdn
1038        self.serv_evt = threading.Event()
1039        self.client_evt = threading.Event()
1040        # Pick a random unused port by passing 0 for the port number
1041        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True)
1042        # Keep a note of what port was assigned
1043        self.port = self.serv.socket.getsockname()[1]
1044        serv_args = (self.serv, self.serv_evt, self.client_evt)
1045        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1046        self.thread.start()
1047
1048        # wait until server thread has assigned a port number
1049        self.serv_evt.wait()
1050        self.serv_evt.clear()
1051
1052    def tearDown(self):
1053        socket.getfqdn = self.real_getfqdn
1054        # indicate that the client is finished
1055        self.client_evt.set()
1056        # wait for the server thread to terminate
1057        self.serv_evt.wait()
1058        threading_helper.join_thread(self.thread)
1059        del self.thread
1060        self.doCleanups()
1061        threading_helper.threading_cleanup(*self.thread_key)
1062
1063    def testBasic(self):
1064        # smoke test
1065        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1066                            timeout=support.LOOPBACK_TIMEOUT)
1067        smtp.quit()
1068
1069    def testEHLO(self):
1070        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1071                            timeout=support.LOOPBACK_TIMEOUT)
1072
1073        # no features should be present before the EHLO
1074        self.assertEqual(smtp.esmtp_features, {})
1075
1076        # features expected from the test server
1077        expected_features = {'expn':'',
1078                             'size': '20000000',
1079                             'starttls': '',
1080                             'deliverby': '',
1081                             'help': '',
1082                             }
1083
1084        smtp.ehlo()
1085        self.assertEqual(smtp.esmtp_features, expected_features)
1086        for k in expected_features:
1087            self.assertTrue(smtp.has_extn(k))
1088        self.assertFalse(smtp.has_extn('unsupported-feature'))
1089        smtp.quit()
1090
1091    def testVRFY(self):
1092        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1093                            timeout=support.LOOPBACK_TIMEOUT)
1094
1095        for addr_spec, name in sim_users.items():
1096            expected_known = (250, bytes('%s %s' %
1097                                         (name, smtplib.quoteaddr(addr_spec)),
1098                                         "ascii"))
1099            self.assertEqual(smtp.vrfy(addr_spec), expected_known)
1100
1101        u = '[email protected]'
1102        expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
1103        self.assertEqual(smtp.vrfy(u), expected_unknown)
1104        smtp.quit()
1105
1106    def testEXPN(self):
1107        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1108                            timeout=support.LOOPBACK_TIMEOUT)
1109
1110        for listname, members in sim_lists.items():
1111            users = []
1112            for m in members:
1113                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
1114            expected_known = (250, bytes('\n'.join(users), "ascii"))
1115            self.assertEqual(smtp.expn(listname), expected_known)
1116
1117        u = 'PSU-Members-List'
1118        expected_unknown = (550, b'No access for you!')
1119        self.assertEqual(smtp.expn(u), expected_unknown)
1120        smtp.quit()
1121
1122    def testAUTH_PLAIN(self):
1123        self.serv.add_feature("AUTH PLAIN")
1124        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1125                            timeout=support.LOOPBACK_TIMEOUT)
1126        resp = smtp.login(sim_auth[0], sim_auth[1])
1127        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1128        smtp.close()
1129
1130    def testAUTH_LOGIN(self):
1131        self.serv.add_feature("AUTH LOGIN")
1132        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1133                            timeout=support.LOOPBACK_TIMEOUT)
1134        resp = smtp.login(sim_auth[0], sim_auth[1])
1135        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1136        smtp.close()
1137
1138    def testAUTH_LOGIN_initial_response_ok(self):
1139        self.serv.add_feature("AUTH LOGIN")
1140        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1141                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1142            smtp.user, smtp.password = sim_auth
1143            smtp.ehlo("test_auth_login")
1144            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True)
1145            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1146
1147    def testAUTH_LOGIN_initial_response_notok(self):
1148        self.serv.add_feature("AUTH LOGIN")
1149        with smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1150                          timeout=support.LOOPBACK_TIMEOUT) as smtp:
1151            smtp.user, smtp.password = sim_auth
1152            smtp.ehlo("test_auth_login")
1153            resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False)
1154            self.assertEqual(resp, (235, b'Authentication Succeeded'))
1155
1156    def testAUTH_BUGGY(self):
1157        self.serv.add_feature("AUTH BUGGY")
1158
1159        def auth_buggy(challenge=None):
1160            self.assertEqual(b"BuGgYbUgGy", challenge)
1161            return "\0"
1162
1163        smtp = smtplib.SMTP(
1164            HOST, self.port, local_hostname='localhost',
1165            timeout=support.LOOPBACK_TIMEOUT
1166        )
1167        try:
1168            smtp.user, smtp.password = sim_auth
1169            smtp.ehlo("test_auth_buggy")
1170            expect = r"^Server AUTH mechanism infinite loop.*"
1171            with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm:
1172                smtp.auth("BUGGY", auth_buggy, initial_response_ok=False)
1173        finally:
1174            smtp.close()
1175
1176    @hashlib_helper.requires_hashdigest('md5', openssl=True)
1177    def testAUTH_CRAM_MD5(self):
1178        self.serv.add_feature("AUTH CRAM-MD5")
1179        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1180                            timeout=support.LOOPBACK_TIMEOUT)
1181        resp = smtp.login(sim_auth[0], sim_auth[1])
1182        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1183        smtp.close()
1184
1185    @hashlib_helper.requires_hashdigest('md5', openssl=True)
1186    def testAUTH_multiple(self):
1187        # Test that multiple authentication methods are tried.
1188        self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
1189        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1190                            timeout=support.LOOPBACK_TIMEOUT)
1191        resp = smtp.login(sim_auth[0], sim_auth[1])
1192        self.assertEqual(resp, (235, b'Authentication Succeeded'))
1193        smtp.close()
1194
1195    def test_auth_function(self):
1196        supported = {'PLAIN', 'LOGIN'}
1197        try:
1198            hashlib.md5()
1199        except ValueError:
1200            pass
1201        else:
1202            supported.add('CRAM-MD5')
1203        for mechanism in supported:
1204            self.serv.add_feature("AUTH {}".format(mechanism))
1205        for mechanism in supported:
1206            with self.subTest(mechanism=mechanism):
1207                smtp = smtplib.SMTP(HOST, self.port,
1208                                    local_hostname='localhost',
1209                                    timeout=support.LOOPBACK_TIMEOUT)
1210                smtp.ehlo('foo')
1211                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
1212                method = 'auth_' + mechanism.lower().replace('-', '_')
1213                resp = smtp.auth(mechanism, getattr(smtp, method))
1214                self.assertEqual(resp, (235, b'Authentication Succeeded'))
1215                smtp.close()
1216
1217    def test_quit_resets_greeting(self):
1218        smtp = smtplib.SMTP(HOST, self.port,
1219                            local_hostname='localhost',
1220                            timeout=support.LOOPBACK_TIMEOUT)
1221        code, message = smtp.ehlo()
1222        self.assertEqual(code, 250)
1223        self.assertIn('size', smtp.esmtp_features)
1224        smtp.quit()
1225        self.assertNotIn('size', smtp.esmtp_features)
1226        smtp.connect(HOST, self.port)
1227        self.assertNotIn('size', smtp.esmtp_features)
1228        smtp.ehlo_or_helo_if_needed()
1229        self.assertIn('size', smtp.esmtp_features)
1230        smtp.quit()
1231
1232    def test_with_statement(self):
1233        with smtplib.SMTP(HOST, self.port) as smtp:
1234            code, message = smtp.noop()
1235            self.assertEqual(code, 250)
1236        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1237        with smtplib.SMTP(HOST, self.port) as smtp:
1238            smtp.close()
1239        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
1240
1241    def test_with_statement_QUIT_failure(self):
1242        with self.assertRaises(smtplib.SMTPResponseException) as error:
1243            with smtplib.SMTP(HOST, self.port) as smtp:
1244                smtp.noop()
1245                self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
1246        self.assertEqual(error.exception.smtp_code, 421)
1247        self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
1248
1249    #TODO: add tests for correct AUTH method fallback now that the
1250    #test infrastructure can support it.
1251
1252    # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
1253    def test__rest_from_mail_cmd(self):
1254        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1255                            timeout=support.LOOPBACK_TIMEOUT)
1256        smtp.noop()
1257        self.serv._SMTPchannel.mail_response = '451 Requested action aborted'
1258        self.serv._SMTPchannel.disconnect = True
1259        with self.assertRaises(smtplib.SMTPSenderRefused):
1260            smtp.sendmail('John', 'Sally', 'test message')
1261        self.assertIsNone(smtp.sock)
1262
1263    # Issue 5713: make sure close, not rset, is called if we get a 421 error
1264    def test_421_from_mail_cmd(self):
1265        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1266                            timeout=support.LOOPBACK_TIMEOUT)
1267        smtp.noop()
1268        self.serv._SMTPchannel.mail_response = '421 closing connection'
1269        with self.assertRaises(smtplib.SMTPSenderRefused):
1270            smtp.sendmail('John', 'Sally', 'test message')
1271        self.assertIsNone(smtp.sock)
1272        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1273
1274    def test_421_from_rcpt_cmd(self):
1275        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1276                            timeout=support.LOOPBACK_TIMEOUT)
1277        smtp.noop()
1278        self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing']
1279        with self.assertRaises(smtplib.SMTPRecipientsRefused) as r:
1280            smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message')
1281        self.assertIsNone(smtp.sock)
1282        self.assertEqual(self.serv._SMTPchannel.rset_count, 0)
1283        self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')})
1284
1285    def test_421_from_data_cmd(self):
1286        class MySimSMTPChannel(SimSMTPChannel):
1287            def found_terminator(self):
1288                if self.smtp_state == self.DATA:
1289                    self.push('421 closing')
1290                else:
1291                    super().found_terminator()
1292        self.serv.channel_class = MySimSMTPChannel
1293        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1294                            timeout=support.LOOPBACK_TIMEOUT)
1295        smtp.noop()
1296        with self.assertRaises(smtplib.SMTPDataError):
1297            smtp.sendmail('[email protected]', ['[email protected]'], 'test message')
1298        self.assertIsNone(smtp.sock)
1299        self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
1300
1301    def test_smtputf8_NotSupportedError_if_no_server_support(self):
1302        smtp = smtplib.SMTP(
1303            HOST, self.port, local_hostname='localhost',
1304            timeout=support.LOOPBACK_TIMEOUT)
1305        self.addCleanup(smtp.close)
1306        smtp.ehlo()
1307        self.assertTrue(smtp.does_esmtp)
1308        self.assertFalse(smtp.has_extn('smtputf8'))
1309        self.assertRaises(
1310            smtplib.SMTPNotSupportedError,
1311            smtp.sendmail,
1312            'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1313        self.assertRaises(
1314            smtplib.SMTPNotSupportedError,
1315            smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
1316
1317    def test_send_unicode_without_SMTPUTF8(self):
1318        smtp = smtplib.SMTP(
1319            HOST, self.port, local_hostname='localhost',
1320            timeout=support.LOOPBACK_TIMEOUT)
1321        self.addCleanup(smtp.close)
1322        self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
1323        self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
1324
1325    def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
1326        # This test is located here and not in the SMTPUTF8SimTests
1327        # class because it needs a "regular" SMTP server to work
1328        msg = EmailMessage()
1329        msg['From'] = "Páolo <fő[email protected]>"
1330        msg['To'] = 'Dinsdale'
1331        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1332        smtp = smtplib.SMTP(
1333            HOST, self.port, local_hostname='localhost',
1334            timeout=support.LOOPBACK_TIMEOUT)
1335        self.addCleanup(smtp.close)
1336        with self.assertRaises(smtplib.SMTPNotSupportedError):
1337            smtp.send_message(msg)
1338
1339    def test_name_field_not_included_in_envelop_addresses(self):
1340        smtp = smtplib.SMTP(
1341            HOST, self.port, local_hostname='localhost',
1342            timeout=support.LOOPBACK_TIMEOUT)
1343        self.addCleanup(smtp.close)
1344
1345        message = EmailMessage()
1346        message['From'] = email.utils.formataddr(('Michaël', '[email protected]'))
1347        message['To'] = email.utils.formataddr(('René', '[email protected]'))
1348
1349        self.assertDictEqual(smtp.send_message(message), {})
1350
1351        self.assertEqual(self.serv._addresses['from'], '[email protected]')
1352        self.assertEqual(self.serv._addresses['tos'], ['[email protected]'])
1353
1354
1355class SimSMTPUTF8Server(SimSMTPServer):
1356
1357    def __init__(self, *args, **kw):
1358        # The base SMTP server turns these on automatically, but our test
1359        # server is set up to munge the EHLO response, so we need to provide
1360        # them as well.  And yes, the call is to SMTPServer not SimSMTPServer.
1361        self._extra_features = ['SMTPUTF8', '8BITMIME']
1362        smtpd.SMTPServer.__init__(self, *args, **kw)
1363
1364    def handle_accepted(self, conn, addr):
1365        self._SMTPchannel = self.channel_class(
1366            self._extra_features, self, conn, addr,
1367            decode_data=self._decode_data,
1368            enable_SMTPUTF8=self.enable_SMTPUTF8,
1369        )
1370
1371    def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
1372                                                             rcpt_options=None):
1373        self.last_peer = peer
1374        self.last_mailfrom = mailfrom
1375        self.last_rcpttos = rcpttos
1376        self.last_message = data
1377        self.last_mail_options = mail_options
1378        self.last_rcpt_options = rcpt_options
1379
1380
1381class SMTPUTF8SimTests(unittest.TestCase):
1382
1383    maxDiff = None
1384
1385    def setUp(self):
1386        self.thread_key = threading_helper.threading_setup()
1387        self.real_getfqdn = socket.getfqdn
1388        socket.getfqdn = mock_socket.getfqdn
1389        self.serv_evt = threading.Event()
1390        self.client_evt = threading.Event()
1391        # Pick a random unused port by passing 0 for the port number
1392        self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
1393                                      decode_data=False,
1394                                      enable_SMTPUTF8=True)
1395        # Keep a note of what port was assigned
1396        self.port = self.serv.socket.getsockname()[1]
1397        serv_args = (self.serv, self.serv_evt, self.client_evt)
1398        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1399        self.thread.start()
1400
1401        # wait until server thread has assigned a port number
1402        self.serv_evt.wait()
1403        self.serv_evt.clear()
1404
1405    def tearDown(self):
1406        socket.getfqdn = self.real_getfqdn
1407        # indicate that the client is finished
1408        self.client_evt.set()
1409        # wait for the server thread to terminate
1410        self.serv_evt.wait()
1411        threading_helper.join_thread(self.thread)
1412        del self.thread
1413        self.doCleanups()
1414        threading_helper.threading_cleanup(*self.thread_key)
1415
1416    def test_test_server_supports_extensions(self):
1417        smtp = smtplib.SMTP(
1418            HOST, self.port, local_hostname='localhost',
1419            timeout=support.LOOPBACK_TIMEOUT)
1420        self.addCleanup(smtp.close)
1421        smtp.ehlo()
1422        self.assertTrue(smtp.does_esmtp)
1423        self.assertTrue(smtp.has_extn('smtputf8'))
1424
1425    def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
1426        m = '¡a test message containing unicode!'.encode('utf-8')
1427        smtp = smtplib.SMTP(
1428            HOST, self.port, local_hostname='localhost',
1429            timeout=support.LOOPBACK_TIMEOUT)
1430        self.addCleanup(smtp.close)
1431        smtp.sendmail('Jőhn', 'Sálly', m,
1432                      mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
1433        self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
1434        self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
1435        self.assertEqual(self.serv.last_message, m)
1436        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1437        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1438        self.assertEqual(self.serv.last_rcpt_options, [])
1439
1440    def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
1441        m = '¡a test message containing unicode!'.encode('utf-8')
1442        smtp = smtplib.SMTP(
1443            HOST, self.port, local_hostname='localhost',
1444            timeout=support.LOOPBACK_TIMEOUT)
1445        self.addCleanup(smtp.close)
1446        smtp.ehlo()
1447        self.assertEqual(
1448            smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']),
1449            (250, b'OK'))
1450        self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
1451        self.assertEqual(smtp.data(m), (250, b'OK'))
1452        self.assertEqual(self.serv.last_mailfrom, 'Jő')
1453        self.assertEqual(self.serv.last_rcpttos, ['János'])
1454        self.assertEqual(self.serv.last_message, m)
1455        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1456        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1457        self.assertEqual(self.serv.last_rcpt_options, [])
1458
1459    def test_send_message_uses_smtputf8_if_addrs_non_ascii(self):
1460        msg = EmailMessage()
1461        msg['From'] = "Páolo <fő[email protected]>"
1462        msg['To'] = 'Dinsdale'
1463        msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
1464        # XXX I don't know why I need two \n's here, but this is an existing
1465        # bug (if it is one) and not a problem with the new functionality.
1466        msg.set_content("oh là là, know what I mean, know what I mean?\n\n")
1467        # XXX smtpd converts received /r/n to /n, so we can't easily test that
1468        # we are successfully sending /r/n :(.
1469        expected = textwrap.dedent("""\
1470            From: Páolo <fő[email protected]>
1471            To: Dinsdale
1472            Subject: Nudge nudge, wink, wink \u1F609
1473            Content-Type: text/plain; charset="utf-8"
1474            Content-Transfer-Encoding: 8bit
1475            MIME-Version: 1.0
1476
1477            oh là là, know what I mean, know what I mean?
1478            """)
1479        smtp = smtplib.SMTP(
1480            HOST, self.port, local_hostname='localhost',
1481            timeout=support.LOOPBACK_TIMEOUT)
1482        self.addCleanup(smtp.close)
1483        self.assertEqual(smtp.send_message(msg), {})
1484        self.assertEqual(self.serv.last_mailfrom, 'fő[email protected]')
1485        self.assertEqual(self.serv.last_rcpttos, ['Dinsdale'])
1486        self.assertEqual(self.serv.last_message.decode(), expected)
1487        self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
1488        self.assertIn('SMTPUTF8', self.serv.last_mail_options)
1489        self.assertEqual(self.serv.last_rcpt_options, [])
1490
1491
1492EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
1493
1494class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
1495    def smtp_AUTH(self, arg):
1496        # RFC 4954's AUTH command allows for an optional initial-response.
1497        # Not all AUTH methods support this; some require a challenge.  AUTH
1498        # PLAIN does those, so test that here.  See issue #15014.
1499        args = arg.split()
1500        if args[0].lower() == 'plain':
1501            if len(args) == 2:
1502                # AUTH PLAIN <initial-response> with the response base 64
1503                # encoded.  Hard code the expected response for the test.
1504                if args[1] == EXPECTED_RESPONSE:
1505                    self.push('235 Ok')
1506                    return
1507        self.push('571 Bad authentication')
1508
1509class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
1510    channel_class = SimSMTPAUTHInitialResponseChannel
1511
1512
1513class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
1514    def setUp(self):
1515        self.thread_key = threading_helper.threading_setup()
1516        self.real_getfqdn = socket.getfqdn
1517        socket.getfqdn = mock_socket.getfqdn
1518        self.serv_evt = threading.Event()
1519        self.client_evt = threading.Event()
1520        # Pick a random unused port by passing 0 for the port number
1521        self.serv = SimSMTPAUTHInitialResponseServer(
1522            (HOST, 0), ('nowhere', -1), decode_data=True)
1523        # Keep a note of what port was assigned
1524        self.port = self.serv.socket.getsockname()[1]
1525        serv_args = (self.serv, self.serv_evt, self.client_evt)
1526        self.thread = threading.Thread(target=debugging_server, args=serv_args)
1527        self.thread.start()
1528
1529        # wait until server thread has assigned a port number
1530        self.serv_evt.wait()
1531        self.serv_evt.clear()
1532
1533    def tearDown(self):
1534        socket.getfqdn = self.real_getfqdn
1535        # indicate that the client is finished
1536        self.client_evt.set()
1537        # wait for the server thread to terminate
1538        self.serv_evt.wait()
1539        threading_helper.join_thread(self.thread)
1540        del self.thread
1541        self.doCleanups()
1542        threading_helper.threading_cleanup(*self.thread_key)
1543
1544    def testAUTH_PLAIN_initial_response_login(self):
1545        self.serv.add_feature('AUTH PLAIN')
1546        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1547                            timeout=support.LOOPBACK_TIMEOUT)
1548        smtp.login('psu', 'doesnotexist')
1549        smtp.close()
1550
1551    def testAUTH_PLAIN_initial_response_auth(self):
1552        self.serv.add_feature('AUTH PLAIN')
1553        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost',
1554                            timeout=support.LOOPBACK_TIMEOUT)
1555        smtp.user = 'psu'
1556        smtp.password = 'doesnotexist'
1557        code, response = smtp.auth('plain', smtp.auth_plain)
1558        smtp.close()
1559        self.assertEqual(code, 235)
1560
1561
1562if __name__ == '__main__':
1563    unittest.main()
1564