1from test.test_support import verbose, run_unittest, import_module
2
3#Skip these tests if either fcntl or termios is not available
4fcntl = import_module('fcntl')
5import_module('termios')
6
7import errno
8import pty
9import os
10import sys
11import select
12import signal
13import socket
14import io # readline
15import unittest
16
17TEST_STRING_1 = "I wish to buy a fish license.\n"
18TEST_STRING_2 = "For my pet fish, Eric.\n"
19
20if verbose:
21    def debug(msg):
22        print msg
23else:
24    def debug(msg):
25        pass
26
27
28# Note that os.read() is nondeterministic so we need to be very careful
29# to make the test suite deterministic.  A normal call to os.read() may
30# give us less than expected.
31#
32# Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get
33# back 'foo\r\n' at the other end.  The behavior depends on the termios
34# setting.  The newline translation may be OS-specific.  To make the
35# test suite deterministic and OS-independent, the functions _readline
36# and normalize_output can be used.
37
38def normalize_output(data):
39    # Some operating systems do conversions on newline.  We could possibly
40    # fix that by doing the appropriate termios.tcsetattr()s.  I couldn't
41    # figure out the right combo on Tru64 and I don't have an IRIX box.
42    # So just normalize the output and doc the problem O/Ses by allowing
43    # certain combinations for some platforms, but avoid allowing other
44    # differences (like extra whitespace, trailing garbage, etc.)
45
46    # This is about the best we can do without getting some feedback
47    # from someone more knowledgable.
48
49    # OSF/1 (Tru64) apparently turns \n into \r\r\n.
50    if data.endswith('\r\r\n'):
51        return data.replace('\r\r\n', '\n')
52
53    # IRIX apparently turns \n into \r\n.
54    if data.endswith('\r\n'):
55        return data.replace('\r\n', '\n')
56
57    return data
58
59def _readline(fd):
60    """Read one line.  May block forever if no newline is read."""
61    reader = io.FileIO(fd, mode='rb', closefd=False)
62    return reader.readline()
63
64
65
66# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
67# because pty code is not too portable.
68# XXX(nnorwitz):  these tests leak fds when there is an error.
69class PtyTest(unittest.TestCase):
70    def setUp(self):
71        # isatty() and close() can hang on some platforms.  Set an alarm
72        # before running the test to make sure we don't hang forever.
73        old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
74        self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
75        self.addCleanup(signal.alarm, 0)
76        signal.alarm(10)
77
78    def handle_sig(self, sig, frame):
79        self.fail("isatty hung")
80
81    def test_basic(self):
82        try:
83            debug("Calling master_open()")
84            master_fd, slave_name = pty.master_open()
85            debug("Got master_fd '%d', slave_name '%s'" %
86                  (master_fd, slave_name))
87            debug("Calling slave_open(%r)" % (slave_name,))
88            slave_fd = pty.slave_open(slave_name)
89            debug("Got slave_fd '%d'" % slave_fd)
90        except OSError:
91            # " An optional feature could not be imported " ... ?
92            raise unittest.SkipTest, "Pseudo-terminals (seemingly) not functional."
93
94        self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
95
96        # Solaris requires reading the fd before anything is returned.
97        # My guess is that since we open and close the slave fd
98        # in master_open(), we need to read the EOF.
99
100        # Ensure the fd is non-blocking in case there's nothing to read.
101        orig_flags = fcntl.fcntl(master_fd, fcntl.F_GETFL)
102        fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags | os.O_NONBLOCK)
103        try:
104            s1 = os.read(master_fd, 1024)
105            self.assertEqual('', s1)
106        except OSError, e:
107            if e.errno != errno.EAGAIN:
108                raise
109        # Restore the original flags.
110        fcntl.fcntl(master_fd, fcntl.F_SETFL, orig_flags)
111
112        debug("Writing to slave_fd")
113        os.write(slave_fd, TEST_STRING_1)
114        s1 = _readline(master_fd)
115        self.assertEqual('I wish to buy a fish license.\n',
116                         normalize_output(s1))
117
118        debug("Writing chunked output")
119        os.write(slave_fd, TEST_STRING_2[:5])
120        os.write(slave_fd, TEST_STRING_2[5:])
121        s2 = _readline(master_fd)
122        self.assertEqual('For my pet fish, Eric.\n', normalize_output(s2))
123
124        os.close(slave_fd)
125        os.close(master_fd)
126
127
128    def test_fork(self):
129        debug("calling pty.fork()")
130        pid, master_fd = pty.fork()
131        if pid == pty.CHILD:
132            # stdout should be connected to a tty.
133            if not os.isatty(1):
134                debug("Child's fd 1 is not a tty?!")
135                os._exit(3)
136
137            # After pty.fork(), the child should already be a session leader.
138            # (on those systems that have that concept.)
139            debug("In child, calling os.setsid()")
140            try:
141                os.setsid()
142            except OSError:
143                # Good, we already were session leader
144                debug("Good: OSError was raised.")
145                pass
146            except AttributeError:
147                # Have pty, but not setsid()?
148                debug("No setsid() available?")
149                pass
150            except:
151                # We don't want this error to propagate, escaping the call to
152                # os._exit() and causing very peculiar behavior in the calling
153                # regrtest.py !
154                # Note: could add traceback printing here.
155                debug("An unexpected error was raised.")
156                os._exit(1)
157            else:
158                debug("os.setsid() succeeded! (bad!)")
159                os._exit(2)
160            os._exit(4)
161        else:
162            debug("Waiting for child (%d) to finish." % pid)
163            # In verbose mode, we have to consume the debug output from the
164            # child or the child will block, causing this test to hang in the
165            # parent's waitpid() call.  The child blocks after a
166            # platform-dependent amount of data is written to its fd.  On
167            # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
168            # X even the small writes in the child above will block it.  Also
169            # on Linux, the read() will raise an OSError (input/output error)
170            # when it tries to read past the end of the buffer but the child's
171            # already exited, so catch and discard those exceptions.  It's not
172            # worth checking for EIO.
173            while True:
174                try:
175                    data = os.read(master_fd, 80)
176                except OSError:
177                    break
178                if not data:
179                    break
180                sys.stdout.write(data.replace('\r\n', '\n'))
181
182            ##line = os.read(master_fd, 80)
183            ##lines = line.replace('\r\n', '\n').split('\n')
184            ##if False and lines != ['In child, calling os.setsid()',
185            ##             'Good: OSError was raised.', '']:
186            ##    raise TestFailed("Unexpected output from child: %r" % line)
187
188            (pid, status) = os.waitpid(pid, 0)
189            res = status >> 8
190            debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
191            if res == 1:
192                self.fail("Child raised an unexpected exception in os.setsid()")
193            elif res == 2:
194                self.fail("pty.fork() failed to make child a session leader.")
195            elif res == 3:
196                self.fail("Child spawned by pty.fork() did not have a tty as stdout")
197            elif res != 4:
198                self.fail("pty.fork() failed for unknown reasons.")
199
200            ##debug("Reading from master_fd now that the child has exited")
201            ##try:
202            ##    s1 = os.read(master_fd, 1024)
203            ##except os.error:
204            ##    pass
205            ##else:
206            ##    raise TestFailed("Read from master_fd did not raise exception")
207
208        os.close(master_fd)
209
210        # pty.fork() passed.
211
212
213class SmallPtyTests(unittest.TestCase):
214    """These tests don't spawn children or hang."""
215
216    def setUp(self):
217        self.orig_stdin_fileno = pty.STDIN_FILENO
218        self.orig_stdout_fileno = pty.STDOUT_FILENO
219        self.orig_pty_select = pty.select
220        self.fds = []  # A list of file descriptors to close.
221        self.select_rfds_lengths = []
222        self.select_rfds_results = []
223
224    def tearDown(self):
225        pty.STDIN_FILENO = self.orig_stdin_fileno
226        pty.STDOUT_FILENO = self.orig_stdout_fileno
227        pty.select = self.orig_pty_select
228        for fd in self.fds:
229            try:
230                os.close(fd)
231            except:
232                pass
233
234    def _pipe(self):
235        pipe_fds = os.pipe()
236        self.fds.extend(pipe_fds)
237        return pipe_fds
238
239    def _mock_select(self, rfds, wfds, xfds):
240        # This will raise IndexError when no more expected calls exist.
241        self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
242        return self.select_rfds_results.pop(0), [], []
243
244    def test__copy_to_each(self):
245        """Test the normal data case on both master_fd and stdin."""
246        read_from_stdout_fd, mock_stdout_fd = self._pipe()
247        pty.STDOUT_FILENO = mock_stdout_fd
248        mock_stdin_fd, write_to_stdin_fd = self._pipe()
249        pty.STDIN_FILENO = mock_stdin_fd
250        socketpair = socket.socketpair()
251        masters = [s.fileno() for s in socketpair]
252        self.fds.extend(masters)
253
254        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
255        os.write(masters[1], b'from master')
256        os.write(write_to_stdin_fd, b'from stdin')
257
258        # Expect two select calls, the last one will cause IndexError
259        pty.select = self._mock_select
260        self.select_rfds_lengths.append(2)
261        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
262        self.select_rfds_lengths.append(2)
263
264        with self.assertRaises(IndexError):
265            pty._copy(masters[0])
266
267        # Test that the right data went to the right places.
268        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
269        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
270        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
271        self.assertEqual(os.read(masters[1], 20), b'from stdin')
272
273    def test__copy_eof_on_all(self):
274        """Test the empty read EOF case on both master_fd and stdin."""
275        read_from_stdout_fd, mock_stdout_fd = self._pipe()
276        pty.STDOUT_FILENO = mock_stdout_fd
277        mock_stdin_fd, write_to_stdin_fd = self._pipe()
278        pty.STDIN_FILENO = mock_stdin_fd
279        socketpair = socket.socketpair()
280        masters = [s.fileno() for s in socketpair]
281        self.fds.extend(masters)
282
283        os.close(masters[1])
284        socketpair[1].close()
285        os.close(write_to_stdin_fd)
286
287        # Expect two select calls, the last one will cause IndexError
288        pty.select = self._mock_select
289        self.select_rfds_lengths.append(2)
290        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
291        # We expect that both fds were removed from the fds list as they
292        # both encountered an EOF before the second select call.
293        self.select_rfds_lengths.append(0)
294
295        with self.assertRaises(IndexError):
296            pty._copy(masters[0])
297
298
299def test_main(verbose=None):
300    run_unittest(SmallPtyTests, PtyTest)
301
302if __name__ == "__main__":
303    test_main()
304