xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/pty.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""Pseudo terminal utilities."""
2
3# Bugs: No signal handling.  Doesn't set slave termios and window size.
4#       Only tested on Linux, FreeBSD, and macOS.
5# See:  W. Richard Stevens. 1992.  Advanced Programming in the
6#       UNIX Environment.  Chapter 19.
7# Author: Steen Lumholt -- with additions by Guido.
8
9from select import select
10import os
11import sys
12import tty
13
14# names imported directly for test mocking purposes
15from os import close, waitpid
16from tty import setraw, tcgetattr, tcsetattr
17
18__all__ = ["openpty", "fork", "spawn"]
19
20STDIN_FILENO = 0
21STDOUT_FILENO = 1
22STDERR_FILENO = 2
23
24CHILD = 0
25
26def openpty():
27    """openpty() -> (master_fd, slave_fd)
28    Open a pty master/slave pair, using os.openpty() if possible."""
29
30    try:
31        return os.openpty()
32    except (AttributeError, OSError):
33        pass
34    master_fd, slave_name = _open_terminal()
35    slave_fd = slave_open(slave_name)
36    return master_fd, slave_fd
37
38def master_open():
39    """master_open() -> (master_fd, slave_name)
40    Open a pty master and return the fd, and the filename of the slave end.
41    Deprecated, use openpty() instead."""
42
43    try:
44        master_fd, slave_fd = os.openpty()
45    except (AttributeError, OSError):
46        pass
47    else:
48        slave_name = os.ttyname(slave_fd)
49        os.close(slave_fd)
50        return master_fd, slave_name
51
52    return _open_terminal()
53
54def _open_terminal():
55    """Open pty master and return (master_fd, tty_name)."""
56    for x in 'pqrstuvwxyzPQRST':
57        for y in '0123456789abcdef':
58            pty_name = '/dev/pty' + x + y
59            try:
60                fd = os.open(pty_name, os.O_RDWR)
61            except OSError:
62                continue
63            return (fd, '/dev/tty' + x + y)
64    raise OSError('out of pty devices')
65
66def slave_open(tty_name):
67    """slave_open(tty_name) -> slave_fd
68    Open the pty slave and acquire the controlling terminal, returning
69    opened filedescriptor.
70    Deprecated, use openpty() instead."""
71
72    result = os.open(tty_name, os.O_RDWR)
73    try:
74        from fcntl import ioctl, I_PUSH
75    except ImportError:
76        return result
77    try:
78        ioctl(result, I_PUSH, "ptem")
79        ioctl(result, I_PUSH, "ldterm")
80    except OSError:
81        pass
82    return result
83
84def fork():
85    """fork() -> (pid, master_fd)
86    Fork and make the child a session leader with a controlling terminal."""
87
88    try:
89        pid, fd = os.forkpty()
90    except (AttributeError, OSError):
91        pass
92    else:
93        if pid == CHILD:
94            try:
95                os.setsid()
96            except OSError:
97                # os.forkpty() already set us session leader
98                pass
99        return pid, fd
100
101    master_fd, slave_fd = openpty()
102    pid = os.fork()
103    if pid == CHILD:
104        # Establish a new session.
105        os.setsid()
106        os.close(master_fd)
107
108        # Slave becomes stdin/stdout/stderr of child.
109        os.dup2(slave_fd, STDIN_FILENO)
110        os.dup2(slave_fd, STDOUT_FILENO)
111        os.dup2(slave_fd, STDERR_FILENO)
112        if slave_fd > STDERR_FILENO:
113            os.close(slave_fd)
114
115        # Explicitly open the tty to make it become a controlling tty.
116        tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
117        os.close(tmp_fd)
118    else:
119        os.close(slave_fd)
120
121    # Parent and child process.
122    return pid, master_fd
123
124def _read(fd):
125    """Default read function."""
126    return os.read(fd, 1024)
127
128def _copy(master_fd, master_read=_read, stdin_read=_read):
129    """Parent copy loop.
130    Copies
131            pty master -> standard output   (master_read)
132            standard input -> pty master    (stdin_read)"""
133    if os.get_blocking(master_fd):
134        # If we write more than tty/ndisc is willing to buffer, we may block
135        # indefinitely. So we set master_fd to non-blocking temporarily during
136        # the copy operation.
137        os.set_blocking(master_fd, False)
138        try:
139            _copy(master_fd, master_read=master_read, stdin_read=stdin_read)
140        finally:
141            # restore blocking mode for backwards compatibility
142            os.set_blocking(master_fd, True)
143        return
144    high_waterlevel = 4096
145    stdin_avail = master_fd != STDIN_FILENO
146    stdout_avail = master_fd != STDOUT_FILENO
147    i_buf = b''
148    o_buf = b''
149    while 1:
150        rfds = []
151        wfds = []
152        if stdin_avail and len(i_buf) < high_waterlevel:
153            rfds.append(STDIN_FILENO)
154        if stdout_avail and len(o_buf) < high_waterlevel:
155            rfds.append(master_fd)
156        if stdout_avail and len(o_buf) > 0:
157            wfds.append(STDOUT_FILENO)
158        if len(i_buf) > 0:
159            wfds.append(master_fd)
160
161        rfds, wfds, _xfds = select(rfds, wfds, [])
162
163        if STDOUT_FILENO in wfds:
164            try:
165                n = os.write(STDOUT_FILENO, o_buf)
166                o_buf = o_buf[n:]
167            except OSError:
168                stdout_avail = False
169
170        if master_fd in rfds:
171            # Some OSes signal EOF by returning an empty byte string,
172            # some throw OSErrors.
173            try:
174                data = master_read(master_fd)
175            except OSError:
176                data = b""
177            if not data:  # Reached EOF.
178                return    # Assume the child process has exited and is
179                          # unreachable, so we clean up.
180            o_buf += data
181
182        if master_fd in wfds:
183            n = os.write(master_fd, i_buf)
184            i_buf = i_buf[n:]
185
186        if stdin_avail and STDIN_FILENO in rfds:
187            data = stdin_read(STDIN_FILENO)
188            if not data:
189                stdin_avail = False
190            else:
191                i_buf += data
192
193def spawn(argv, master_read=_read, stdin_read=_read):
194    """Create a spawned process."""
195    if type(argv) == type(''):
196        argv = (argv,)
197    sys.audit('pty.spawn', argv)
198
199    pid, master_fd = fork()
200    if pid == CHILD:
201        os.execlp(argv[0], *argv)
202
203    try:
204        mode = tcgetattr(STDIN_FILENO)
205        setraw(STDIN_FILENO)
206        restore = True
207    except tty.error:    # This is the same as termios.error
208        restore = False
209
210    try:
211        _copy(master_fd, master_read, stdin_read)
212    finally:
213        if restore:
214            tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
215
216    close(master_fd)
217    return waitpid(pid, 0)[1]
218