xref: /aosp_15_r20/external/pigweed/pw_cli/py/pw_cli/process.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2019 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Module for running subprocesses from pw and capturing their output."""
15
16from __future__ import annotations
17
18import asyncio
19import logging
20import os
21import shlex
22import tempfile
23from typing import IO
24
25import pw_cli.color
26import pw_cli.log
27
28import psutil  # type: ignore
29
30
31_COLOR = pw_cli.color.colors()
32_LOG = logging.getLogger(__name__)
33
34# Environment variable passed down to subprocesses to indicate that they are
35# running as a subprocess. Can be imported by other code.
36PW_SUBPROCESS_ENV = 'PW_SUBPROCESS'
37
38
39class CompletedProcess:
40    """Information about a process executed in run_async.
41
42    Attributes:
43        pid: The process identifier.
44        returncode: The return code of the process.
45    """
46
47    def __init__(
48        self,
49        process: asyncio.subprocess.Process,
50        output: bytes | IO[bytes],
51    ):
52        assert process.returncode is not None
53        self.returncode: int = process.returncode
54        self.pid = process.pid
55        self._output = output
56
57    @property
58    def output(self) -> bytes:
59        # If the output is a file, read it, then close it.
60        if not isinstance(self._output, bytes):
61            with self._output as file:
62                file.flush()
63                file.seek(0)
64                self._output = file.read()
65
66        return self._output
67
68
69async def _run_and_log(program: str, args: tuple[str, ...], env: dict):
70    process = await asyncio.create_subprocess_exec(
71        program,
72        *args,
73        stdout=asyncio.subprocess.PIPE,
74        stderr=asyncio.subprocess.STDOUT,
75        env=env,
76    )
77
78    output = bytearray()
79
80    if process.stdout:
81        while True:
82            line = await process.stdout.readline()
83            if not line:
84                break
85
86            output += line
87            _LOG.log(
88                pw_cli.log.LOGLEVEL_STDOUT,
89                '[%s] %s',
90                _COLOR.bold_white(process.pid),
91                line.decode(errors='replace').rstrip(),
92            )
93
94    return process, bytes(output)
95
96
97async def _kill_process_and_children(
98    process: asyncio.subprocess.Process,
99) -> None:
100    """Kills child processes of a process with PID `pid`."""
101    # Look up child processes before sending the kill signal to the parent,
102    # as otherwise the child lookup would fail.
103    pid = process.pid
104    try:
105        process_util = psutil.Process(pid=pid)
106        kill_list = list(process_util.children(recursive=True))
107    except psutil.NoSuchProcess:
108        # Creating the kill list raced with parent process completion.
109        #
110        # Don't bother cleaning up child processes of parent processes that
111        # exited on their own.
112        kill_list = []
113
114    for proc in kill_list:
115        try:
116            proc.kill()
117        except psutil.NoSuchProcess:
118            pass
119
120    def wait_for_all() -> None:
121        for proc in kill_list:
122            try:
123                proc.wait()
124            except psutil.NoSuchProcess:
125                pass
126
127    # Wait for process completion on the executor to avoid blocking the
128    # event loop.
129    loop = asyncio.get_event_loop()
130    wait_for_children = loop.run_in_executor(None, wait_for_all)
131
132    # Send a kill signal to the main process before waiting for the children
133    # to complete.
134    try:
135        process.kill()
136        await process.wait()
137    except ProcessLookupError:
138        _LOG.debug(
139            'Process completed before it could be killed. '
140            'This may have been caused by the killing one of its '
141            'child subprocesses.',
142        )
143
144    await wait_for_children
145
146
147async def run_async(
148    program: str,
149    *args: str,
150    env: dict[str, str] | None = None,
151    log_output: bool = False,
152    timeout: float | None = None,
153) -> CompletedProcess:
154    """Runs a command, capturing and optionally logging its output.
155
156    Args:
157      program: The program to run in a new process.
158      args: The arguments to pass to the program.
159      env: An optional mapping of environment variables within which to run
160        the process.
161      log_output: Whether to log stdout and stderr of the process to this
162        process's stdout (prefixed with the PID of the subprocess from which
163        the output originated). If unspecified, the child process's stdout
164        and stderr will be captured, and both will be stored in the returned
165        `CompletedProcess`'s  output`.
166      timeout: An optional floating point number of seconds to allow the
167        subprocess to run before killing it and its children. If unspecified,
168        the subprocess will be allowed to continue exiting until it completes.
169
170    Returns a CompletedProcess with details from the process.
171    """
172
173    _LOG.debug(
174        'Running `%s`', ' '.join(shlex.quote(arg) for arg in [program, *args])
175    )
176
177    hydrated_env = os.environ.copy()
178    if env is not None:
179        for key, value in env.items():
180            hydrated_env[key] = value
181    hydrated_env[PW_SUBPROCESS_ENV] = '1'
182    output: bytes | IO[bytes]
183
184    if log_output:
185        process, output = await _run_and_log(program, args, hydrated_env)
186    else:
187        output = tempfile.TemporaryFile()
188        process = await asyncio.create_subprocess_exec(
189            program,
190            *args,
191            stdout=output,
192            stderr=asyncio.subprocess.STDOUT,
193            env=hydrated_env,
194        )
195
196    try:
197        await asyncio.wait_for(process.wait(), timeout)
198    except asyncio.TimeoutError:
199        _LOG.error('%s timed out after %d seconds', program, timeout)
200        await _kill_process_and_children(process)
201
202    if process.returncode:
203        _LOG.error('%s exited with status %d', program, process.returncode)
204    else:
205        # process.returncode is 0
206        _LOG.debug('%s exited successfully', program)
207
208    return CompletedProcess(process, output)
209
210
211def run(program: str, *args: str, **kwargs) -> CompletedProcess:
212    """Synchronous wrapper for run_async."""
213    return asyncio.run(run_async(program, *args, **kwargs))
214