1import unittest 2import textwrap 3from test import support, mock_socket 4from test.support import socket_helper 5from test.support import warnings_helper 6import socket 7import io 8 9 10smtpd = warnings_helper.import_deprecated('smtpd') 11asyncore = warnings_helper.import_deprecated('asyncore') 12 13if not socket_helper.has_gethostname: 14 raise unittest.SkipTest("test requires gethostname()") 15 16 17class DummyServer(smtpd.SMTPServer): 18 def __init__(self, *args, **kwargs): 19 smtpd.SMTPServer.__init__(self, *args, **kwargs) 20 self.messages = [] 21 if self._decode_data: 22 self.return_status = 'return status' 23 else: 24 self.return_status = b'return status' 25 26 def process_message(self, peer, mailfrom, rcpttos, data, **kw): 27 self.messages.append((peer, mailfrom, rcpttos, data)) 28 if data == self.return_status: 29 return '250 Okish' 30 if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']: 31 return '250 SMTPUTF8 message okish' 32 33 34class DummyDispatcherBroken(Exception): 35 pass 36 37 38class BrokenDummyServer(DummyServer): 39 def listen(self, num): 40 raise DummyDispatcherBroken() 41 42 43class SMTPDServerTest(unittest.TestCase): 44 def setUp(self): 45 smtpd.socket = asyncore.socket = mock_socket 46 47 def test_process_message_unimplemented(self): 48 server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0), 49 decode_data=True) 50 conn, addr = server.accept() 51 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 52 53 def write_line(line): 54 channel.socket.queue_recv(line) 55 channel.handle_read() 56 57 write_line(b'HELO example') 58 write_line(b'MAIL From:eggs@example') 59 write_line(b'RCPT To:spam@example') 60 write_line(b'DATA') 61 self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n') 62 63 def test_decode_data_and_enable_SMTPUTF8_raises(self): 64 self.assertRaises( 65 ValueError, 66 smtpd.SMTPServer, 67 (socket_helper.HOST, 0), 68 ('b', 0), 69 enable_SMTPUTF8=True, 70 decode_data=True) 71 72 def tearDown(self): 73 asyncore.close_all() 74 asyncore.socket = smtpd.socket = socket 75 76 77class DebuggingServerTest(unittest.TestCase): 78 79 def setUp(self): 80 smtpd.socket = asyncore.socket = mock_socket 81 82 def send_data(self, channel, data, enable_SMTPUTF8=False): 83 def write_line(line): 84 channel.socket.queue_recv(line) 85 channel.handle_read() 86 write_line(b'EHLO example') 87 if enable_SMTPUTF8: 88 write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8') 89 else: 90 write_line(b'MAIL From:eggs@example') 91 write_line(b'RCPT To:spam@example') 92 write_line(b'DATA') 93 write_line(data) 94 write_line(b'.') 95 96 def test_process_message_with_decode_data_true(self): 97 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 98 decode_data=True) 99 conn, addr = server.accept() 100 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 101 with support.captured_stdout() as s: 102 self.send_data(channel, b'From: test\n\nhello\n') 103 stdout = s.getvalue() 104 self.assertEqual(stdout, textwrap.dedent("""\ 105 ---------- MESSAGE FOLLOWS ---------- 106 From: test 107 X-Peer: peer-address 108 109 hello 110 ------------ END MESSAGE ------------ 111 """)) 112 113 def test_process_message_with_decode_data_false(self): 114 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0)) 115 conn, addr = server.accept() 116 channel = smtpd.SMTPChannel(server, conn, addr) 117 with support.captured_stdout() as s: 118 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 119 stdout = s.getvalue() 120 self.assertEqual(stdout, textwrap.dedent("""\ 121 ---------- MESSAGE FOLLOWS ---------- 122 b'From: test' 123 b'X-Peer: peer-address' 124 b'' 125 b'h\\xc3\\xa9llo\\xff' 126 ------------ END MESSAGE ------------ 127 """)) 128 129 def test_process_message_with_enable_SMTPUTF8_true(self): 130 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 131 enable_SMTPUTF8=True) 132 conn, addr = server.accept() 133 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 134 with support.captured_stdout() as s: 135 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n') 136 stdout = s.getvalue() 137 self.assertEqual(stdout, textwrap.dedent("""\ 138 ---------- MESSAGE FOLLOWS ---------- 139 b'From: test' 140 b'X-Peer: peer-address' 141 b'' 142 b'h\\xc3\\xa9llo\\xff' 143 ------------ END MESSAGE ------------ 144 """)) 145 146 def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self): 147 server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0), 148 enable_SMTPUTF8=True) 149 conn, addr = server.accept() 150 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 151 with support.captured_stdout() as s: 152 self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n', 153 enable_SMTPUTF8=True) 154 stdout = s.getvalue() 155 self.assertEqual(stdout, textwrap.dedent("""\ 156 ---------- MESSAGE FOLLOWS ---------- 157 mail options: ['BODY=8BITMIME', 'SMTPUTF8'] 158 b'From: test' 159 b'X-Peer: peer-address' 160 b'' 161 b'h\\xc3\\xa9llo\\xff' 162 ------------ END MESSAGE ------------ 163 """)) 164 165 def tearDown(self): 166 asyncore.close_all() 167 asyncore.socket = smtpd.socket = socket 168 169 170class TestFamilyDetection(unittest.TestCase): 171 def setUp(self): 172 smtpd.socket = asyncore.socket = mock_socket 173 174 def tearDown(self): 175 asyncore.close_all() 176 asyncore.socket = smtpd.socket = socket 177 178 @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 179 def test_socket_uses_IPv6(self): 180 server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0)) 181 self.assertEqual(server.socket.family, socket.AF_INET6) 182 183 def test_socket_uses_IPv4(self): 184 server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0)) 185 self.assertEqual(server.socket.family, socket.AF_INET) 186 187 188class TestRcptOptionParsing(unittest.TestCase): 189 error_response = (b'555 RCPT TO parameters not recognized or not ' 190 b'implemented\r\n') 191 192 def setUp(self): 193 smtpd.socket = asyncore.socket = mock_socket 194 self.old_debugstream = smtpd.DEBUGSTREAM 195 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 196 197 def tearDown(self): 198 asyncore.close_all() 199 asyncore.socket = smtpd.socket = socket 200 smtpd.DEBUGSTREAM = self.old_debugstream 201 202 def write_line(self, channel, line): 203 channel.socket.queue_recv(line) 204 channel.handle_read() 205 206 def test_params_rejected(self): 207 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 208 conn, addr = server.accept() 209 channel = smtpd.SMTPChannel(server, conn, addr) 210 self.write_line(channel, b'EHLO example') 211 self.write_line(channel, b'MAIL from: <[email protected]> size=20') 212 self.write_line(channel, b'RCPT to: <[email protected]> foo=bar') 213 self.assertEqual(channel.socket.last, self.error_response) 214 215 def test_nothing_accepted(self): 216 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 217 conn, addr = server.accept() 218 channel = smtpd.SMTPChannel(server, conn, addr) 219 self.write_line(channel, b'EHLO example') 220 self.write_line(channel, b'MAIL from: <[email protected]> size=20') 221 self.write_line(channel, b'RCPT to: <[email protected]>') 222 self.assertEqual(channel.socket.last, b'250 OK\r\n') 223 224 225class TestMailOptionParsing(unittest.TestCase): 226 error_response = (b'555 MAIL FROM parameters not recognized or not ' 227 b'implemented\r\n') 228 229 def setUp(self): 230 smtpd.socket = asyncore.socket = mock_socket 231 self.old_debugstream = smtpd.DEBUGSTREAM 232 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 233 234 def tearDown(self): 235 asyncore.close_all() 236 asyncore.socket = smtpd.socket = socket 237 smtpd.DEBUGSTREAM = self.old_debugstream 238 239 def write_line(self, channel, line): 240 channel.socket.queue_recv(line) 241 channel.handle_read() 242 243 def test_with_decode_data_true(self): 244 server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True) 245 conn, addr = server.accept() 246 channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True) 247 self.write_line(channel, b'EHLO example') 248 for line in [ 249 b'MAIL from: <[email protected]> size=20 SMTPUTF8', 250 b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=8BITMIME', 251 b'MAIL from: <[email protected]> size=20 BODY=UNKNOWN', 252 b'MAIL from: <[email protected]> size=20 body=8bitmime', 253 ]: 254 self.write_line(channel, line) 255 self.assertEqual(channel.socket.last, self.error_response) 256 self.write_line(channel, b'MAIL from: <[email protected]> size=20') 257 self.assertEqual(channel.socket.last, b'250 OK\r\n') 258 259 def test_with_decode_data_false(self): 260 server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 261 conn, addr = server.accept() 262 channel = smtpd.SMTPChannel(server, conn, addr) 263 self.write_line(channel, b'EHLO example') 264 for line in [ 265 b'MAIL from: <[email protected]> size=20 SMTPUTF8', 266 b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=8BITMIME', 267 ]: 268 self.write_line(channel, line) 269 self.assertEqual(channel.socket.last, self.error_response) 270 self.write_line( 271 channel, 272 b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=UNKNOWN') 273 self.assertEqual( 274 channel.socket.last, 275 b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n') 276 self.write_line( 277 channel, b'MAIL from: <[email protected]> size=20 body=8bitmime') 278 self.assertEqual(channel.socket.last, b'250 OK\r\n') 279 280 def test_with_enable_smtputf8_true(self): 281 server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True) 282 conn, addr = server.accept() 283 channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True) 284 self.write_line(channel, b'EHLO example') 285 self.write_line( 286 channel, 287 b'MAIL from: <[email protected]> size=20 body=8bitmime smtputf8') 288 self.assertEqual(channel.socket.last, b'250 OK\r\n') 289 290 291class SMTPDChannelTest(unittest.TestCase): 292 def setUp(self): 293 smtpd.socket = asyncore.socket = mock_socket 294 self.old_debugstream = smtpd.DEBUGSTREAM 295 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 296 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 297 decode_data=True) 298 conn, addr = self.server.accept() 299 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 300 decode_data=True) 301 302 def tearDown(self): 303 asyncore.close_all() 304 asyncore.socket = smtpd.socket = socket 305 smtpd.DEBUGSTREAM = self.old_debugstream 306 307 def write_line(self, line): 308 self.channel.socket.queue_recv(line) 309 self.channel.handle_read() 310 311 def test_broken_connect(self): 312 self.assertRaises( 313 DummyDispatcherBroken, BrokenDummyServer, 314 (socket_helper.HOST, 0), ('b', 0), decode_data=True) 315 316 def test_decode_data_and_enable_SMTPUTF8_raises(self): 317 self.assertRaises( 318 ValueError, smtpd.SMTPChannel, 319 self.server, self.channel.conn, self.channel.addr, 320 enable_SMTPUTF8=True, decode_data=True) 321 322 def test_server_accept(self): 323 self.server.handle_accept() 324 325 def test_missing_data(self): 326 self.write_line(b'') 327 self.assertEqual(self.channel.socket.last, 328 b'500 Error: bad syntax\r\n') 329 330 def test_EHLO(self): 331 self.write_line(b'EHLO example') 332 self.assertEqual(self.channel.socket.last, b'250 HELP\r\n') 333 334 def test_EHLO_bad_syntax(self): 335 self.write_line(b'EHLO') 336 self.assertEqual(self.channel.socket.last, 337 b'501 Syntax: EHLO hostname\r\n') 338 339 def test_EHLO_duplicate(self): 340 self.write_line(b'EHLO example') 341 self.write_line(b'EHLO example') 342 self.assertEqual(self.channel.socket.last, 343 b'503 Duplicate HELO/EHLO\r\n') 344 345 def test_EHLO_HELO_duplicate(self): 346 self.write_line(b'EHLO example') 347 self.write_line(b'HELO example') 348 self.assertEqual(self.channel.socket.last, 349 b'503 Duplicate HELO/EHLO\r\n') 350 351 def test_HELO(self): 352 name = smtpd.socket.getfqdn() 353 self.write_line(b'HELO example') 354 self.assertEqual(self.channel.socket.last, 355 '250 {}\r\n'.format(name).encode('ascii')) 356 357 def test_HELO_EHLO_duplicate(self): 358 self.write_line(b'HELO example') 359 self.write_line(b'EHLO example') 360 self.assertEqual(self.channel.socket.last, 361 b'503 Duplicate HELO/EHLO\r\n') 362 363 def test_HELP(self): 364 self.write_line(b'HELP') 365 self.assertEqual(self.channel.socket.last, 366 b'250 Supported commands: EHLO HELO MAIL RCPT ' + \ 367 b'DATA RSET NOOP QUIT VRFY\r\n') 368 369 def test_HELP_command(self): 370 self.write_line(b'HELP MAIL') 371 self.assertEqual(self.channel.socket.last, 372 b'250 Syntax: MAIL FROM: <address>\r\n') 373 374 def test_HELP_command_unknown(self): 375 self.write_line(b'HELP SPAM') 376 self.assertEqual(self.channel.socket.last, 377 b'501 Supported commands: EHLO HELO MAIL RCPT ' + \ 378 b'DATA RSET NOOP QUIT VRFY\r\n') 379 380 def test_HELO_bad_syntax(self): 381 self.write_line(b'HELO') 382 self.assertEqual(self.channel.socket.last, 383 b'501 Syntax: HELO hostname\r\n') 384 385 def test_HELO_duplicate(self): 386 self.write_line(b'HELO example') 387 self.write_line(b'HELO example') 388 self.assertEqual(self.channel.socket.last, 389 b'503 Duplicate HELO/EHLO\r\n') 390 391 def test_HELO_parameter_rejected_when_extensions_not_enabled(self): 392 self.extended_smtp = False 393 self.write_line(b'HELO example') 394 self.write_line(b'MAIL from:<[email protected]> SIZE=1234') 395 self.assertEqual(self.channel.socket.last, 396 b'501 Syntax: MAIL FROM: <address>\r\n') 397 398 def test_MAIL_allows_space_after_colon(self): 399 self.write_line(b'HELO example') 400 self.write_line(b'MAIL from: <[email protected]>') 401 self.assertEqual(self.channel.socket.last, 402 b'250 OK\r\n') 403 404 def test_extended_MAIL_allows_space_after_colon(self): 405 self.write_line(b'EHLO example') 406 self.write_line(b'MAIL from: <[email protected]> size=20') 407 self.assertEqual(self.channel.socket.last, 408 b'250 OK\r\n') 409 410 def test_NOOP(self): 411 self.write_line(b'NOOP') 412 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 413 414 def test_HELO_NOOP(self): 415 self.write_line(b'HELO example') 416 self.write_line(b'NOOP') 417 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 418 419 def test_NOOP_bad_syntax(self): 420 self.write_line(b'NOOP hi') 421 self.assertEqual(self.channel.socket.last, 422 b'501 Syntax: NOOP\r\n') 423 424 def test_QUIT(self): 425 self.write_line(b'QUIT') 426 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 427 428 def test_HELO_QUIT(self): 429 self.write_line(b'HELO example') 430 self.write_line(b'QUIT') 431 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 432 433 def test_QUIT_arg_ignored(self): 434 self.write_line(b'QUIT bye bye') 435 self.assertEqual(self.channel.socket.last, b'221 Bye\r\n') 436 437 def test_bad_state(self): 438 self.channel.smtp_state = 'BAD STATE' 439 self.write_line(b'HELO example') 440 self.assertEqual(self.channel.socket.last, 441 b'451 Internal confusion\r\n') 442 443 def test_command_too_long(self): 444 self.write_line(b'HELO example') 445 self.write_line(b'MAIL from: ' + 446 b'a' * self.channel.command_size_limit + 447 b'@example') 448 self.assertEqual(self.channel.socket.last, 449 b'500 Error: line too long\r\n') 450 451 def test_MAIL_command_limit_extended_with_SIZE(self): 452 self.write_line(b'EHLO example') 453 fill_len = self.channel.command_size_limit - len('MAIL from:<@example>') 454 self.write_line(b'MAIL from:<' + 455 b'a' * fill_len + 456 b'@example> SIZE=1234') 457 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 458 459 self.write_line(b'MAIL from:<' + 460 b'a' * (fill_len + 26) + 461 b'@example> SIZE=1234') 462 self.assertEqual(self.channel.socket.last, 463 b'500 Error: line too long\r\n') 464 465 def test_MAIL_command_rejects_SMTPUTF8_by_default(self): 466 self.write_line(b'EHLO example') 467 self.write_line( 468 b'MAIL from: <[email protected]> BODY=8BITMIME SMTPUTF8') 469 self.assertEqual(self.channel.socket.last[0:1], b'5') 470 471 def test_data_longer_than_default_data_size_limit(self): 472 # Hack the default so we don't have to generate so much data. 473 self.channel.data_size_limit = 1048 474 self.write_line(b'HELO example') 475 self.write_line(b'MAIL From:eggs@example') 476 self.write_line(b'RCPT To:spam@example') 477 self.write_line(b'DATA') 478 self.write_line(b'A' * self.channel.data_size_limit + 479 b'A\r\n.') 480 self.assertEqual(self.channel.socket.last, 481 b'552 Error: Too much mail data\r\n') 482 483 def test_MAIL_size_parameter(self): 484 self.write_line(b'EHLO example') 485 self.write_line(b'MAIL FROM:<eggs@example> SIZE=512') 486 self.assertEqual(self.channel.socket.last, 487 b'250 OK\r\n') 488 489 def test_MAIL_invalid_size_parameter(self): 490 self.write_line(b'EHLO example') 491 self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid') 492 self.assertEqual(self.channel.socket.last, 493 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 494 495 def test_MAIL_RCPT_unknown_parameters(self): 496 self.write_line(b'EHLO example') 497 self.write_line(b'MAIL FROM:<eggs@example> ham=green') 498 self.assertEqual(self.channel.socket.last, 499 b'555 MAIL FROM parameters not recognized or not implemented\r\n') 500 501 self.write_line(b'MAIL FROM:<eggs@example>') 502 self.write_line(b'RCPT TO:<eggs@example> ham=green') 503 self.assertEqual(self.channel.socket.last, 504 b'555 RCPT TO parameters not recognized or not implemented\r\n') 505 506 def test_MAIL_size_parameter_larger_than_default_data_size_limit(self): 507 self.channel.data_size_limit = 1048 508 self.write_line(b'EHLO example') 509 self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096') 510 self.assertEqual(self.channel.socket.last, 511 b'552 Error: message size exceeds fixed maximum message size\r\n') 512 513 def test_need_MAIL(self): 514 self.write_line(b'HELO example') 515 self.write_line(b'RCPT to:spam@example') 516 self.assertEqual(self.channel.socket.last, 517 b'503 Error: need MAIL command\r\n') 518 519 def test_MAIL_syntax_HELO(self): 520 self.write_line(b'HELO example') 521 self.write_line(b'MAIL from eggs@example') 522 self.assertEqual(self.channel.socket.last, 523 b'501 Syntax: MAIL FROM: <address>\r\n') 524 525 def test_MAIL_syntax_EHLO(self): 526 self.write_line(b'EHLO example') 527 self.write_line(b'MAIL from eggs@example') 528 self.assertEqual(self.channel.socket.last, 529 b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n') 530 531 def test_MAIL_missing_address(self): 532 self.write_line(b'HELO example') 533 self.write_line(b'MAIL from:') 534 self.assertEqual(self.channel.socket.last, 535 b'501 Syntax: MAIL FROM: <address>\r\n') 536 537 def test_MAIL_chevrons(self): 538 self.write_line(b'HELO example') 539 self.write_line(b'MAIL from:<eggs@example>') 540 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 541 542 def test_MAIL_empty_chevrons(self): 543 self.write_line(b'EHLO example') 544 self.write_line(b'MAIL from:<>') 545 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 546 547 def test_MAIL_quoted_localpart(self): 548 self.write_line(b'EHLO example') 549 self.write_line(b'MAIL from: <"Fred Blogs"@example.com>') 550 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 551 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 552 553 def test_MAIL_quoted_localpart_no_angles(self): 554 self.write_line(b'EHLO example') 555 self.write_line(b'MAIL from: "Fred Blogs"@example.com') 556 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 557 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 558 559 def test_MAIL_quoted_localpart_with_size(self): 560 self.write_line(b'EHLO example') 561 self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000') 562 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 563 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 564 565 def test_MAIL_quoted_localpart_with_size_no_angles(self): 566 self.write_line(b'EHLO example') 567 self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000') 568 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 569 self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com') 570 571 def test_nested_MAIL(self): 572 self.write_line(b'HELO example') 573 self.write_line(b'MAIL from:eggs@example') 574 self.write_line(b'MAIL from:spam@example') 575 self.assertEqual(self.channel.socket.last, 576 b'503 Error: nested MAIL command\r\n') 577 578 def test_VRFY(self): 579 self.write_line(b'VRFY eggs@example') 580 self.assertEqual(self.channel.socket.last, 581 b'252 Cannot VRFY user, but will accept message and attempt ' + \ 582 b'delivery\r\n') 583 584 def test_VRFY_syntax(self): 585 self.write_line(b'VRFY') 586 self.assertEqual(self.channel.socket.last, 587 b'501 Syntax: VRFY <address>\r\n') 588 589 def test_EXPN_not_implemented(self): 590 self.write_line(b'EXPN') 591 self.assertEqual(self.channel.socket.last, 592 b'502 EXPN not implemented\r\n') 593 594 def test_no_HELO_MAIL(self): 595 self.write_line(b'MAIL from:<[email protected]>') 596 self.assertEqual(self.channel.socket.last, 597 b'503 Error: send HELO first\r\n') 598 599 def test_need_RCPT(self): 600 self.write_line(b'HELO example') 601 self.write_line(b'MAIL From:eggs@example') 602 self.write_line(b'DATA') 603 self.assertEqual(self.channel.socket.last, 604 b'503 Error: need RCPT command\r\n') 605 606 def test_RCPT_syntax_HELO(self): 607 self.write_line(b'HELO example') 608 self.write_line(b'MAIL From: eggs@example') 609 self.write_line(b'RCPT to eggs@example') 610 self.assertEqual(self.channel.socket.last, 611 b'501 Syntax: RCPT TO: <address>\r\n') 612 613 def test_RCPT_syntax_EHLO(self): 614 self.write_line(b'EHLO example') 615 self.write_line(b'MAIL From: eggs@example') 616 self.write_line(b'RCPT to eggs@example') 617 self.assertEqual(self.channel.socket.last, 618 b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n') 619 620 def test_RCPT_lowercase_to_OK(self): 621 self.write_line(b'HELO example') 622 self.write_line(b'MAIL From: eggs@example') 623 self.write_line(b'RCPT to: <eggs@example>') 624 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 625 626 def test_no_HELO_RCPT(self): 627 self.write_line(b'RCPT to eggs@example') 628 self.assertEqual(self.channel.socket.last, 629 b'503 Error: send HELO first\r\n') 630 631 def test_data_dialog(self): 632 self.write_line(b'HELO example') 633 self.write_line(b'MAIL From:eggs@example') 634 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 635 self.write_line(b'RCPT To:spam@example') 636 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 637 638 self.write_line(b'DATA') 639 self.assertEqual(self.channel.socket.last, 640 b'354 End data with <CR><LF>.<CR><LF>\r\n') 641 self.write_line(b'data\r\nmore\r\n.') 642 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 643 self.assertEqual(self.server.messages, 644 [(('peer-address', 'peer-port'), 645 'eggs@example', 646 ['spam@example'], 647 'data\nmore')]) 648 649 def test_DATA_syntax(self): 650 self.write_line(b'HELO example') 651 self.write_line(b'MAIL From:eggs@example') 652 self.write_line(b'RCPT To:spam@example') 653 self.write_line(b'DATA spam') 654 self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n') 655 656 def test_no_HELO_DATA(self): 657 self.write_line(b'DATA spam') 658 self.assertEqual(self.channel.socket.last, 659 b'503 Error: send HELO first\r\n') 660 661 def test_data_transparency_section_4_5_2(self): 662 self.write_line(b'HELO example') 663 self.write_line(b'MAIL From:eggs@example') 664 self.write_line(b'RCPT To:spam@example') 665 self.write_line(b'DATA') 666 self.write_line(b'..\r\n.\r\n') 667 self.assertEqual(self.channel.received_data, '.') 668 669 def test_multiple_RCPT(self): 670 self.write_line(b'HELO example') 671 self.write_line(b'MAIL From:eggs@example') 672 self.write_line(b'RCPT To:spam@example') 673 self.write_line(b'RCPT To:ham@example') 674 self.write_line(b'DATA') 675 self.write_line(b'data\r\n.') 676 self.assertEqual(self.server.messages, 677 [(('peer-address', 'peer-port'), 678 'eggs@example', 679 ['spam@example','ham@example'], 680 'data')]) 681 682 def test_manual_status(self): 683 # checks that the Channel is able to return a custom status message 684 self.write_line(b'HELO example') 685 self.write_line(b'MAIL From:eggs@example') 686 self.write_line(b'RCPT To:spam@example') 687 self.write_line(b'DATA') 688 self.write_line(b'return status\r\n.') 689 self.assertEqual(self.channel.socket.last, b'250 Okish\r\n') 690 691 def test_RSET(self): 692 self.write_line(b'HELO example') 693 self.write_line(b'MAIL From:eggs@example') 694 self.write_line(b'RCPT To:spam@example') 695 self.write_line(b'RSET') 696 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 697 self.write_line(b'MAIL From:foo@example') 698 self.write_line(b'RCPT To:eggs@example') 699 self.write_line(b'DATA') 700 self.write_line(b'data\r\n.') 701 self.assertEqual(self.server.messages, 702 [(('peer-address', 'peer-port'), 703 'foo@example', 704 ['eggs@example'], 705 'data')]) 706 707 def test_HELO_RSET(self): 708 self.write_line(b'HELO example') 709 self.write_line(b'RSET') 710 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 711 712 def test_RSET_syntax(self): 713 self.write_line(b'RSET hi') 714 self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n') 715 716 def test_unknown_command(self): 717 self.write_line(b'UNKNOWN_CMD') 718 self.assertEqual(self.channel.socket.last, 719 b'500 Error: command "UNKNOWN_CMD" not ' + \ 720 b'recognized\r\n') 721 722 def test_attribute_deprecations(self): 723 with warnings_helper.check_warnings(('', DeprecationWarning)): 724 spam = self.channel._SMTPChannel__server 725 with warnings_helper.check_warnings(('', DeprecationWarning)): 726 self.channel._SMTPChannel__server = 'spam' 727 with warnings_helper.check_warnings(('', DeprecationWarning)): 728 spam = self.channel._SMTPChannel__line 729 with warnings_helper.check_warnings(('', DeprecationWarning)): 730 self.channel._SMTPChannel__line = 'spam' 731 with warnings_helper.check_warnings(('', DeprecationWarning)): 732 spam = self.channel._SMTPChannel__state 733 with warnings_helper.check_warnings(('', DeprecationWarning)): 734 self.channel._SMTPChannel__state = 'spam' 735 with warnings_helper.check_warnings(('', DeprecationWarning)): 736 spam = self.channel._SMTPChannel__greeting 737 with warnings_helper.check_warnings(('', DeprecationWarning)): 738 self.channel._SMTPChannel__greeting = 'spam' 739 with warnings_helper.check_warnings(('', DeprecationWarning)): 740 spam = self.channel._SMTPChannel__mailfrom 741 with warnings_helper.check_warnings(('', DeprecationWarning)): 742 self.channel._SMTPChannel__mailfrom = 'spam' 743 with warnings_helper.check_warnings(('', DeprecationWarning)): 744 spam = self.channel._SMTPChannel__rcpttos 745 with warnings_helper.check_warnings(('', DeprecationWarning)): 746 self.channel._SMTPChannel__rcpttos = 'spam' 747 with warnings_helper.check_warnings(('', DeprecationWarning)): 748 spam = self.channel._SMTPChannel__data 749 with warnings_helper.check_warnings(('', DeprecationWarning)): 750 self.channel._SMTPChannel__data = 'spam' 751 with warnings_helper.check_warnings(('', DeprecationWarning)): 752 spam = self.channel._SMTPChannel__fqdn 753 with warnings_helper.check_warnings(('', DeprecationWarning)): 754 self.channel._SMTPChannel__fqdn = 'spam' 755 with warnings_helper.check_warnings(('', DeprecationWarning)): 756 spam = self.channel._SMTPChannel__peer 757 with warnings_helper.check_warnings(('', DeprecationWarning)): 758 self.channel._SMTPChannel__peer = 'spam' 759 with warnings_helper.check_warnings(('', DeprecationWarning)): 760 spam = self.channel._SMTPChannel__conn 761 with warnings_helper.check_warnings(('', DeprecationWarning)): 762 self.channel._SMTPChannel__conn = 'spam' 763 with warnings_helper.check_warnings(('', DeprecationWarning)): 764 spam = self.channel._SMTPChannel__addr 765 with warnings_helper.check_warnings(('', DeprecationWarning)): 766 self.channel._SMTPChannel__addr = 'spam' 767 768@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled") 769class SMTPDChannelIPv6Test(SMTPDChannelTest): 770 def setUp(self): 771 smtpd.socket = asyncore.socket = mock_socket 772 self.old_debugstream = smtpd.DEBUGSTREAM 773 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 774 self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0), 775 decode_data=True) 776 conn, addr = self.server.accept() 777 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 778 decode_data=True) 779 780class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase): 781 782 def setUp(self): 783 smtpd.socket = asyncore.socket = mock_socket 784 self.old_debugstream = smtpd.DEBUGSTREAM 785 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 786 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 787 decode_data=True) 788 conn, addr = self.server.accept() 789 # Set DATA size limit to 32 bytes for easy testing 790 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32, 791 decode_data=True) 792 793 def tearDown(self): 794 asyncore.close_all() 795 asyncore.socket = smtpd.socket = socket 796 smtpd.DEBUGSTREAM = self.old_debugstream 797 798 def write_line(self, line): 799 self.channel.socket.queue_recv(line) 800 self.channel.handle_read() 801 802 def test_data_limit_dialog(self): 803 self.write_line(b'HELO example') 804 self.write_line(b'MAIL From:eggs@example') 805 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 806 self.write_line(b'RCPT To:spam@example') 807 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 808 809 self.write_line(b'DATA') 810 self.assertEqual(self.channel.socket.last, 811 b'354 End data with <CR><LF>.<CR><LF>\r\n') 812 self.write_line(b'data\r\nmore\r\n.') 813 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 814 self.assertEqual(self.server.messages, 815 [(('peer-address', 'peer-port'), 816 'eggs@example', 817 ['spam@example'], 818 'data\nmore')]) 819 820 def test_data_limit_dialog_too_much_data(self): 821 self.write_line(b'HELO example') 822 self.write_line(b'MAIL From:eggs@example') 823 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 824 self.write_line(b'RCPT To:spam@example') 825 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 826 827 self.write_line(b'DATA') 828 self.assertEqual(self.channel.socket.last, 829 b'354 End data with <CR><LF>.<CR><LF>\r\n') 830 self.write_line(b'This message is longer than 32 bytes\r\n.') 831 self.assertEqual(self.channel.socket.last, 832 b'552 Error: Too much mail data\r\n') 833 834 835class SMTPDChannelWithDecodeDataFalse(unittest.TestCase): 836 837 def setUp(self): 838 smtpd.socket = asyncore.socket = mock_socket 839 self.old_debugstream = smtpd.DEBUGSTREAM 840 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 841 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0)) 842 conn, addr = self.server.accept() 843 self.channel = smtpd.SMTPChannel(self.server, conn, addr) 844 845 def tearDown(self): 846 asyncore.close_all() 847 asyncore.socket = smtpd.socket = socket 848 smtpd.DEBUGSTREAM = self.old_debugstream 849 850 def write_line(self, line): 851 self.channel.socket.queue_recv(line) 852 self.channel.handle_read() 853 854 def test_ascii_data(self): 855 self.write_line(b'HELO example') 856 self.write_line(b'MAIL From:eggs@example') 857 self.write_line(b'RCPT To:spam@example') 858 self.write_line(b'DATA') 859 self.write_line(b'plain ascii text') 860 self.write_line(b'.') 861 self.assertEqual(self.channel.received_data, b'plain ascii text') 862 863 def test_utf8_data(self): 864 self.write_line(b'HELO example') 865 self.write_line(b'MAIL From:eggs@example') 866 self.write_line(b'RCPT To:spam@example') 867 self.write_line(b'DATA') 868 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 869 self.write_line(b'and some plain ascii') 870 self.write_line(b'.') 871 self.assertEqual( 872 self.channel.received_data, 873 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n' 874 b'and some plain ascii') 875 876 877class SMTPDChannelWithDecodeDataTrue(unittest.TestCase): 878 879 def setUp(self): 880 smtpd.socket = asyncore.socket = mock_socket 881 self.old_debugstream = smtpd.DEBUGSTREAM 882 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 883 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 884 decode_data=True) 885 conn, addr = self.server.accept() 886 # Set decode_data to True 887 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 888 decode_data=True) 889 890 def tearDown(self): 891 asyncore.close_all() 892 asyncore.socket = smtpd.socket = socket 893 smtpd.DEBUGSTREAM = self.old_debugstream 894 895 def write_line(self, line): 896 self.channel.socket.queue_recv(line) 897 self.channel.handle_read() 898 899 def test_ascii_data(self): 900 self.write_line(b'HELO example') 901 self.write_line(b'MAIL From:eggs@example') 902 self.write_line(b'RCPT To:spam@example') 903 self.write_line(b'DATA') 904 self.write_line(b'plain ascii text') 905 self.write_line(b'.') 906 self.assertEqual(self.channel.received_data, 'plain ascii text') 907 908 def test_utf8_data(self): 909 self.write_line(b'HELO example') 910 self.write_line(b'MAIL From:eggs@example') 911 self.write_line(b'RCPT To:spam@example') 912 self.write_line(b'DATA') 913 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 914 self.write_line(b'and some plain ascii') 915 self.write_line(b'.') 916 self.assertEqual( 917 self.channel.received_data, 918 'utf8 enriched text: żźć\nand some plain ascii') 919 920 921class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase): 922 def setUp(self): 923 smtpd.socket = asyncore.socket = mock_socket 924 self.old_debugstream = smtpd.DEBUGSTREAM 925 self.debug = smtpd.DEBUGSTREAM = io.StringIO() 926 self.server = DummyServer((socket_helper.HOST, 0), ('b', 0), 927 enable_SMTPUTF8=True) 928 conn, addr = self.server.accept() 929 self.channel = smtpd.SMTPChannel(self.server, conn, addr, 930 enable_SMTPUTF8=True) 931 932 def tearDown(self): 933 asyncore.close_all() 934 asyncore.socket = smtpd.socket = socket 935 smtpd.DEBUGSTREAM = self.old_debugstream 936 937 def write_line(self, line): 938 self.channel.socket.queue_recv(line) 939 self.channel.handle_read() 940 941 def test_MAIL_command_accepts_SMTPUTF8_when_announced(self): 942 self.write_line(b'EHLO example') 943 self.write_line( 944 'MAIL from: <naï[email protected]> BODY=8BITMIME SMTPUTF8'.encode( 945 'utf-8') 946 ) 947 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 948 949 def test_process_smtputf8_message(self): 950 self.write_line(b'EHLO example') 951 for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']: 952 self.write_line(b'MAIL from: <a@example> ' + mail_parameters) 953 self.assertEqual(self.channel.socket.last[0:3], b'250') 954 self.write_line(b'rcpt to:<[email protected]>') 955 self.assertEqual(self.channel.socket.last[0:3], b'250') 956 self.write_line(b'data') 957 self.assertEqual(self.channel.socket.last[0:3], b'354') 958 self.write_line(b'c\r\n.') 959 if mail_parameters == b'': 960 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 961 else: 962 self.assertEqual(self.channel.socket.last, 963 b'250 SMTPUTF8 message okish\r\n') 964 965 def test_utf8_data(self): 966 self.write_line(b'EHLO example') 967 self.write_line( 968 'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8')) 969 self.assertEqual(self.channel.socket.last[0:3], b'250') 970 self.write_line('RCPT To:späm@examplé'.encode('utf-8')) 971 self.assertEqual(self.channel.socket.last[0:3], b'250') 972 self.write_line(b'DATA') 973 self.assertEqual(self.channel.socket.last[0:3], b'354') 974 self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 975 self.write_line(b'.') 976 self.assertEqual( 977 self.channel.received_data, 978 b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87') 979 980 def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self): 981 self.write_line(b'ehlo example') 982 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 983 self.write_line(b'MAIL from:<' + 984 b'a' * (fill_len + 1) + 985 b'@example>') 986 self.assertEqual(self.channel.socket.last, 987 b'500 Error: line too long\r\n') 988 self.write_line(b'MAIL from:<' + 989 b'a' * fill_len + 990 b'@example>') 991 self.assertEqual(self.channel.socket.last, b'250 OK\r\n') 992 993 def test_multiple_emails_with_extended_command_length(self): 994 self.write_line(b'ehlo example') 995 fill_len = (512 + 26 + 10) - len('mail from:<@example>') 996 for char in [b'a', b'b', b'c']: 997 self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>') 998 self.assertEqual(self.channel.socket.last[0:3], b'500') 999 self.write_line(b'MAIL from:<' + char * fill_len + b'@example>') 1000 self.assertEqual(self.channel.socket.last[0:3], b'250') 1001 self.write_line(b'rcpt to:<[email protected]>') 1002 self.assertEqual(self.channel.socket.last[0:3], b'250') 1003 self.write_line(b'data') 1004 self.assertEqual(self.channel.socket.last[0:3], b'354') 1005 self.write_line(b'test\r\n.') 1006 self.assertEqual(self.channel.socket.last[0:3], b'250') 1007 1008 1009class MiscTestCase(unittest.TestCase): 1010 def test__all__(self): 1011 not_exported = { 1012 "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE", 1013 "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs", 1014 } 1015 support.check__all__(self, smtpd, not_exported=not_exported) 1016 1017 1018if __name__ == "__main__": 1019 unittest.main() 1020