1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7import threading
8import time
9import unittest
10from test.support import (
11    cpython_only, requires_subprocess, requires_working_socket
12)
13from test.support import threading_helper
14from test.support.os_helper import TESTFN
15
16
17try:
18    select.poll
19except AttributeError:
20    raise unittest.SkipTest("select.poll not defined")
21
22requires_working_socket(module=True)
23
24def find_ready_matching(ready, flag):
25    match = []
26    for fd, mode in ready:
27        if mode & flag:
28            match.append(fd)
29    return match
30
31class PollTests(unittest.TestCase):
32
33    def test_poll1(self):
34        # Basic functional test of poll object
35        # Create a bunch of pipe and test that poll works with them.
36
37        p = select.poll()
38
39        NUM_PIPES = 12
40        MSG = b" This is a test."
41        MSG_LEN = len(MSG)
42        readers = []
43        writers = []
44        r2w = {}
45        w2r = {}
46
47        for i in range(NUM_PIPES):
48            rd, wr = os.pipe()
49            p.register(rd)
50            p.modify(rd, select.POLLIN)
51            p.register(wr, select.POLLOUT)
52            readers.append(rd)
53            writers.append(wr)
54            r2w[rd] = wr
55            w2r[wr] = rd
56
57        bufs = []
58
59        while writers:
60            ready = p.poll()
61            ready_writers = find_ready_matching(ready, select.POLLOUT)
62            if not ready_writers:
63                raise RuntimeError("no pipes ready for writing")
64            wr = random.choice(ready_writers)
65            os.write(wr, MSG)
66
67            ready = p.poll()
68            ready_readers = find_ready_matching(ready, select.POLLIN)
69            if not ready_readers:
70                raise RuntimeError("no pipes ready for reading")
71            rd = random.choice(ready_readers)
72            buf = os.read(rd, MSG_LEN)
73            self.assertEqual(len(buf), MSG_LEN)
74            bufs.append(buf)
75            os.close(r2w[rd]) ; os.close( rd )
76            p.unregister( r2w[rd] )
77            p.unregister( rd )
78            writers.remove(r2w[rd])
79
80        self.assertEqual(bufs, [MSG] * NUM_PIPES)
81
82    def test_poll_unit_tests(self):
83        # returns NVAL for invalid file descriptor
84        FD, w = os.pipe()
85        os.close(FD)
86        os.close(w)
87        p = select.poll()
88        p.register(FD)
89        r = p.poll()
90        self.assertEqual(r[0], (FD, select.POLLNVAL))
91
92        with open(TESTFN, 'w') as f:
93            fd = f.fileno()
94            p = select.poll()
95            p.register(f)
96            r = p.poll()
97            self.assertEqual(r[0][0], fd)
98        r = p.poll()
99        self.assertEqual(r[0], (fd, select.POLLNVAL))
100        os.unlink(TESTFN)
101
102        # type error for invalid arguments
103        p = select.poll()
104        self.assertRaises(TypeError, p.register, p)
105        self.assertRaises(TypeError, p.unregister, p)
106
107        # can't unregister non-existent object
108        p = select.poll()
109        self.assertRaises(KeyError, p.unregister, 3)
110
111        # Test error cases
112        pollster = select.poll()
113        class Nope:
114            pass
115
116        class Almost:
117            def fileno(self):
118                return 'fileno'
119
120        self.assertRaises(TypeError, pollster.register, Nope(), 0)
121        self.assertRaises(TypeError, pollster.register, Almost(), 0)
122
123    # Another test case for poll().  This is copied from the test case for
124    # select(), modified to use poll() instead.
125
126    @requires_subprocess()
127    def test_poll2(self):
128        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
129        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
130                                bufsize=0)
131        self.enterContext(proc)
132        p = proc.stdout
133        pollster = select.poll()
134        pollster.register( p, select.POLLIN )
135        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
136            fdlist = pollster.poll(tout)
137            if (fdlist == []):
138                continue
139            fd, flags = fdlist[0]
140            if flags & select.POLLHUP:
141                line = p.readline()
142                if line != b"":
143                    self.fail('error: pipe seems to be closed, but still returns data')
144                continue
145
146            elif flags & select.POLLIN:
147                line = p.readline()
148                if not line:
149                    break
150                self.assertEqual(line, b'testing...\n')
151                continue
152            else:
153                self.fail('Unexpected return value from select.poll: %s' % fdlist)
154
155    def test_poll3(self):
156        # test int overflow
157        pollster = select.poll()
158        pollster.register(1)
159
160        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
161
162        x = 2 + 3
163        if x != 5:
164            self.fail('Overflow must have occurred')
165
166        # Issues #15989, #17919
167        self.assertRaises(ValueError, pollster.register, 0, -1)
168        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
169        self.assertRaises(ValueError, pollster.modify, 1, -1)
170        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
171
172    @cpython_only
173    def test_poll_c_limits(self):
174        from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
175        pollster = select.poll()
176        pollster.register(1)
177
178        # Issues #15989, #17919
179        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
180        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
181        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
182        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
183
184    @threading_helper.reap_threads
185    def test_threaded_poll(self):
186        r, w = os.pipe()
187        self.addCleanup(os.close, r)
188        self.addCleanup(os.close, w)
189        rfds = []
190        for i in range(10):
191            fd = os.dup(r)
192            self.addCleanup(os.close, fd)
193            rfds.append(fd)
194        pollster = select.poll()
195        for fd in rfds:
196            pollster.register(fd, select.POLLIN)
197
198        t = threading.Thread(target=pollster.poll)
199        t.start()
200        try:
201            time.sleep(0.5)
202            # trigger ufds array reallocation
203            for fd in rfds:
204                pollster.unregister(fd)
205            pollster.register(w, select.POLLOUT)
206            self.assertRaises(RuntimeError, pollster.poll)
207        finally:
208            # and make the call to poll() from the thread return
209            os.write(w, b'spam')
210            t.join()
211
212    @unittest.skipUnless(threading, 'Threading required for this test.')
213    @threading_helper.reap_threads
214    def test_poll_blocks_with_negative_ms(self):
215        for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]:
216            # Create two file descriptors. This will be used to unlock
217            # the blocking call to poll.poll inside the thread
218            r, w = os.pipe()
219            pollster = select.poll()
220            pollster.register(r, select.POLLIN)
221
222            poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
223            poll_thread.start()
224            poll_thread.join(timeout=0.1)
225            self.assertTrue(poll_thread.is_alive())
226
227            # Write to the pipe so pollster.poll unblocks and the thread ends.
228            os.write(w, b'spam')
229            poll_thread.join()
230            self.assertFalse(poll_thread.is_alive())
231            os.close(r)
232            os.close(w)
233
234
235if __name__ == '__main__':
236    unittest.main()
237