1# Copyright 2019 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Module for running subprocesses from pw and capturing their output.""" 15 16from __future__ import annotations 17 18import asyncio 19import logging 20import os 21import shlex 22import tempfile 23from typing import IO 24 25import pw_cli.color 26import pw_cli.log 27 28import psutil # type: ignore 29 30 31_COLOR = pw_cli.color.colors() 32_LOG = logging.getLogger(__name__) 33 34# Environment variable passed down to subprocesses to indicate that they are 35# running as a subprocess. Can be imported by other code. 36PW_SUBPROCESS_ENV = 'PW_SUBPROCESS' 37 38 39class CompletedProcess: 40 """Information about a process executed in run_async. 41 42 Attributes: 43 pid: The process identifier. 44 returncode: The return code of the process. 45 """ 46 47 def __init__( 48 self, 49 process: asyncio.subprocess.Process, 50 output: bytes | IO[bytes], 51 ): 52 assert process.returncode is not None 53 self.returncode: int = process.returncode 54 self.pid = process.pid 55 self._output = output 56 57 @property 58 def output(self) -> bytes: 59 # If the output is a file, read it, then close it. 60 if not isinstance(self._output, bytes): 61 with self._output as file: 62 file.flush() 63 file.seek(0) 64 self._output = file.read() 65 66 return self._output 67 68 69async def _run_and_log(program: str, args: tuple[str, ...], env: dict): 70 process = await asyncio.create_subprocess_exec( 71 program, 72 *args, 73 stdout=asyncio.subprocess.PIPE, 74 stderr=asyncio.subprocess.STDOUT, 75 env=env, 76 ) 77 78 output = bytearray() 79 80 if process.stdout: 81 while True: 82 line = await process.stdout.readline() 83 if not line: 84 break 85 86 output += line 87 _LOG.log( 88 pw_cli.log.LOGLEVEL_STDOUT, 89 '[%s] %s', 90 _COLOR.bold_white(process.pid), 91 line.decode(errors='replace').rstrip(), 92 ) 93 94 return process, bytes(output) 95 96 97async def _kill_process_and_children( 98 process: asyncio.subprocess.Process, 99) -> None: 100 """Kills child processes of a process with PID `pid`.""" 101 # Look up child processes before sending the kill signal to the parent, 102 # as otherwise the child lookup would fail. 103 pid = process.pid 104 try: 105 process_util = psutil.Process(pid=pid) 106 kill_list = list(process_util.children(recursive=True)) 107 except psutil.NoSuchProcess: 108 # Creating the kill list raced with parent process completion. 109 # 110 # Don't bother cleaning up child processes of parent processes that 111 # exited on their own. 112 kill_list = [] 113 114 for proc in kill_list: 115 try: 116 proc.kill() 117 except psutil.NoSuchProcess: 118 pass 119 120 def wait_for_all() -> None: 121 for proc in kill_list: 122 try: 123 proc.wait() 124 except psutil.NoSuchProcess: 125 pass 126 127 # Wait for process completion on the executor to avoid blocking the 128 # event loop. 129 loop = asyncio.get_event_loop() 130 wait_for_children = loop.run_in_executor(None, wait_for_all) 131 132 # Send a kill signal to the main process before waiting for the children 133 # to complete. 134 try: 135 process.kill() 136 await process.wait() 137 except ProcessLookupError: 138 _LOG.debug( 139 'Process completed before it could be killed. ' 140 'This may have been caused by the killing one of its ' 141 'child subprocesses.', 142 ) 143 144 await wait_for_children 145 146 147async def run_async( 148 program: str, 149 *args: str, 150 env: dict[str, str] | None = None, 151 log_output: bool = False, 152 timeout: float | None = None, 153) -> CompletedProcess: 154 """Runs a command, capturing and optionally logging its output. 155 156 Args: 157 program: The program to run in a new process. 158 args: The arguments to pass to the program. 159 env: An optional mapping of environment variables within which to run 160 the process. 161 log_output: Whether to log stdout and stderr of the process to this 162 process's stdout (prefixed with the PID of the subprocess from which 163 the output originated). If unspecified, the child process's stdout 164 and stderr will be captured, and both will be stored in the returned 165 `CompletedProcess`'s output`. 166 timeout: An optional floating point number of seconds to allow the 167 subprocess to run before killing it and its children. If unspecified, 168 the subprocess will be allowed to continue exiting until it completes. 169 170 Returns a CompletedProcess with details from the process. 171 """ 172 173 _LOG.debug( 174 'Running `%s`', ' '.join(shlex.quote(arg) for arg in [program, *args]) 175 ) 176 177 hydrated_env = os.environ.copy() 178 if env is not None: 179 for key, value in env.items(): 180 hydrated_env[key] = value 181 hydrated_env[PW_SUBPROCESS_ENV] = '1' 182 output: bytes | IO[bytes] 183 184 if log_output: 185 process, output = await _run_and_log(program, args, hydrated_env) 186 else: 187 output = tempfile.TemporaryFile() 188 process = await asyncio.create_subprocess_exec( 189 program, 190 *args, 191 stdout=output, 192 stderr=asyncio.subprocess.STDOUT, 193 env=hydrated_env, 194 ) 195 196 try: 197 await asyncio.wait_for(process.wait(), timeout) 198 except asyncio.TimeoutError: 199 _LOG.error('%s timed out after %d seconds', program, timeout) 200 await _kill_process_and_children(process) 201 202 if process.returncode: 203 _LOG.error('%s exited with status %d', program, process.returncode) 204 else: 205 # process.returncode is 0 206 _LOG.debug('%s exited successfully', program) 207 208 return CompletedProcess(process, output) 209 210 211def run(program: str, *args: str, **kwargs) -> CompletedProcess: 212 """Synchronous wrapper for run_async.""" 213 return asyncio.run(run_async(program, *args, **kwargs)) 214