1# test asynchat
2
3from test import support
4from test.support import socket_helper
5from test.support import threading_helper
6from test.support import warnings_helper
7
8import errno
9import socket
10import sys
11import threading
12import time
13import unittest
14import unittest.mock
15
16
17asynchat = warnings_helper.import_deprecated('asynchat')
18asyncore = warnings_helper.import_deprecated('asyncore')
19
20support.requires_working_socket(module=True)
21
22HOST = socket_helper.HOST
23SERVER_QUIT = b'QUIT\n'
24
25
26class echo_server(threading.Thread):
27    # parameter to determine the number of bytes passed back to the
28    # client each send
29    chunk_size = 1
30
31    def __init__(self, event):
32        threading.Thread.__init__(self)
33        self.event = event
34        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35        self.port = socket_helper.bind_port(self.sock)
36        # This will be set if the client wants us to wait before echoing
37        # data back.
38        self.start_resend_event = None
39
40    def run(self):
41        self.sock.listen()
42        self.event.set()
43        conn, client = self.sock.accept()
44        self.buffer = b""
45        # collect data until quit message is seen
46        while SERVER_QUIT not in self.buffer:
47            data = conn.recv(1)
48            if not data:
49                break
50            self.buffer = self.buffer + data
51
52        # remove the SERVER_QUIT message
53        self.buffer = self.buffer.replace(SERVER_QUIT, b'')
54
55        if self.start_resend_event:
56            self.start_resend_event.wait()
57
58        # re-send entire set of collected data
59        try:
60            # this may fail on some tests, such as test_close_when_done,
61            # since the client closes the channel when it's done sending
62            while self.buffer:
63                n = conn.send(self.buffer[:self.chunk_size])
64                time.sleep(0.001)
65                self.buffer = self.buffer[n:]
66        except:
67            pass
68
69        conn.close()
70        self.sock.close()
71
72class echo_client(asynchat.async_chat):
73
74    def __init__(self, terminator, server_port):
75        asynchat.async_chat.__init__(self)
76        self.contents = []
77        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
78        self.connect((HOST, server_port))
79        self.set_terminator(terminator)
80        self.buffer = b""
81
82    def handle_connect(self):
83        pass
84
85    if sys.platform == 'darwin':
86        # select.poll returns a select.POLLHUP at the end of the tests
87        # on darwin, so just ignore it
88        def handle_expt(self):
89            pass
90
91    def collect_incoming_data(self, data):
92        self.buffer += data
93
94    def found_terminator(self):
95        self.contents.append(self.buffer)
96        self.buffer = b""
97
98def start_echo_server():
99    event = threading.Event()
100    s = echo_server(event)
101    s.start()
102    event.wait()
103    event.clear()
104    time.sleep(0.01)   # Give server time to start accepting.
105    return s, event
106
107
108class TestAsynchat(unittest.TestCase):
109    usepoll = False
110
111    def setUp(self):
112        self._threads = threading_helper.threading_setup()
113
114    def tearDown(self):
115        threading_helper.threading_cleanup(*self._threads)
116
117    def line_terminator_check(self, term, server_chunk):
118        event = threading.Event()
119        s = echo_server(event)
120        s.chunk_size = server_chunk
121        s.start()
122        event.wait()
123        event.clear()
124        time.sleep(0.01)   # Give server time to start accepting.
125        c = echo_client(term, s.port)
126        c.push(b"hello ")
127        c.push(b"world" + term)
128        c.push(b"I'm not dead yet!" + term)
129        c.push(SERVER_QUIT)
130        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
131        threading_helper.join_thread(s)
132
133        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
134
135    # the line terminator tests below check receiving variously-sized
136    # chunks back from the server in order to exercise all branches of
137    # async_chat.handle_read
138
139    def test_line_terminator1(self):
140        # test one-character terminator
141        for l in (1, 2, 3):
142            self.line_terminator_check(b'\n', l)
143
144    def test_line_terminator2(self):
145        # test two-character terminator
146        for l in (1, 2, 3):
147            self.line_terminator_check(b'\r\n', l)
148
149    def test_line_terminator3(self):
150        # test three-character terminator
151        for l in (1, 2, 3):
152            self.line_terminator_check(b'qqq', l)
153
154    def numeric_terminator_check(self, termlen):
155        # Try reading a fixed number of bytes
156        s, event = start_echo_server()
157        c = echo_client(termlen, s.port)
158        data = b"hello world, I'm not dead yet!\n"
159        c.push(data)
160        c.push(SERVER_QUIT)
161        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
162        threading_helper.join_thread(s)
163
164        self.assertEqual(c.contents, [data[:termlen]])
165
166    def test_numeric_terminator1(self):
167        # check that ints & longs both work (since type is
168        # explicitly checked in async_chat.handle_read)
169        self.numeric_terminator_check(1)
170
171    def test_numeric_terminator2(self):
172        self.numeric_terminator_check(6)
173
174    def test_none_terminator(self):
175        # Try reading a fixed number of bytes
176        s, event = start_echo_server()
177        c = echo_client(None, s.port)
178        data = b"hello world, I'm not dead yet!\n"
179        c.push(data)
180        c.push(SERVER_QUIT)
181        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
182        threading_helper.join_thread(s)
183
184        self.assertEqual(c.contents, [])
185        self.assertEqual(c.buffer, data)
186
187    def test_simple_producer(self):
188        s, event = start_echo_server()
189        c = echo_client(b'\n', s.port)
190        data = b"hello world\nI'm not dead yet!\n"
191        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
192        c.push_with_producer(p)
193        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
194        threading_helper.join_thread(s)
195
196        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
197
198    def test_string_producer(self):
199        s, event = start_echo_server()
200        c = echo_client(b'\n', s.port)
201        data = b"hello world\nI'm not dead yet!\n"
202        c.push_with_producer(data+SERVER_QUIT)
203        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
204        threading_helper.join_thread(s)
205
206        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
207
208    def test_empty_line(self):
209        # checks that empty lines are handled correctly
210        s, event = start_echo_server()
211        c = echo_client(b'\n', s.port)
212        c.push(b"hello world\n\nI'm not dead yet!\n")
213        c.push(SERVER_QUIT)
214        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
215        threading_helper.join_thread(s)
216
217        self.assertEqual(c.contents,
218                         [b"hello world", b"", b"I'm not dead yet!"])
219
220    def test_close_when_done(self):
221        s, event = start_echo_server()
222        s.start_resend_event = threading.Event()
223        c = echo_client(b'\n', s.port)
224        c.push(b"hello world\nI'm not dead yet!\n")
225        c.push(SERVER_QUIT)
226        c.close_when_done()
227        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
228
229        # Only allow the server to start echoing data back to the client after
230        # the client has closed its connection.  This prevents a race condition
231        # where the server echoes all of its data before we can check that it
232        # got any down below.
233        s.start_resend_event.set()
234        threading_helper.join_thread(s)
235
236        self.assertEqual(c.contents, [])
237        # the server might have been able to send a byte or two back, but this
238        # at least checks that it received something and didn't just fail
239        # (which could still result in the client not having received anything)
240        self.assertGreater(len(s.buffer), 0)
241
242    def test_push(self):
243        # Issue #12523: push() should raise a TypeError if it doesn't get
244        # a bytes string
245        s, event = start_echo_server()
246        c = echo_client(b'\n', s.port)
247        data = b'bytes\n'
248        c.push(data)
249        c.push(bytearray(data))
250        c.push(memoryview(data))
251        self.assertRaises(TypeError, c.push, 10)
252        self.assertRaises(TypeError, c.push, 'unicode')
253        c.push(SERVER_QUIT)
254        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
255        threading_helper.join_thread(s)
256        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
257
258
259class TestAsynchat_WithPoll(TestAsynchat):
260    usepoll = True
261
262
263class TestAsynchatMocked(unittest.TestCase):
264    def test_blockingioerror(self):
265        # Issue #16133: handle_read() must ignore BlockingIOError
266        sock = unittest.mock.Mock()
267        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
268
269        dispatcher = asynchat.async_chat()
270        dispatcher.set_socket(sock)
271        self.addCleanup(dispatcher.del_channel)
272
273        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
274            dispatcher.handle_read()
275        self.assertFalse(error.called)
276
277
278class TestHelperFunctions(unittest.TestCase):
279    def test_find_prefix_at_end(self):
280        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
281        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
282
283
284class TestNotConnected(unittest.TestCase):
285    def test_disallow_negative_terminator(self):
286        # Issue #11259
287        client = asynchat.async_chat()
288        self.assertRaises(ValueError, client.set_terminator, -1)
289
290
291
292if __name__ == "__main__":
293    unittest.main()
294