1*cda5da8dSAndroid Build Coastguard Worker__all__ = 'create_subprocess_exec', 'create_subprocess_shell' 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard Workerimport subprocess 4*cda5da8dSAndroid Build Coastguard Worker 5*cda5da8dSAndroid Build Coastguard Workerfrom . import events 6*cda5da8dSAndroid Build Coastguard Workerfrom . import protocols 7*cda5da8dSAndroid Build Coastguard Workerfrom . import streams 8*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks 9*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger 10*cda5da8dSAndroid Build Coastguard Worker 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard WorkerPIPE = subprocess.PIPE 13*cda5da8dSAndroid Build Coastguard WorkerSTDOUT = subprocess.STDOUT 14*cda5da8dSAndroid Build Coastguard WorkerDEVNULL = subprocess.DEVNULL 15*cda5da8dSAndroid Build Coastguard Worker 16*cda5da8dSAndroid Build Coastguard Worker 17*cda5da8dSAndroid Build Coastguard Workerclass SubprocessStreamProtocol(streams.FlowControlMixin, 18*cda5da8dSAndroid Build Coastguard Worker protocols.SubprocessProtocol): 19*cda5da8dSAndroid Build Coastguard Worker """Like StreamReaderProtocol, but for a subprocess.""" 20*cda5da8dSAndroid Build Coastguard Worker 21*cda5da8dSAndroid Build Coastguard Worker def __init__(self, limit, loop): 22*cda5da8dSAndroid Build Coastguard Worker super().__init__(loop=loop) 23*cda5da8dSAndroid Build Coastguard Worker self._limit = limit 24*cda5da8dSAndroid Build Coastguard Worker self.stdin = self.stdout = self.stderr = None 25*cda5da8dSAndroid Build Coastguard Worker self._transport = None 26*cda5da8dSAndroid Build Coastguard Worker self._process_exited = False 27*cda5da8dSAndroid Build Coastguard Worker self._pipe_fds = [] 28*cda5da8dSAndroid Build Coastguard Worker self._stdin_closed = self._loop.create_future() 29*cda5da8dSAndroid Build Coastguard Worker 30*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 31*cda5da8dSAndroid Build Coastguard Worker info = [self.__class__.__name__] 32*cda5da8dSAndroid Build Coastguard Worker if self.stdin is not None: 33*cda5da8dSAndroid Build Coastguard Worker info.append(f'stdin={self.stdin!r}') 34*cda5da8dSAndroid Build Coastguard Worker if self.stdout is not None: 35*cda5da8dSAndroid Build Coastguard Worker info.append(f'stdout={self.stdout!r}') 36*cda5da8dSAndroid Build Coastguard Worker if self.stderr is not None: 37*cda5da8dSAndroid Build Coastguard Worker info.append(f'stderr={self.stderr!r}') 38*cda5da8dSAndroid Build Coastguard Worker return '<{}>'.format(' '.join(info)) 39*cda5da8dSAndroid Build Coastguard Worker 40*cda5da8dSAndroid Build Coastguard Worker def connection_made(self, transport): 41*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 42*cda5da8dSAndroid Build Coastguard Worker 43*cda5da8dSAndroid Build Coastguard Worker stdout_transport = transport.get_pipe_transport(1) 44*cda5da8dSAndroid Build Coastguard Worker if stdout_transport is not None: 45*cda5da8dSAndroid Build Coastguard Worker self.stdout = streams.StreamReader(limit=self._limit, 46*cda5da8dSAndroid Build Coastguard Worker loop=self._loop) 47*cda5da8dSAndroid Build Coastguard Worker self.stdout.set_transport(stdout_transport) 48*cda5da8dSAndroid Build Coastguard Worker self._pipe_fds.append(1) 49*cda5da8dSAndroid Build Coastguard Worker 50*cda5da8dSAndroid Build Coastguard Worker stderr_transport = transport.get_pipe_transport(2) 51*cda5da8dSAndroid Build Coastguard Worker if stderr_transport is not None: 52*cda5da8dSAndroid Build Coastguard Worker self.stderr = streams.StreamReader(limit=self._limit, 53*cda5da8dSAndroid Build Coastguard Worker loop=self._loop) 54*cda5da8dSAndroid Build Coastguard Worker self.stderr.set_transport(stderr_transport) 55*cda5da8dSAndroid Build Coastguard Worker self._pipe_fds.append(2) 56*cda5da8dSAndroid Build Coastguard Worker 57*cda5da8dSAndroid Build Coastguard Worker stdin_transport = transport.get_pipe_transport(0) 58*cda5da8dSAndroid Build Coastguard Worker if stdin_transport is not None: 59*cda5da8dSAndroid Build Coastguard Worker self.stdin = streams.StreamWriter(stdin_transport, 60*cda5da8dSAndroid Build Coastguard Worker protocol=self, 61*cda5da8dSAndroid Build Coastguard Worker reader=None, 62*cda5da8dSAndroid Build Coastguard Worker loop=self._loop) 63*cda5da8dSAndroid Build Coastguard Worker 64*cda5da8dSAndroid Build Coastguard Worker def pipe_data_received(self, fd, data): 65*cda5da8dSAndroid Build Coastguard Worker if fd == 1: 66*cda5da8dSAndroid Build Coastguard Worker reader = self.stdout 67*cda5da8dSAndroid Build Coastguard Worker elif fd == 2: 68*cda5da8dSAndroid Build Coastguard Worker reader = self.stderr 69*cda5da8dSAndroid Build Coastguard Worker else: 70*cda5da8dSAndroid Build Coastguard Worker reader = None 71*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 72*cda5da8dSAndroid Build Coastguard Worker reader.feed_data(data) 73*cda5da8dSAndroid Build Coastguard Worker 74*cda5da8dSAndroid Build Coastguard Worker def pipe_connection_lost(self, fd, exc): 75*cda5da8dSAndroid Build Coastguard Worker if fd == 0: 76*cda5da8dSAndroid Build Coastguard Worker pipe = self.stdin 77*cda5da8dSAndroid Build Coastguard Worker if pipe is not None: 78*cda5da8dSAndroid Build Coastguard Worker pipe.close() 79*cda5da8dSAndroid Build Coastguard Worker self.connection_lost(exc) 80*cda5da8dSAndroid Build Coastguard Worker if exc is None: 81*cda5da8dSAndroid Build Coastguard Worker self._stdin_closed.set_result(None) 82*cda5da8dSAndroid Build Coastguard Worker else: 83*cda5da8dSAndroid Build Coastguard Worker self._stdin_closed.set_exception(exc) 84*cda5da8dSAndroid Build Coastguard Worker # Since calling `wait_closed()` is not mandatory, 85*cda5da8dSAndroid Build Coastguard Worker # we shouldn't log the traceback if this is not awaited. 86*cda5da8dSAndroid Build Coastguard Worker self._stdin_closed._log_traceback = False 87*cda5da8dSAndroid Build Coastguard Worker return 88*cda5da8dSAndroid Build Coastguard Worker if fd == 1: 89*cda5da8dSAndroid Build Coastguard Worker reader = self.stdout 90*cda5da8dSAndroid Build Coastguard Worker elif fd == 2: 91*cda5da8dSAndroid Build Coastguard Worker reader = self.stderr 92*cda5da8dSAndroid Build Coastguard Worker else: 93*cda5da8dSAndroid Build Coastguard Worker reader = None 94*cda5da8dSAndroid Build Coastguard Worker if reader is not None: 95*cda5da8dSAndroid Build Coastguard Worker if exc is None: 96*cda5da8dSAndroid Build Coastguard Worker reader.feed_eof() 97*cda5da8dSAndroid Build Coastguard Worker else: 98*cda5da8dSAndroid Build Coastguard Worker reader.set_exception(exc) 99*cda5da8dSAndroid Build Coastguard Worker 100*cda5da8dSAndroid Build Coastguard Worker if fd in self._pipe_fds: 101*cda5da8dSAndroid Build Coastguard Worker self._pipe_fds.remove(fd) 102*cda5da8dSAndroid Build Coastguard Worker self._maybe_close_transport() 103*cda5da8dSAndroid Build Coastguard Worker 104*cda5da8dSAndroid Build Coastguard Worker def process_exited(self): 105*cda5da8dSAndroid Build Coastguard Worker self._process_exited = True 106*cda5da8dSAndroid Build Coastguard Worker self._maybe_close_transport() 107*cda5da8dSAndroid Build Coastguard Worker 108*cda5da8dSAndroid Build Coastguard Worker def _maybe_close_transport(self): 109*cda5da8dSAndroid Build Coastguard Worker if len(self._pipe_fds) == 0 and self._process_exited: 110*cda5da8dSAndroid Build Coastguard Worker self._transport.close() 111*cda5da8dSAndroid Build Coastguard Worker self._transport = None 112*cda5da8dSAndroid Build Coastguard Worker 113*cda5da8dSAndroid Build Coastguard Worker def _get_close_waiter(self, stream): 114*cda5da8dSAndroid Build Coastguard Worker if stream is self.stdin: 115*cda5da8dSAndroid Build Coastguard Worker return self._stdin_closed 116*cda5da8dSAndroid Build Coastguard Worker 117*cda5da8dSAndroid Build Coastguard Worker 118*cda5da8dSAndroid Build Coastguard Workerclass Process: 119*cda5da8dSAndroid Build Coastguard Worker def __init__(self, transport, protocol, loop): 120*cda5da8dSAndroid Build Coastguard Worker self._transport = transport 121*cda5da8dSAndroid Build Coastguard Worker self._protocol = protocol 122*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 123*cda5da8dSAndroid Build Coastguard Worker self.stdin = protocol.stdin 124*cda5da8dSAndroid Build Coastguard Worker self.stdout = protocol.stdout 125*cda5da8dSAndroid Build Coastguard Worker self.stderr = protocol.stderr 126*cda5da8dSAndroid Build Coastguard Worker self.pid = transport.get_pid() 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 129*cda5da8dSAndroid Build Coastguard Worker return f'<{self.__class__.__name__} {self.pid}>' 130*cda5da8dSAndroid Build Coastguard Worker 131*cda5da8dSAndroid Build Coastguard Worker @property 132*cda5da8dSAndroid Build Coastguard Worker def returncode(self): 133*cda5da8dSAndroid Build Coastguard Worker return self._transport.get_returncode() 134*cda5da8dSAndroid Build Coastguard Worker 135*cda5da8dSAndroid Build Coastguard Worker async def wait(self): 136*cda5da8dSAndroid Build Coastguard Worker """Wait until the process exit and return the process return code.""" 137*cda5da8dSAndroid Build Coastguard Worker return await self._transport._wait() 138*cda5da8dSAndroid Build Coastguard Worker 139*cda5da8dSAndroid Build Coastguard Worker def send_signal(self, signal): 140*cda5da8dSAndroid Build Coastguard Worker self._transport.send_signal(signal) 141*cda5da8dSAndroid Build Coastguard Worker 142*cda5da8dSAndroid Build Coastguard Worker def terminate(self): 143*cda5da8dSAndroid Build Coastguard Worker self._transport.terminate() 144*cda5da8dSAndroid Build Coastguard Worker 145*cda5da8dSAndroid Build Coastguard Worker def kill(self): 146*cda5da8dSAndroid Build Coastguard Worker self._transport.kill() 147*cda5da8dSAndroid Build Coastguard Worker 148*cda5da8dSAndroid Build Coastguard Worker async def _feed_stdin(self, input): 149*cda5da8dSAndroid Build Coastguard Worker debug = self._loop.get_debug() 150*cda5da8dSAndroid Build Coastguard Worker self.stdin.write(input) 151*cda5da8dSAndroid Build Coastguard Worker if debug: 152*cda5da8dSAndroid Build Coastguard Worker logger.debug( 153*cda5da8dSAndroid Build Coastguard Worker '%r communicate: feed stdin (%s bytes)', self, len(input)) 154*cda5da8dSAndroid Build Coastguard Worker try: 155*cda5da8dSAndroid Build Coastguard Worker await self.stdin.drain() 156*cda5da8dSAndroid Build Coastguard Worker except (BrokenPipeError, ConnectionResetError) as exc: 157*cda5da8dSAndroid Build Coastguard Worker # communicate() ignores BrokenPipeError and ConnectionResetError 158*cda5da8dSAndroid Build Coastguard Worker if debug: 159*cda5da8dSAndroid Build Coastguard Worker logger.debug('%r communicate: stdin got %r', self, exc) 160*cda5da8dSAndroid Build Coastguard Worker 161*cda5da8dSAndroid Build Coastguard Worker if debug: 162*cda5da8dSAndroid Build Coastguard Worker logger.debug('%r communicate: close stdin', self) 163*cda5da8dSAndroid Build Coastguard Worker self.stdin.close() 164*cda5da8dSAndroid Build Coastguard Worker 165*cda5da8dSAndroid Build Coastguard Worker async def _noop(self): 166*cda5da8dSAndroid Build Coastguard Worker return None 167*cda5da8dSAndroid Build Coastguard Worker 168*cda5da8dSAndroid Build Coastguard Worker async def _read_stream(self, fd): 169*cda5da8dSAndroid Build Coastguard Worker transport = self._transport.get_pipe_transport(fd) 170*cda5da8dSAndroid Build Coastguard Worker if fd == 2: 171*cda5da8dSAndroid Build Coastguard Worker stream = self.stderr 172*cda5da8dSAndroid Build Coastguard Worker else: 173*cda5da8dSAndroid Build Coastguard Worker assert fd == 1 174*cda5da8dSAndroid Build Coastguard Worker stream = self.stdout 175*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 176*cda5da8dSAndroid Build Coastguard Worker name = 'stdout' if fd == 1 else 'stderr' 177*cda5da8dSAndroid Build Coastguard Worker logger.debug('%r communicate: read %s', self, name) 178*cda5da8dSAndroid Build Coastguard Worker output = await stream.read() 179*cda5da8dSAndroid Build Coastguard Worker if self._loop.get_debug(): 180*cda5da8dSAndroid Build Coastguard Worker name = 'stdout' if fd == 1 else 'stderr' 181*cda5da8dSAndroid Build Coastguard Worker logger.debug('%r communicate: close %s', self, name) 182*cda5da8dSAndroid Build Coastguard Worker transport.close() 183*cda5da8dSAndroid Build Coastguard Worker return output 184*cda5da8dSAndroid Build Coastguard Worker 185*cda5da8dSAndroid Build Coastguard Worker async def communicate(self, input=None): 186*cda5da8dSAndroid Build Coastguard Worker if input is not None: 187*cda5da8dSAndroid Build Coastguard Worker stdin = self._feed_stdin(input) 188*cda5da8dSAndroid Build Coastguard Worker else: 189*cda5da8dSAndroid Build Coastguard Worker stdin = self._noop() 190*cda5da8dSAndroid Build Coastguard Worker if self.stdout is not None: 191*cda5da8dSAndroid Build Coastguard Worker stdout = self._read_stream(1) 192*cda5da8dSAndroid Build Coastguard Worker else: 193*cda5da8dSAndroid Build Coastguard Worker stdout = self._noop() 194*cda5da8dSAndroid Build Coastguard Worker if self.stderr is not None: 195*cda5da8dSAndroid Build Coastguard Worker stderr = self._read_stream(2) 196*cda5da8dSAndroid Build Coastguard Worker else: 197*cda5da8dSAndroid Build Coastguard Worker stderr = self._noop() 198*cda5da8dSAndroid Build Coastguard Worker stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr) 199*cda5da8dSAndroid Build Coastguard Worker await self.wait() 200*cda5da8dSAndroid Build Coastguard Worker return (stdout, stderr) 201*cda5da8dSAndroid Build Coastguard Worker 202*cda5da8dSAndroid Build Coastguard Worker 203*cda5da8dSAndroid Build Coastguard Workerasync def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, 204*cda5da8dSAndroid Build Coastguard Worker limit=streams._DEFAULT_LIMIT, **kwds): 205*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 206*cda5da8dSAndroid Build Coastguard Worker protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 207*cda5da8dSAndroid Build Coastguard Worker loop=loop) 208*cda5da8dSAndroid Build Coastguard Worker transport, protocol = await loop.subprocess_shell( 209*cda5da8dSAndroid Build Coastguard Worker protocol_factory, 210*cda5da8dSAndroid Build Coastguard Worker cmd, stdin=stdin, stdout=stdout, 211*cda5da8dSAndroid Build Coastguard Worker stderr=stderr, **kwds) 212*cda5da8dSAndroid Build Coastguard Worker return Process(transport, protocol, loop) 213*cda5da8dSAndroid Build Coastguard Worker 214*cda5da8dSAndroid Build Coastguard Worker 215*cda5da8dSAndroid Build Coastguard Workerasync def create_subprocess_exec(program, *args, stdin=None, stdout=None, 216*cda5da8dSAndroid Build Coastguard Worker stderr=None, limit=streams._DEFAULT_LIMIT, 217*cda5da8dSAndroid Build Coastguard Worker **kwds): 218*cda5da8dSAndroid Build Coastguard Worker loop = events.get_running_loop() 219*cda5da8dSAndroid Build Coastguard Worker protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 220*cda5da8dSAndroid Build Coastguard Worker loop=loop) 221*cda5da8dSAndroid Build Coastguard Worker transport, protocol = await loop.subprocess_exec( 222*cda5da8dSAndroid Build Coastguard Worker protocol_factory, 223*cda5da8dSAndroid Build Coastguard Worker program, *args, 224*cda5da8dSAndroid Build Coastguard Worker stdin=stdin, stdout=stdout, 225*cda5da8dSAndroid Build Coastguard Worker stderr=stderr, **kwds) 226*cda5da8dSAndroid Build Coastguard Worker return Process(transport, protocol, loop) 227