1"""Test script for poplib module.""" 2 3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL 4# a real test suite 5 6import poplib 7import socket 8import os 9import errno 10import threading 11 12import unittest 13from unittest import TestCase, skipUnless 14from test import support as test_support 15from test.support import hashlib_helper 16from test.support import socket_helper 17from test.support import threading_helper 18from test.support import warnings_helper 19 20 21asynchat = warnings_helper.import_deprecated('asynchat') 22asyncore = warnings_helper.import_deprecated('asyncore') 23 24 25test_support.requires_working_socket(module=True) 26 27HOST = socket_helper.HOST 28PORT = 0 29 30SUPPORTS_SSL = False 31if hasattr(poplib, 'POP3_SSL'): 32 import ssl 33 34 SUPPORTS_SSL = True 35 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 36 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 37 38requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') 39 40# the dummy data returned by server when LIST and RETR commands are issued 41LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' 42RETR_RESP = b"""From: [email protected]\ 43\r\nContent-Type: text/plain\r\n\ 44MIME-Version: 1.0\r\n\ 45Subject: Dummy\r\n\ 46\r\n\ 47line1\r\n\ 48line2\r\n\ 49line3\r\n\ 50.\r\n""" 51 52 53class DummyPOP3Handler(asynchat.async_chat): 54 55 CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']} 56 enable_UTF8 = False 57 58 def __init__(self, conn): 59 asynchat.async_chat.__init__(self, conn) 60 self.set_terminator(b"\r\n") 61 self.in_buffer = [] 62 self.push('+OK dummy pop3 server ready. <timestamp>') 63 self.tls_active = False 64 self.tls_starting = False 65 66 def collect_incoming_data(self, data): 67 self.in_buffer.append(data) 68 69 def found_terminator(self): 70 line = b''.join(self.in_buffer) 71 line = str(line, 'ISO-8859-1') 72 self.in_buffer = [] 73 cmd = line.split(' ')[0].lower() 74 space = line.find(' ') 75 if space != -1: 76 arg = line[space + 1:] 77 else: 78 arg = "" 79 if hasattr(self, 'cmd_' + cmd): 80 method = getattr(self, 'cmd_' + cmd) 81 method(arg) 82 else: 83 self.push('-ERR unrecognized POP3 command "%s".' %cmd) 84 85 def handle_error(self): 86 raise 87 88 def push(self, data): 89 asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') 90 91 def cmd_echo(self, arg): 92 # sends back the received string (used by the test suite) 93 self.push(arg) 94 95 def cmd_user(self, arg): 96 if arg != "guido": 97 self.push("-ERR no such user") 98 self.push('+OK password required') 99 100 def cmd_pass(self, arg): 101 if arg != "python": 102 self.push("-ERR wrong password") 103 self.push('+OK 10 messages') 104 105 def cmd_stat(self, arg): 106 self.push('+OK 10 100') 107 108 def cmd_list(self, arg): 109 if arg: 110 self.push('+OK %s %s' % (arg, arg)) 111 else: 112 self.push('+OK') 113 asynchat.async_chat.push(self, LIST_RESP) 114 115 cmd_uidl = cmd_list 116 117 def cmd_retr(self, arg): 118 self.push('+OK %s bytes' %len(RETR_RESP)) 119 asynchat.async_chat.push(self, RETR_RESP) 120 121 cmd_top = cmd_retr 122 123 def cmd_dele(self, arg): 124 self.push('+OK message marked for deletion.') 125 126 def cmd_noop(self, arg): 127 self.push('+OK done nothing.') 128 129 def cmd_rpop(self, arg): 130 self.push('+OK done nothing.') 131 132 def cmd_apop(self, arg): 133 self.push('+OK done nothing.') 134 135 def cmd_quit(self, arg): 136 self.push('+OK closing.') 137 self.close_when_done() 138 139 def _get_capas(self): 140 _capas = dict(self.CAPAS) 141 if not self.tls_active and SUPPORTS_SSL: 142 _capas['STLS'] = [] 143 return _capas 144 145 def cmd_capa(self, arg): 146 self.push('+OK Capability list follows') 147 if self._get_capas(): 148 for cap, params in self._get_capas().items(): 149 _ln = [cap] 150 if params: 151 _ln.extend(params) 152 self.push(' '.join(_ln)) 153 self.push('.') 154 155 def cmd_utf8(self, arg): 156 self.push('+OK I know RFC6856' 157 if self.enable_UTF8 158 else '-ERR What is UTF8?!') 159 160 if SUPPORTS_SSL: 161 162 def cmd_stls(self, arg): 163 if self.tls_active is False: 164 self.push('+OK Begin TLS negotiation') 165 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 166 context.load_cert_chain(CERTFILE) 167 tls_sock = context.wrap_socket(self.socket, 168 server_side=True, 169 do_handshake_on_connect=False, 170 suppress_ragged_eofs=False) 171 self.del_channel() 172 self.set_socket(tls_sock) 173 self.tls_active = True 174 self.tls_starting = True 175 self.in_buffer = [] 176 self._do_tls_handshake() 177 else: 178 self.push('-ERR Command not permitted when TLS active') 179 180 def _do_tls_handshake(self): 181 try: 182 self.socket.do_handshake() 183 except ssl.SSLError as err: 184 if err.args[0] in (ssl.SSL_ERROR_WANT_READ, 185 ssl.SSL_ERROR_WANT_WRITE): 186 return 187 elif err.args[0] == ssl.SSL_ERROR_EOF: 188 return self.handle_close() 189 # TODO: SSLError does not expose alert information 190 elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or 191 "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]): 192 return self.handle_close() 193 raise 194 except OSError as err: 195 if err.args[0] == errno.ECONNABORTED: 196 return self.handle_close() 197 else: 198 self.tls_active = True 199 self.tls_starting = False 200 201 def handle_read(self): 202 if self.tls_starting: 203 self._do_tls_handshake() 204 else: 205 try: 206 asynchat.async_chat.handle_read(self) 207 except ssl.SSLEOFError: 208 self.handle_close() 209 210class DummyPOP3Server(asyncore.dispatcher, threading.Thread): 211 212 handler = DummyPOP3Handler 213 214 def __init__(self, address, af=socket.AF_INET): 215 threading.Thread.__init__(self) 216 asyncore.dispatcher.__init__(self) 217 self.daemon = True 218 self.create_socket(af, socket.SOCK_STREAM) 219 self.bind(address) 220 self.listen(5) 221 self.active = False 222 self.active_lock = threading.Lock() 223 self.host, self.port = self.socket.getsockname()[:2] 224 self.handler_instance = None 225 226 def start(self): 227 assert not self.active 228 self.__flag = threading.Event() 229 threading.Thread.start(self) 230 self.__flag.wait() 231 232 def run(self): 233 self.active = True 234 self.__flag.set() 235 try: 236 while self.active and asyncore.socket_map: 237 with self.active_lock: 238 asyncore.loop(timeout=0.1, count=1) 239 finally: 240 asyncore.close_all(ignore_all=True) 241 242 def stop(self): 243 assert self.active 244 self.active = False 245 self.join() 246 247 def handle_accepted(self, conn, addr): 248 self.handler_instance = self.handler(conn) 249 250 def handle_connect(self): 251 self.close() 252 handle_read = handle_connect 253 254 def writable(self): 255 return 0 256 257 def handle_error(self): 258 raise 259 260 261class TestPOP3Class(TestCase): 262 def assertOK(self, resp): 263 self.assertTrue(resp.startswith(b"+OK")) 264 265 def setUp(self): 266 self.server = DummyPOP3Server((HOST, PORT)) 267 self.server.start() 268 self.client = poplib.POP3(self.server.host, self.server.port, 269 timeout=test_support.LOOPBACK_TIMEOUT) 270 271 def tearDown(self): 272 self.client.close() 273 self.server.stop() 274 # Explicitly clear the attribute to prevent dangling thread 275 self.server = None 276 277 def test_getwelcome(self): 278 self.assertEqual(self.client.getwelcome(), 279 b'+OK dummy pop3 server ready. <timestamp>') 280 281 def test_exceptions(self): 282 self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') 283 284 def test_user(self): 285 self.assertOK(self.client.user('guido')) 286 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 287 288 def test_pass_(self): 289 self.assertOK(self.client.pass_('python')) 290 self.assertRaises(poplib.error_proto, self.client.user, 'invalid') 291 292 def test_stat(self): 293 self.assertEqual(self.client.stat(), (10, 100)) 294 295 def test_list(self): 296 self.assertEqual(self.client.list()[1:], 297 ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'], 298 25)) 299 self.assertTrue(self.client.list('1').endswith(b"OK 1 1")) 300 301 def test_retr(self): 302 expected = (b'+OK 116 bytes', 303 [b'From: [email protected]', b'Content-Type: text/plain', 304 b'MIME-Version: 1.0', b'Subject: Dummy', 305 b'', b'line1', b'line2', b'line3'], 306 113) 307 foo = self.client.retr('foo') 308 self.assertEqual(foo, expected) 309 310 def test_too_long_lines(self): 311 self.assertRaises(poplib.error_proto, self.client._shortcmd, 312 'echo +%s' % ((poplib._MAXLINE + 10) * 'a')) 313 314 def test_dele(self): 315 self.assertOK(self.client.dele('foo')) 316 317 def test_noop(self): 318 self.assertOK(self.client.noop()) 319 320 def test_rpop(self): 321 self.assertOK(self.client.rpop('foo')) 322 323 @hashlib_helper.requires_hashdigest('md5', openssl=True) 324 def test_apop_normal(self): 325 self.assertOK(self.client.apop('foo', 'dummypassword')) 326 327 @hashlib_helper.requires_hashdigest('md5', openssl=True) 328 def test_apop_REDOS(self): 329 # Replace welcome with very long evil welcome. 330 # NB The upper bound on welcome length is currently 2048. 331 # At this length, evil input makes each apop call take 332 # on the order of milliseconds instead of microseconds. 333 evil_welcome = b'+OK' + (b'<' * 1000000) 334 with test_support.swap_attr(self.client, 'welcome', evil_welcome): 335 # The evil welcome is invalid, so apop should throw. 336 self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb') 337 338 def test_top(self): 339 expected = (b'+OK 116 bytes', 340 [b'From: [email protected]', b'Content-Type: text/plain', 341 b'MIME-Version: 1.0', b'Subject: Dummy', b'', 342 b'line1', b'line2', b'line3'], 343 113) 344 self.assertEqual(self.client.top(1, 1), expected) 345 346 def test_uidl(self): 347 self.client.uidl() 348 self.client.uidl('foo') 349 350 def test_utf8_raises_if_unsupported(self): 351 self.server.handler.enable_UTF8 = False 352 self.assertRaises(poplib.error_proto, self.client.utf8) 353 354 def test_utf8(self): 355 self.server.handler.enable_UTF8 = True 356 expected = b'+OK I know RFC6856' 357 result = self.client.utf8() 358 self.assertEqual(result, expected) 359 360 def test_capa(self): 361 capa = self.client.capa() 362 self.assertTrue('IMPLEMENTATION' in capa.keys()) 363 364 def test_quit(self): 365 resp = self.client.quit() 366 self.assertTrue(resp) 367 self.assertIsNone(self.client.sock) 368 self.assertIsNone(self.client.file) 369 370 @requires_ssl 371 def test_stls_capa(self): 372 capa = self.client.capa() 373 self.assertTrue('STLS' in capa.keys()) 374 375 @requires_ssl 376 def test_stls(self): 377 expected = b'+OK Begin TLS negotiation' 378 resp = self.client.stls() 379 self.assertEqual(resp, expected) 380 381 @requires_ssl 382 def test_stls_context(self): 383 expected = b'+OK Begin TLS negotiation' 384 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 385 ctx.load_verify_locations(CAFILE) 386 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 387 self.assertEqual(ctx.check_hostname, True) 388 with self.assertRaises(ssl.CertificateError): 389 resp = self.client.stls(context=ctx) 390 self.client = poplib.POP3("localhost", self.server.port, 391 timeout=test_support.LOOPBACK_TIMEOUT) 392 resp = self.client.stls(context=ctx) 393 self.assertEqual(resp, expected) 394 395 396if SUPPORTS_SSL: 397 from test.test_ftplib import SSLConnection 398 399 class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler): 400 401 def __init__(self, conn): 402 asynchat.async_chat.__init__(self, conn) 403 self.secure_connection() 404 self.set_terminator(b"\r\n") 405 self.in_buffer = [] 406 self.push('+OK dummy pop3 server ready. <timestamp>') 407 self.tls_active = True 408 self.tls_starting = False 409 410 411@requires_ssl 412class TestPOP3_SSLClass(TestPOP3Class): 413 # repeat previous tests by using poplib.POP3_SSL 414 415 def setUp(self): 416 self.server = DummyPOP3Server((HOST, PORT)) 417 self.server.handler = DummyPOP3_SSLHandler 418 self.server.start() 419 self.client = poplib.POP3_SSL(self.server.host, self.server.port) 420 421 def test__all__(self): 422 self.assertIn('POP3_SSL', poplib.__all__) 423 424 def test_context(self): 425 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 426 ctx.check_hostname = False 427 ctx.verify_mode = ssl.CERT_NONE 428 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 429 self.server.port, keyfile=CERTFILE, context=ctx) 430 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 431 self.server.port, certfile=CERTFILE, context=ctx) 432 self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, 433 self.server.port, keyfile=CERTFILE, 434 certfile=CERTFILE, context=ctx) 435 436 self.client.quit() 437 self.client = poplib.POP3_SSL(self.server.host, self.server.port, 438 context=ctx) 439 self.assertIsInstance(self.client.sock, ssl.SSLSocket) 440 self.assertIs(self.client.sock.context, ctx) 441 self.assertTrue(self.client.noop().startswith(b'+OK')) 442 443 def test_stls(self): 444 self.assertRaises(poplib.error_proto, self.client.stls) 445 446 test_stls_context = test_stls 447 448 def test_stls_capa(self): 449 capa = self.client.capa() 450 self.assertFalse('STLS' in capa.keys()) 451 452 453@requires_ssl 454class TestPOP3_TLSClass(TestPOP3Class): 455 # repeat previous tests by using poplib.POP3.stls() 456 457 def setUp(self): 458 self.server = DummyPOP3Server((HOST, PORT)) 459 self.server.start() 460 self.client = poplib.POP3(self.server.host, self.server.port, 461 timeout=test_support.LOOPBACK_TIMEOUT) 462 self.client.stls() 463 464 def tearDown(self): 465 if self.client.file is not None and self.client.sock is not None: 466 try: 467 self.client.quit() 468 except poplib.error_proto: 469 # happens in the test_too_long_lines case; the overlong 470 # response will be treated as response to QUIT and raise 471 # this exception 472 self.client.close() 473 self.server.stop() 474 # Explicitly clear the attribute to prevent dangling thread 475 self.server = None 476 477 def test_stls(self): 478 self.assertRaises(poplib.error_proto, self.client.stls) 479 480 test_stls_context = test_stls 481 482 def test_stls_capa(self): 483 capa = self.client.capa() 484 self.assertFalse(b'STLS' in capa.keys()) 485 486 487class TestTimeouts(TestCase): 488 489 def setUp(self): 490 self.evt = threading.Event() 491 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 492 self.sock.settimeout(60) # Safety net. Look issue 11812 493 self.port = socket_helper.bind_port(self.sock) 494 self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock)) 495 self.thread.daemon = True 496 self.thread.start() 497 self.evt.wait() 498 499 def tearDown(self): 500 self.thread.join() 501 # Explicitly clear the attribute to prevent dangling thread 502 self.thread = None 503 504 def server(self, evt, serv): 505 serv.listen() 506 evt.set() 507 try: 508 conn, addr = serv.accept() 509 conn.send(b"+ Hola mundo\n") 510 conn.close() 511 except TimeoutError: 512 pass 513 finally: 514 serv.close() 515 516 def testTimeoutDefault(self): 517 self.assertIsNone(socket.getdefaulttimeout()) 518 socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT) 519 try: 520 pop = poplib.POP3(HOST, self.port) 521 finally: 522 socket.setdefaulttimeout(None) 523 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 524 pop.close() 525 526 def testTimeoutNone(self): 527 self.assertIsNone(socket.getdefaulttimeout()) 528 socket.setdefaulttimeout(30) 529 try: 530 pop = poplib.POP3(HOST, self.port, timeout=None) 531 finally: 532 socket.setdefaulttimeout(None) 533 self.assertIsNone(pop.sock.gettimeout()) 534 pop.close() 535 536 def testTimeoutValue(self): 537 pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT) 538 self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT) 539 pop.close() 540 with self.assertRaises(ValueError): 541 poplib.POP3(HOST, self.port, timeout=0) 542 543 544def setUpModule(): 545 thread_info = threading_helper.threading_setup() 546 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 547 548 549if __name__ == '__main__': 550 unittest.main() 551