1from test import support 2from test.support import socket_helper 3 4from contextlib import contextmanager 5import imaplib 6import os.path 7import socketserver 8import time 9import calendar 10import threading 11import socket 12 13from test.support import (verbose, 14 run_with_tz, run_with_locale, cpython_only, 15 requires_working_socket) 16from test.support import hashlib_helper 17from test.support import threading_helper 18from test.support import warnings_helper 19import unittest 20from unittest import mock 21from datetime import datetime, timezone, timedelta 22try: 23 import ssl 24except ImportError: 25 ssl = None 26 27support.requires_working_socket(module=True) 28 29CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem") 30CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem") 31 32 33class TestImaplib(unittest.TestCase): 34 35 def test_Internaldate2tuple(self): 36 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 37 tt = imaplib.Internaldate2tuple( 38 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 39 self.assertEqual(time.mktime(tt), t0) 40 tt = imaplib.Internaldate2tuple( 41 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")') 42 self.assertEqual(time.mktime(tt), t0) 43 tt = imaplib.Internaldate2tuple( 44 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")') 45 self.assertEqual(time.mktime(tt), t0) 46 47 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0') 48 def test_Internaldate2tuple_issue10941(self): 49 self.assertNotEqual(imaplib.Internaldate2tuple( 50 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'), 51 imaplib.Internaldate2tuple( 52 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")')) 53 54 def timevalues(self): 55 return [2000000000, 2000000000.0, time.localtime(2000000000), 56 (2033, 5, 18, 5, 33, 20, -1, -1, -1), 57 (2033, 5, 18, 5, 33, 20, -1, -1, 1), 58 datetime.fromtimestamp(2000000000, 59 timezone(timedelta(0, 2 * 60 * 60))), 60 '"18-May-2033 05:33:20 +0200"'] 61 62 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR') 63 # DST rules included to work around quirk where the Gnu C library may not 64 # otherwise restore the previous time zone 65 @run_with_tz('STD-1DST,M3.2.0,M11.1.0') 66 def test_Time2Internaldate(self): 67 expected = '"18-May-2033 05:33:20 +0200"' 68 69 for t in self.timevalues(): 70 internal = imaplib.Time2Internaldate(t) 71 self.assertEqual(internal, expected) 72 73 def test_that_Time2Internaldate_returns_a_result(self): 74 # Without tzset, we can check only that it successfully 75 # produces a result, not the correctness of the result itself, 76 # since the result depends on the timezone the machine is in. 77 for t in self.timevalues(): 78 imaplib.Time2Internaldate(t) 79 80 def test_imap4_host_default_value(self): 81 # Check whether the IMAP4_PORT is truly unavailable. 82 with socket.socket() as s: 83 try: 84 s.connect(('', imaplib.IMAP4_PORT)) 85 self.skipTest( 86 "Cannot run the test with local IMAP server running.") 87 except socket.error: 88 pass 89 90 # This is the exception that should be raised. 91 expected_errnos = socket_helper.get_socket_conn_refused_errs() 92 with self.assertRaises(OSError) as cm: 93 imaplib.IMAP4() 94 self.assertIn(cm.exception.errno, expected_errnos) 95 96 97if ssl: 98 class SecureTCPServer(socketserver.TCPServer): 99 100 def get_request(self): 101 newsocket, fromaddr = self.socket.accept() 102 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 103 context.load_cert_chain(CERTFILE) 104 connstream = context.wrap_socket(newsocket, server_side=True) 105 return connstream, fromaddr 106 107 IMAP4_SSL = imaplib.IMAP4_SSL 108 109else: 110 111 class SecureTCPServer: 112 pass 113 114 IMAP4_SSL = None 115 116 117class SimpleIMAPHandler(socketserver.StreamRequestHandler): 118 timeout = support.LOOPBACK_TIMEOUT 119 continuation = None 120 capabilities = '' 121 122 def setup(self): 123 super().setup() 124 self.server.is_selected = False 125 self.server.logged = None 126 127 def _send(self, message): 128 if verbose: 129 print("SENT: %r" % message.strip()) 130 self.wfile.write(message) 131 132 def _send_line(self, message): 133 self._send(message + b'\r\n') 134 135 def _send_textline(self, message): 136 self._send_line(message.encode('ASCII')) 137 138 def _send_tagged(self, tag, code, message): 139 self._send_textline(' '.join((tag, code, message))) 140 141 def handle(self): 142 # Send a welcome message. 143 self._send_textline('* OK IMAP4rev1') 144 while 1: 145 # Gather up input until we receive a line terminator or we timeout. 146 # Accumulate read(1) because it's simpler to handle the differences 147 # between naked sockets and SSL sockets. 148 line = b'' 149 while 1: 150 try: 151 part = self.rfile.read(1) 152 if part == b'': 153 # Naked sockets return empty strings.. 154 return 155 line += part 156 except OSError: 157 # ..but SSLSockets raise exceptions. 158 return 159 if line.endswith(b'\r\n'): 160 break 161 162 if verbose: 163 print('GOT: %r' % line.strip()) 164 if self.continuation: 165 try: 166 self.continuation.send(line) 167 except StopIteration: 168 self.continuation = None 169 continue 170 splitline = line.decode('ASCII').split() 171 tag = splitline[0] 172 cmd = splitline[1] 173 args = splitline[2:] 174 175 if hasattr(self, 'cmd_' + cmd): 176 continuation = getattr(self, 'cmd_' + cmd)(tag, args) 177 if continuation: 178 self.continuation = continuation 179 next(continuation) 180 else: 181 self._send_tagged(tag, 'BAD', cmd + ' unknown') 182 183 def cmd_CAPABILITY(self, tag, args): 184 caps = ('IMAP4rev1 ' + self.capabilities 185 if self.capabilities 186 else 'IMAP4rev1') 187 self._send_textline('* CAPABILITY ' + caps) 188 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 189 190 def cmd_LOGOUT(self, tag, args): 191 self.server.logged = None 192 self._send_textline('* BYE IMAP4ref1 Server logging out') 193 self._send_tagged(tag, 'OK', 'LOGOUT completed') 194 195 def cmd_LOGIN(self, tag, args): 196 self.server.logged = args[0] 197 self._send_tagged(tag, 'OK', 'LOGIN completed') 198 199 def cmd_SELECT(self, tag, args): 200 self.server.is_selected = True 201 self._send_line(b'* 2 EXISTS') 202 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 203 204 def cmd_UNSELECT(self, tag, args): 205 if self.server.is_selected: 206 self.server.is_selected = False 207 self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') 208 else: 209 self._send_tagged(tag, 'BAD', 'No mailbox selected') 210 211 212class NewIMAPTestsMixin(): 213 client = None 214 215 def _setup(self, imap_handler, connect=True): 216 """ 217 Sets up imap_handler for tests. imap_handler should inherit from either: 218 - SimpleIMAPHandler - for testing IMAP commands, 219 - socketserver.StreamRequestHandler - if raw access to stream is needed. 220 Returns (client, server). 221 """ 222 class TestTCPServer(self.server_class): 223 def handle_error(self, request, client_address): 224 """ 225 End request and raise the error if one occurs. 226 """ 227 self.close_request(request) 228 self.server_close() 229 raise 230 231 self.addCleanup(self._cleanup) 232 self.server = self.server_class((socket_helper.HOST, 0), imap_handler) 233 self.thread = threading.Thread( 234 name=self._testMethodName+'-server', 235 target=self.server.serve_forever, 236 # Short poll interval to make the test finish quickly. 237 # Time between requests is short enough that we won't wake 238 # up spuriously too many times. 239 kwargs={'poll_interval': 0.01}) 240 self.thread.daemon = True # In case this function raises. 241 self.thread.start() 242 243 if connect: 244 self.client = self.imap_class(*self.server.server_address) 245 246 return self.client, self.server 247 248 def _cleanup(self): 249 """ 250 Cleans up the test server. This method should not be called manually, 251 it is added to the cleanup queue in the _setup method already. 252 """ 253 # if logout was called already we'd raise an exception trying to 254 # shutdown the client once again 255 if self.client is not None and self.client.state != 'LOGOUT': 256 self.client.shutdown() 257 # cleanup the server 258 self.server.shutdown() 259 self.server.server_close() 260 threading_helper.join_thread(self.thread) 261 # Explicitly clear the attribute to prevent dangling thread 262 self.thread = None 263 264 def test_EOF_without_complete_welcome_message(self): 265 # http://bugs.python.org/issue5949 266 class EOFHandler(socketserver.StreamRequestHandler): 267 def handle(self): 268 self.wfile.write(b'* OK') 269 _, server = self._setup(EOFHandler, connect=False) 270 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 271 *server.server_address) 272 273 def test_line_termination(self): 274 class BadNewlineHandler(SimpleIMAPHandler): 275 def cmd_CAPABILITY(self, tag, args): 276 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 277 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 278 _, server = self._setup(BadNewlineHandler, connect=False) 279 self.assertRaises(imaplib.IMAP4.abort, self.imap_class, 280 *server.server_address) 281 282 def test_enable_raises_error_if_not_AUTH(self): 283 class EnableHandler(SimpleIMAPHandler): 284 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 285 client, _ = self._setup(EnableHandler) 286 self.assertFalse(client.utf8_enabled) 287 with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): 288 client.enable('foo') 289 self.assertFalse(client.utf8_enabled) 290 291 def test_enable_raises_error_if_no_capability(self): 292 client, _ = self._setup(SimpleIMAPHandler) 293 with self.assertRaisesRegex(imaplib.IMAP4.error, 294 'does not support ENABLE'): 295 client.enable('foo') 296 297 def test_enable_UTF8_raises_error_if_not_supported(self): 298 client, _ = self._setup(SimpleIMAPHandler) 299 typ, data = client.login('user', 'pass') 300 self.assertEqual(typ, 'OK') 301 with self.assertRaisesRegex(imaplib.IMAP4.error, 302 'does not support ENABLE'): 303 client.enable('UTF8=ACCEPT') 304 305 def test_enable_UTF8_True_append(self): 306 class UTF8AppendServer(SimpleIMAPHandler): 307 capabilities = 'ENABLE UTF8=ACCEPT' 308 def cmd_ENABLE(self, tag, args): 309 self._send_tagged(tag, 'OK', 'ENABLE successful') 310 def cmd_AUTHENTICATE(self, tag, args): 311 self._send_textline('+') 312 self.server.response = yield 313 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 314 def cmd_APPEND(self, tag, args): 315 self._send_textline('+') 316 self.server.response = yield 317 self._send_tagged(tag, 'OK', 'okay') 318 client, server = self._setup(UTF8AppendServer) 319 self.assertEqual(client._encoding, 'ascii') 320 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 321 self.assertEqual(code, 'OK') 322 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 323 code, _ = client.enable('UTF8=ACCEPT') 324 self.assertEqual(code, 'OK') 325 self.assertEqual(client._encoding, 'utf-8') 326 msg_string = 'Subject: üñí©öðé' 327 typ, data = client.append(None, None, None, msg_string.encode('utf-8')) 328 self.assertEqual(typ, 'OK') 329 self.assertEqual(server.response, 330 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) 331 332 def test_search_disallows_charset_in_utf8_mode(self): 333 class UTF8Server(SimpleIMAPHandler): 334 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 335 def cmd_ENABLE(self, tag, args): 336 self._send_tagged(tag, 'OK', 'ENABLE successful') 337 def cmd_AUTHENTICATE(self, tag, args): 338 self._send_textline('+') 339 self.server.response = yield 340 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 341 client, _ = self._setup(UTF8Server) 342 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 343 self.assertEqual(typ, 'OK') 344 typ, _ = client.enable('UTF8=ACCEPT') 345 self.assertEqual(typ, 'OK') 346 self.assertTrue(client.utf8_enabled) 347 with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): 348 client.search('foo', 'bar') 349 350 def test_bad_auth_name(self): 351 class MyServer(SimpleIMAPHandler): 352 def cmd_AUTHENTICATE(self, tag, args): 353 self._send_tagged(tag, 'NO', 354 'unrecognized authentication type {}'.format(args[0])) 355 client, _ = self._setup(MyServer) 356 with self.assertRaisesRegex(imaplib.IMAP4.error, 357 'unrecognized authentication type METHOD'): 358 client.authenticate('METHOD', lambda: 1) 359 360 def test_invalid_authentication(self): 361 class MyServer(SimpleIMAPHandler): 362 def cmd_AUTHENTICATE(self, tag, args): 363 self._send_textline('+') 364 self.response = yield 365 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 366 client, _ = self._setup(MyServer) 367 with self.assertRaisesRegex(imaplib.IMAP4.error, 368 r'\[AUTHENTICATIONFAILED\] invalid'): 369 client.authenticate('MYAUTH', lambda x: b'fake') 370 371 def test_valid_authentication_bytes(self): 372 class MyServer(SimpleIMAPHandler): 373 def cmd_AUTHENTICATE(self, tag, args): 374 self._send_textline('+') 375 self.server.response = yield 376 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 377 client, server = self._setup(MyServer) 378 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 379 self.assertEqual(code, 'OK') 380 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 381 382 def test_valid_authentication_plain_text(self): 383 class MyServer(SimpleIMAPHandler): 384 def cmd_AUTHENTICATE(self, tag, args): 385 self._send_textline('+') 386 self.server.response = yield 387 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 388 client, server = self._setup(MyServer) 389 code, _ = client.authenticate('MYAUTH', lambda x: 'fake') 390 self.assertEqual(code, 'OK') 391 self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' 392 393 @hashlib_helper.requires_hashdigest('md5', openssl=True) 394 def test_login_cram_md5_bytes(self): 395 class AuthHandler(SimpleIMAPHandler): 396 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 397 def cmd_AUTHENTICATE(self, tag, args): 398 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 399 'VzdG9uLm1jaS5uZXQ=') 400 r = yield 401 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 402 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 403 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 404 else: 405 self._send_tagged(tag, 'NO', 'No access') 406 client, _ = self._setup(AuthHandler) 407 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 408 ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") 409 self.assertEqual(ret, "OK") 410 411 @hashlib_helper.requires_hashdigest('md5', openssl=True) 412 def test_login_cram_md5_plain_text(self): 413 class AuthHandler(SimpleIMAPHandler): 414 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 415 def cmd_AUTHENTICATE(self, tag, args): 416 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 417 'VzdG9uLm1jaS5uZXQ=') 418 r = yield 419 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 420 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 421 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 422 else: 423 self._send_tagged(tag, 'NO', 'No access') 424 client, _ = self._setup(AuthHandler) 425 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 426 ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") 427 self.assertEqual(ret, "OK") 428 429 def test_aborted_authentication(self): 430 class MyServer(SimpleIMAPHandler): 431 def cmd_AUTHENTICATE(self, tag, args): 432 self._send_textline('+') 433 self.response = yield 434 if self.response == b'*\r\n': 435 self._send_tagged( 436 tag, 437 'NO', 438 '[AUTHENTICATIONFAILED] aborted') 439 else: 440 self._send_tagged(tag, 'OK', 'MYAUTH successful') 441 client, _ = self._setup(MyServer) 442 with self.assertRaisesRegex(imaplib.IMAP4.error, 443 r'\[AUTHENTICATIONFAILED\] aborted'): 444 client.authenticate('MYAUTH', lambda x: None) 445 446 @mock.patch('imaplib._MAXLINE', 10) 447 def test_linetoolong(self): 448 class TooLongHandler(SimpleIMAPHandler): 449 def handle(self): 450 # send response line longer than the limit set in the next line 451 self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') 452 _, server = self._setup(TooLongHandler, connect=False) 453 with self.assertRaisesRegex(imaplib.IMAP4.error, 454 'got more than 10 bytes'): 455 self.imap_class(*server.server_address) 456 457 def test_simple_with_statement(self): 458 _, server = self._setup(SimpleIMAPHandler, connect=False) 459 with self.imap_class(*server.server_address): 460 pass 461 462 def test_imaplib_timeout_test(self): 463 _, server = self._setup(SimpleIMAPHandler) 464 addr = server.server_address[1] 465 client = self.imap_class("localhost", addr, timeout=None) 466 self.assertEqual(client.sock.timeout, None) 467 client.shutdown() 468 client = self.imap_class("localhost", addr, timeout=support.LOOPBACK_TIMEOUT) 469 self.assertEqual(client.sock.timeout, support.LOOPBACK_TIMEOUT) 470 client.shutdown() 471 with self.assertRaises(ValueError): 472 client = self.imap_class("localhost", addr, timeout=0) 473 474 def test_imaplib_timeout_functionality_test(self): 475 class TimeoutHandler(SimpleIMAPHandler): 476 def handle(self): 477 time.sleep(1) 478 SimpleIMAPHandler.handle(self) 479 480 _, server = self._setup(TimeoutHandler) 481 addr = server.server_address[1] 482 with self.assertRaises(TimeoutError): 483 client = self.imap_class("localhost", addr, timeout=0.001) 484 485 def test_with_statement(self): 486 _, server = self._setup(SimpleIMAPHandler, connect=False) 487 with self.imap_class(*server.server_address) as imap: 488 imap.login('user', 'pass') 489 self.assertEqual(server.logged, 'user') 490 self.assertIsNone(server.logged) 491 492 def test_with_statement_logout(self): 493 # It is legal to log out explicitly inside the with block 494 _, server = self._setup(SimpleIMAPHandler, connect=False) 495 with self.imap_class(*server.server_address) as imap: 496 imap.login('user', 'pass') 497 self.assertEqual(server.logged, 'user') 498 imap.logout() 499 self.assertIsNone(server.logged) 500 self.assertIsNone(server.logged) 501 502 # command tests 503 504 def test_login(self): 505 client, _ = self._setup(SimpleIMAPHandler) 506 typ, data = client.login('user', 'pass') 507 self.assertEqual(typ, 'OK') 508 self.assertEqual(data[0], b'LOGIN completed') 509 self.assertEqual(client.state, 'AUTH') 510 511 def test_logout(self): 512 client, _ = self._setup(SimpleIMAPHandler) 513 typ, data = client.login('user', 'pass') 514 self.assertEqual(typ, 'OK') 515 self.assertEqual(data[0], b'LOGIN completed') 516 typ, data = client.logout() 517 self.assertEqual(typ, 'BYE', (typ, data)) 518 self.assertEqual(data[0], b'IMAP4ref1 Server logging out', (typ, data)) 519 self.assertEqual(client.state, 'LOGOUT') 520 521 def test_lsub(self): 522 class LsubCmd(SimpleIMAPHandler): 523 def cmd_LSUB(self, tag, args): 524 self._send_textline('* LSUB () "." directoryA') 525 return self._send_tagged(tag, 'OK', 'LSUB completed') 526 client, _ = self._setup(LsubCmd) 527 client.login('user', 'pass') 528 typ, data = client.lsub() 529 self.assertEqual(typ, 'OK') 530 self.assertEqual(data[0], b'() "." directoryA') 531 532 def test_unselect(self): 533 client, _ = self._setup(SimpleIMAPHandler) 534 client.login('user', 'pass') 535 typ, data = client.select() 536 self.assertEqual(typ, 'OK') 537 self.assertEqual(data[0], b'2') 538 539 typ, data = client.unselect() 540 self.assertEqual(typ, 'OK') 541 self.assertEqual(data[0], b'Returned to authenticated state. (Success)') 542 self.assertEqual(client.state, 'AUTH') 543 544 545class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): 546 imap_class = imaplib.IMAP4 547 server_class = socketserver.TCPServer 548 549 550@unittest.skipUnless(ssl, "SSL not available") 551class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): 552 imap_class = IMAP4_SSL 553 server_class = SecureTCPServer 554 555 def test_ssl_raises(self): 556 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 557 self.assertEqual(ssl_context.verify_mode, ssl.CERT_REQUIRED) 558 self.assertEqual(ssl_context.check_hostname, True) 559 ssl_context.load_verify_locations(CAFILE) 560 561 with self.assertRaisesRegex(ssl.CertificateError, 562 "IP address mismatch, certificate is not valid for " 563 "'127.0.0.1'"): 564 _, server = self._setup(SimpleIMAPHandler) 565 client = self.imap_class(*server.server_address, 566 ssl_context=ssl_context) 567 client.shutdown() 568 569 def test_ssl_verified(self): 570 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 571 ssl_context.load_verify_locations(CAFILE) 572 573 _, server = self._setup(SimpleIMAPHandler) 574 client = self.imap_class("localhost", server.server_address[1], 575 ssl_context=ssl_context) 576 client.shutdown() 577 578 # Mock the private method _connect(), so mark the test as specific 579 # to CPython stdlib 580 @cpython_only 581 def test_certfile_arg_warn(self): 582 with warnings_helper.check_warnings(('', DeprecationWarning)): 583 with mock.patch.object(self.imap_class, 'open'): 584 with mock.patch.object(self.imap_class, '_connect'): 585 self.imap_class('localhost', 143, certfile=CERTFILE) 586 587class ThreadedNetworkedTests(unittest.TestCase): 588 server_class = socketserver.TCPServer 589 imap_class = imaplib.IMAP4 590 591 def make_server(self, addr, hdlr): 592 593 class MyServer(self.server_class): 594 def handle_error(self, request, client_address): 595 self.close_request(request) 596 self.server_close() 597 raise 598 599 if verbose: 600 print("creating server") 601 server = MyServer(addr, hdlr) 602 self.assertEqual(server.server_address, server.socket.getsockname()) 603 604 if verbose: 605 print("server created") 606 print("ADDR =", addr) 607 print("CLASS =", self.server_class) 608 print("HDLR =", server.RequestHandlerClass) 609 610 t = threading.Thread( 611 name='%s serving' % self.server_class, 612 target=server.serve_forever, 613 # Short poll interval to make the test finish quickly. 614 # Time between requests is short enough that we won't wake 615 # up spuriously too many times. 616 kwargs={'poll_interval': 0.01}) 617 t.daemon = True # In case this function raises. 618 t.start() 619 if verbose: 620 print("server running") 621 return server, t 622 623 def reap_server(self, server, thread): 624 if verbose: 625 print("waiting for server") 626 server.shutdown() 627 server.server_close() 628 thread.join() 629 if verbose: 630 print("done") 631 632 @contextmanager 633 def reaped_server(self, hdlr): 634 server, thread = self.make_server((socket_helper.HOST, 0), hdlr) 635 try: 636 yield server 637 finally: 638 self.reap_server(server, thread) 639 640 @contextmanager 641 def reaped_pair(self, hdlr): 642 with self.reaped_server(hdlr) as server: 643 client = self.imap_class(*server.server_address) 644 try: 645 yield server, client 646 finally: 647 client.logout() 648 649 @threading_helper.reap_threads 650 def test_connect(self): 651 with self.reaped_server(SimpleIMAPHandler) as server: 652 client = self.imap_class(*server.server_address) 653 client.shutdown() 654 655 @threading_helper.reap_threads 656 def test_bracket_flags(self): 657 658 # This violates RFC 3501, which disallows ']' characters in tag names, 659 # but imaplib has allowed producing such tags forever, other programs 660 # also produce them (eg: OtherInbox's Organizer app as of 20140716), 661 # and Gmail, for example, accepts them and produces them. So we 662 # support them. See issue #21815. 663 664 class BracketFlagHandler(SimpleIMAPHandler): 665 666 def handle(self): 667 self.flags = ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft'] 668 super().handle() 669 670 def cmd_AUTHENTICATE(self, tag, args): 671 self._send_textline('+') 672 self.server.response = yield 673 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 674 675 def cmd_SELECT(self, tag, args): 676 flag_msg = ' \\'.join(self.flags) 677 self._send_line(('* FLAGS (%s)' % flag_msg).encode('ascii')) 678 self._send_line(b'* 2 EXISTS') 679 self._send_line(b'* 0 RECENT') 680 msg = ('* OK [PERMANENTFLAGS %s \\*)] Flags permitted.' 681 % flag_msg) 682 self._send_line(msg.encode('ascii')) 683 self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') 684 685 def cmd_STORE(self, tag, args): 686 new_flags = args[2].strip('(').strip(')').split() 687 self.flags.extend(new_flags) 688 flags_msg = '(FLAGS (%s))' % ' \\'.join(self.flags) 689 msg = '* %s FETCH %s' % (args[0], flags_msg) 690 self._send_line(msg.encode('ascii')) 691 self._send_tagged(tag, 'OK', 'STORE completed.') 692 693 with self.reaped_pair(BracketFlagHandler) as (server, client): 694 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 695 self.assertEqual(code, 'OK') 696 self.assertEqual(server.response, b'ZmFrZQ==\r\n') 697 client.select('test') 698 typ, [data] = client.store(b'1', "+FLAGS", "[test]") 699 self.assertIn(b'[test]', data) 700 client.select('test') 701 typ, [data] = client.response('PERMANENTFLAGS') 702 self.assertIn(b'[test]', data) 703 704 @threading_helper.reap_threads 705 def test_issue5949(self): 706 707 class EOFHandler(socketserver.StreamRequestHandler): 708 def handle(self): 709 # EOF without sending a complete welcome message. 710 self.wfile.write(b'* OK') 711 712 with self.reaped_server(EOFHandler) as server: 713 self.assertRaises(imaplib.IMAP4.abort, 714 self.imap_class, *server.server_address) 715 716 @threading_helper.reap_threads 717 def test_line_termination(self): 718 719 class BadNewlineHandler(SimpleIMAPHandler): 720 721 def cmd_CAPABILITY(self, tag, args): 722 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 723 self._send_tagged(tag, 'OK', 'CAPABILITY completed') 724 725 with self.reaped_server(BadNewlineHandler) as server: 726 self.assertRaises(imaplib.IMAP4.abort, 727 self.imap_class, *server.server_address) 728 729 class UTF8Server(SimpleIMAPHandler): 730 capabilities = 'AUTH ENABLE UTF8=ACCEPT' 731 732 def cmd_ENABLE(self, tag, args): 733 self._send_tagged(tag, 'OK', 'ENABLE successful') 734 735 def cmd_AUTHENTICATE(self, tag, args): 736 self._send_textline('+') 737 self.server.response = yield 738 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 739 740 @threading_helper.reap_threads 741 def test_enable_raises_error_if_not_AUTH(self): 742 with self.reaped_pair(self.UTF8Server) as (server, client): 743 self.assertFalse(client.utf8_enabled) 744 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 745 self.assertFalse(client.utf8_enabled) 746 747 # XXX Also need a test that enable after SELECT raises an error. 748 749 @threading_helper.reap_threads 750 def test_enable_raises_error_if_no_capability(self): 751 class NoEnableServer(self.UTF8Server): 752 capabilities = 'AUTH' 753 with self.reaped_pair(NoEnableServer) as (server, client): 754 self.assertRaises(imaplib.IMAP4.error, client.enable, 'foo') 755 756 @threading_helper.reap_threads 757 def test_enable_UTF8_raises_error_if_not_supported(self): 758 class NonUTF8Server(SimpleIMAPHandler): 759 pass 760 with self.assertRaises(imaplib.IMAP4.error): 761 with self.reaped_pair(NonUTF8Server) as (server, client): 762 typ, data = client.login('user', 'pass') 763 self.assertEqual(typ, 'OK') 764 client.enable('UTF8=ACCEPT') 765 pass 766 767 @threading_helper.reap_threads 768 def test_enable_UTF8_True_append(self): 769 770 class UTF8AppendServer(self.UTF8Server): 771 def cmd_APPEND(self, tag, args): 772 self._send_textline('+') 773 self.server.response = yield 774 self._send_tagged(tag, 'OK', 'okay') 775 776 with self.reaped_pair(UTF8AppendServer) as (server, client): 777 self.assertEqual(client._encoding, 'ascii') 778 code, _ = client.authenticate('MYAUTH', lambda x: b'fake') 779 self.assertEqual(code, 'OK') 780 self.assertEqual(server.response, 781 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 782 code, _ = client.enable('UTF8=ACCEPT') 783 self.assertEqual(code, 'OK') 784 self.assertEqual(client._encoding, 'utf-8') 785 msg_string = 'Subject: üñí©öðé' 786 typ, data = client.append( 787 None, None, None, msg_string.encode('utf-8')) 788 self.assertEqual(typ, 'OK') 789 self.assertEqual( 790 server.response, 791 ('UTF8 (%s)\r\n' % msg_string).encode('utf-8') 792 ) 793 794 # XXX also need a test that makes sure that the Literal and Untagged_status 795 # regexes uses unicode in UTF8 mode instead of the default ASCII. 796 797 @threading_helper.reap_threads 798 def test_search_disallows_charset_in_utf8_mode(self): 799 with self.reaped_pair(self.UTF8Server) as (server, client): 800 typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') 801 self.assertEqual(typ, 'OK') 802 typ, _ = client.enable('UTF8=ACCEPT') 803 self.assertEqual(typ, 'OK') 804 self.assertTrue(client.utf8_enabled) 805 self.assertRaises(imaplib.IMAP4.error, client.search, 'foo', 'bar') 806 807 @threading_helper.reap_threads 808 def test_bad_auth_name(self): 809 810 class MyServer(SimpleIMAPHandler): 811 812 def cmd_AUTHENTICATE(self, tag, args): 813 self._send_tagged(tag, 'NO', 'unrecognized authentication ' 814 'type {}'.format(args[0])) 815 816 with self.reaped_pair(MyServer) as (server, client): 817 with self.assertRaises(imaplib.IMAP4.error): 818 client.authenticate('METHOD', lambda: 1) 819 820 @threading_helper.reap_threads 821 def test_invalid_authentication(self): 822 823 class MyServer(SimpleIMAPHandler): 824 825 def cmd_AUTHENTICATE(self, tag, args): 826 self._send_textline('+') 827 self.response = yield 828 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') 829 830 with self.reaped_pair(MyServer) as (server, client): 831 with self.assertRaises(imaplib.IMAP4.error): 832 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 833 834 @threading_helper.reap_threads 835 def test_valid_authentication(self): 836 837 class MyServer(SimpleIMAPHandler): 838 839 def cmd_AUTHENTICATE(self, tag, args): 840 self._send_textline('+') 841 self.server.response = yield 842 self._send_tagged(tag, 'OK', 'FAKEAUTH successful') 843 844 with self.reaped_pair(MyServer) as (server, client): 845 code, data = client.authenticate('MYAUTH', lambda x: b'fake') 846 self.assertEqual(code, 'OK') 847 self.assertEqual(server.response, 848 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 849 850 with self.reaped_pair(MyServer) as (server, client): 851 code, data = client.authenticate('MYAUTH', lambda x: 'fake') 852 self.assertEqual(code, 'OK') 853 self.assertEqual(server.response, 854 b'ZmFrZQ==\r\n') # b64 encoded 'fake' 855 856 @threading_helper.reap_threads 857 @hashlib_helper.requires_hashdigest('md5', openssl=True) 858 def test_login_cram_md5(self): 859 860 class AuthHandler(SimpleIMAPHandler): 861 862 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' 863 864 def cmd_AUTHENTICATE(self, tag, args): 865 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' 866 'VzdG9uLm1jaS5uZXQ=') 867 r = yield 868 if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' 869 b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): 870 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') 871 else: 872 self._send_tagged(tag, 'NO', 'No access') 873 874 with self.reaped_pair(AuthHandler) as (server, client): 875 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 876 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf") 877 self.assertEqual(ret, "OK") 878 879 with self.reaped_pair(AuthHandler) as (server, client): 880 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) 881 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf") 882 self.assertEqual(ret, "OK") 883 884 885 @threading_helper.reap_threads 886 def test_aborted_authentication(self): 887 888 class MyServer(SimpleIMAPHandler): 889 890 def cmd_AUTHENTICATE(self, tag, args): 891 self._send_textline('+') 892 self.response = yield 893 894 if self.response == b'*\r\n': 895 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') 896 else: 897 self._send_tagged(tag, 'OK', 'MYAUTH successful') 898 899 with self.reaped_pair(MyServer) as (server, client): 900 with self.assertRaises(imaplib.IMAP4.error): 901 code, data = client.authenticate('MYAUTH', lambda x: None) 902 903 904 def test_linetoolong(self): 905 class TooLongHandler(SimpleIMAPHandler): 906 def handle(self): 907 # Send a very long response line 908 self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') 909 910 with self.reaped_server(TooLongHandler) as server: 911 self.assertRaises(imaplib.IMAP4.error, 912 self.imap_class, *server.server_address) 913 914 @threading_helper.reap_threads 915 def test_simple_with_statement(self): 916 # simplest call 917 with self.reaped_server(SimpleIMAPHandler) as server: 918 with self.imap_class(*server.server_address): 919 pass 920 921 @threading_helper.reap_threads 922 def test_with_statement(self): 923 with self.reaped_server(SimpleIMAPHandler) as server: 924 with self.imap_class(*server.server_address) as imap: 925 imap.login('user', 'pass') 926 self.assertEqual(server.logged, 'user') 927 self.assertIsNone(server.logged) 928 929 @threading_helper.reap_threads 930 def test_with_statement_logout(self): 931 # what happens if already logout in the block? 932 with self.reaped_server(SimpleIMAPHandler) as server: 933 with self.imap_class(*server.server_address) as imap: 934 imap.login('user', 'pass') 935 self.assertEqual(server.logged, 'user') 936 imap.logout() 937 self.assertIsNone(server.logged) 938 self.assertIsNone(server.logged) 939 940 @threading_helper.reap_threads 941 @cpython_only 942 @unittest.skipUnless(__debug__, "Won't work if __debug__ is False") 943 def test_dump_ur(self): 944 # See: http://bugs.python.org/issue26543 945 untagged_resp_dict = {'READ-WRITE': [b'']} 946 947 with self.reaped_server(SimpleIMAPHandler) as server: 948 with self.imap_class(*server.server_address) as imap: 949 with mock.patch.object(imap, '_mesg') as mock_mesg: 950 imap._dump_ur(untagged_resp_dict) 951 mock_mesg.assert_called_with( 952 "untagged responses dump:READ-WRITE: [b'']" 953 ) 954 955 956@unittest.skipUnless(ssl, "SSL not available") 957class ThreadedNetworkedTestsSSL(ThreadedNetworkedTests): 958 server_class = SecureTCPServer 959 imap_class = IMAP4_SSL 960 961 @threading_helper.reap_threads 962 def test_ssl_verified(self): 963 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 964 ssl_context.load_verify_locations(CAFILE) 965 966 with self.assertRaisesRegex( 967 ssl.CertificateError, 968 "IP address mismatch, certificate is not valid for " 969 "'127.0.0.1'"): 970 with self.reaped_server(SimpleIMAPHandler) as server: 971 client = self.imap_class(*server.server_address, 972 ssl_context=ssl_context) 973 client.shutdown() 974 975 with self.reaped_server(SimpleIMAPHandler) as server: 976 client = self.imap_class("localhost", server.server_address[1], 977 ssl_context=ssl_context) 978 client.shutdown() 979 980 981@unittest.skipUnless( 982 support.is_resource_enabled('network'), 'network resource disabled') 983@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 984class RemoteIMAPTest(unittest.TestCase): 985 host = 'cyrus.andrew.cmu.edu' 986 port = 143 987 username = 'anonymous' 988 password = 'pass' 989 imap_class = imaplib.IMAP4 990 991 def setUp(self): 992 with socket_helper.transient_internet(self.host): 993 self.server = self.imap_class(self.host, self.port) 994 995 def tearDown(self): 996 if self.server is not None: 997 with socket_helper.transient_internet(self.host): 998 self.server.logout() 999 1000 def test_logincapa(self): 1001 with socket_helper.transient_internet(self.host): 1002 for cap in self.server.capabilities: 1003 self.assertIsInstance(cap, str) 1004 self.assertIn('LOGINDISABLED', self.server.capabilities) 1005 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities) 1006 rs = self.server.login(self.username, self.password) 1007 self.assertEqual(rs[0], 'OK') 1008 1009 def test_logout(self): 1010 with socket_helper.transient_internet(self.host): 1011 rs = self.server.logout() 1012 self.server = None 1013 self.assertEqual(rs[0], 'BYE', rs) 1014 1015 1016@unittest.skipUnless(ssl, "SSL not available") 1017@unittest.skipUnless( 1018 support.is_resource_enabled('network'), 'network resource disabled') 1019@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1020class RemoteIMAP_STARTTLSTest(RemoteIMAPTest): 1021 1022 def setUp(self): 1023 super().setUp() 1024 with socket_helper.transient_internet(self.host): 1025 rs = self.server.starttls() 1026 self.assertEqual(rs[0], 'OK') 1027 1028 def test_logincapa(self): 1029 for cap in self.server.capabilities: 1030 self.assertIsInstance(cap, str) 1031 self.assertNotIn('LOGINDISABLED', self.server.capabilities) 1032 1033 1034@unittest.skipUnless(ssl, "SSL not available") 1035@unittest.skip('cyrus.andrew.cmu.edu blocks connections') 1036class RemoteIMAP_SSLTest(RemoteIMAPTest): 1037 port = 993 1038 imap_class = IMAP4_SSL 1039 1040 def setUp(self): 1041 pass 1042 1043 def tearDown(self): 1044 pass 1045 1046 def create_ssl_context(self): 1047 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 1048 ssl_context.check_hostname = False 1049 ssl_context.verify_mode = ssl.CERT_NONE 1050 ssl_context.load_cert_chain(CERTFILE) 1051 return ssl_context 1052 1053 def check_logincapa(self, server): 1054 try: 1055 for cap in server.capabilities: 1056 self.assertIsInstance(cap, str) 1057 self.assertNotIn('LOGINDISABLED', server.capabilities) 1058 self.assertIn('AUTH=PLAIN', server.capabilities) 1059 rs = server.login(self.username, self.password) 1060 self.assertEqual(rs[0], 'OK') 1061 finally: 1062 server.logout() 1063 1064 def test_logincapa(self): 1065 with socket_helper.transient_internet(self.host): 1066 _server = self.imap_class(self.host, self.port) 1067 self.check_logincapa(_server) 1068 1069 def test_logout(self): 1070 with socket_helper.transient_internet(self.host): 1071 _server = self.imap_class(self.host, self.port) 1072 rs = _server.logout() 1073 self.assertEqual(rs[0], 'BYE', rs) 1074 1075 def test_ssl_context_certfile_exclusive(self): 1076 with socket_helper.transient_internet(self.host): 1077 self.assertRaises( 1078 ValueError, self.imap_class, self.host, self.port, 1079 certfile=CERTFILE, ssl_context=self.create_ssl_context()) 1080 1081 def test_ssl_context_keyfile_exclusive(self): 1082 with socket_helper.transient_internet(self.host): 1083 self.assertRaises( 1084 ValueError, self.imap_class, self.host, self.port, 1085 keyfile=CERTFILE, ssl_context=self.create_ssl_context()) 1086 1087 1088if __name__ == "__main__": 1089 unittest.main() 1090