1#!/usr/bin/env python 2# Copyright 2023 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Mock emulator used for testing process and channel management.""" 16 17import argparse 18import os 19import socket 20import sys 21import time 22 23from threading import Thread 24 25 26def _tcp_thread(sock: socket.socket) -> None: 27 conn, _ = sock.accept() 28 while True: 29 data = conn.recv(1) 30 conn.send(data) 31 32 33def _pty_thread(fd: int) -> None: 34 while True: 35 data = os.read(fd, 1) 36 os.write(fd, data) 37 38 39def _get_parser() -> argparse.ArgumentParser: 40 """Command line parser.""" 41 42 parser = argparse.ArgumentParser() 43 parser.add_argument( 44 '-C', '--working-dir', metavar='PATH', help='working directory' 45 ) 46 parser.add_argument( 47 'echo', metavar='STRING', nargs='*', help='write STRING to stdout' 48 ) 49 parser.add_argument( 50 '--tcp-channel', 51 action='append', 52 default=[], 53 metavar='NAME', 54 help='listen for TCP connections, write port WDIR/NAME', 55 ) 56 if sys.platform != 'win32': 57 parser.add_argument( 58 '--pty-channel', 59 action='append', 60 default=[], 61 metavar='NAME', 62 help='create pty channel and link in WDIR/NAME', 63 ) 64 parser.add_argument( 65 '--exit', action='store_true', default=False, help='exit when done' 66 ) 67 68 return parser 69 70 71def main() -> None: 72 """Mock emulator.""" 73 74 args = _get_parser().parse_args() 75 76 if len(args.echo) > 0: 77 print(' '.join(args.echo)) 78 sys.stdout.flush() 79 80 threads = [] 81 82 for chan in args.tcp_channel: 83 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 84 sock.bind(('localhost', 0)) 85 port = sock.getsockname()[1] 86 sock.listen() 87 with open(os.path.join(args.working_dir, chan), 'w') as file: 88 file.write(str(port)) 89 thread = Thread(target=_tcp_thread, args=(sock,)) 90 thread.start() 91 threads.append(thread) 92 93 if sys.platform != 'win32': 94 for chan in args.pty_channel: 95 controller, tty = os.openpty() 96 with open(os.path.join(args.working_dir, chan), 'w') as file: 97 file.write(os.ttyname(tty)) 98 thread = Thread(target=_pty_thread, args=(controller,)) 99 thread.start() 100 threads.append(thread) 101 102 for thread in threads: 103 thread.join() 104 105 if not args.exit: 106 while True: 107 time.sleep(1) 108 109 110if __name__ == '__main__': 111 main() 112