1"""Test script for poplib module."""
2
3# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
4# a real test suite
5
6import poplib
7import socket
8import os
9import errno
10import threading
11
12import unittest
13from unittest import TestCase, skipUnless
14from test import support as test_support
15from test.support import hashlib_helper
16from test.support import socket_helper
17from test.support import threading_helper
18from test.support import warnings_helper
19
20
21asynchat = warnings_helper.import_deprecated('asynchat')
22asyncore = warnings_helper.import_deprecated('asyncore')
23
24
25test_support.requires_working_socket(module=True)
26
27HOST = socket_helper.HOST
28PORT = 0
29
30SUPPORTS_SSL = False
31if hasattr(poplib, 'POP3_SSL'):
32    import ssl
33
34    SUPPORTS_SSL = True
35    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert3.pem")
36    CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "pycacert.pem")
37
38requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
39
40# the dummy data returned by server when LIST and RETR commands are issued
41LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
42RETR_RESP = b"""From: [email protected]\
43\r\nContent-Type: text/plain\r\n\
44MIME-Version: 1.0\r\n\
45Subject: Dummy\r\n\
46\r\n\
47line1\r\n\
48line2\r\n\
49line3\r\n\
50.\r\n"""
51
52
53class DummyPOP3Handler(asynchat.async_chat):
54
55    CAPAS = {'UIDL': [], 'IMPLEMENTATION': ['python-testlib-pop-server']}
56    enable_UTF8 = False
57
58    def __init__(self, conn):
59        asynchat.async_chat.__init__(self, conn)
60        self.set_terminator(b"\r\n")
61        self.in_buffer = []
62        self.push('+OK dummy pop3 server ready. <timestamp>')
63        self.tls_active = False
64        self.tls_starting = False
65
66    def collect_incoming_data(self, data):
67        self.in_buffer.append(data)
68
69    def found_terminator(self):
70        line = b''.join(self.in_buffer)
71        line = str(line, 'ISO-8859-1')
72        self.in_buffer = []
73        cmd = line.split(' ')[0].lower()
74        space = line.find(' ')
75        if space != -1:
76            arg = line[space + 1:]
77        else:
78            arg = ""
79        if hasattr(self, 'cmd_' + cmd):
80            method = getattr(self, 'cmd_' + cmd)
81            method(arg)
82        else:
83            self.push('-ERR unrecognized POP3 command "%s".' %cmd)
84
85    def handle_error(self):
86        raise
87
88    def push(self, data):
89        asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
90
91    def cmd_echo(self, arg):
92        # sends back the received string (used by the test suite)
93        self.push(arg)
94
95    def cmd_user(self, arg):
96        if arg != "guido":
97            self.push("-ERR no such user")
98        self.push('+OK password required')
99
100    def cmd_pass(self, arg):
101        if arg != "python":
102            self.push("-ERR wrong password")
103        self.push('+OK 10 messages')
104
105    def cmd_stat(self, arg):
106        self.push('+OK 10 100')
107
108    def cmd_list(self, arg):
109        if arg:
110            self.push('+OK %s %s' % (arg, arg))
111        else:
112            self.push('+OK')
113            asynchat.async_chat.push(self, LIST_RESP)
114
115    cmd_uidl = cmd_list
116
117    def cmd_retr(self, arg):
118        self.push('+OK %s bytes' %len(RETR_RESP))
119        asynchat.async_chat.push(self, RETR_RESP)
120
121    cmd_top = cmd_retr
122
123    def cmd_dele(self, arg):
124        self.push('+OK message marked for deletion.')
125
126    def cmd_noop(self, arg):
127        self.push('+OK done nothing.')
128
129    def cmd_rpop(self, arg):
130        self.push('+OK done nothing.')
131
132    def cmd_apop(self, arg):
133        self.push('+OK done nothing.')
134
135    def cmd_quit(self, arg):
136        self.push('+OK closing.')
137        self.close_when_done()
138
139    def _get_capas(self):
140        _capas = dict(self.CAPAS)
141        if not self.tls_active and SUPPORTS_SSL:
142            _capas['STLS'] = []
143        return _capas
144
145    def cmd_capa(self, arg):
146        self.push('+OK Capability list follows')
147        if self._get_capas():
148            for cap, params in self._get_capas().items():
149                _ln = [cap]
150                if params:
151                    _ln.extend(params)
152                self.push(' '.join(_ln))
153        self.push('.')
154
155    def cmd_utf8(self, arg):
156        self.push('+OK I know RFC6856'
157                  if self.enable_UTF8
158                  else '-ERR What is UTF8?!')
159
160    if SUPPORTS_SSL:
161
162        def cmd_stls(self, arg):
163            if self.tls_active is False:
164                self.push('+OK Begin TLS negotiation')
165                context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
166                context.load_cert_chain(CERTFILE)
167                tls_sock = context.wrap_socket(self.socket,
168                                               server_side=True,
169                                               do_handshake_on_connect=False,
170                                               suppress_ragged_eofs=False)
171                self.del_channel()
172                self.set_socket(tls_sock)
173                self.tls_active = True
174                self.tls_starting = True
175                self.in_buffer = []
176                self._do_tls_handshake()
177            else:
178                self.push('-ERR Command not permitted when TLS active')
179
180        def _do_tls_handshake(self):
181            try:
182                self.socket.do_handshake()
183            except ssl.SSLError as err:
184                if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
185                                   ssl.SSL_ERROR_WANT_WRITE):
186                    return
187                elif err.args[0] == ssl.SSL_ERROR_EOF:
188                    return self.handle_close()
189                # TODO: SSLError does not expose alert information
190                elif ("SSLV3_ALERT_BAD_CERTIFICATE" in err.args[1] or
191                      "SSLV3_ALERT_CERTIFICATE_UNKNOWN" in err.args[1]):
192                    return self.handle_close()
193                raise
194            except OSError as err:
195                if err.args[0] == errno.ECONNABORTED:
196                    return self.handle_close()
197            else:
198                self.tls_active = True
199                self.tls_starting = False
200
201        def handle_read(self):
202            if self.tls_starting:
203                self._do_tls_handshake()
204            else:
205                try:
206                    asynchat.async_chat.handle_read(self)
207                except ssl.SSLEOFError:
208                    self.handle_close()
209
210class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
211
212    handler = DummyPOP3Handler
213
214    def __init__(self, address, af=socket.AF_INET):
215        threading.Thread.__init__(self)
216        asyncore.dispatcher.__init__(self)
217        self.daemon = True
218        self.create_socket(af, socket.SOCK_STREAM)
219        self.bind(address)
220        self.listen(5)
221        self.active = False
222        self.active_lock = threading.Lock()
223        self.host, self.port = self.socket.getsockname()[:2]
224        self.handler_instance = None
225
226    def start(self):
227        assert not self.active
228        self.__flag = threading.Event()
229        threading.Thread.start(self)
230        self.__flag.wait()
231
232    def run(self):
233        self.active = True
234        self.__flag.set()
235        try:
236            while self.active and asyncore.socket_map:
237                with self.active_lock:
238                    asyncore.loop(timeout=0.1, count=1)
239        finally:
240            asyncore.close_all(ignore_all=True)
241
242    def stop(self):
243        assert self.active
244        self.active = False
245        self.join()
246
247    def handle_accepted(self, conn, addr):
248        self.handler_instance = self.handler(conn)
249
250    def handle_connect(self):
251        self.close()
252    handle_read = handle_connect
253
254    def writable(self):
255        return 0
256
257    def handle_error(self):
258        raise
259
260
261class TestPOP3Class(TestCase):
262    def assertOK(self, resp):
263        self.assertTrue(resp.startswith(b"+OK"))
264
265    def setUp(self):
266        self.server = DummyPOP3Server((HOST, PORT))
267        self.server.start()
268        self.client = poplib.POP3(self.server.host, self.server.port,
269                                  timeout=test_support.LOOPBACK_TIMEOUT)
270
271    def tearDown(self):
272        self.client.close()
273        self.server.stop()
274        # Explicitly clear the attribute to prevent dangling thread
275        self.server = None
276
277    def test_getwelcome(self):
278        self.assertEqual(self.client.getwelcome(),
279                         b'+OK dummy pop3 server ready. <timestamp>')
280
281    def test_exceptions(self):
282        self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
283
284    def test_user(self):
285        self.assertOK(self.client.user('guido'))
286        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
287
288    def test_pass_(self):
289        self.assertOK(self.client.pass_('python'))
290        self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
291
292    def test_stat(self):
293        self.assertEqual(self.client.stat(), (10, 100))
294
295    def test_list(self):
296        self.assertEqual(self.client.list()[1:],
297                         ([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
298                          25))
299        self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
300
301    def test_retr(self):
302        expected = (b'+OK 116 bytes',
303                    [b'From: [email protected]', b'Content-Type: text/plain',
304                     b'MIME-Version: 1.0', b'Subject: Dummy',
305                     b'', b'line1', b'line2', b'line3'],
306                    113)
307        foo = self.client.retr('foo')
308        self.assertEqual(foo, expected)
309
310    def test_too_long_lines(self):
311        self.assertRaises(poplib.error_proto, self.client._shortcmd,
312                          'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
313
314    def test_dele(self):
315        self.assertOK(self.client.dele('foo'))
316
317    def test_noop(self):
318        self.assertOK(self.client.noop())
319
320    def test_rpop(self):
321        self.assertOK(self.client.rpop('foo'))
322
323    @hashlib_helper.requires_hashdigest('md5', openssl=True)
324    def test_apop_normal(self):
325        self.assertOK(self.client.apop('foo', 'dummypassword'))
326
327    @hashlib_helper.requires_hashdigest('md5', openssl=True)
328    def test_apop_REDOS(self):
329        # Replace welcome with very long evil welcome.
330        # NB The upper bound on welcome length is currently 2048.
331        # At this length, evil input makes each apop call take
332        # on the order of milliseconds instead of microseconds.
333        evil_welcome = b'+OK' + (b'<' * 1000000)
334        with test_support.swap_attr(self.client, 'welcome', evil_welcome):
335            # The evil welcome is invalid, so apop should throw.
336            self.assertRaises(poplib.error_proto, self.client.apop, 'a', 'kb')
337
338    def test_top(self):
339        expected =  (b'+OK 116 bytes',
340                     [b'From: [email protected]', b'Content-Type: text/plain',
341                      b'MIME-Version: 1.0', b'Subject: Dummy', b'',
342                      b'line1', b'line2', b'line3'],
343                     113)
344        self.assertEqual(self.client.top(1, 1), expected)
345
346    def test_uidl(self):
347        self.client.uidl()
348        self.client.uidl('foo')
349
350    def test_utf8_raises_if_unsupported(self):
351        self.server.handler.enable_UTF8 = False
352        self.assertRaises(poplib.error_proto, self.client.utf8)
353
354    def test_utf8(self):
355        self.server.handler.enable_UTF8 = True
356        expected = b'+OK I know RFC6856'
357        result = self.client.utf8()
358        self.assertEqual(result, expected)
359
360    def test_capa(self):
361        capa = self.client.capa()
362        self.assertTrue('IMPLEMENTATION' in capa.keys())
363
364    def test_quit(self):
365        resp = self.client.quit()
366        self.assertTrue(resp)
367        self.assertIsNone(self.client.sock)
368        self.assertIsNone(self.client.file)
369
370    @requires_ssl
371    def test_stls_capa(self):
372        capa = self.client.capa()
373        self.assertTrue('STLS' in capa.keys())
374
375    @requires_ssl
376    def test_stls(self):
377        expected = b'+OK Begin TLS negotiation'
378        resp = self.client.stls()
379        self.assertEqual(resp, expected)
380
381    @requires_ssl
382    def test_stls_context(self):
383        expected = b'+OK Begin TLS negotiation'
384        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
385        ctx.load_verify_locations(CAFILE)
386        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
387        self.assertEqual(ctx.check_hostname, True)
388        with self.assertRaises(ssl.CertificateError):
389            resp = self.client.stls(context=ctx)
390        self.client = poplib.POP3("localhost", self.server.port,
391                                  timeout=test_support.LOOPBACK_TIMEOUT)
392        resp = self.client.stls(context=ctx)
393        self.assertEqual(resp, expected)
394
395
396if SUPPORTS_SSL:
397    from test.test_ftplib import SSLConnection
398
399    class DummyPOP3_SSLHandler(SSLConnection, DummyPOP3Handler):
400
401        def __init__(self, conn):
402            asynchat.async_chat.__init__(self, conn)
403            self.secure_connection()
404            self.set_terminator(b"\r\n")
405            self.in_buffer = []
406            self.push('+OK dummy pop3 server ready. <timestamp>')
407            self.tls_active = True
408            self.tls_starting = False
409
410
411@requires_ssl
412class TestPOP3_SSLClass(TestPOP3Class):
413    # repeat previous tests by using poplib.POP3_SSL
414
415    def setUp(self):
416        self.server = DummyPOP3Server((HOST, PORT))
417        self.server.handler = DummyPOP3_SSLHandler
418        self.server.start()
419        self.client = poplib.POP3_SSL(self.server.host, self.server.port)
420
421    def test__all__(self):
422        self.assertIn('POP3_SSL', poplib.__all__)
423
424    def test_context(self):
425        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
426        ctx.check_hostname = False
427        ctx.verify_mode = ssl.CERT_NONE
428        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
429                            self.server.port, keyfile=CERTFILE, context=ctx)
430        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
431                            self.server.port, certfile=CERTFILE, context=ctx)
432        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
433                            self.server.port, keyfile=CERTFILE,
434                            certfile=CERTFILE, context=ctx)
435
436        self.client.quit()
437        self.client = poplib.POP3_SSL(self.server.host, self.server.port,
438                                        context=ctx)
439        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
440        self.assertIs(self.client.sock.context, ctx)
441        self.assertTrue(self.client.noop().startswith(b'+OK'))
442
443    def test_stls(self):
444        self.assertRaises(poplib.error_proto, self.client.stls)
445
446    test_stls_context = test_stls
447
448    def test_stls_capa(self):
449        capa = self.client.capa()
450        self.assertFalse('STLS' in capa.keys())
451
452
453@requires_ssl
454class TestPOP3_TLSClass(TestPOP3Class):
455    # repeat previous tests by using poplib.POP3.stls()
456
457    def setUp(self):
458        self.server = DummyPOP3Server((HOST, PORT))
459        self.server.start()
460        self.client = poplib.POP3(self.server.host, self.server.port,
461                                  timeout=test_support.LOOPBACK_TIMEOUT)
462        self.client.stls()
463
464    def tearDown(self):
465        if self.client.file is not None and self.client.sock is not None:
466            try:
467                self.client.quit()
468            except poplib.error_proto:
469                # happens in the test_too_long_lines case; the overlong
470                # response will be treated as response to QUIT and raise
471                # this exception
472                self.client.close()
473        self.server.stop()
474        # Explicitly clear the attribute to prevent dangling thread
475        self.server = None
476
477    def test_stls(self):
478        self.assertRaises(poplib.error_proto, self.client.stls)
479
480    test_stls_context = test_stls
481
482    def test_stls_capa(self):
483        capa = self.client.capa()
484        self.assertFalse(b'STLS' in capa.keys())
485
486
487class TestTimeouts(TestCase):
488
489    def setUp(self):
490        self.evt = threading.Event()
491        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
492        self.sock.settimeout(60)  # Safety net. Look issue 11812
493        self.port = socket_helper.bind_port(self.sock)
494        self.thread = threading.Thread(target=self.server, args=(self.evt, self.sock))
495        self.thread.daemon = True
496        self.thread.start()
497        self.evt.wait()
498
499    def tearDown(self):
500        self.thread.join()
501        # Explicitly clear the attribute to prevent dangling thread
502        self.thread = None
503
504    def server(self, evt, serv):
505        serv.listen()
506        evt.set()
507        try:
508            conn, addr = serv.accept()
509            conn.send(b"+ Hola mundo\n")
510            conn.close()
511        except TimeoutError:
512            pass
513        finally:
514            serv.close()
515
516    def testTimeoutDefault(self):
517        self.assertIsNone(socket.getdefaulttimeout())
518        socket.setdefaulttimeout(test_support.LOOPBACK_TIMEOUT)
519        try:
520            pop = poplib.POP3(HOST, self.port)
521        finally:
522            socket.setdefaulttimeout(None)
523        self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
524        pop.close()
525
526    def testTimeoutNone(self):
527        self.assertIsNone(socket.getdefaulttimeout())
528        socket.setdefaulttimeout(30)
529        try:
530            pop = poplib.POP3(HOST, self.port, timeout=None)
531        finally:
532            socket.setdefaulttimeout(None)
533        self.assertIsNone(pop.sock.gettimeout())
534        pop.close()
535
536    def testTimeoutValue(self):
537        pop = poplib.POP3(HOST, self.port, timeout=test_support.LOOPBACK_TIMEOUT)
538        self.assertEqual(pop.sock.gettimeout(), test_support.LOOPBACK_TIMEOUT)
539        pop.close()
540        with self.assertRaises(ValueError):
541            poplib.POP3(HOST, self.port, timeout=0)
542
543
544def setUpModule():
545    thread_info = threading_helper.threading_setup()
546    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
547
548
549if __name__ == '__main__':
550    unittest.main()
551