xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/web_server.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Console HTTP Log Server functions."""
15
16import asyncio
17import datetime
18import email.utils
19import logging
20import mimetypes
21from pathlib import Path
22import socket
23from threading import Thread
24import webbrowser
25from typing import Any, Callable
26
27from aiohttp import web, WSMsgType
28
29from pw_console.web_kernel import WebKernel
30
31_LOG = logging.getLogger(__package__)
32
33
34def aiohttp_server(handler: Callable) -> web.AppRunner:
35    app = web.Application()
36    app.add_routes([web.get('/', handler), web.get('/{name}', handler)])
37    runner = web.AppRunner(app)
38    return runner
39
40
41def find_available_port(start_port=8080, max_retries=100) -> int:
42    """Finds the next available port starting from `start_port`."""
43    for port in range(start_port, start_port + max_retries):
44        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
45            try:
46                s.bind(('localhost', port))
47                return port
48            except OSError:
49                pass  # Port is already in use, try the next one
50
51    raise RuntimeError(
52        (
53            'No available port found within the range '
54            f'{start_port}-{start_port + max_retries}'
55        )
56    )
57
58
59def pw_console_http_server(
60    start_port: int,
61    html_files: dict[str, str],
62    kernel_params: dict[str, Any] | None = None,
63) -> None:
64    try:
65        handler = WebHandler(html_files=html_files, kernel_params=kernel_params)
66        handler.start_web_socket_streaming_responder_thread()
67        runner = aiohttp_server(handler.handle_request)
68        port = find_available_port(start_port)
69        loop = asyncio.new_event_loop()
70        asyncio.set_event_loop(loop)
71        loop.run_until_complete(runner.setup())
72        site = web.TCPSite(runner, 'localhost', port)
73        loop.run_until_complete(site.start())
74        url = f'http://localhost:{port}'
75        print(url)
76        webbrowser.open(url)
77        loop.run_forever()
78    except KeyboardInterrupt:
79        _LOG.info('Shutting down...')
80        handler.stop_web_socket_streaming_responder_thread()
81        loop.stop()
82
83
84class WebHandler:
85    """Request handler that serves files from pw_console.html package data."""
86
87    def __init__(
88        self,
89        html_files: dict[str, str],
90        kernel_params: dict[str, Any] | None = None,
91    ) -> None:
92        self.html_files = html_files
93        self.date_modified = email.utils.formatdate(
94            datetime.datetime.now().timestamp(), usegmt=True
95        )
96        self.kernel_params: dict[str, Any] = {}
97        if kernel_params:
98            self.kernel_params = kernel_params
99
100        self.web_socket_streaming_responder_loop = asyncio.new_event_loop()
101
102    def _web_socket_streaming_responder_thread_entry(self):
103        """Entry point for the web socket logging handlers thread."""
104        asyncio.set_event_loop(self.web_socket_streaming_responder_loop)
105        self.web_socket_streaming_responder_loop.run_forever()
106
107    def start_web_socket_streaming_responder_thread(self):
108        """Start thread for handling log messages to web socket responses."""
109        thread = Thread(
110            target=self._web_socket_streaming_responder_thread_entry,
111            args=(),
112            daemon=True,
113        )
114        thread.start()
115
116    def stop_web_socket_streaming_responder_thread(self):
117        self.web_socket_streaming_responder_loop.call_soon_threadsafe(
118            self.web_socket_streaming_responder_loop.stop
119        )
120
121    async def handle_request(
122        self, request: web.Request
123    ) -> web.Response | web.WebSocketResponse:
124        _LOG.debug(
125            '%s: %s',
126            request.remote,
127            request.raw_path,
128        )
129
130        path = request.path
131        if path == '/':
132            path = '/console.html'
133
134        if path == '/ws':
135            return await self.handle_websocket(request)
136
137        if path not in self.html_files:
138            return web.Response(status=404, text='File not found')
139
140        content: bytes = self.html_files[path].encode('utf-8')
141        content_type = 'application/octet-stream'
142        mime_guess, _ = mimetypes.guess_type(Path(path).name)
143        if mime_guess:
144            content_type = mime_guess
145
146        return web.Response(
147            body=content,
148            content_type=content_type,
149            headers={
150                'Content-Length': str(len(content)),
151                'Last-Modified': self.date_modified,
152            },
153        )
154
155    async def handle_websocket(
156        self, request: web.Request
157    ) -> web.WebSocketResponse:
158        """Handle a websocket connection request by creating a new kernel."""
159        ws = web.WebSocketResponse()
160        await ws.prepare(request)
161        kernel = WebKernel(
162            ws, self.kernel_params, self.web_socket_streaming_responder_loop
163        )
164        try:
165            async for msg in ws:
166                if msg.type == WSMsgType.TEXT:
167                    if msg.data == 'close':
168                        return ws
169                    response = await kernel.handle_request(msg.data)
170                    await ws.send_str(response)
171                elif msg.type == WSMsgType.ERROR:
172                    _LOG.warning(
173                        'ws connection closed with exception: %s',
174                        ws.exception(),
175                    )
176        finally:
177            kernel.handle_disconnect()
178            await ws.close()
179
180        return ws
181