xref: /aosp_15_r20/external/pigweed/pw_emu/py/mock_emu.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python
2# Copyright 2023 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Mock emulator used for testing process and channel management."""
16
17import argparse
18import os
19import socket
20import sys
21import time
22
23from threading import Thread
24
25
26def _tcp_thread(sock: socket.socket) -> None:
27    conn, _ = sock.accept()
28    while True:
29        data = conn.recv(1)
30        conn.send(data)
31
32
33def _pty_thread(fd: int) -> None:
34    while True:
35        data = os.read(fd, 1)
36        os.write(fd, data)
37
38
39def _get_parser() -> argparse.ArgumentParser:
40    """Command line parser."""
41
42    parser = argparse.ArgumentParser()
43    parser.add_argument(
44        '-C', '--working-dir', metavar='PATH', help='working directory'
45    )
46    parser.add_argument(
47        'echo', metavar='STRING', nargs='*', help='write STRING to stdout'
48    )
49    parser.add_argument(
50        '--tcp-channel',
51        action='append',
52        default=[],
53        metavar='NAME',
54        help='listen for TCP connections, write port WDIR/NAME',
55    )
56    if sys.platform != 'win32':
57        parser.add_argument(
58            '--pty-channel',
59            action='append',
60            default=[],
61            metavar='NAME',
62            help='create pty channel and link in WDIR/NAME',
63        )
64    parser.add_argument(
65        '--exit', action='store_true', default=False, help='exit when done'
66    )
67
68    return parser
69
70
71def main() -> None:
72    """Mock emulator."""
73
74    args = _get_parser().parse_args()
75
76    if len(args.echo) > 0:
77        print(' '.join(args.echo))
78    sys.stdout.flush()
79
80    threads = []
81
82    for chan in args.tcp_channel:
83        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
84        sock.bind(('localhost', 0))
85        port = sock.getsockname()[1]
86        sock.listen()
87        with open(os.path.join(args.working_dir, chan), 'w') as file:
88            file.write(str(port))
89        thread = Thread(target=_tcp_thread, args=(sock,))
90        thread.start()
91        threads.append(thread)
92
93    if sys.platform != 'win32':
94        for chan in args.pty_channel:
95            controller, tty = os.openpty()
96            with open(os.path.join(args.working_dir, chan), 'w') as file:
97                file.write(os.ttyname(tty))
98            thread = Thread(target=_pty_thread, args=(controller,))
99            thread.start()
100            threads.append(thread)
101
102    for thread in threads:
103        thread.join()
104
105    if not args.exit:
106        while True:
107            time.sleep(1)
108
109
110if __name__ == '__main__':
111    main()
112