1import base64 2import email.mime.text 3from email.message import EmailMessage 4from email.base64mime import body_encode as encode_base64 5import email.utils 6import hashlib 7import hmac 8import socket 9import smtplib 10import io 11import re 12import sys 13import time 14import select 15import errno 16import textwrap 17import threading 18 19import unittest 20from test import support, mock_socket 21from test.support import hashlib_helper 22from test.support import socket_helper 23from test.support import threading_helper 24from test.support import warnings_helper 25from unittest.mock import Mock 26 27 28asyncore = warnings_helper.import_deprecated('asyncore') 29smtpd = warnings_helper.import_deprecated('smtpd') 30 31 32support.requires_working_socket(module=True) 33 34HOST = socket_helper.HOST 35 36if sys.platform == 'darwin': 37 # select.poll returns a select.POLLHUP at the end of the tests 38 # on darwin, so just ignore it 39 def handle_expt(self): 40 pass 41 smtpd.SMTPChannel.handle_expt = handle_expt 42 43 44def server(evt, buf, serv): 45 serv.listen() 46 evt.set() 47 try: 48 conn, addr = serv.accept() 49 except TimeoutError: 50 pass 51 else: 52 n = 500 53 while buf and n > 0: 54 r, w, e = select.select([], [conn], []) 55 if w: 56 sent = conn.send(buf) 57 buf = buf[sent:] 58 59 n -= 1 60 61 conn.close() 62 finally: 63 serv.close() 64 evt.set() 65 66class GeneralTests: 67 68 def setUp(self): 69 smtplib.socket = mock_socket 70 self.port = 25 71 72 def tearDown(self): 73 smtplib.socket = socket 74 75 # This method is no longer used but is retained for backward compatibility, 76 # so test to make sure it still works. 77 def testQuoteData(self): 78 teststr = "abc\n.jkl\rfoo\r\n..blue" 79 expected = "abc\r\n..jkl\r\nfoo\r\n...blue" 80 self.assertEqual(expected, smtplib.quotedata(teststr)) 81 82 def testBasic1(self): 83 mock_socket.reply_with(b"220 Hola mundo") 84 # connects 85 client = self.client(HOST, self.port) 86 client.close() 87 88 def testSourceAddress(self): 89 mock_socket.reply_with(b"220 Hola mundo") 90 # connects 91 client = self.client(HOST, self.port, 92 source_address=('127.0.0.1',19876)) 93 self.assertEqual(client.source_address, ('127.0.0.1', 19876)) 94 client.close() 95 96 def testBasic2(self): 97 mock_socket.reply_with(b"220 Hola mundo") 98 # connects, include port in host name 99 client = self.client("%s:%s" % (HOST, self.port)) 100 client.close() 101 102 def testLocalHostName(self): 103 mock_socket.reply_with(b"220 Hola mundo") 104 # check that supplied local_hostname is used 105 client = self.client(HOST, self.port, local_hostname="testhost") 106 self.assertEqual(client.local_hostname, "testhost") 107 client.close() 108 109 def testTimeoutDefault(self): 110 mock_socket.reply_with(b"220 Hola mundo") 111 self.assertIsNone(mock_socket.getdefaulttimeout()) 112 mock_socket.setdefaulttimeout(30) 113 self.assertEqual(mock_socket.getdefaulttimeout(), 30) 114 try: 115 client = self.client(HOST, self.port) 116 finally: 117 mock_socket.setdefaulttimeout(None) 118 self.assertEqual(client.sock.gettimeout(), 30) 119 client.close() 120 121 def testTimeoutNone(self): 122 mock_socket.reply_with(b"220 Hola mundo") 123 self.assertIsNone(socket.getdefaulttimeout()) 124 socket.setdefaulttimeout(30) 125 try: 126 client = self.client(HOST, self.port, timeout=None) 127 finally: 128 socket.setdefaulttimeout(None) 129 self.assertIsNone(client.sock.gettimeout()) 130 client.close() 131 132 def testTimeoutZero(self): 133 mock_socket.reply_with(b"220 Hola mundo") 134 with self.assertRaises(ValueError): 135 self.client(HOST, self.port, timeout=0) 136 137 def testTimeoutValue(self): 138 mock_socket.reply_with(b"220 Hola mundo") 139 client = self.client(HOST, self.port, timeout=30) 140 self.assertEqual(client.sock.gettimeout(), 30) 141 client.close() 142 143 def test_debuglevel(self): 144 mock_socket.reply_with(b"220 Hello world") 145 client = self.client() 146 client.set_debuglevel(1) 147 with support.captured_stderr() as stderr: 148 client.connect(HOST, self.port) 149 client.close() 150 expected = re.compile(r"^connect:", re.MULTILINE) 151 self.assertRegex(stderr.getvalue(), expected) 152 153 def test_debuglevel_2(self): 154 mock_socket.reply_with(b"220 Hello world") 155 client = self.client() 156 client.set_debuglevel(2) 157 with support.captured_stderr() as stderr: 158 client.connect(HOST, self.port) 159 client.close() 160 expected = re.compile(r"^\d{2}:\d{2}:\d{2}\.\d{6} connect: ", 161 re.MULTILINE) 162 self.assertRegex(stderr.getvalue(), expected) 163 164 165class SMTPGeneralTests(GeneralTests, unittest.TestCase): 166 167 client = smtplib.SMTP 168 169 170class LMTPGeneralTests(GeneralTests, unittest.TestCase): 171 172 client = smtplib.LMTP 173 174 @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "test requires Unix domain socket") 175 def testUnixDomainSocketTimeoutDefault(self): 176 local_host = '/some/local/lmtp/delivery/program' 177 mock_socket.reply_with(b"220 Hello world") 178 try: 179 client = self.client(local_host, self.port) 180 finally: 181 mock_socket.setdefaulttimeout(None) 182 self.assertIsNone(client.sock.gettimeout()) 183 client.close() 184 185 def testTimeoutZero(self): 186 super().testTimeoutZero() 187 local_host = '/some/local/lmtp/delivery/program' 188 with self.assertRaises(ValueError): 189 self.client(local_host, timeout=0) 190 191# Test server thread using the specified SMTP server class 192def debugging_server(serv, serv_evt, client_evt): 193 serv_evt.set() 194 195 try: 196 if hasattr(select, 'poll'): 197 poll_fun = asyncore.poll2 198 else: 199 poll_fun = asyncore.poll 200 201 n = 1000 202 while asyncore.socket_map and n > 0: 203 poll_fun(0.01, asyncore.socket_map) 204 205 # when the client conversation is finished, it will 206 # set client_evt, and it's then ok to kill the server 207 if client_evt.is_set(): 208 serv.close() 209 break 210 211 n -= 1 212 213 except TimeoutError: 214 pass 215 finally: 216 if not client_evt.is_set(): 217 # allow some time for the client to read the result 218 time.sleep(0.5) 219 serv.close() 220 asyncore.close_all() 221 serv_evt.set() 222 223MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' 224MSG_END = '------------ END MESSAGE ------------\n' 225 226# NOTE: Some SMTP objects in the tests below are created with a non-default 227# local_hostname argument to the constructor, since (on some systems) the FQDN 228# lookup caused by the default local_hostname sometimes takes so long that the 229# test server times out, causing the test to fail. 230 231# Test behavior of smtpd.DebuggingServer 232class DebuggingServerTests(unittest.TestCase): 233 234 maxDiff = None 235 236 def setUp(self): 237 self.thread_key = threading_helper.threading_setup() 238 self.real_getfqdn = socket.getfqdn 239 socket.getfqdn = mock_socket.getfqdn 240 # temporarily replace sys.stdout to capture DebuggingServer output 241 self.old_stdout = sys.stdout 242 self.output = io.StringIO() 243 sys.stdout = self.output 244 245 self.serv_evt = threading.Event() 246 self.client_evt = threading.Event() 247 # Capture SMTPChannel debug output 248 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM 249 smtpd.DEBUGSTREAM = io.StringIO() 250 # Pick a random unused port by passing 0 for the port number 251 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1), 252 decode_data=True) 253 # Keep a note of what server host and port were assigned 254 self.host, self.port = self.serv.socket.getsockname()[:2] 255 serv_args = (self.serv, self.serv_evt, self.client_evt) 256 self.thread = threading.Thread(target=debugging_server, args=serv_args) 257 self.thread.start() 258 259 # wait until server thread has assigned a port number 260 self.serv_evt.wait() 261 self.serv_evt.clear() 262 263 def tearDown(self): 264 socket.getfqdn = self.real_getfqdn 265 # indicate that the client is finished 266 self.client_evt.set() 267 # wait for the server thread to terminate 268 self.serv_evt.wait() 269 threading_helper.join_thread(self.thread) 270 # restore sys.stdout 271 sys.stdout = self.old_stdout 272 # restore DEBUGSTREAM 273 smtpd.DEBUGSTREAM.close() 274 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM 275 del self.thread 276 self.doCleanups() 277 threading_helper.threading_cleanup(*self.thread_key) 278 279 def get_output_without_xpeer(self): 280 test_output = self.output.getvalue() 281 return re.sub(r'(.*?)^X-Peer:\s*\S+\n(.*)', r'\1\2', 282 test_output, flags=re.MULTILINE|re.DOTALL) 283 284 def testBasic(self): 285 # connect 286 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 287 timeout=support.LOOPBACK_TIMEOUT) 288 smtp.quit() 289 290 def testSourceAddress(self): 291 # connect 292 src_port = socket_helper.find_unused_port() 293 try: 294 smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost', 295 timeout=support.LOOPBACK_TIMEOUT, 296 source_address=(self.host, src_port)) 297 self.addCleanup(smtp.close) 298 self.assertEqual(smtp.source_address, (self.host, src_port)) 299 self.assertEqual(smtp.local_hostname, 'localhost') 300 smtp.quit() 301 except OSError as e: 302 if e.errno == errno.EADDRINUSE: 303 self.skipTest("couldn't bind to source port %d" % src_port) 304 raise 305 306 def testNOOP(self): 307 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 308 timeout=support.LOOPBACK_TIMEOUT) 309 self.addCleanup(smtp.close) 310 expected = (250, b'OK') 311 self.assertEqual(smtp.noop(), expected) 312 smtp.quit() 313 314 def testRSET(self): 315 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 316 timeout=support.LOOPBACK_TIMEOUT) 317 self.addCleanup(smtp.close) 318 expected = (250, b'OK') 319 self.assertEqual(smtp.rset(), expected) 320 smtp.quit() 321 322 def testELHO(self): 323 # EHLO isn't implemented in DebuggingServer 324 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 325 timeout=support.LOOPBACK_TIMEOUT) 326 self.addCleanup(smtp.close) 327 expected = (250, b'\nSIZE 33554432\nHELP') 328 self.assertEqual(smtp.ehlo(), expected) 329 smtp.quit() 330 331 def testEXPNNotImplemented(self): 332 # EXPN isn't implemented in DebuggingServer 333 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 334 timeout=support.LOOPBACK_TIMEOUT) 335 self.addCleanup(smtp.close) 336 expected = (502, b'EXPN not implemented') 337 smtp.putcmd('EXPN') 338 self.assertEqual(smtp.getreply(), expected) 339 smtp.quit() 340 341 def test_issue43124_putcmd_escapes_newline(self): 342 # see: https://bugs.python.org/issue43124 343 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 344 timeout=support.LOOPBACK_TIMEOUT) 345 self.addCleanup(smtp.close) 346 with self.assertRaises(ValueError) as exc: 347 smtp.putcmd('helo\nX-INJECTED') 348 self.assertIn("prohibited newline characters", str(exc.exception)) 349 smtp.quit() 350 351 def testVRFY(self): 352 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 353 timeout=support.LOOPBACK_TIMEOUT) 354 self.addCleanup(smtp.close) 355 expected = (252, b'Cannot VRFY user, but will accept message ' + \ 356 b'and attempt delivery') 357 self.assertEqual(smtp.vrfy('[email protected]'), expected) 358 self.assertEqual(smtp.verify('[email protected]'), expected) 359 smtp.quit() 360 361 def testSecondHELO(self): 362 # check that a second HELO returns a message that it's a duplicate 363 # (this behavior is specific to smtpd.SMTPChannel) 364 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 365 timeout=support.LOOPBACK_TIMEOUT) 366 self.addCleanup(smtp.close) 367 smtp.helo() 368 expected = (503, b'Duplicate HELO/EHLO') 369 self.assertEqual(smtp.helo(), expected) 370 smtp.quit() 371 372 def testHELP(self): 373 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 374 timeout=support.LOOPBACK_TIMEOUT) 375 self.addCleanup(smtp.close) 376 self.assertEqual(smtp.help(), b'Supported commands: EHLO HELO MAIL ' + \ 377 b'RCPT DATA RSET NOOP QUIT VRFY') 378 smtp.quit() 379 380 def testSend(self): 381 # connect and send mail 382 m = 'A test message' 383 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 384 timeout=support.LOOPBACK_TIMEOUT) 385 self.addCleanup(smtp.close) 386 smtp.sendmail('John', 'Sally', m) 387 # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor 388 # in asyncore. This sleep might help, but should really be fixed 389 # properly by using an Event variable. 390 time.sleep(0.01) 391 smtp.quit() 392 393 self.client_evt.set() 394 self.serv_evt.wait() 395 self.output.flush() 396 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 397 self.assertEqual(self.output.getvalue(), mexpect) 398 399 def testSendBinary(self): 400 m = b'A test message' 401 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 402 timeout=support.LOOPBACK_TIMEOUT) 403 self.addCleanup(smtp.close) 404 smtp.sendmail('John', 'Sally', m) 405 # XXX (see comment in testSend) 406 time.sleep(0.01) 407 smtp.quit() 408 409 self.client_evt.set() 410 self.serv_evt.wait() 411 self.output.flush() 412 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END) 413 self.assertEqual(self.output.getvalue(), mexpect) 414 415 def testSendNeedingDotQuote(self): 416 # Issue 12283 417 m = '.A test\n.mes.sage.' 418 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 419 timeout=support.LOOPBACK_TIMEOUT) 420 self.addCleanup(smtp.close) 421 smtp.sendmail('John', 'Sally', m) 422 # XXX (see comment in testSend) 423 time.sleep(0.01) 424 smtp.quit() 425 426 self.client_evt.set() 427 self.serv_evt.wait() 428 self.output.flush() 429 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 430 self.assertEqual(self.output.getvalue(), mexpect) 431 432 def test_issue43124_escape_localhostname(self): 433 # see: https://bugs.python.org/issue43124 434 # connect and send mail 435 m = 'wazzuuup\nlinetwo' 436 smtp = smtplib.SMTP(HOST, self.port, local_hostname='hi\nX-INJECTED', 437 timeout=support.LOOPBACK_TIMEOUT) 438 self.addCleanup(smtp.close) 439 with self.assertRaises(ValueError) as exc: 440 smtp.sendmail("[email protected]", "[email protected]", m) 441 self.assertIn( 442 "prohibited newline characters: ehlo hi\\nX-INJECTED", 443 str(exc.exception), 444 ) 445 # XXX (see comment in testSend) 446 time.sleep(0.01) 447 smtp.quit() 448 449 debugout = smtpd.DEBUGSTREAM.getvalue() 450 self.assertNotIn("X-INJECTED", debugout) 451 452 def test_issue43124_escape_options(self): 453 # see: https://bugs.python.org/issue43124 454 # connect and send mail 455 m = 'wazzuuup\nlinetwo' 456 smtp = smtplib.SMTP( 457 HOST, self.port, local_hostname='localhost', 458 timeout=support.LOOPBACK_TIMEOUT) 459 460 self.addCleanup(smtp.close) 461 smtp.sendmail("[email protected]", "[email protected]", m) 462 with self.assertRaises(ValueError) as exc: 463 smtp.mail("[email protected]", ["X-OPTION\nX-INJECTED-1", "X-OPTION2\nX-INJECTED-2"]) 464 msg = str(exc.exception) 465 self.assertIn("prohibited newline characters", msg) 466 self.assertIn("X-OPTION\\nX-INJECTED-1 X-OPTION2\\nX-INJECTED-2", msg) 467 # XXX (see comment in testSend) 468 time.sleep(0.01) 469 smtp.quit() 470 471 debugout = smtpd.DEBUGSTREAM.getvalue() 472 self.assertNotIn("X-OPTION", debugout) 473 self.assertNotIn("X-OPTION2", debugout) 474 self.assertNotIn("X-INJECTED-1", debugout) 475 self.assertNotIn("X-INJECTED-2", debugout) 476 477 def testSendNullSender(self): 478 m = 'A test message' 479 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 480 timeout=support.LOOPBACK_TIMEOUT) 481 self.addCleanup(smtp.close) 482 smtp.sendmail('<>', 'Sally', m) 483 # XXX (see comment in testSend) 484 time.sleep(0.01) 485 smtp.quit() 486 487 self.client_evt.set() 488 self.serv_evt.wait() 489 self.output.flush() 490 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 491 self.assertEqual(self.output.getvalue(), mexpect) 492 debugout = smtpd.DEBUGSTREAM.getvalue() 493 sender = re.compile("^sender: <>$", re.MULTILINE) 494 self.assertRegex(debugout, sender) 495 496 def testSendMessage(self): 497 m = email.mime.text.MIMEText('A test message') 498 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 499 timeout=support.LOOPBACK_TIMEOUT) 500 self.addCleanup(smtp.close) 501 smtp.send_message(m, from_addr='John', to_addrs='Sally') 502 # XXX (see comment in testSend) 503 time.sleep(0.01) 504 smtp.quit() 505 506 self.client_evt.set() 507 self.serv_evt.wait() 508 self.output.flush() 509 # Remove the X-Peer header that DebuggingServer adds as figuring out 510 # exactly what IP address format is put there is not easy (and 511 # irrelevant to our test). Typically 127.0.0.1 or ::1, but it is 512 # not always the same as socket.gethostbyname(HOST). :( 513 test_output = self.get_output_without_xpeer() 514 del m['X-Peer'] 515 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 516 self.assertEqual(test_output, mexpect) 517 518 def testSendMessageWithAddresses(self): 519 m = email.mime.text.MIMEText('A test message') 520 m['From'] = '[email protected]' 521 m['To'] = 'John' 522 m['CC'] = 'Sally, Fred' 523 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>' 524 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 525 timeout=support.LOOPBACK_TIMEOUT) 526 self.addCleanup(smtp.close) 527 smtp.send_message(m) 528 # XXX (see comment in testSend) 529 time.sleep(0.01) 530 smtp.quit() 531 # make sure the Bcc header is still in the message. 532 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" ' 533 '<[email protected]>') 534 535 self.client_evt.set() 536 self.serv_evt.wait() 537 self.output.flush() 538 # Remove the X-Peer header that DebuggingServer adds. 539 test_output = self.get_output_without_xpeer() 540 del m['X-Peer'] 541 # The Bcc header should not be transmitted. 542 del m['Bcc'] 543 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 544 self.assertEqual(test_output, mexpect) 545 debugout = smtpd.DEBUGSTREAM.getvalue() 546 sender = re.compile("^sender: [email protected]$", re.MULTILINE) 547 self.assertRegex(debugout, sender) 548 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 549 '[email protected]'): 550 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 551 re.MULTILINE) 552 self.assertRegex(debugout, to_addr) 553 554 def testSendMessageWithSomeAddresses(self): 555 # Make sure nothing breaks if not all of the three 'to' headers exist 556 m = email.mime.text.MIMEText('A test message') 557 m['From'] = '[email protected]' 558 m['To'] = 'John, Dinsdale' 559 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 560 timeout=support.LOOPBACK_TIMEOUT) 561 self.addCleanup(smtp.close) 562 smtp.send_message(m) 563 # XXX (see comment in testSend) 564 time.sleep(0.01) 565 smtp.quit() 566 567 self.client_evt.set() 568 self.serv_evt.wait() 569 self.output.flush() 570 # Remove the X-Peer header that DebuggingServer adds. 571 test_output = self.get_output_without_xpeer() 572 del m['X-Peer'] 573 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 574 self.assertEqual(test_output, mexpect) 575 debugout = smtpd.DEBUGSTREAM.getvalue() 576 sender = re.compile("^sender: [email protected]$", re.MULTILINE) 577 self.assertRegex(debugout, sender) 578 for addr in ('John', 'Dinsdale'): 579 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 580 re.MULTILINE) 581 self.assertRegex(debugout, to_addr) 582 583 def testSendMessageWithSpecifiedAddresses(self): 584 # Make sure addresses specified in call override those in message. 585 m = email.mime.text.MIMEText('A test message') 586 m['From'] = '[email protected]' 587 m['To'] = 'John, Dinsdale' 588 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 589 timeout=support.LOOPBACK_TIMEOUT) 590 self.addCleanup(smtp.close) 591 smtp.send_message(m, from_addr='[email protected]', to_addrs='[email protected]') 592 # XXX (see comment in testSend) 593 time.sleep(0.01) 594 smtp.quit() 595 596 self.client_evt.set() 597 self.serv_evt.wait() 598 self.output.flush() 599 # Remove the X-Peer header that DebuggingServer adds. 600 test_output = self.get_output_without_xpeer() 601 del m['X-Peer'] 602 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 603 self.assertEqual(test_output, mexpect) 604 debugout = smtpd.DEBUGSTREAM.getvalue() 605 sender = re.compile("^sender: [email protected]$", re.MULTILINE) 606 self.assertRegex(debugout, sender) 607 for addr in ('John', 'Dinsdale'): 608 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 609 re.MULTILINE) 610 self.assertNotRegex(debugout, to_addr) 611 recip = re.compile(r"^recips: .*'[email protected]'.*$", re.MULTILINE) 612 self.assertRegex(debugout, recip) 613 614 def testSendMessageWithMultipleFrom(self): 615 # Sender overrides To 616 m = email.mime.text.MIMEText('A test message') 617 m['From'] = 'Bernard, Bianca' 618 m['Sender'] = '[email protected]' 619 m['To'] = 'John, Dinsdale' 620 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 621 timeout=support.LOOPBACK_TIMEOUT) 622 self.addCleanup(smtp.close) 623 smtp.send_message(m) 624 # XXX (see comment in testSend) 625 time.sleep(0.01) 626 smtp.quit() 627 628 self.client_evt.set() 629 self.serv_evt.wait() 630 self.output.flush() 631 # Remove the X-Peer header that DebuggingServer adds. 632 test_output = self.get_output_without_xpeer() 633 del m['X-Peer'] 634 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 635 self.assertEqual(test_output, mexpect) 636 debugout = smtpd.DEBUGSTREAM.getvalue() 637 sender = re.compile("^sender: [email protected]$", re.MULTILINE) 638 self.assertRegex(debugout, sender) 639 for addr in ('John', 'Dinsdale'): 640 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 641 re.MULTILINE) 642 self.assertRegex(debugout, to_addr) 643 644 def testSendMessageResent(self): 645 m = email.mime.text.MIMEText('A test message') 646 m['From'] = '[email protected]' 647 m['To'] = 'John' 648 m['CC'] = 'Sally, Fred' 649 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>' 650 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 651 m['Resent-From'] = '[email protected]' 652 m['Resent-To'] = 'Martha <[email protected]>, Jeff' 653 m['Resent-Bcc'] = '[email protected]' 654 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 655 timeout=support.LOOPBACK_TIMEOUT) 656 self.addCleanup(smtp.close) 657 smtp.send_message(m) 658 # XXX (see comment in testSend) 659 time.sleep(0.01) 660 smtp.quit() 661 662 self.client_evt.set() 663 self.serv_evt.wait() 664 self.output.flush() 665 # The Resent-Bcc headers are deleted before serialization. 666 del m['Bcc'] 667 del m['Resent-Bcc'] 668 # Remove the X-Peer header that DebuggingServer adds. 669 test_output = self.get_output_without_xpeer() 670 del m['X-Peer'] 671 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 672 self.assertEqual(test_output, mexpect) 673 debugout = smtpd.DEBUGSTREAM.getvalue() 674 sender = re.compile("^sender: [email protected]$", re.MULTILINE) 675 self.assertRegex(debugout, sender) 676 for addr in ('[email protected]', 'Jeff', '[email protected]'): 677 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 678 re.MULTILINE) 679 self.assertRegex(debugout, to_addr) 680 681 def testSendMessageMultipleResentRaises(self): 682 m = email.mime.text.MIMEText('A test message') 683 m['From'] = '[email protected]' 684 m['To'] = 'John' 685 m['CC'] = 'Sally, Fred' 686 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <[email protected]>' 687 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000' 688 m['Resent-From'] = '[email protected]' 689 m['Resent-To'] = 'Martha <[email protected]>, Jeff' 690 m['Resent-Bcc'] = '[email protected]' 691 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000' 692 m['Resent-To'] = '[email protected]' 693 m['Resent-From'] = 'Martha <[email protected]>, Jeff' 694 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 695 timeout=support.LOOPBACK_TIMEOUT) 696 self.addCleanup(smtp.close) 697 with self.assertRaises(ValueError): 698 smtp.send_message(m) 699 smtp.close() 700 701class NonConnectingTests(unittest.TestCase): 702 703 def testNotConnected(self): 704 # Test various operations on an unconnected SMTP object that 705 # should raise exceptions (at present the attempt in SMTP.send 706 # to reference the nonexistent 'sock' attribute of the SMTP object 707 # causes an AttributeError) 708 smtp = smtplib.SMTP() 709 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo) 710 self.assertRaises(smtplib.SMTPServerDisconnected, 711 smtp.send, 'test msg') 712 713 def testNonnumericPort(self): 714 # check that non-numeric port raises OSError 715 self.assertRaises(OSError, smtplib.SMTP, 716 "localhost", "bogus") 717 self.assertRaises(OSError, smtplib.SMTP, 718 "localhost:bogus") 719 720 def testSockAttributeExists(self): 721 # check that sock attribute is present outside of a connect() call 722 # (regression test, the previous behavior raised an 723 # AttributeError: 'SMTP' object has no attribute 'sock') 724 with smtplib.SMTP() as smtp: 725 self.assertIsNone(smtp.sock) 726 727 728class DefaultArgumentsTests(unittest.TestCase): 729 730 def setUp(self): 731 self.msg = EmailMessage() 732 self.msg['From'] = 'Páolo <fő[email protected]>' 733 self.smtp = smtplib.SMTP() 734 self.smtp.ehlo = Mock(return_value=(200, 'OK')) 735 self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock() 736 737 def testSendMessage(self): 738 expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME') 739 self.smtp.send_message(self.msg) 740 self.smtp.send_message(self.msg) 741 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 742 expected_mail_options) 743 self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3], 744 expected_mail_options) 745 746 def testSendMessageWithMailOptions(self): 747 mail_options = ['STARTTLS'] 748 expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME') 749 self.smtp.send_message(self.msg, None, None, mail_options) 750 self.assertEqual(mail_options, ['STARTTLS']) 751 self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3], 752 expected_mail_options) 753 754 755# test response of client to a non-successful HELO message 756class BadHELOServerTests(unittest.TestCase): 757 758 def setUp(self): 759 smtplib.socket = mock_socket 760 mock_socket.reply_with(b"199 no hello for you!") 761 self.old_stdout = sys.stdout 762 self.output = io.StringIO() 763 sys.stdout = self.output 764 self.port = 25 765 766 def tearDown(self): 767 smtplib.socket = socket 768 sys.stdout = self.old_stdout 769 770 def testFailingHELO(self): 771 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 772 HOST, self.port, 'localhost', 3) 773 774 775class TooLongLineTests(unittest.TestCase): 776 respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n' 777 778 def setUp(self): 779 self.thread_key = threading_helper.threading_setup() 780 self.old_stdout = sys.stdout 781 self.output = io.StringIO() 782 sys.stdout = self.output 783 784 self.evt = threading.Event() 785 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 786 self.sock.settimeout(15) 787 self.port = socket_helper.bind_port(self.sock) 788 servargs = (self.evt, self.respdata, self.sock) 789 self.thread = threading.Thread(target=server, args=servargs) 790 self.thread.start() 791 self.evt.wait() 792 self.evt.clear() 793 794 def tearDown(self): 795 self.evt.wait() 796 sys.stdout = self.old_stdout 797 threading_helper.join_thread(self.thread) 798 del self.thread 799 self.doCleanups() 800 threading_helper.threading_cleanup(*self.thread_key) 801 802 def testLineTooLong(self): 803 self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP, 804 HOST, self.port, 'localhost', 3) 805 806 807sim_users = {'[email protected]':'John A', 808 '[email protected]':'Sally B', 809 '[email protected]':'Ruth C', 810 } 811 812sim_auth = ('[email protected]', 'somepassword') 813sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 814 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 815sim_lists = {'list-1':['[email protected]','[email protected]'], 816 'list-2':['[email protected]',], 817 } 818 819# Simulated SMTP channel & server 820class ResponseException(Exception): pass 821class SimSMTPChannel(smtpd.SMTPChannel): 822 823 quit_response = None 824 mail_response = None 825 rcpt_response = None 826 data_response = None 827 rcpt_count = 0 828 rset_count = 0 829 disconnect = 0 830 AUTH = 99 # Add protocol state to enable auth testing. 831 authenticated_user = None 832 833 def __init__(self, extra_features, *args, **kw): 834 self._extrafeatures = ''.join( 835 [ "250-{0}\r\n".format(x) for x in extra_features ]) 836 super(SimSMTPChannel, self).__init__(*args, **kw) 837 838 # AUTH related stuff. It would be nice if support for this were in smtpd. 839 def found_terminator(self): 840 if self.smtp_state == self.AUTH: 841 line = self._emptystring.join(self.received_lines) 842 print('Data:', repr(line), file=smtpd.DEBUGSTREAM) 843 self.received_lines = [] 844 try: 845 self.auth_object(line) 846 except ResponseException as e: 847 self.smtp_state = self.COMMAND 848 self.push('%s %s' % (e.smtp_code, e.smtp_error)) 849 return 850 super().found_terminator() 851 852 853 def smtp_AUTH(self, arg): 854 if not self.seen_greeting: 855 self.push('503 Error: send EHLO first') 856 return 857 if not self.extended_smtp or 'AUTH' not in self._extrafeatures: 858 self.push('500 Error: command "AUTH" not recognized') 859 return 860 if self.authenticated_user is not None: 861 self.push( 862 '503 Bad sequence of commands: already authenticated') 863 return 864 args = arg.split() 865 if len(args) not in [1, 2]: 866 self.push('501 Syntax: AUTH <mechanism> [initial-response]') 867 return 868 auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') 869 try: 870 self.auth_object = getattr(self, auth_object_name) 871 except AttributeError: 872 self.push('504 Command parameter not implemented: unsupported ' 873 ' authentication mechanism {!r}'.format(auth_object_name)) 874 return 875 self.smtp_state = self.AUTH 876 self.auth_object(args[1] if len(args) == 2 else None) 877 878 def _authenticated(self, user, valid): 879 if valid: 880 self.authenticated_user = user 881 self.push('235 Authentication Succeeded') 882 else: 883 self.push('535 Authentication credentials invalid') 884 self.smtp_state = self.COMMAND 885 886 def _decode_base64(self, string): 887 return base64.decodebytes(string.encode('ascii')).decode('utf-8') 888 889 def _auth_plain(self, arg=None): 890 if arg is None: 891 self.push('334 ') 892 else: 893 logpass = self._decode_base64(arg) 894 try: 895 *_, user, password = logpass.split('\0') 896 except ValueError as e: 897 self.push('535 Splitting response {!r} into user and password' 898 ' failed: {}'.format(logpass, e)) 899 return 900 self._authenticated(user, password == sim_auth[1]) 901 902 def _auth_login(self, arg=None): 903 if arg is None: 904 # base64 encoded 'Username:' 905 self.push('334 VXNlcm5hbWU6') 906 elif not hasattr(self, '_auth_login_user'): 907 self._auth_login_user = self._decode_base64(arg) 908 # base64 encoded 'Password:' 909 self.push('334 UGFzc3dvcmQ6') 910 else: 911 password = self._decode_base64(arg) 912 self._authenticated(self._auth_login_user, password == sim_auth[1]) 913 del self._auth_login_user 914 915 def _auth_buggy(self, arg=None): 916 # This AUTH mechanism will 'trap' client in a neverending 334 917 # base64 encoded 'BuGgYbUgGy' 918 self.push('334 QnVHZ1liVWdHeQ==') 919 920 def _auth_cram_md5(self, arg=None): 921 if arg is None: 922 self.push('334 {}'.format(sim_cram_md5_challenge)) 923 else: 924 logpass = self._decode_base64(arg) 925 try: 926 user, hashed_pass = logpass.split() 927 except ValueError as e: 928 self.push('535 Splitting response {!r} into user and password ' 929 'failed: {}'.format(logpass, e)) 930 return False 931 valid_hashed_pass = hmac.HMAC( 932 sim_auth[1].encode('ascii'), 933 self._decode_base64(sim_cram_md5_challenge).encode('ascii'), 934 'md5').hexdigest() 935 self._authenticated(user, hashed_pass == valid_hashed_pass) 936 # end AUTH related stuff. 937 938 def smtp_EHLO(self, arg): 939 resp = ('250-testhost\r\n' 940 '250-EXPN\r\n' 941 '250-SIZE 20000000\r\n' 942 '250-STARTTLS\r\n' 943 '250-DELIVERBY\r\n') 944 resp = resp + self._extrafeatures + '250 HELP' 945 self.push(resp) 946 self.seen_greeting = arg 947 self.extended_smtp = True 948 949 def smtp_VRFY(self, arg): 950 # For max compatibility smtplib should be sending the raw address. 951 if arg in sim_users: 952 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 953 else: 954 self.push('550 No such user: %s' % arg) 955 956 def smtp_EXPN(self, arg): 957 list_name = arg.lower() 958 if list_name in sim_lists: 959 user_list = sim_lists[list_name] 960 for n, user_email in enumerate(user_list): 961 quoted_addr = smtplib.quoteaddr(user_email) 962 if n < len(user_list) - 1: 963 self.push('250-%s %s' % (sim_users[user_email], quoted_addr)) 964 else: 965 self.push('250 %s %s' % (sim_users[user_email], quoted_addr)) 966 else: 967 self.push('550 No access for you!') 968 969 def smtp_QUIT(self, arg): 970 if self.quit_response is None: 971 super(SimSMTPChannel, self).smtp_QUIT(arg) 972 else: 973 self.push(self.quit_response) 974 self.close_when_done() 975 976 def smtp_MAIL(self, arg): 977 if self.mail_response is None: 978 super().smtp_MAIL(arg) 979 else: 980 self.push(self.mail_response) 981 if self.disconnect: 982 self.close_when_done() 983 984 def smtp_RCPT(self, arg): 985 if self.rcpt_response is None: 986 super().smtp_RCPT(arg) 987 return 988 self.rcpt_count += 1 989 self.push(self.rcpt_response[self.rcpt_count-1]) 990 991 def smtp_RSET(self, arg): 992 self.rset_count += 1 993 super().smtp_RSET(arg) 994 995 def smtp_DATA(self, arg): 996 if self.data_response is None: 997 super().smtp_DATA(arg) 998 else: 999 self.push(self.data_response) 1000 1001 def handle_error(self): 1002 raise 1003 1004 1005class SimSMTPServer(smtpd.SMTPServer): 1006 1007 channel_class = SimSMTPChannel 1008 1009 def __init__(self, *args, **kw): 1010 self._extra_features = [] 1011 self._addresses = {} 1012 smtpd.SMTPServer.__init__(self, *args, **kw) 1013 1014 def handle_accepted(self, conn, addr): 1015 self._SMTPchannel = self.channel_class( 1016 self._extra_features, self, conn, addr, 1017 decode_data=self._decode_data) 1018 1019 def process_message(self, peer, mailfrom, rcpttos, data): 1020 self._addresses['from'] = mailfrom 1021 self._addresses['tos'] = rcpttos 1022 1023 def add_feature(self, feature): 1024 self._extra_features.append(feature) 1025 1026 def handle_error(self): 1027 raise 1028 1029 1030# Test various SMTP & ESMTP commands/behaviors that require a simulated server 1031# (i.e., something with more features than DebuggingServer) 1032class SMTPSimTests(unittest.TestCase): 1033 1034 def setUp(self): 1035 self.thread_key = threading_helper.threading_setup() 1036 self.real_getfqdn = socket.getfqdn 1037 socket.getfqdn = mock_socket.getfqdn 1038 self.serv_evt = threading.Event() 1039 self.client_evt = threading.Event() 1040 # Pick a random unused port by passing 0 for the port number 1041 self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1), decode_data=True) 1042 # Keep a note of what port was assigned 1043 self.port = self.serv.socket.getsockname()[1] 1044 serv_args = (self.serv, self.serv_evt, self.client_evt) 1045 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1046 self.thread.start() 1047 1048 # wait until server thread has assigned a port number 1049 self.serv_evt.wait() 1050 self.serv_evt.clear() 1051 1052 def tearDown(self): 1053 socket.getfqdn = self.real_getfqdn 1054 # indicate that the client is finished 1055 self.client_evt.set() 1056 # wait for the server thread to terminate 1057 self.serv_evt.wait() 1058 threading_helper.join_thread(self.thread) 1059 del self.thread 1060 self.doCleanups() 1061 threading_helper.threading_cleanup(*self.thread_key) 1062 1063 def testBasic(self): 1064 # smoke test 1065 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1066 timeout=support.LOOPBACK_TIMEOUT) 1067 smtp.quit() 1068 1069 def testEHLO(self): 1070 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1071 timeout=support.LOOPBACK_TIMEOUT) 1072 1073 # no features should be present before the EHLO 1074 self.assertEqual(smtp.esmtp_features, {}) 1075 1076 # features expected from the test server 1077 expected_features = {'expn':'', 1078 'size': '20000000', 1079 'starttls': '', 1080 'deliverby': '', 1081 'help': '', 1082 } 1083 1084 smtp.ehlo() 1085 self.assertEqual(smtp.esmtp_features, expected_features) 1086 for k in expected_features: 1087 self.assertTrue(smtp.has_extn(k)) 1088 self.assertFalse(smtp.has_extn('unsupported-feature')) 1089 smtp.quit() 1090 1091 def testVRFY(self): 1092 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1093 timeout=support.LOOPBACK_TIMEOUT) 1094 1095 for addr_spec, name in sim_users.items(): 1096 expected_known = (250, bytes('%s %s' % 1097 (name, smtplib.quoteaddr(addr_spec)), 1098 "ascii")) 1099 self.assertEqual(smtp.vrfy(addr_spec), expected_known) 1100 1101 u = '[email protected]' 1102 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 1103 self.assertEqual(smtp.vrfy(u), expected_unknown) 1104 smtp.quit() 1105 1106 def testEXPN(self): 1107 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1108 timeout=support.LOOPBACK_TIMEOUT) 1109 1110 for listname, members in sim_lists.items(): 1111 users = [] 1112 for m in members: 1113 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 1114 expected_known = (250, bytes('\n'.join(users), "ascii")) 1115 self.assertEqual(smtp.expn(listname), expected_known) 1116 1117 u = 'PSU-Members-List' 1118 expected_unknown = (550, b'No access for you!') 1119 self.assertEqual(smtp.expn(u), expected_unknown) 1120 smtp.quit() 1121 1122 def testAUTH_PLAIN(self): 1123 self.serv.add_feature("AUTH PLAIN") 1124 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1125 timeout=support.LOOPBACK_TIMEOUT) 1126 resp = smtp.login(sim_auth[0], sim_auth[1]) 1127 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1128 smtp.close() 1129 1130 def testAUTH_LOGIN(self): 1131 self.serv.add_feature("AUTH LOGIN") 1132 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1133 timeout=support.LOOPBACK_TIMEOUT) 1134 resp = smtp.login(sim_auth[0], sim_auth[1]) 1135 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1136 smtp.close() 1137 1138 def testAUTH_LOGIN_initial_response_ok(self): 1139 self.serv.add_feature("AUTH LOGIN") 1140 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1141 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1142 smtp.user, smtp.password = sim_auth 1143 smtp.ehlo("test_auth_login") 1144 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=True) 1145 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1146 1147 def testAUTH_LOGIN_initial_response_notok(self): 1148 self.serv.add_feature("AUTH LOGIN") 1149 with smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1150 timeout=support.LOOPBACK_TIMEOUT) as smtp: 1151 smtp.user, smtp.password = sim_auth 1152 smtp.ehlo("test_auth_login") 1153 resp = smtp.auth("LOGIN", smtp.auth_login, initial_response_ok=False) 1154 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1155 1156 def testAUTH_BUGGY(self): 1157 self.serv.add_feature("AUTH BUGGY") 1158 1159 def auth_buggy(challenge=None): 1160 self.assertEqual(b"BuGgYbUgGy", challenge) 1161 return "\0" 1162 1163 smtp = smtplib.SMTP( 1164 HOST, self.port, local_hostname='localhost', 1165 timeout=support.LOOPBACK_TIMEOUT 1166 ) 1167 try: 1168 smtp.user, smtp.password = sim_auth 1169 smtp.ehlo("test_auth_buggy") 1170 expect = r"^Server AUTH mechanism infinite loop.*" 1171 with self.assertRaisesRegex(smtplib.SMTPException, expect) as cm: 1172 smtp.auth("BUGGY", auth_buggy, initial_response_ok=False) 1173 finally: 1174 smtp.close() 1175 1176 @hashlib_helper.requires_hashdigest('md5', openssl=True) 1177 def testAUTH_CRAM_MD5(self): 1178 self.serv.add_feature("AUTH CRAM-MD5") 1179 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1180 timeout=support.LOOPBACK_TIMEOUT) 1181 resp = smtp.login(sim_auth[0], sim_auth[1]) 1182 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1183 smtp.close() 1184 1185 @hashlib_helper.requires_hashdigest('md5', openssl=True) 1186 def testAUTH_multiple(self): 1187 # Test that multiple authentication methods are tried. 1188 self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") 1189 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1190 timeout=support.LOOPBACK_TIMEOUT) 1191 resp = smtp.login(sim_auth[0], sim_auth[1]) 1192 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1193 smtp.close() 1194 1195 def test_auth_function(self): 1196 supported = {'PLAIN', 'LOGIN'} 1197 try: 1198 hashlib.md5() 1199 except ValueError: 1200 pass 1201 else: 1202 supported.add('CRAM-MD5') 1203 for mechanism in supported: 1204 self.serv.add_feature("AUTH {}".format(mechanism)) 1205 for mechanism in supported: 1206 with self.subTest(mechanism=mechanism): 1207 smtp = smtplib.SMTP(HOST, self.port, 1208 local_hostname='localhost', 1209 timeout=support.LOOPBACK_TIMEOUT) 1210 smtp.ehlo('foo') 1211 smtp.user, smtp.password = sim_auth[0], sim_auth[1] 1212 method = 'auth_' + mechanism.lower().replace('-', '_') 1213 resp = smtp.auth(mechanism, getattr(smtp, method)) 1214 self.assertEqual(resp, (235, b'Authentication Succeeded')) 1215 smtp.close() 1216 1217 def test_quit_resets_greeting(self): 1218 smtp = smtplib.SMTP(HOST, self.port, 1219 local_hostname='localhost', 1220 timeout=support.LOOPBACK_TIMEOUT) 1221 code, message = smtp.ehlo() 1222 self.assertEqual(code, 250) 1223 self.assertIn('size', smtp.esmtp_features) 1224 smtp.quit() 1225 self.assertNotIn('size', smtp.esmtp_features) 1226 smtp.connect(HOST, self.port) 1227 self.assertNotIn('size', smtp.esmtp_features) 1228 smtp.ehlo_or_helo_if_needed() 1229 self.assertIn('size', smtp.esmtp_features) 1230 smtp.quit() 1231 1232 def test_with_statement(self): 1233 with smtplib.SMTP(HOST, self.port) as smtp: 1234 code, message = smtp.noop() 1235 self.assertEqual(code, 250) 1236 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1237 with smtplib.SMTP(HOST, self.port) as smtp: 1238 smtp.close() 1239 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo') 1240 1241 def test_with_statement_QUIT_failure(self): 1242 with self.assertRaises(smtplib.SMTPResponseException) as error: 1243 with smtplib.SMTP(HOST, self.port) as smtp: 1244 smtp.noop() 1245 self.serv._SMTPchannel.quit_response = '421 QUIT FAILED' 1246 self.assertEqual(error.exception.smtp_code, 421) 1247 self.assertEqual(error.exception.smtp_error, b'QUIT FAILED') 1248 1249 #TODO: add tests for correct AUTH method fallback now that the 1250 #test infrastructure can support it. 1251 1252 # Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception 1253 def test__rest_from_mail_cmd(self): 1254 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1255 timeout=support.LOOPBACK_TIMEOUT) 1256 smtp.noop() 1257 self.serv._SMTPchannel.mail_response = '451 Requested action aborted' 1258 self.serv._SMTPchannel.disconnect = True 1259 with self.assertRaises(smtplib.SMTPSenderRefused): 1260 smtp.sendmail('John', 'Sally', 'test message') 1261 self.assertIsNone(smtp.sock) 1262 1263 # Issue 5713: make sure close, not rset, is called if we get a 421 error 1264 def test_421_from_mail_cmd(self): 1265 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1266 timeout=support.LOOPBACK_TIMEOUT) 1267 smtp.noop() 1268 self.serv._SMTPchannel.mail_response = '421 closing connection' 1269 with self.assertRaises(smtplib.SMTPSenderRefused): 1270 smtp.sendmail('John', 'Sally', 'test message') 1271 self.assertIsNone(smtp.sock) 1272 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1273 1274 def test_421_from_rcpt_cmd(self): 1275 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1276 timeout=support.LOOPBACK_TIMEOUT) 1277 smtp.noop() 1278 self.serv._SMTPchannel.rcpt_response = ['250 accepted', '421 closing'] 1279 with self.assertRaises(smtplib.SMTPRecipientsRefused) as r: 1280 smtp.sendmail('John', ['Sally', 'Frank', 'George'], 'test message') 1281 self.assertIsNone(smtp.sock) 1282 self.assertEqual(self.serv._SMTPchannel.rset_count, 0) 1283 self.assertDictEqual(r.exception.args[0], {'Frank': (421, b'closing')}) 1284 1285 def test_421_from_data_cmd(self): 1286 class MySimSMTPChannel(SimSMTPChannel): 1287 def found_terminator(self): 1288 if self.smtp_state == self.DATA: 1289 self.push('421 closing') 1290 else: 1291 super().found_terminator() 1292 self.serv.channel_class = MySimSMTPChannel 1293 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1294 timeout=support.LOOPBACK_TIMEOUT) 1295 smtp.noop() 1296 with self.assertRaises(smtplib.SMTPDataError): 1297 smtp.sendmail('[email protected]', ['[email protected]'], 'test message') 1298 self.assertIsNone(smtp.sock) 1299 self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0) 1300 1301 def test_smtputf8_NotSupportedError_if_no_server_support(self): 1302 smtp = smtplib.SMTP( 1303 HOST, self.port, local_hostname='localhost', 1304 timeout=support.LOOPBACK_TIMEOUT) 1305 self.addCleanup(smtp.close) 1306 smtp.ehlo() 1307 self.assertTrue(smtp.does_esmtp) 1308 self.assertFalse(smtp.has_extn('smtputf8')) 1309 self.assertRaises( 1310 smtplib.SMTPNotSupportedError, 1311 smtp.sendmail, 1312 'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1313 self.assertRaises( 1314 smtplib.SMTPNotSupportedError, 1315 smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8']) 1316 1317 def test_send_unicode_without_SMTPUTF8(self): 1318 smtp = smtplib.SMTP( 1319 HOST, self.port, local_hostname='localhost', 1320 timeout=support.LOOPBACK_TIMEOUT) 1321 self.addCleanup(smtp.close) 1322 self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '') 1323 self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice') 1324 1325 def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self): 1326 # This test is located here and not in the SMTPUTF8SimTests 1327 # class because it needs a "regular" SMTP server to work 1328 msg = EmailMessage() 1329 msg['From'] = "Páolo <fő[email protected]>" 1330 msg['To'] = 'Dinsdale' 1331 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1332 smtp = smtplib.SMTP( 1333 HOST, self.port, local_hostname='localhost', 1334 timeout=support.LOOPBACK_TIMEOUT) 1335 self.addCleanup(smtp.close) 1336 with self.assertRaises(smtplib.SMTPNotSupportedError): 1337 smtp.send_message(msg) 1338 1339 def test_name_field_not_included_in_envelop_addresses(self): 1340 smtp = smtplib.SMTP( 1341 HOST, self.port, local_hostname='localhost', 1342 timeout=support.LOOPBACK_TIMEOUT) 1343 self.addCleanup(smtp.close) 1344 1345 message = EmailMessage() 1346 message['From'] = email.utils.formataddr(('Michaël', '[email protected]')) 1347 message['To'] = email.utils.formataddr(('René', '[email protected]')) 1348 1349 self.assertDictEqual(smtp.send_message(message), {}) 1350 1351 self.assertEqual(self.serv._addresses['from'], '[email protected]') 1352 self.assertEqual(self.serv._addresses['tos'], ['[email protected]']) 1353 1354 1355class SimSMTPUTF8Server(SimSMTPServer): 1356 1357 def __init__(self, *args, **kw): 1358 # The base SMTP server turns these on automatically, but our test 1359 # server is set up to munge the EHLO response, so we need to provide 1360 # them as well. And yes, the call is to SMTPServer not SimSMTPServer. 1361 self._extra_features = ['SMTPUTF8', '8BITMIME'] 1362 smtpd.SMTPServer.__init__(self, *args, **kw) 1363 1364 def handle_accepted(self, conn, addr): 1365 self._SMTPchannel = self.channel_class( 1366 self._extra_features, self, conn, addr, 1367 decode_data=self._decode_data, 1368 enable_SMTPUTF8=self.enable_SMTPUTF8, 1369 ) 1370 1371 def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None, 1372 rcpt_options=None): 1373 self.last_peer = peer 1374 self.last_mailfrom = mailfrom 1375 self.last_rcpttos = rcpttos 1376 self.last_message = data 1377 self.last_mail_options = mail_options 1378 self.last_rcpt_options = rcpt_options 1379 1380 1381class SMTPUTF8SimTests(unittest.TestCase): 1382 1383 maxDiff = None 1384 1385 def setUp(self): 1386 self.thread_key = threading_helper.threading_setup() 1387 self.real_getfqdn = socket.getfqdn 1388 socket.getfqdn = mock_socket.getfqdn 1389 self.serv_evt = threading.Event() 1390 self.client_evt = threading.Event() 1391 # Pick a random unused port by passing 0 for the port number 1392 self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1), 1393 decode_data=False, 1394 enable_SMTPUTF8=True) 1395 # Keep a note of what port was assigned 1396 self.port = self.serv.socket.getsockname()[1] 1397 serv_args = (self.serv, self.serv_evt, self.client_evt) 1398 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1399 self.thread.start() 1400 1401 # wait until server thread has assigned a port number 1402 self.serv_evt.wait() 1403 self.serv_evt.clear() 1404 1405 def tearDown(self): 1406 socket.getfqdn = self.real_getfqdn 1407 # indicate that the client is finished 1408 self.client_evt.set() 1409 # wait for the server thread to terminate 1410 self.serv_evt.wait() 1411 threading_helper.join_thread(self.thread) 1412 del self.thread 1413 self.doCleanups() 1414 threading_helper.threading_cleanup(*self.thread_key) 1415 1416 def test_test_server_supports_extensions(self): 1417 smtp = smtplib.SMTP( 1418 HOST, self.port, local_hostname='localhost', 1419 timeout=support.LOOPBACK_TIMEOUT) 1420 self.addCleanup(smtp.close) 1421 smtp.ehlo() 1422 self.assertTrue(smtp.does_esmtp) 1423 self.assertTrue(smtp.has_extn('smtputf8')) 1424 1425 def test_send_unicode_with_SMTPUTF8_via_sendmail(self): 1426 m = '¡a test message containing unicode!'.encode('utf-8') 1427 smtp = smtplib.SMTP( 1428 HOST, self.port, local_hostname='localhost', 1429 timeout=support.LOOPBACK_TIMEOUT) 1430 self.addCleanup(smtp.close) 1431 smtp.sendmail('Jőhn', 'Sálly', m, 1432 mail_options=['BODY=8BITMIME', 'SMTPUTF8']) 1433 self.assertEqual(self.serv.last_mailfrom, 'Jőhn') 1434 self.assertEqual(self.serv.last_rcpttos, ['Sálly']) 1435 self.assertEqual(self.serv.last_message, m) 1436 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1437 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1438 self.assertEqual(self.serv.last_rcpt_options, []) 1439 1440 def test_send_unicode_with_SMTPUTF8_via_low_level_API(self): 1441 m = '¡a test message containing unicode!'.encode('utf-8') 1442 smtp = smtplib.SMTP( 1443 HOST, self.port, local_hostname='localhost', 1444 timeout=support.LOOPBACK_TIMEOUT) 1445 self.addCleanup(smtp.close) 1446 smtp.ehlo() 1447 self.assertEqual( 1448 smtp.mail('Jő', options=['BODY=8BITMIME', 'SMTPUTF8']), 1449 (250, b'OK')) 1450 self.assertEqual(smtp.rcpt('János'), (250, b'OK')) 1451 self.assertEqual(smtp.data(m), (250, b'OK')) 1452 self.assertEqual(self.serv.last_mailfrom, 'Jő') 1453 self.assertEqual(self.serv.last_rcpttos, ['János']) 1454 self.assertEqual(self.serv.last_message, m) 1455 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1456 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1457 self.assertEqual(self.serv.last_rcpt_options, []) 1458 1459 def test_send_message_uses_smtputf8_if_addrs_non_ascii(self): 1460 msg = EmailMessage() 1461 msg['From'] = "Páolo <fő[email protected]>" 1462 msg['To'] = 'Dinsdale' 1463 msg['Subject'] = 'Nudge nudge, wink, wink \u1F609' 1464 # XXX I don't know why I need two \n's here, but this is an existing 1465 # bug (if it is one) and not a problem with the new functionality. 1466 msg.set_content("oh là là, know what I mean, know what I mean?\n\n") 1467 # XXX smtpd converts received /r/n to /n, so we can't easily test that 1468 # we are successfully sending /r/n :(. 1469 expected = textwrap.dedent("""\ 1470 From: Páolo <fő[email protected]> 1471 To: Dinsdale 1472 Subject: Nudge nudge, wink, wink \u1F609 1473 Content-Type: text/plain; charset="utf-8" 1474 Content-Transfer-Encoding: 8bit 1475 MIME-Version: 1.0 1476 1477 oh là là, know what I mean, know what I mean? 1478 """) 1479 smtp = smtplib.SMTP( 1480 HOST, self.port, local_hostname='localhost', 1481 timeout=support.LOOPBACK_TIMEOUT) 1482 self.addCleanup(smtp.close) 1483 self.assertEqual(smtp.send_message(msg), {}) 1484 self.assertEqual(self.serv.last_mailfrom, 'fő[email protected]') 1485 self.assertEqual(self.serv.last_rcpttos, ['Dinsdale']) 1486 self.assertEqual(self.serv.last_message.decode(), expected) 1487 self.assertIn('BODY=8BITMIME', self.serv.last_mail_options) 1488 self.assertIn('SMTPUTF8', self.serv.last_mail_options) 1489 self.assertEqual(self.serv.last_rcpt_options, []) 1490 1491 1492EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='') 1493 1494class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel): 1495 def smtp_AUTH(self, arg): 1496 # RFC 4954's AUTH command allows for an optional initial-response. 1497 # Not all AUTH methods support this; some require a challenge. AUTH 1498 # PLAIN does those, so test that here. See issue #15014. 1499 args = arg.split() 1500 if args[0].lower() == 'plain': 1501 if len(args) == 2: 1502 # AUTH PLAIN <initial-response> with the response base 64 1503 # encoded. Hard code the expected response for the test. 1504 if args[1] == EXPECTED_RESPONSE: 1505 self.push('235 Ok') 1506 return 1507 self.push('571 Bad authentication') 1508 1509class SimSMTPAUTHInitialResponseServer(SimSMTPServer): 1510 channel_class = SimSMTPAUTHInitialResponseChannel 1511 1512 1513class SMTPAUTHInitialResponseSimTests(unittest.TestCase): 1514 def setUp(self): 1515 self.thread_key = threading_helper.threading_setup() 1516 self.real_getfqdn = socket.getfqdn 1517 socket.getfqdn = mock_socket.getfqdn 1518 self.serv_evt = threading.Event() 1519 self.client_evt = threading.Event() 1520 # Pick a random unused port by passing 0 for the port number 1521 self.serv = SimSMTPAUTHInitialResponseServer( 1522 (HOST, 0), ('nowhere', -1), decode_data=True) 1523 # Keep a note of what port was assigned 1524 self.port = self.serv.socket.getsockname()[1] 1525 serv_args = (self.serv, self.serv_evt, self.client_evt) 1526 self.thread = threading.Thread(target=debugging_server, args=serv_args) 1527 self.thread.start() 1528 1529 # wait until server thread has assigned a port number 1530 self.serv_evt.wait() 1531 self.serv_evt.clear() 1532 1533 def tearDown(self): 1534 socket.getfqdn = self.real_getfqdn 1535 # indicate that the client is finished 1536 self.client_evt.set() 1537 # wait for the server thread to terminate 1538 self.serv_evt.wait() 1539 threading_helper.join_thread(self.thread) 1540 del self.thread 1541 self.doCleanups() 1542 threading_helper.threading_cleanup(*self.thread_key) 1543 1544 def testAUTH_PLAIN_initial_response_login(self): 1545 self.serv.add_feature('AUTH PLAIN') 1546 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1547 timeout=support.LOOPBACK_TIMEOUT) 1548 smtp.login('psu', 'doesnotexist') 1549 smtp.close() 1550 1551 def testAUTH_PLAIN_initial_response_auth(self): 1552 self.serv.add_feature('AUTH PLAIN') 1553 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', 1554 timeout=support.LOOPBACK_TIMEOUT) 1555 smtp.user = 'psu' 1556 smtp.password = 'doesnotexist' 1557 code, response = smtp.auth('plain', smtp.auth_plain) 1558 smtp.close() 1559 self.assertEqual(code, 235) 1560 1561 1562if __name__ == '__main__': 1563 unittest.main() 1564