1import errno
2import os
3import select
4import subprocess
5import sys
6import textwrap
7import unittest
8from test import support
9
10support.requires_working_socket(module=True)
11
12@unittest.skipIf((sys.platform[:3]=='win'),
13                 "can't easily test on this system")
14class SelectTestCase(unittest.TestCase):
15
16    class Nope:
17        pass
18
19    class Almost:
20        def fileno(self):
21            return 'fileno'
22
23    def test_error_conditions(self):
24        self.assertRaises(TypeError, select.select, 1, 2, 3)
25        self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
26        self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
27        self.assertRaises(TypeError, select.select, [], [], [], "not a number")
28        self.assertRaises(ValueError, select.select, [], [], [], -1)
29
30    # Issue #12367: http://www.freebsd.org/cgi/query-pr.cgi?pr=kern/155606
31    @unittest.skipIf(sys.platform.startswith('freebsd'),
32                     'skip because of a FreeBSD bug: kern/155606')
33    def test_errno(self):
34        with open(__file__, 'rb') as fp:
35            fd = fp.fileno()
36            fp.close()
37            try:
38                select.select([fd], [], [], 0)
39            except OSError as err:
40                self.assertEqual(err.errno, errno.EBADF)
41            else:
42                self.fail("exception not raised")
43
44    def test_returned_list_identity(self):
45        # See issue #8329
46        r, w, x = select.select([], [], [], 1)
47        self.assertIsNot(r, w)
48        self.assertIsNot(r, x)
49        self.assertIsNot(w, x)
50
51    @support.requires_fork()
52    def test_select(self):
53        code = textwrap.dedent('''
54            import time
55            for i in range(10):
56                print("testing...", flush=True)
57                time.sleep(0.050)
58        ''')
59        cmd = [sys.executable, '-I', '-c', code]
60        with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc:
61            pipe = proc.stdout
62            for timeout in (0, 1, 2, 4, 8, 16) + (None,)*10:
63                if support.verbose:
64                    print(f'timeout = {timeout}')
65                rfd, wfd, xfd = select.select([pipe], [], [], timeout)
66                self.assertEqual(wfd, [])
67                self.assertEqual(xfd, [])
68                if not rfd:
69                    continue
70                if rfd == [pipe]:
71                    line = pipe.readline()
72                    if support.verbose:
73                        print(repr(line))
74                    if not line:
75                        if support.verbose:
76                            print('EOF')
77                        break
78                    continue
79                self.fail('Unexpected return values from select():',
80                          rfd, wfd, xfd)
81
82    # Issue 16230: Crash on select resized list
83    @unittest.skipIf(
84        support.is_emscripten, "Emscripten cannot select a fd multiple times."
85    )
86    def test_select_mutated(self):
87        a = []
88        class F:
89            def fileno(self):
90                del a[-1]
91                return sys.__stdout__.fileno()
92        a[:] = [F()] * 10
93        self.assertEqual(select.select([], a, []), ([], a[:5], []))
94
95    def test_disallow_instantiation(self):
96        support.check_disallow_instantiation(self, type(select.poll()))
97
98        if hasattr(select, 'devpoll'):
99            support.check_disallow_instantiation(self, type(select.devpoll()))
100
101def tearDownModule():
102    support.reap_children()
103
104if __name__ == "__main__":
105    unittest.main()
106