1import unittest
2import textwrap
3from test import support, mock_socket
4from test.support import socket_helper
5from test.support import warnings_helper
6import socket
7import io
8
9
10smtpd = warnings_helper.import_deprecated('smtpd')
11asyncore = warnings_helper.import_deprecated('asyncore')
12
13if not socket_helper.has_gethostname:
14    raise unittest.SkipTest("test requires gethostname()")
15
16
17class DummyServer(smtpd.SMTPServer):
18    def __init__(self, *args, **kwargs):
19        smtpd.SMTPServer.__init__(self, *args, **kwargs)
20        self.messages = []
21        if self._decode_data:
22            self.return_status = 'return status'
23        else:
24            self.return_status = b'return status'
25
26    def process_message(self, peer, mailfrom, rcpttos, data, **kw):
27        self.messages.append((peer, mailfrom, rcpttos, data))
28        if data == self.return_status:
29            return '250 Okish'
30        if 'mail_options' in kw and 'SMTPUTF8' in kw['mail_options']:
31            return '250 SMTPUTF8 message okish'
32
33
34class DummyDispatcherBroken(Exception):
35    pass
36
37
38class BrokenDummyServer(DummyServer):
39    def listen(self, num):
40        raise DummyDispatcherBroken()
41
42
43class SMTPDServerTest(unittest.TestCase):
44    def setUp(self):
45        smtpd.socket = asyncore.socket = mock_socket
46
47    def test_process_message_unimplemented(self):
48        server = smtpd.SMTPServer((socket_helper.HOST, 0), ('b', 0),
49                                  decode_data=True)
50        conn, addr = server.accept()
51        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
52
53        def write_line(line):
54            channel.socket.queue_recv(line)
55            channel.handle_read()
56
57        write_line(b'HELO example')
58        write_line(b'MAIL From:eggs@example')
59        write_line(b'RCPT To:spam@example')
60        write_line(b'DATA')
61        self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
62
63    def test_decode_data_and_enable_SMTPUTF8_raises(self):
64        self.assertRaises(
65            ValueError,
66            smtpd.SMTPServer,
67            (socket_helper.HOST, 0),
68            ('b', 0),
69            enable_SMTPUTF8=True,
70            decode_data=True)
71
72    def tearDown(self):
73        asyncore.close_all()
74        asyncore.socket = smtpd.socket = socket
75
76
77class DebuggingServerTest(unittest.TestCase):
78
79    def setUp(self):
80        smtpd.socket = asyncore.socket = mock_socket
81
82    def send_data(self, channel, data, enable_SMTPUTF8=False):
83        def write_line(line):
84            channel.socket.queue_recv(line)
85            channel.handle_read()
86        write_line(b'EHLO example')
87        if enable_SMTPUTF8:
88            write_line(b'MAIL From:eggs@example BODY=8BITMIME SMTPUTF8')
89        else:
90            write_line(b'MAIL From:eggs@example')
91        write_line(b'RCPT To:spam@example')
92        write_line(b'DATA')
93        write_line(data)
94        write_line(b'.')
95
96    def test_process_message_with_decode_data_true(self):
97        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
98                                       decode_data=True)
99        conn, addr = server.accept()
100        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
101        with support.captured_stdout() as s:
102            self.send_data(channel, b'From: test\n\nhello\n')
103        stdout = s.getvalue()
104        self.assertEqual(stdout, textwrap.dedent("""\
105             ---------- MESSAGE FOLLOWS ----------
106             From: test
107             X-Peer: peer-address
108
109             hello
110             ------------ END MESSAGE ------------
111             """))
112
113    def test_process_message_with_decode_data_false(self):
114        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0))
115        conn, addr = server.accept()
116        channel = smtpd.SMTPChannel(server, conn, addr)
117        with support.captured_stdout() as s:
118            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
119        stdout = s.getvalue()
120        self.assertEqual(stdout, textwrap.dedent("""\
121             ---------- MESSAGE FOLLOWS ----------
122             b'From: test'
123             b'X-Peer: peer-address'
124             b''
125             b'h\\xc3\\xa9llo\\xff'
126             ------------ END MESSAGE ------------
127             """))
128
129    def test_process_message_with_enable_SMTPUTF8_true(self):
130        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
131                                       enable_SMTPUTF8=True)
132        conn, addr = server.accept()
133        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
134        with support.captured_stdout() as s:
135            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n')
136        stdout = s.getvalue()
137        self.assertEqual(stdout, textwrap.dedent("""\
138             ---------- MESSAGE FOLLOWS ----------
139             b'From: test'
140             b'X-Peer: peer-address'
141             b''
142             b'h\\xc3\\xa9llo\\xff'
143             ------------ END MESSAGE ------------
144             """))
145
146    def test_process_SMTPUTF8_message_with_enable_SMTPUTF8_true(self):
147        server = smtpd.DebuggingServer((socket_helper.HOST, 0), ('b', 0),
148                                       enable_SMTPUTF8=True)
149        conn, addr = server.accept()
150        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
151        with support.captured_stdout() as s:
152            self.send_data(channel, b'From: test\n\nh\xc3\xa9llo\xff\n',
153                           enable_SMTPUTF8=True)
154        stdout = s.getvalue()
155        self.assertEqual(stdout, textwrap.dedent("""\
156             ---------- MESSAGE FOLLOWS ----------
157             mail options: ['BODY=8BITMIME', 'SMTPUTF8']
158             b'From: test'
159             b'X-Peer: peer-address'
160             b''
161             b'h\\xc3\\xa9llo\\xff'
162             ------------ END MESSAGE ------------
163             """))
164
165    def tearDown(self):
166        asyncore.close_all()
167        asyncore.socket = smtpd.socket = socket
168
169
170class TestFamilyDetection(unittest.TestCase):
171    def setUp(self):
172        smtpd.socket = asyncore.socket = mock_socket
173
174    def tearDown(self):
175        asyncore.close_all()
176        asyncore.socket = smtpd.socket = socket
177
178    @unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
179    def test_socket_uses_IPv6(self):
180        server = smtpd.SMTPServer((socket_helper.HOSTv6, 0), (socket_helper.HOSTv4, 0))
181        self.assertEqual(server.socket.family, socket.AF_INET6)
182
183    def test_socket_uses_IPv4(self):
184        server = smtpd.SMTPServer((socket_helper.HOSTv4, 0), (socket_helper.HOSTv6, 0))
185        self.assertEqual(server.socket.family, socket.AF_INET)
186
187
188class TestRcptOptionParsing(unittest.TestCase):
189    error_response = (b'555 RCPT TO parameters not recognized or not '
190                      b'implemented\r\n')
191
192    def setUp(self):
193        smtpd.socket = asyncore.socket = mock_socket
194        self.old_debugstream = smtpd.DEBUGSTREAM
195        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
196
197    def tearDown(self):
198        asyncore.close_all()
199        asyncore.socket = smtpd.socket = socket
200        smtpd.DEBUGSTREAM = self.old_debugstream
201
202    def write_line(self, channel, line):
203        channel.socket.queue_recv(line)
204        channel.handle_read()
205
206    def test_params_rejected(self):
207        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
208        conn, addr = server.accept()
209        channel = smtpd.SMTPChannel(server, conn, addr)
210        self.write_line(channel, b'EHLO example')
211        self.write_line(channel, b'MAIL from: <[email protected]> size=20')
212        self.write_line(channel, b'RCPT to: <[email protected]> foo=bar')
213        self.assertEqual(channel.socket.last, self.error_response)
214
215    def test_nothing_accepted(self):
216        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
217        conn, addr = server.accept()
218        channel = smtpd.SMTPChannel(server, conn, addr)
219        self.write_line(channel, b'EHLO example')
220        self.write_line(channel, b'MAIL from: <[email protected]> size=20')
221        self.write_line(channel, b'RCPT to: <[email protected]>')
222        self.assertEqual(channel.socket.last, b'250 OK\r\n')
223
224
225class TestMailOptionParsing(unittest.TestCase):
226    error_response = (b'555 MAIL FROM parameters not recognized or not '
227                      b'implemented\r\n')
228
229    def setUp(self):
230        smtpd.socket = asyncore.socket = mock_socket
231        self.old_debugstream = smtpd.DEBUGSTREAM
232        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
233
234    def tearDown(self):
235        asyncore.close_all()
236        asyncore.socket = smtpd.socket = socket
237        smtpd.DEBUGSTREAM = self.old_debugstream
238
239    def write_line(self, channel, line):
240        channel.socket.queue_recv(line)
241        channel.handle_read()
242
243    def test_with_decode_data_true(self):
244        server = DummyServer((socket_helper.HOST, 0), ('b', 0), decode_data=True)
245        conn, addr = server.accept()
246        channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
247        self.write_line(channel, b'EHLO example')
248        for line in [
249            b'MAIL from: <[email protected]> size=20 SMTPUTF8',
250            b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=8BITMIME',
251            b'MAIL from: <[email protected]> size=20 BODY=UNKNOWN',
252            b'MAIL from: <[email protected]> size=20 body=8bitmime',
253        ]:
254            self.write_line(channel, line)
255            self.assertEqual(channel.socket.last, self.error_response)
256        self.write_line(channel, b'MAIL from: <[email protected]> size=20')
257        self.assertEqual(channel.socket.last, b'250 OK\r\n')
258
259    def test_with_decode_data_false(self):
260        server = DummyServer((socket_helper.HOST, 0), ('b', 0))
261        conn, addr = server.accept()
262        channel = smtpd.SMTPChannel(server, conn, addr)
263        self.write_line(channel, b'EHLO example')
264        for line in [
265            b'MAIL from: <[email protected]> size=20 SMTPUTF8',
266            b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=8BITMIME',
267        ]:
268            self.write_line(channel, line)
269            self.assertEqual(channel.socket.last, self.error_response)
270        self.write_line(
271            channel,
272            b'MAIL from: <[email protected]> size=20 SMTPUTF8 BODY=UNKNOWN')
273        self.assertEqual(
274            channel.socket.last,
275            b'501 Error: BODY can only be one of 7BIT, 8BITMIME\r\n')
276        self.write_line(
277            channel, b'MAIL from: <[email protected]> size=20 body=8bitmime')
278        self.assertEqual(channel.socket.last, b'250 OK\r\n')
279
280    def test_with_enable_smtputf8_true(self):
281        server = DummyServer((socket_helper.HOST, 0), ('b', 0), enable_SMTPUTF8=True)
282        conn, addr = server.accept()
283        channel = smtpd.SMTPChannel(server, conn, addr, enable_SMTPUTF8=True)
284        self.write_line(channel, b'EHLO example')
285        self.write_line(
286            channel,
287            b'MAIL from: <[email protected]> size=20 body=8bitmime smtputf8')
288        self.assertEqual(channel.socket.last, b'250 OK\r\n')
289
290
291class SMTPDChannelTest(unittest.TestCase):
292    def setUp(self):
293        smtpd.socket = asyncore.socket = mock_socket
294        self.old_debugstream = smtpd.DEBUGSTREAM
295        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
296        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
297                                  decode_data=True)
298        conn, addr = self.server.accept()
299        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
300                                         decode_data=True)
301
302    def tearDown(self):
303        asyncore.close_all()
304        asyncore.socket = smtpd.socket = socket
305        smtpd.DEBUGSTREAM = self.old_debugstream
306
307    def write_line(self, line):
308        self.channel.socket.queue_recv(line)
309        self.channel.handle_read()
310
311    def test_broken_connect(self):
312        self.assertRaises(
313            DummyDispatcherBroken, BrokenDummyServer,
314            (socket_helper.HOST, 0), ('b', 0), decode_data=True)
315
316    def test_decode_data_and_enable_SMTPUTF8_raises(self):
317        self.assertRaises(
318            ValueError, smtpd.SMTPChannel,
319            self.server, self.channel.conn, self.channel.addr,
320            enable_SMTPUTF8=True, decode_data=True)
321
322    def test_server_accept(self):
323        self.server.handle_accept()
324
325    def test_missing_data(self):
326        self.write_line(b'')
327        self.assertEqual(self.channel.socket.last,
328                         b'500 Error: bad syntax\r\n')
329
330    def test_EHLO(self):
331        self.write_line(b'EHLO example')
332        self.assertEqual(self.channel.socket.last, b'250 HELP\r\n')
333
334    def test_EHLO_bad_syntax(self):
335        self.write_line(b'EHLO')
336        self.assertEqual(self.channel.socket.last,
337                         b'501 Syntax: EHLO hostname\r\n')
338
339    def test_EHLO_duplicate(self):
340        self.write_line(b'EHLO example')
341        self.write_line(b'EHLO example')
342        self.assertEqual(self.channel.socket.last,
343                         b'503 Duplicate HELO/EHLO\r\n')
344
345    def test_EHLO_HELO_duplicate(self):
346        self.write_line(b'EHLO example')
347        self.write_line(b'HELO example')
348        self.assertEqual(self.channel.socket.last,
349                         b'503 Duplicate HELO/EHLO\r\n')
350
351    def test_HELO(self):
352        name = smtpd.socket.getfqdn()
353        self.write_line(b'HELO example')
354        self.assertEqual(self.channel.socket.last,
355                         '250 {}\r\n'.format(name).encode('ascii'))
356
357    def test_HELO_EHLO_duplicate(self):
358        self.write_line(b'HELO example')
359        self.write_line(b'EHLO example')
360        self.assertEqual(self.channel.socket.last,
361                         b'503 Duplicate HELO/EHLO\r\n')
362
363    def test_HELP(self):
364        self.write_line(b'HELP')
365        self.assertEqual(self.channel.socket.last,
366                         b'250 Supported commands: EHLO HELO MAIL RCPT ' + \
367                         b'DATA RSET NOOP QUIT VRFY\r\n')
368
369    def test_HELP_command(self):
370        self.write_line(b'HELP MAIL')
371        self.assertEqual(self.channel.socket.last,
372                         b'250 Syntax: MAIL FROM: <address>\r\n')
373
374    def test_HELP_command_unknown(self):
375        self.write_line(b'HELP SPAM')
376        self.assertEqual(self.channel.socket.last,
377                         b'501 Supported commands: EHLO HELO MAIL RCPT ' + \
378                         b'DATA RSET NOOP QUIT VRFY\r\n')
379
380    def test_HELO_bad_syntax(self):
381        self.write_line(b'HELO')
382        self.assertEqual(self.channel.socket.last,
383                         b'501 Syntax: HELO hostname\r\n')
384
385    def test_HELO_duplicate(self):
386        self.write_line(b'HELO example')
387        self.write_line(b'HELO example')
388        self.assertEqual(self.channel.socket.last,
389                         b'503 Duplicate HELO/EHLO\r\n')
390
391    def test_HELO_parameter_rejected_when_extensions_not_enabled(self):
392        self.extended_smtp = False
393        self.write_line(b'HELO example')
394        self.write_line(b'MAIL from:<[email protected]> SIZE=1234')
395        self.assertEqual(self.channel.socket.last,
396                         b'501 Syntax: MAIL FROM: <address>\r\n')
397
398    def test_MAIL_allows_space_after_colon(self):
399        self.write_line(b'HELO example')
400        self.write_line(b'MAIL from:   <[email protected]>')
401        self.assertEqual(self.channel.socket.last,
402                         b'250 OK\r\n')
403
404    def test_extended_MAIL_allows_space_after_colon(self):
405        self.write_line(b'EHLO example')
406        self.write_line(b'MAIL from:   <[email protected]> size=20')
407        self.assertEqual(self.channel.socket.last,
408                         b'250 OK\r\n')
409
410    def test_NOOP(self):
411        self.write_line(b'NOOP')
412        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
413
414    def test_HELO_NOOP(self):
415        self.write_line(b'HELO example')
416        self.write_line(b'NOOP')
417        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
418
419    def test_NOOP_bad_syntax(self):
420        self.write_line(b'NOOP hi')
421        self.assertEqual(self.channel.socket.last,
422                         b'501 Syntax: NOOP\r\n')
423
424    def test_QUIT(self):
425        self.write_line(b'QUIT')
426        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
427
428    def test_HELO_QUIT(self):
429        self.write_line(b'HELO example')
430        self.write_line(b'QUIT')
431        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
432
433    def test_QUIT_arg_ignored(self):
434        self.write_line(b'QUIT bye bye')
435        self.assertEqual(self.channel.socket.last, b'221 Bye\r\n')
436
437    def test_bad_state(self):
438        self.channel.smtp_state = 'BAD STATE'
439        self.write_line(b'HELO example')
440        self.assertEqual(self.channel.socket.last,
441                         b'451 Internal confusion\r\n')
442
443    def test_command_too_long(self):
444        self.write_line(b'HELO example')
445        self.write_line(b'MAIL from: ' +
446                        b'a' * self.channel.command_size_limit +
447                        b'@example')
448        self.assertEqual(self.channel.socket.last,
449                         b'500 Error: line too long\r\n')
450
451    def test_MAIL_command_limit_extended_with_SIZE(self):
452        self.write_line(b'EHLO example')
453        fill_len = self.channel.command_size_limit - len('MAIL from:<@example>')
454        self.write_line(b'MAIL from:<' +
455                        b'a' * fill_len +
456                        b'@example> SIZE=1234')
457        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
458
459        self.write_line(b'MAIL from:<' +
460                        b'a' * (fill_len + 26) +
461                        b'@example> SIZE=1234')
462        self.assertEqual(self.channel.socket.last,
463                         b'500 Error: line too long\r\n')
464
465    def test_MAIL_command_rejects_SMTPUTF8_by_default(self):
466        self.write_line(b'EHLO example')
467        self.write_line(
468            b'MAIL from: <[email protected]> BODY=8BITMIME SMTPUTF8')
469        self.assertEqual(self.channel.socket.last[0:1], b'5')
470
471    def test_data_longer_than_default_data_size_limit(self):
472        # Hack the default so we don't have to generate so much data.
473        self.channel.data_size_limit = 1048
474        self.write_line(b'HELO example')
475        self.write_line(b'MAIL From:eggs@example')
476        self.write_line(b'RCPT To:spam@example')
477        self.write_line(b'DATA')
478        self.write_line(b'A' * self.channel.data_size_limit +
479                        b'A\r\n.')
480        self.assertEqual(self.channel.socket.last,
481                         b'552 Error: Too much mail data\r\n')
482
483    def test_MAIL_size_parameter(self):
484        self.write_line(b'EHLO example')
485        self.write_line(b'MAIL FROM:<eggs@example> SIZE=512')
486        self.assertEqual(self.channel.socket.last,
487                         b'250 OK\r\n')
488
489    def test_MAIL_invalid_size_parameter(self):
490        self.write_line(b'EHLO example')
491        self.write_line(b'MAIL FROM:<eggs@example> SIZE=invalid')
492        self.assertEqual(self.channel.socket.last,
493            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
494
495    def test_MAIL_RCPT_unknown_parameters(self):
496        self.write_line(b'EHLO example')
497        self.write_line(b'MAIL FROM:<eggs@example> ham=green')
498        self.assertEqual(self.channel.socket.last,
499            b'555 MAIL FROM parameters not recognized or not implemented\r\n')
500
501        self.write_line(b'MAIL FROM:<eggs@example>')
502        self.write_line(b'RCPT TO:<eggs@example> ham=green')
503        self.assertEqual(self.channel.socket.last,
504            b'555 RCPT TO parameters not recognized or not implemented\r\n')
505
506    def test_MAIL_size_parameter_larger_than_default_data_size_limit(self):
507        self.channel.data_size_limit = 1048
508        self.write_line(b'EHLO example')
509        self.write_line(b'MAIL FROM:<eggs@example> SIZE=2096')
510        self.assertEqual(self.channel.socket.last,
511            b'552 Error: message size exceeds fixed maximum message size\r\n')
512
513    def test_need_MAIL(self):
514        self.write_line(b'HELO example')
515        self.write_line(b'RCPT to:spam@example')
516        self.assertEqual(self.channel.socket.last,
517            b'503 Error: need MAIL command\r\n')
518
519    def test_MAIL_syntax_HELO(self):
520        self.write_line(b'HELO example')
521        self.write_line(b'MAIL from eggs@example')
522        self.assertEqual(self.channel.socket.last,
523            b'501 Syntax: MAIL FROM: <address>\r\n')
524
525    def test_MAIL_syntax_EHLO(self):
526        self.write_line(b'EHLO example')
527        self.write_line(b'MAIL from eggs@example')
528        self.assertEqual(self.channel.socket.last,
529            b'501 Syntax: MAIL FROM: <address> [SP <mail-parameters>]\r\n')
530
531    def test_MAIL_missing_address(self):
532        self.write_line(b'HELO example')
533        self.write_line(b'MAIL from:')
534        self.assertEqual(self.channel.socket.last,
535            b'501 Syntax: MAIL FROM: <address>\r\n')
536
537    def test_MAIL_chevrons(self):
538        self.write_line(b'HELO example')
539        self.write_line(b'MAIL from:<eggs@example>')
540        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
541
542    def test_MAIL_empty_chevrons(self):
543        self.write_line(b'EHLO example')
544        self.write_line(b'MAIL from:<>')
545        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
546
547    def test_MAIL_quoted_localpart(self):
548        self.write_line(b'EHLO example')
549        self.write_line(b'MAIL from: <"Fred Blogs"@example.com>')
550        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
551        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
552
553    def test_MAIL_quoted_localpart_no_angles(self):
554        self.write_line(b'EHLO example')
555        self.write_line(b'MAIL from: "Fred Blogs"@example.com')
556        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
557        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
558
559    def test_MAIL_quoted_localpart_with_size(self):
560        self.write_line(b'EHLO example')
561        self.write_line(b'MAIL from: <"Fred Blogs"@example.com> SIZE=1000')
562        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
563        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
564
565    def test_MAIL_quoted_localpart_with_size_no_angles(self):
566        self.write_line(b'EHLO example')
567        self.write_line(b'MAIL from: "Fred Blogs"@example.com SIZE=1000')
568        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
569        self.assertEqual(self.channel.mailfrom, '"Fred Blogs"@example.com')
570
571    def test_nested_MAIL(self):
572        self.write_line(b'HELO example')
573        self.write_line(b'MAIL from:eggs@example')
574        self.write_line(b'MAIL from:spam@example')
575        self.assertEqual(self.channel.socket.last,
576            b'503 Error: nested MAIL command\r\n')
577
578    def test_VRFY(self):
579        self.write_line(b'VRFY eggs@example')
580        self.assertEqual(self.channel.socket.last,
581            b'252 Cannot VRFY user, but will accept message and attempt ' + \
582            b'delivery\r\n')
583
584    def test_VRFY_syntax(self):
585        self.write_line(b'VRFY')
586        self.assertEqual(self.channel.socket.last,
587            b'501 Syntax: VRFY <address>\r\n')
588
589    def test_EXPN_not_implemented(self):
590        self.write_line(b'EXPN')
591        self.assertEqual(self.channel.socket.last,
592            b'502 EXPN not implemented\r\n')
593
594    def test_no_HELO_MAIL(self):
595        self.write_line(b'MAIL from:<[email protected]>')
596        self.assertEqual(self.channel.socket.last,
597                         b'503 Error: send HELO first\r\n')
598
599    def test_need_RCPT(self):
600        self.write_line(b'HELO example')
601        self.write_line(b'MAIL From:eggs@example')
602        self.write_line(b'DATA')
603        self.assertEqual(self.channel.socket.last,
604            b'503 Error: need RCPT command\r\n')
605
606    def test_RCPT_syntax_HELO(self):
607        self.write_line(b'HELO example')
608        self.write_line(b'MAIL From: eggs@example')
609        self.write_line(b'RCPT to eggs@example')
610        self.assertEqual(self.channel.socket.last,
611            b'501 Syntax: RCPT TO: <address>\r\n')
612
613    def test_RCPT_syntax_EHLO(self):
614        self.write_line(b'EHLO example')
615        self.write_line(b'MAIL From: eggs@example')
616        self.write_line(b'RCPT to eggs@example')
617        self.assertEqual(self.channel.socket.last,
618            b'501 Syntax: RCPT TO: <address> [SP <mail-parameters>]\r\n')
619
620    def test_RCPT_lowercase_to_OK(self):
621        self.write_line(b'HELO example')
622        self.write_line(b'MAIL From: eggs@example')
623        self.write_line(b'RCPT to: <eggs@example>')
624        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
625
626    def test_no_HELO_RCPT(self):
627        self.write_line(b'RCPT to eggs@example')
628        self.assertEqual(self.channel.socket.last,
629                         b'503 Error: send HELO first\r\n')
630
631    def test_data_dialog(self):
632        self.write_line(b'HELO example')
633        self.write_line(b'MAIL From:eggs@example')
634        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
635        self.write_line(b'RCPT To:spam@example')
636        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
637
638        self.write_line(b'DATA')
639        self.assertEqual(self.channel.socket.last,
640            b'354 End data with <CR><LF>.<CR><LF>\r\n')
641        self.write_line(b'data\r\nmore\r\n.')
642        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
643        self.assertEqual(self.server.messages,
644            [(('peer-address', 'peer-port'),
645              'eggs@example',
646              ['spam@example'],
647              'data\nmore')])
648
649    def test_DATA_syntax(self):
650        self.write_line(b'HELO example')
651        self.write_line(b'MAIL From:eggs@example')
652        self.write_line(b'RCPT To:spam@example')
653        self.write_line(b'DATA spam')
654        self.assertEqual(self.channel.socket.last, b'501 Syntax: DATA\r\n')
655
656    def test_no_HELO_DATA(self):
657        self.write_line(b'DATA spam')
658        self.assertEqual(self.channel.socket.last,
659                         b'503 Error: send HELO first\r\n')
660
661    def test_data_transparency_section_4_5_2(self):
662        self.write_line(b'HELO example')
663        self.write_line(b'MAIL From:eggs@example')
664        self.write_line(b'RCPT To:spam@example')
665        self.write_line(b'DATA')
666        self.write_line(b'..\r\n.\r\n')
667        self.assertEqual(self.channel.received_data, '.')
668
669    def test_multiple_RCPT(self):
670        self.write_line(b'HELO example')
671        self.write_line(b'MAIL From:eggs@example')
672        self.write_line(b'RCPT To:spam@example')
673        self.write_line(b'RCPT To:ham@example')
674        self.write_line(b'DATA')
675        self.write_line(b'data\r\n.')
676        self.assertEqual(self.server.messages,
677            [(('peer-address', 'peer-port'),
678              'eggs@example',
679              ['spam@example','ham@example'],
680              'data')])
681
682    def test_manual_status(self):
683        # checks that the Channel is able to return a custom status message
684        self.write_line(b'HELO example')
685        self.write_line(b'MAIL From:eggs@example')
686        self.write_line(b'RCPT To:spam@example')
687        self.write_line(b'DATA')
688        self.write_line(b'return status\r\n.')
689        self.assertEqual(self.channel.socket.last, b'250 Okish\r\n')
690
691    def test_RSET(self):
692        self.write_line(b'HELO example')
693        self.write_line(b'MAIL From:eggs@example')
694        self.write_line(b'RCPT To:spam@example')
695        self.write_line(b'RSET')
696        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
697        self.write_line(b'MAIL From:foo@example')
698        self.write_line(b'RCPT To:eggs@example')
699        self.write_line(b'DATA')
700        self.write_line(b'data\r\n.')
701        self.assertEqual(self.server.messages,
702            [(('peer-address', 'peer-port'),
703               'foo@example',
704               ['eggs@example'],
705               'data')])
706
707    def test_HELO_RSET(self):
708        self.write_line(b'HELO example')
709        self.write_line(b'RSET')
710        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
711
712    def test_RSET_syntax(self):
713        self.write_line(b'RSET hi')
714        self.assertEqual(self.channel.socket.last, b'501 Syntax: RSET\r\n')
715
716    def test_unknown_command(self):
717        self.write_line(b'UNKNOWN_CMD')
718        self.assertEqual(self.channel.socket.last,
719                         b'500 Error: command "UNKNOWN_CMD" not ' + \
720                         b'recognized\r\n')
721
722    def test_attribute_deprecations(self):
723        with warnings_helper.check_warnings(('', DeprecationWarning)):
724            spam = self.channel._SMTPChannel__server
725        with warnings_helper.check_warnings(('', DeprecationWarning)):
726            self.channel._SMTPChannel__server = 'spam'
727        with warnings_helper.check_warnings(('', DeprecationWarning)):
728            spam = self.channel._SMTPChannel__line
729        with warnings_helper.check_warnings(('', DeprecationWarning)):
730            self.channel._SMTPChannel__line = 'spam'
731        with warnings_helper.check_warnings(('', DeprecationWarning)):
732            spam = self.channel._SMTPChannel__state
733        with warnings_helper.check_warnings(('', DeprecationWarning)):
734            self.channel._SMTPChannel__state = 'spam'
735        with warnings_helper.check_warnings(('', DeprecationWarning)):
736            spam = self.channel._SMTPChannel__greeting
737        with warnings_helper.check_warnings(('', DeprecationWarning)):
738            self.channel._SMTPChannel__greeting = 'spam'
739        with warnings_helper.check_warnings(('', DeprecationWarning)):
740            spam = self.channel._SMTPChannel__mailfrom
741        with warnings_helper.check_warnings(('', DeprecationWarning)):
742            self.channel._SMTPChannel__mailfrom = 'spam'
743        with warnings_helper.check_warnings(('', DeprecationWarning)):
744            spam = self.channel._SMTPChannel__rcpttos
745        with warnings_helper.check_warnings(('', DeprecationWarning)):
746            self.channel._SMTPChannel__rcpttos = 'spam'
747        with warnings_helper.check_warnings(('', DeprecationWarning)):
748            spam = self.channel._SMTPChannel__data
749        with warnings_helper.check_warnings(('', DeprecationWarning)):
750            self.channel._SMTPChannel__data = 'spam'
751        with warnings_helper.check_warnings(('', DeprecationWarning)):
752            spam = self.channel._SMTPChannel__fqdn
753        with warnings_helper.check_warnings(('', DeprecationWarning)):
754            self.channel._SMTPChannel__fqdn = 'spam'
755        with warnings_helper.check_warnings(('', DeprecationWarning)):
756            spam = self.channel._SMTPChannel__peer
757        with warnings_helper.check_warnings(('', DeprecationWarning)):
758            self.channel._SMTPChannel__peer = 'spam'
759        with warnings_helper.check_warnings(('', DeprecationWarning)):
760            spam = self.channel._SMTPChannel__conn
761        with warnings_helper.check_warnings(('', DeprecationWarning)):
762            self.channel._SMTPChannel__conn = 'spam'
763        with warnings_helper.check_warnings(('', DeprecationWarning)):
764            spam = self.channel._SMTPChannel__addr
765        with warnings_helper.check_warnings(('', DeprecationWarning)):
766            self.channel._SMTPChannel__addr = 'spam'
767
768@unittest.skipUnless(socket_helper.IPV6_ENABLED, "IPv6 not enabled")
769class SMTPDChannelIPv6Test(SMTPDChannelTest):
770    def setUp(self):
771        smtpd.socket = asyncore.socket = mock_socket
772        self.old_debugstream = smtpd.DEBUGSTREAM
773        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
774        self.server = DummyServer((socket_helper.HOSTv6, 0), ('b', 0),
775                                  decode_data=True)
776        conn, addr = self.server.accept()
777        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
778                                         decode_data=True)
779
780class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
781
782    def setUp(self):
783        smtpd.socket = asyncore.socket = mock_socket
784        self.old_debugstream = smtpd.DEBUGSTREAM
785        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
786        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
787                                  decode_data=True)
788        conn, addr = self.server.accept()
789        # Set DATA size limit to 32 bytes for easy testing
790        self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
791                                         decode_data=True)
792
793    def tearDown(self):
794        asyncore.close_all()
795        asyncore.socket = smtpd.socket = socket
796        smtpd.DEBUGSTREAM = self.old_debugstream
797
798    def write_line(self, line):
799        self.channel.socket.queue_recv(line)
800        self.channel.handle_read()
801
802    def test_data_limit_dialog(self):
803        self.write_line(b'HELO example')
804        self.write_line(b'MAIL From:eggs@example')
805        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
806        self.write_line(b'RCPT To:spam@example')
807        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
808
809        self.write_line(b'DATA')
810        self.assertEqual(self.channel.socket.last,
811            b'354 End data with <CR><LF>.<CR><LF>\r\n')
812        self.write_line(b'data\r\nmore\r\n.')
813        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
814        self.assertEqual(self.server.messages,
815            [(('peer-address', 'peer-port'),
816              'eggs@example',
817              ['spam@example'],
818              'data\nmore')])
819
820    def test_data_limit_dialog_too_much_data(self):
821        self.write_line(b'HELO example')
822        self.write_line(b'MAIL From:eggs@example')
823        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
824        self.write_line(b'RCPT To:spam@example')
825        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
826
827        self.write_line(b'DATA')
828        self.assertEqual(self.channel.socket.last,
829            b'354 End data with <CR><LF>.<CR><LF>\r\n')
830        self.write_line(b'This message is longer than 32 bytes\r\n.')
831        self.assertEqual(self.channel.socket.last,
832                         b'552 Error: Too much mail data\r\n')
833
834
835class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
836
837    def setUp(self):
838        smtpd.socket = asyncore.socket = mock_socket
839        self.old_debugstream = smtpd.DEBUGSTREAM
840        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
841        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0))
842        conn, addr = self.server.accept()
843        self.channel = smtpd.SMTPChannel(self.server, conn, addr)
844
845    def tearDown(self):
846        asyncore.close_all()
847        asyncore.socket = smtpd.socket = socket
848        smtpd.DEBUGSTREAM = self.old_debugstream
849
850    def write_line(self, line):
851        self.channel.socket.queue_recv(line)
852        self.channel.handle_read()
853
854    def test_ascii_data(self):
855        self.write_line(b'HELO example')
856        self.write_line(b'MAIL From:eggs@example')
857        self.write_line(b'RCPT To:spam@example')
858        self.write_line(b'DATA')
859        self.write_line(b'plain ascii text')
860        self.write_line(b'.')
861        self.assertEqual(self.channel.received_data, b'plain ascii text')
862
863    def test_utf8_data(self):
864        self.write_line(b'HELO example')
865        self.write_line(b'MAIL From:eggs@example')
866        self.write_line(b'RCPT To:spam@example')
867        self.write_line(b'DATA')
868        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
869        self.write_line(b'and some plain ascii')
870        self.write_line(b'.')
871        self.assertEqual(
872            self.channel.received_data,
873            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
874                b'and some plain ascii')
875
876
877class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
878
879    def setUp(self):
880        smtpd.socket = asyncore.socket = mock_socket
881        self.old_debugstream = smtpd.DEBUGSTREAM
882        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
883        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
884                                  decode_data=True)
885        conn, addr = self.server.accept()
886        # Set decode_data to True
887        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
888                decode_data=True)
889
890    def tearDown(self):
891        asyncore.close_all()
892        asyncore.socket = smtpd.socket = socket
893        smtpd.DEBUGSTREAM = self.old_debugstream
894
895    def write_line(self, line):
896        self.channel.socket.queue_recv(line)
897        self.channel.handle_read()
898
899    def test_ascii_data(self):
900        self.write_line(b'HELO example')
901        self.write_line(b'MAIL From:eggs@example')
902        self.write_line(b'RCPT To:spam@example')
903        self.write_line(b'DATA')
904        self.write_line(b'plain ascii text')
905        self.write_line(b'.')
906        self.assertEqual(self.channel.received_data, 'plain ascii text')
907
908    def test_utf8_data(self):
909        self.write_line(b'HELO example')
910        self.write_line(b'MAIL From:eggs@example')
911        self.write_line(b'RCPT To:spam@example')
912        self.write_line(b'DATA')
913        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
914        self.write_line(b'and some plain ascii')
915        self.write_line(b'.')
916        self.assertEqual(
917            self.channel.received_data,
918            'utf8 enriched text: żźć\nand some plain ascii')
919
920
921class SMTPDChannelTestWithEnableSMTPUTF8True(unittest.TestCase):
922    def setUp(self):
923        smtpd.socket = asyncore.socket = mock_socket
924        self.old_debugstream = smtpd.DEBUGSTREAM
925        self.debug = smtpd.DEBUGSTREAM = io.StringIO()
926        self.server = DummyServer((socket_helper.HOST, 0), ('b', 0),
927                                  enable_SMTPUTF8=True)
928        conn, addr = self.server.accept()
929        self.channel = smtpd.SMTPChannel(self.server, conn, addr,
930                                         enable_SMTPUTF8=True)
931
932    def tearDown(self):
933        asyncore.close_all()
934        asyncore.socket = smtpd.socket = socket
935        smtpd.DEBUGSTREAM = self.old_debugstream
936
937    def write_line(self, line):
938        self.channel.socket.queue_recv(line)
939        self.channel.handle_read()
940
941    def test_MAIL_command_accepts_SMTPUTF8_when_announced(self):
942        self.write_line(b'EHLO example')
943        self.write_line(
944            'MAIL from: <naï[email protected]> BODY=8BITMIME SMTPUTF8'.encode(
945                'utf-8')
946        )
947        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
948
949    def test_process_smtputf8_message(self):
950        self.write_line(b'EHLO example')
951        for mail_parameters in [b'', b'BODY=8BITMIME SMTPUTF8']:
952            self.write_line(b'MAIL from: <a@example> ' + mail_parameters)
953            self.assertEqual(self.channel.socket.last[0:3], b'250')
954            self.write_line(b'rcpt to:<[email protected]>')
955            self.assertEqual(self.channel.socket.last[0:3], b'250')
956            self.write_line(b'data')
957            self.assertEqual(self.channel.socket.last[0:3], b'354')
958            self.write_line(b'c\r\n.')
959            if mail_parameters == b'':
960                self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
961            else:
962                self.assertEqual(self.channel.socket.last,
963                                 b'250 SMTPUTF8 message okish\r\n')
964
965    def test_utf8_data(self):
966        self.write_line(b'EHLO example')
967        self.write_line(
968            'MAIL From: naïve@examplé BODY=8BITMIME SMTPUTF8'.encode('utf-8'))
969        self.assertEqual(self.channel.socket.last[0:3], b'250')
970        self.write_line('RCPT To:späm@examplé'.encode('utf-8'))
971        self.assertEqual(self.channel.socket.last[0:3], b'250')
972        self.write_line(b'DATA')
973        self.assertEqual(self.channel.socket.last[0:3], b'354')
974        self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
975        self.write_line(b'.')
976        self.assertEqual(
977            self.channel.received_data,
978            b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
979
980    def test_MAIL_command_limit_extended_with_SIZE_and_SMTPUTF8(self):
981        self.write_line(b'ehlo example')
982        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
983        self.write_line(b'MAIL from:<' +
984                        b'a' * (fill_len + 1) +
985                        b'@example>')
986        self.assertEqual(self.channel.socket.last,
987                         b'500 Error: line too long\r\n')
988        self.write_line(b'MAIL from:<' +
989                        b'a' * fill_len +
990                        b'@example>')
991        self.assertEqual(self.channel.socket.last, b'250 OK\r\n')
992
993    def test_multiple_emails_with_extended_command_length(self):
994        self.write_line(b'ehlo example')
995        fill_len = (512 + 26 + 10) - len('mail from:<@example>')
996        for char in [b'a', b'b', b'c']:
997            self.write_line(b'MAIL from:<' + char * fill_len + b'a@example>')
998            self.assertEqual(self.channel.socket.last[0:3], b'500')
999            self.write_line(b'MAIL from:<' + char * fill_len + b'@example>')
1000            self.assertEqual(self.channel.socket.last[0:3], b'250')
1001            self.write_line(b'rcpt to:<[email protected]>')
1002            self.assertEqual(self.channel.socket.last[0:3], b'250')
1003            self.write_line(b'data')
1004            self.assertEqual(self.channel.socket.last[0:3], b'354')
1005            self.write_line(b'test\r\n.')
1006            self.assertEqual(self.channel.socket.last[0:3], b'250')
1007
1008
1009class MiscTestCase(unittest.TestCase):
1010    def test__all__(self):
1011        not_exported = {
1012            "program", "Devnull", "DEBUGSTREAM", "NEWLINE", "COMMASPACE",
1013            "DATA_SIZE_DEFAULT", "usage", "Options", "parseargs",
1014        }
1015        support.check__all__(self, smtpd, not_exported=not_exported)
1016
1017
1018if __name__ == "__main__":
1019    unittest.main()
1020