xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/pty.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker"""Pseudo terminal utilities."""
2*cda5da8dSAndroid Build Coastguard Worker
3*cda5da8dSAndroid Build Coastguard Worker# Bugs: No signal handling.  Doesn't set slave termios and window size.
4*cda5da8dSAndroid Build Coastguard Worker#       Only tested on Linux, FreeBSD, and macOS.
5*cda5da8dSAndroid Build Coastguard Worker# See:  W. Richard Stevens. 1992.  Advanced Programming in the
6*cda5da8dSAndroid Build Coastguard Worker#       UNIX Environment.  Chapter 19.
7*cda5da8dSAndroid Build Coastguard Worker# Author: Steen Lumholt -- with additions by Guido.
8*cda5da8dSAndroid Build Coastguard Worker
9*cda5da8dSAndroid Build Coastguard Workerfrom select import select
10*cda5da8dSAndroid Build Coastguard Workerimport os
11*cda5da8dSAndroid Build Coastguard Workerimport sys
12*cda5da8dSAndroid Build Coastguard Workerimport tty
13*cda5da8dSAndroid Build Coastguard Worker
14*cda5da8dSAndroid Build Coastguard Worker# names imported directly for test mocking purposes
15*cda5da8dSAndroid Build Coastguard Workerfrom os import close, waitpid
16*cda5da8dSAndroid Build Coastguard Workerfrom tty import setraw, tcgetattr, tcsetattr
17*cda5da8dSAndroid Build Coastguard Worker
18*cda5da8dSAndroid Build Coastguard Worker__all__ = ["openpty", "fork", "spawn"]
19*cda5da8dSAndroid Build Coastguard Worker
20*cda5da8dSAndroid Build Coastguard WorkerSTDIN_FILENO = 0
21*cda5da8dSAndroid Build Coastguard WorkerSTDOUT_FILENO = 1
22*cda5da8dSAndroid Build Coastguard WorkerSTDERR_FILENO = 2
23*cda5da8dSAndroid Build Coastguard Worker
24*cda5da8dSAndroid Build Coastguard WorkerCHILD = 0
25*cda5da8dSAndroid Build Coastguard Worker
26*cda5da8dSAndroid Build Coastguard Workerdef openpty():
27*cda5da8dSAndroid Build Coastguard Worker    """openpty() -> (master_fd, slave_fd)
28*cda5da8dSAndroid Build Coastguard Worker    Open a pty master/slave pair, using os.openpty() if possible."""
29*cda5da8dSAndroid Build Coastguard Worker
30*cda5da8dSAndroid Build Coastguard Worker    try:
31*cda5da8dSAndroid Build Coastguard Worker        return os.openpty()
32*cda5da8dSAndroid Build Coastguard Worker    except (AttributeError, OSError):
33*cda5da8dSAndroid Build Coastguard Worker        pass
34*cda5da8dSAndroid Build Coastguard Worker    master_fd, slave_name = _open_terminal()
35*cda5da8dSAndroid Build Coastguard Worker    slave_fd = slave_open(slave_name)
36*cda5da8dSAndroid Build Coastguard Worker    return master_fd, slave_fd
37*cda5da8dSAndroid Build Coastguard Worker
38*cda5da8dSAndroid Build Coastguard Workerdef master_open():
39*cda5da8dSAndroid Build Coastguard Worker    """master_open() -> (master_fd, slave_name)
40*cda5da8dSAndroid Build Coastguard Worker    Open a pty master and return the fd, and the filename of the slave end.
41*cda5da8dSAndroid Build Coastguard Worker    Deprecated, use openpty() instead."""
42*cda5da8dSAndroid Build Coastguard Worker
43*cda5da8dSAndroid Build Coastguard Worker    try:
44*cda5da8dSAndroid Build Coastguard Worker        master_fd, slave_fd = os.openpty()
45*cda5da8dSAndroid Build Coastguard Worker    except (AttributeError, OSError):
46*cda5da8dSAndroid Build Coastguard Worker        pass
47*cda5da8dSAndroid Build Coastguard Worker    else:
48*cda5da8dSAndroid Build Coastguard Worker        slave_name = os.ttyname(slave_fd)
49*cda5da8dSAndroid Build Coastguard Worker        os.close(slave_fd)
50*cda5da8dSAndroid Build Coastguard Worker        return master_fd, slave_name
51*cda5da8dSAndroid Build Coastguard Worker
52*cda5da8dSAndroid Build Coastguard Worker    return _open_terminal()
53*cda5da8dSAndroid Build Coastguard Worker
54*cda5da8dSAndroid Build Coastguard Workerdef _open_terminal():
55*cda5da8dSAndroid Build Coastguard Worker    """Open pty master and return (master_fd, tty_name)."""
56*cda5da8dSAndroid Build Coastguard Worker    for x in 'pqrstuvwxyzPQRST':
57*cda5da8dSAndroid Build Coastguard Worker        for y in '0123456789abcdef':
58*cda5da8dSAndroid Build Coastguard Worker            pty_name = '/dev/pty' + x + y
59*cda5da8dSAndroid Build Coastguard Worker            try:
60*cda5da8dSAndroid Build Coastguard Worker                fd = os.open(pty_name, os.O_RDWR)
61*cda5da8dSAndroid Build Coastguard Worker            except OSError:
62*cda5da8dSAndroid Build Coastguard Worker                continue
63*cda5da8dSAndroid Build Coastguard Worker            return (fd, '/dev/tty' + x + y)
64*cda5da8dSAndroid Build Coastguard Worker    raise OSError('out of pty devices')
65*cda5da8dSAndroid Build Coastguard Worker
66*cda5da8dSAndroid Build Coastguard Workerdef slave_open(tty_name):
67*cda5da8dSAndroid Build Coastguard Worker    """slave_open(tty_name) -> slave_fd
68*cda5da8dSAndroid Build Coastguard Worker    Open the pty slave and acquire the controlling terminal, returning
69*cda5da8dSAndroid Build Coastguard Worker    opened filedescriptor.
70*cda5da8dSAndroid Build Coastguard Worker    Deprecated, use openpty() instead."""
71*cda5da8dSAndroid Build Coastguard Worker
72*cda5da8dSAndroid Build Coastguard Worker    result = os.open(tty_name, os.O_RDWR)
73*cda5da8dSAndroid Build Coastguard Worker    try:
74*cda5da8dSAndroid Build Coastguard Worker        from fcntl import ioctl, I_PUSH
75*cda5da8dSAndroid Build Coastguard Worker    except ImportError:
76*cda5da8dSAndroid Build Coastguard Worker        return result
77*cda5da8dSAndroid Build Coastguard Worker    try:
78*cda5da8dSAndroid Build Coastguard Worker        ioctl(result, I_PUSH, "ptem")
79*cda5da8dSAndroid Build Coastguard Worker        ioctl(result, I_PUSH, "ldterm")
80*cda5da8dSAndroid Build Coastguard Worker    except OSError:
81*cda5da8dSAndroid Build Coastguard Worker        pass
82*cda5da8dSAndroid Build Coastguard Worker    return result
83*cda5da8dSAndroid Build Coastguard Worker
84*cda5da8dSAndroid Build Coastguard Workerdef fork():
85*cda5da8dSAndroid Build Coastguard Worker    """fork() -> (pid, master_fd)
86*cda5da8dSAndroid Build Coastguard Worker    Fork and make the child a session leader with a controlling terminal."""
87*cda5da8dSAndroid Build Coastguard Worker
88*cda5da8dSAndroid Build Coastguard Worker    try:
89*cda5da8dSAndroid Build Coastguard Worker        pid, fd = os.forkpty()
90*cda5da8dSAndroid Build Coastguard Worker    except (AttributeError, OSError):
91*cda5da8dSAndroid Build Coastguard Worker        pass
92*cda5da8dSAndroid Build Coastguard Worker    else:
93*cda5da8dSAndroid Build Coastguard Worker        if pid == CHILD:
94*cda5da8dSAndroid Build Coastguard Worker            try:
95*cda5da8dSAndroid Build Coastguard Worker                os.setsid()
96*cda5da8dSAndroid Build Coastguard Worker            except OSError:
97*cda5da8dSAndroid Build Coastguard Worker                # os.forkpty() already set us session leader
98*cda5da8dSAndroid Build Coastguard Worker                pass
99*cda5da8dSAndroid Build Coastguard Worker        return pid, fd
100*cda5da8dSAndroid Build Coastguard Worker
101*cda5da8dSAndroid Build Coastguard Worker    master_fd, slave_fd = openpty()
102*cda5da8dSAndroid Build Coastguard Worker    pid = os.fork()
103*cda5da8dSAndroid Build Coastguard Worker    if pid == CHILD:
104*cda5da8dSAndroid Build Coastguard Worker        # Establish a new session.
105*cda5da8dSAndroid Build Coastguard Worker        os.setsid()
106*cda5da8dSAndroid Build Coastguard Worker        os.close(master_fd)
107*cda5da8dSAndroid Build Coastguard Worker
108*cda5da8dSAndroid Build Coastguard Worker        # Slave becomes stdin/stdout/stderr of child.
109*cda5da8dSAndroid Build Coastguard Worker        os.dup2(slave_fd, STDIN_FILENO)
110*cda5da8dSAndroid Build Coastguard Worker        os.dup2(slave_fd, STDOUT_FILENO)
111*cda5da8dSAndroid Build Coastguard Worker        os.dup2(slave_fd, STDERR_FILENO)
112*cda5da8dSAndroid Build Coastguard Worker        if slave_fd > STDERR_FILENO:
113*cda5da8dSAndroid Build Coastguard Worker            os.close(slave_fd)
114*cda5da8dSAndroid Build Coastguard Worker
115*cda5da8dSAndroid Build Coastguard Worker        # Explicitly open the tty to make it become a controlling tty.
116*cda5da8dSAndroid Build Coastguard Worker        tmp_fd = os.open(os.ttyname(STDOUT_FILENO), os.O_RDWR)
117*cda5da8dSAndroid Build Coastguard Worker        os.close(tmp_fd)
118*cda5da8dSAndroid Build Coastguard Worker    else:
119*cda5da8dSAndroid Build Coastguard Worker        os.close(slave_fd)
120*cda5da8dSAndroid Build Coastguard Worker
121*cda5da8dSAndroid Build Coastguard Worker    # Parent and child process.
122*cda5da8dSAndroid Build Coastguard Worker    return pid, master_fd
123*cda5da8dSAndroid Build Coastguard Worker
124*cda5da8dSAndroid Build Coastguard Workerdef _read(fd):
125*cda5da8dSAndroid Build Coastguard Worker    """Default read function."""
126*cda5da8dSAndroid Build Coastguard Worker    return os.read(fd, 1024)
127*cda5da8dSAndroid Build Coastguard Worker
128*cda5da8dSAndroid Build Coastguard Workerdef _copy(master_fd, master_read=_read, stdin_read=_read):
129*cda5da8dSAndroid Build Coastguard Worker    """Parent copy loop.
130*cda5da8dSAndroid Build Coastguard Worker    Copies
131*cda5da8dSAndroid Build Coastguard Worker            pty master -> standard output   (master_read)
132*cda5da8dSAndroid Build Coastguard Worker            standard input -> pty master    (stdin_read)"""
133*cda5da8dSAndroid Build Coastguard Worker    if os.get_blocking(master_fd):
134*cda5da8dSAndroid Build Coastguard Worker        # If we write more than tty/ndisc is willing to buffer, we may block
135*cda5da8dSAndroid Build Coastguard Worker        # indefinitely. So we set master_fd to non-blocking temporarily during
136*cda5da8dSAndroid Build Coastguard Worker        # the copy operation.
137*cda5da8dSAndroid Build Coastguard Worker        os.set_blocking(master_fd, False)
138*cda5da8dSAndroid Build Coastguard Worker        try:
139*cda5da8dSAndroid Build Coastguard Worker            _copy(master_fd, master_read=master_read, stdin_read=stdin_read)
140*cda5da8dSAndroid Build Coastguard Worker        finally:
141*cda5da8dSAndroid Build Coastguard Worker            # restore blocking mode for backwards compatibility
142*cda5da8dSAndroid Build Coastguard Worker            os.set_blocking(master_fd, True)
143*cda5da8dSAndroid Build Coastguard Worker        return
144*cda5da8dSAndroid Build Coastguard Worker    high_waterlevel = 4096
145*cda5da8dSAndroid Build Coastguard Worker    stdin_avail = master_fd != STDIN_FILENO
146*cda5da8dSAndroid Build Coastguard Worker    stdout_avail = master_fd != STDOUT_FILENO
147*cda5da8dSAndroid Build Coastguard Worker    i_buf = b''
148*cda5da8dSAndroid Build Coastguard Worker    o_buf = b''
149*cda5da8dSAndroid Build Coastguard Worker    while 1:
150*cda5da8dSAndroid Build Coastguard Worker        rfds = []
151*cda5da8dSAndroid Build Coastguard Worker        wfds = []
152*cda5da8dSAndroid Build Coastguard Worker        if stdin_avail and len(i_buf) < high_waterlevel:
153*cda5da8dSAndroid Build Coastguard Worker            rfds.append(STDIN_FILENO)
154*cda5da8dSAndroid Build Coastguard Worker        if stdout_avail and len(o_buf) < high_waterlevel:
155*cda5da8dSAndroid Build Coastguard Worker            rfds.append(master_fd)
156*cda5da8dSAndroid Build Coastguard Worker        if stdout_avail and len(o_buf) > 0:
157*cda5da8dSAndroid Build Coastguard Worker            wfds.append(STDOUT_FILENO)
158*cda5da8dSAndroid Build Coastguard Worker        if len(i_buf) > 0:
159*cda5da8dSAndroid Build Coastguard Worker            wfds.append(master_fd)
160*cda5da8dSAndroid Build Coastguard Worker
161*cda5da8dSAndroid Build Coastguard Worker        rfds, wfds, _xfds = select(rfds, wfds, [])
162*cda5da8dSAndroid Build Coastguard Worker
163*cda5da8dSAndroid Build Coastguard Worker        if STDOUT_FILENO in wfds:
164*cda5da8dSAndroid Build Coastguard Worker            try:
165*cda5da8dSAndroid Build Coastguard Worker                n = os.write(STDOUT_FILENO, o_buf)
166*cda5da8dSAndroid Build Coastguard Worker                o_buf = o_buf[n:]
167*cda5da8dSAndroid Build Coastguard Worker            except OSError:
168*cda5da8dSAndroid Build Coastguard Worker                stdout_avail = False
169*cda5da8dSAndroid Build Coastguard Worker
170*cda5da8dSAndroid Build Coastguard Worker        if master_fd in rfds:
171*cda5da8dSAndroid Build Coastguard Worker            # Some OSes signal EOF by returning an empty byte string,
172*cda5da8dSAndroid Build Coastguard Worker            # some throw OSErrors.
173*cda5da8dSAndroid Build Coastguard Worker            try:
174*cda5da8dSAndroid Build Coastguard Worker                data = master_read(master_fd)
175*cda5da8dSAndroid Build Coastguard Worker            except OSError:
176*cda5da8dSAndroid Build Coastguard Worker                data = b""
177*cda5da8dSAndroid Build Coastguard Worker            if not data:  # Reached EOF.
178*cda5da8dSAndroid Build Coastguard Worker                return    # Assume the child process has exited and is
179*cda5da8dSAndroid Build Coastguard Worker                          # unreachable, so we clean up.
180*cda5da8dSAndroid Build Coastguard Worker            o_buf += data
181*cda5da8dSAndroid Build Coastguard Worker
182*cda5da8dSAndroid Build Coastguard Worker        if master_fd in wfds:
183*cda5da8dSAndroid Build Coastguard Worker            n = os.write(master_fd, i_buf)
184*cda5da8dSAndroid Build Coastguard Worker            i_buf = i_buf[n:]
185*cda5da8dSAndroid Build Coastguard Worker
186*cda5da8dSAndroid Build Coastguard Worker        if stdin_avail and STDIN_FILENO in rfds:
187*cda5da8dSAndroid Build Coastguard Worker            data = stdin_read(STDIN_FILENO)
188*cda5da8dSAndroid Build Coastguard Worker            if not data:
189*cda5da8dSAndroid Build Coastguard Worker                stdin_avail = False
190*cda5da8dSAndroid Build Coastguard Worker            else:
191*cda5da8dSAndroid Build Coastguard Worker                i_buf += data
192*cda5da8dSAndroid Build Coastguard Worker
193*cda5da8dSAndroid Build Coastguard Workerdef spawn(argv, master_read=_read, stdin_read=_read):
194*cda5da8dSAndroid Build Coastguard Worker    """Create a spawned process."""
195*cda5da8dSAndroid Build Coastguard Worker    if type(argv) == type(''):
196*cda5da8dSAndroid Build Coastguard Worker        argv = (argv,)
197*cda5da8dSAndroid Build Coastguard Worker    sys.audit('pty.spawn', argv)
198*cda5da8dSAndroid Build Coastguard Worker
199*cda5da8dSAndroid Build Coastguard Worker    pid, master_fd = fork()
200*cda5da8dSAndroid Build Coastguard Worker    if pid == CHILD:
201*cda5da8dSAndroid Build Coastguard Worker        os.execlp(argv[0], *argv)
202*cda5da8dSAndroid Build Coastguard Worker
203*cda5da8dSAndroid Build Coastguard Worker    try:
204*cda5da8dSAndroid Build Coastguard Worker        mode = tcgetattr(STDIN_FILENO)
205*cda5da8dSAndroid Build Coastguard Worker        setraw(STDIN_FILENO)
206*cda5da8dSAndroid Build Coastguard Worker        restore = True
207*cda5da8dSAndroid Build Coastguard Worker    except tty.error:    # This is the same as termios.error
208*cda5da8dSAndroid Build Coastguard Worker        restore = False
209*cda5da8dSAndroid Build Coastguard Worker
210*cda5da8dSAndroid Build Coastguard Worker    try:
211*cda5da8dSAndroid Build Coastguard Worker        _copy(master_fd, master_read, stdin_read)
212*cda5da8dSAndroid Build Coastguard Worker    finally:
213*cda5da8dSAndroid Build Coastguard Worker        if restore:
214*cda5da8dSAndroid Build Coastguard Worker            tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
215*cda5da8dSAndroid Build Coastguard Worker
216*cda5da8dSAndroid Build Coastguard Worker    close(master_fd)
217*cda5da8dSAndroid Build Coastguard Worker    return waitpid(pid, 0)[1]
218