1from test.support import verbose, reap_children 2from test.support.import_helper import import_module 3 4# Skip these tests if termios or fcntl are not available 5import_module('termios') 6# fcntl is a proxy for not being one of the wasm32 platforms even though we 7# don't use this module... a proper check for what crashes those is needed. 8import_module("fcntl") 9 10import errno 11import os 12import pty 13import tty 14import sys 15import select 16import signal 17import socket 18import io # readline 19import unittest 20import warnings 21 22TEST_STRING_1 = b"I wish to buy a fish license.\n" 23TEST_STRING_2 = b"For my pet fish, Eric.\n" 24 25_HAVE_WINSZ = hasattr(tty, "TIOCGWINSZ") and hasattr(tty, "TIOCSWINSZ") 26 27if verbose: 28 def debug(msg): 29 print(msg) 30else: 31 def debug(msg): 32 pass 33 34 35# Note that os.read() is nondeterministic so we need to be very careful 36# to make the test suite deterministic. A normal call to os.read() may 37# give us less than expected. 38# 39# Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get 40# back 'foo\r\n' at the other end. The behavior depends on the termios 41# setting. The newline translation may be OS-specific. To make the 42# test suite deterministic and OS-independent, the functions _readline 43# and normalize_output can be used. 44 45def normalize_output(data): 46 # Some operating systems do conversions on newline. We could possibly fix 47 # that by doing the appropriate termios.tcsetattr()s. I couldn't figure out 48 # the right combo on Tru64. So, just normalize the output and doc the 49 # problem O/Ses by allowing certain combinations for some platforms, but 50 # avoid allowing other differences (like extra whitespace, trailing garbage, 51 # etc.) 52 53 # This is about the best we can do without getting some feedback 54 # from someone more knowledgable. 55 56 # OSF/1 (Tru64) apparently turns \n into \r\r\n. 57 if data.endswith(b'\r\r\n'): 58 return data.replace(b'\r\r\n', b'\n') 59 60 if data.endswith(b'\r\n'): 61 return data.replace(b'\r\n', b'\n') 62 63 return data 64 65def _readline(fd): 66 """Read one line. May block forever if no newline is read.""" 67 reader = io.FileIO(fd, mode='rb', closefd=False) 68 return reader.readline() 69 70def expectedFailureIfStdinIsTTY(fun): 71 # avoid isatty() 72 try: 73 tty.tcgetattr(pty.STDIN_FILENO) 74 return unittest.expectedFailure(fun) 75 except tty.error: 76 pass 77 return fun 78 79# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing 80# because pty code is not too portable. 81class PtyTest(unittest.TestCase): 82 def setUp(self): 83 old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) 84 self.addCleanup(signal.signal, signal.SIGALRM, old_alarm) 85 86 old_sighup = signal.signal(signal.SIGHUP, self.handle_sighup) 87 self.addCleanup(signal.signal, signal.SIGHUP, old_sighup) 88 89 # isatty() and close() can hang on some platforms. Set an alarm 90 # before running the test to make sure we don't hang forever. 91 self.addCleanup(signal.alarm, 0) 92 signal.alarm(10) 93 94 # Save original stdin window size. 95 self.stdin_dim = None 96 if _HAVE_WINSZ: 97 try: 98 self.stdin_dim = tty.tcgetwinsize(pty.STDIN_FILENO) 99 self.addCleanup(tty.tcsetwinsize, pty.STDIN_FILENO, 100 self.stdin_dim) 101 except tty.error: 102 pass 103 104 def handle_sig(self, sig, frame): 105 self.fail("isatty hung") 106 107 @staticmethod 108 def handle_sighup(signum, frame): 109 pass 110 111 @expectedFailureIfStdinIsTTY 112 def test_openpty(self): 113 try: 114 mode = tty.tcgetattr(pty.STDIN_FILENO) 115 except tty.error: 116 # Not a tty or bad/closed fd. 117 debug("tty.tcgetattr(pty.STDIN_FILENO) failed") 118 mode = None 119 120 new_dim = None 121 if self.stdin_dim: 122 try: 123 # Modify pty.STDIN_FILENO window size; we need to 124 # check if pty.openpty() is able to set pty slave 125 # window size accordingly. 126 debug("Setting pty.STDIN_FILENO window size.") 127 debug(f"original size: (row, col) = {self.stdin_dim}") 128 target_dim = (self.stdin_dim[0] + 1, self.stdin_dim[1] + 1) 129 debug(f"target size: (row, col) = {target_dim}") 130 tty.tcsetwinsize(pty.STDIN_FILENO, target_dim) 131 132 # Were we able to set the window size 133 # of pty.STDIN_FILENO successfully? 134 new_dim = tty.tcgetwinsize(pty.STDIN_FILENO) 135 self.assertEqual(new_dim, target_dim, 136 "pty.STDIN_FILENO window size unchanged") 137 except OSError: 138 warnings.warn("Failed to set pty.STDIN_FILENO window size.") 139 pass 140 141 try: 142 debug("Calling pty.openpty()") 143 try: 144 master_fd, slave_fd, slave_name = pty.openpty(mode, new_dim, 145 True) 146 except TypeError: 147 master_fd, slave_fd = pty.openpty() 148 slave_name = None 149 debug(f"Got {master_fd=}, {slave_fd=}, {slave_name=}") 150 except OSError: 151 # " An optional feature could not be imported " ... ? 152 raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.") 153 154 # closing master_fd can raise a SIGHUP if the process is 155 # the session leader: we installed a SIGHUP signal handler 156 # to ignore this signal. 157 self.addCleanup(os.close, master_fd) 158 self.addCleanup(os.close, slave_fd) 159 160 self.assertTrue(os.isatty(slave_fd), "slave_fd is not a tty") 161 162 if mode: 163 self.assertEqual(tty.tcgetattr(slave_fd), mode, 164 "openpty() failed to set slave termios") 165 if new_dim: 166 self.assertEqual(tty.tcgetwinsize(slave_fd), new_dim, 167 "openpty() failed to set slave window size") 168 169 # Ensure the fd is non-blocking in case there's nothing to read. 170 blocking = os.get_blocking(master_fd) 171 try: 172 os.set_blocking(master_fd, False) 173 try: 174 s1 = os.read(master_fd, 1024) 175 self.assertEqual(b'', s1) 176 except OSError as e: 177 if e.errno != errno.EAGAIN: 178 raise 179 finally: 180 # Restore the original flags. 181 os.set_blocking(master_fd, blocking) 182 183 debug("Writing to slave_fd") 184 os.write(slave_fd, TEST_STRING_1) 185 s1 = _readline(master_fd) 186 self.assertEqual(b'I wish to buy a fish license.\n', 187 normalize_output(s1)) 188 189 debug("Writing chunked output") 190 os.write(slave_fd, TEST_STRING_2[:5]) 191 os.write(slave_fd, TEST_STRING_2[5:]) 192 s2 = _readline(master_fd) 193 self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) 194 195 def test_fork(self): 196 debug("calling pty.fork()") 197 pid, master_fd = pty.fork() 198 self.addCleanup(os.close, master_fd) 199 if pid == pty.CHILD: 200 # stdout should be connected to a tty. 201 if not os.isatty(1): 202 debug("Child's fd 1 is not a tty?!") 203 os._exit(3) 204 205 # After pty.fork(), the child should already be a session leader. 206 # (on those systems that have that concept.) 207 debug("In child, calling os.setsid()") 208 try: 209 os.setsid() 210 except OSError: 211 # Good, we already were session leader 212 debug("Good: OSError was raised.") 213 pass 214 except AttributeError: 215 # Have pty, but not setsid()? 216 debug("No setsid() available?") 217 pass 218 except: 219 # We don't want this error to propagate, escaping the call to 220 # os._exit() and causing very peculiar behavior in the calling 221 # regrtest.py ! 222 # Note: could add traceback printing here. 223 debug("An unexpected error was raised.") 224 os._exit(1) 225 else: 226 debug("os.setsid() succeeded! (bad!)") 227 os._exit(2) 228 os._exit(4) 229 else: 230 debug("Waiting for child (%d) to finish." % pid) 231 # In verbose mode, we have to consume the debug output from the 232 # child or the child will block, causing this test to hang in the 233 # parent's waitpid() call. The child blocks after a 234 # platform-dependent amount of data is written to its fd. On 235 # Linux 2.6, it's 4000 bytes and the child won't block, but on OS 236 # X even the small writes in the child above will block it. Also 237 # on Linux, the read() will raise an OSError (input/output error) 238 # when it tries to read past the end of the buffer but the child's 239 # already exited, so catch and discard those exceptions. It's not 240 # worth checking for EIO. 241 while True: 242 try: 243 data = os.read(master_fd, 80) 244 except OSError: 245 break 246 if not data: 247 break 248 sys.stdout.write(str(data.replace(b'\r\n', b'\n'), 249 encoding='ascii')) 250 251 ##line = os.read(master_fd, 80) 252 ##lines = line.replace('\r\n', '\n').split('\n') 253 ##if False and lines != ['In child, calling os.setsid()', 254 ## 'Good: OSError was raised.', '']: 255 ## raise TestFailed("Unexpected output from child: %r" % line) 256 257 (pid, status) = os.waitpid(pid, 0) 258 res = os.waitstatus_to_exitcode(status) 259 debug("Child (%d) exited with code %d (status %d)." % (pid, res, status)) 260 if res == 1: 261 self.fail("Child raised an unexpected exception in os.setsid()") 262 elif res == 2: 263 self.fail("pty.fork() failed to make child a session leader.") 264 elif res == 3: 265 self.fail("Child spawned by pty.fork() did not have a tty as stdout") 266 elif res != 4: 267 self.fail("pty.fork() failed for unknown reasons.") 268 269 ##debug("Reading from master_fd now that the child has exited") 270 ##try: 271 ## s1 = os.read(master_fd, 1024) 272 ##except OSError: 273 ## pass 274 ##else: 275 ## raise TestFailed("Read from master_fd did not raise exception") 276 277 def test_master_read(self): 278 # XXX(nnorwitz): this test leaks fds when there is an error. 279 debug("Calling pty.openpty()") 280 master_fd, slave_fd = pty.openpty() 281 debug(f"Got master_fd '{master_fd}', slave_fd '{slave_fd}'") 282 283 self.addCleanup(os.close, master_fd) 284 285 debug("Closing slave_fd") 286 os.close(slave_fd) 287 288 debug("Reading from master_fd") 289 try: 290 data = os.read(master_fd, 1) 291 except OSError: # Linux 292 data = b"" 293 294 self.assertEqual(data, b"") 295 296 def test_spawn_doesnt_hang(self): 297 pty.spawn([sys.executable, '-c', 'print("hi there")']) 298 299class SmallPtyTests(unittest.TestCase): 300 """These tests don't spawn children or hang.""" 301 302 def setUp(self): 303 self.orig_stdin_fileno = pty.STDIN_FILENO 304 self.orig_stdout_fileno = pty.STDOUT_FILENO 305 self.orig_pty_close = pty.close 306 self.orig_pty__copy = pty._copy 307 self.orig_pty_fork = pty.fork 308 self.orig_pty_select = pty.select 309 self.orig_pty_setraw = pty.setraw 310 self.orig_pty_tcgetattr = pty.tcgetattr 311 self.orig_pty_tcsetattr = pty.tcsetattr 312 self.orig_pty_waitpid = pty.waitpid 313 self.fds = [] # A list of file descriptors to close. 314 self.files = [] 315 self.select_input = [] 316 self.select_output = [] 317 self.tcsetattr_mode_setting = None 318 319 def tearDown(self): 320 pty.STDIN_FILENO = self.orig_stdin_fileno 321 pty.STDOUT_FILENO = self.orig_stdout_fileno 322 pty.close = self.orig_pty_close 323 pty._copy = self.orig_pty__copy 324 pty.fork = self.orig_pty_fork 325 pty.select = self.orig_pty_select 326 pty.setraw = self.orig_pty_setraw 327 pty.tcgetattr = self.orig_pty_tcgetattr 328 pty.tcsetattr = self.orig_pty_tcsetattr 329 pty.waitpid = self.orig_pty_waitpid 330 for file in self.files: 331 try: 332 file.close() 333 except OSError: 334 pass 335 for fd in self.fds: 336 try: 337 os.close(fd) 338 except OSError: 339 pass 340 341 def _pipe(self): 342 pipe_fds = os.pipe() 343 self.fds.extend(pipe_fds) 344 return pipe_fds 345 346 def _socketpair(self): 347 socketpair = socket.socketpair() 348 self.files.extend(socketpair) 349 return socketpair 350 351 def _mock_select(self, rfds, wfds, xfds): 352 # This will raise IndexError when no more expected calls exist. 353 self.assertEqual((rfds, wfds, xfds), self.select_input.pop(0)) 354 return self.select_output.pop(0) 355 356 def _make_mock_fork(self, pid): 357 def mock_fork(): 358 return (pid, 12) 359 return mock_fork 360 361 def _mock_tcsetattr(self, fileno, opt, mode): 362 self.tcsetattr_mode_setting = mode 363 364 def test__copy_to_each(self): 365 """Test the normal data case on both master_fd and stdin.""" 366 read_from_stdout_fd, mock_stdout_fd = self._pipe() 367 pty.STDOUT_FILENO = mock_stdout_fd 368 mock_stdin_fd, write_to_stdin_fd = self._pipe() 369 pty.STDIN_FILENO = mock_stdin_fd 370 socketpair = self._socketpair() 371 masters = [s.fileno() for s in socketpair] 372 373 # Feed data. Smaller than PIPEBUF. These writes will not block. 374 os.write(masters[1], b'from master') 375 os.write(write_to_stdin_fd, b'from stdin') 376 377 # Expect three select calls, the last one will cause IndexError 378 pty.select = self._mock_select 379 self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) 380 self.select_output.append(([mock_stdin_fd, masters[0]], [], [])) 381 self.select_input.append(([mock_stdin_fd, masters[0]], [mock_stdout_fd, masters[0]], [])) 382 self.select_output.append(([], [mock_stdout_fd, masters[0]], [])) 383 self.select_input.append(([mock_stdin_fd, masters[0]], [], [])) 384 385 with self.assertRaises(IndexError): 386 pty._copy(masters[0]) 387 388 # Test that the right data went to the right places. 389 rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] 390 self.assertEqual([read_from_stdout_fd, masters[1]], rfds) 391 self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') 392 self.assertEqual(os.read(masters[1], 20), b'from stdin') 393 394 def test__restore_tty_mode_normal_return(self): 395 """Test that spawn resets the tty mode no when _copy returns normally.""" 396 397 # PID 1 is returned from mocked fork to run the parent branch 398 # of code 399 pty.fork = self._make_mock_fork(1) 400 401 status_sentinel = object() 402 pty.waitpid = lambda _1, _2: [None, status_sentinel] 403 pty.close = lambda _: None 404 405 pty._copy = lambda _1, _2, _3: None 406 407 mode_sentinel = object() 408 pty.tcgetattr = lambda fd: mode_sentinel 409 pty.tcsetattr = self._mock_tcsetattr 410 pty.setraw = lambda _: None 411 412 self.assertEqual(pty.spawn([]), status_sentinel, "pty.waitpid process status not returned by pty.spawn") 413 self.assertEqual(self.tcsetattr_mode_setting, mode_sentinel, "pty.tcsetattr not called with original mode value") 414 415 416def tearDownModule(): 417 reap_children() 418 419 420if __name__ == "__main__": 421 unittest.main() 422