1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Console HTTP Log Server functions.""" 15 16import asyncio 17import datetime 18import email.utils 19import logging 20import mimetypes 21from pathlib import Path 22import socket 23from threading import Thread 24import webbrowser 25from typing import Any, Callable 26 27from aiohttp import web, WSMsgType 28 29from pw_console.web_kernel import WebKernel 30 31_LOG = logging.getLogger(__package__) 32 33 34def aiohttp_server(handler: Callable) -> web.AppRunner: 35 app = web.Application() 36 app.add_routes([web.get('/', handler), web.get('/{name}', handler)]) 37 runner = web.AppRunner(app) 38 return runner 39 40 41def find_available_port(start_port=8080, max_retries=100) -> int: 42 """Finds the next available port starting from `start_port`.""" 43 for port in range(start_port, start_port + max_retries): 44 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 45 try: 46 s.bind(('localhost', port)) 47 return port 48 except OSError: 49 pass # Port is already in use, try the next one 50 51 raise RuntimeError( 52 ( 53 'No available port found within the range ' 54 f'{start_port}-{start_port + max_retries}' 55 ) 56 ) 57 58 59def pw_console_http_server( 60 start_port: int, 61 html_files: dict[str, str], 62 kernel_params: dict[str, Any] | None = None, 63) -> None: 64 try: 65 handler = WebHandler(html_files=html_files, kernel_params=kernel_params) 66 handler.start_web_socket_streaming_responder_thread() 67 runner = aiohttp_server(handler.handle_request) 68 port = find_available_port(start_port) 69 loop = asyncio.new_event_loop() 70 asyncio.set_event_loop(loop) 71 loop.run_until_complete(runner.setup()) 72 site = web.TCPSite(runner, 'localhost', port) 73 loop.run_until_complete(site.start()) 74 url = f'http://localhost:{port}' 75 print(url) 76 webbrowser.open(url) 77 loop.run_forever() 78 except KeyboardInterrupt: 79 _LOG.info('Shutting down...') 80 handler.stop_web_socket_streaming_responder_thread() 81 loop.stop() 82 83 84class WebHandler: 85 """Request handler that serves files from pw_console.html package data.""" 86 87 def __init__( 88 self, 89 html_files: dict[str, str], 90 kernel_params: dict[str, Any] | None = None, 91 ) -> None: 92 self.html_files = html_files 93 self.date_modified = email.utils.formatdate( 94 datetime.datetime.now().timestamp(), usegmt=True 95 ) 96 self.kernel_params: dict[str, Any] = {} 97 if kernel_params: 98 self.kernel_params = kernel_params 99 100 self.web_socket_streaming_responder_loop = asyncio.new_event_loop() 101 102 def _web_socket_streaming_responder_thread_entry(self): 103 """Entry point for the web socket logging handlers thread.""" 104 asyncio.set_event_loop(self.web_socket_streaming_responder_loop) 105 self.web_socket_streaming_responder_loop.run_forever() 106 107 def start_web_socket_streaming_responder_thread(self): 108 """Start thread for handling log messages to web socket responses.""" 109 thread = Thread( 110 target=self._web_socket_streaming_responder_thread_entry, 111 args=(), 112 daemon=True, 113 ) 114 thread.start() 115 116 def stop_web_socket_streaming_responder_thread(self): 117 self.web_socket_streaming_responder_loop.call_soon_threadsafe( 118 self.web_socket_streaming_responder_loop.stop 119 ) 120 121 async def handle_request( 122 self, request: web.Request 123 ) -> web.Response | web.WebSocketResponse: 124 _LOG.debug( 125 '%s: %s', 126 request.remote, 127 request.raw_path, 128 ) 129 130 path = request.path 131 if path == '/': 132 path = '/console.html' 133 134 if path == '/ws': 135 return await self.handle_websocket(request) 136 137 if path not in self.html_files: 138 return web.Response(status=404, text='File not found') 139 140 content: bytes = self.html_files[path].encode('utf-8') 141 content_type = 'application/octet-stream' 142 mime_guess, _ = mimetypes.guess_type(Path(path).name) 143 if mime_guess: 144 content_type = mime_guess 145 146 return web.Response( 147 body=content, 148 content_type=content_type, 149 headers={ 150 'Content-Length': str(len(content)), 151 'Last-Modified': self.date_modified, 152 }, 153 ) 154 155 async def handle_websocket( 156 self, request: web.Request 157 ) -> web.WebSocketResponse: 158 """Handle a websocket connection request by creating a new kernel.""" 159 ws = web.WebSocketResponse() 160 await ws.prepare(request) 161 kernel = WebKernel( 162 ws, self.kernel_params, self.web_socket_streaming_responder_loop 163 ) 164 try: 165 async for msg in ws: 166 if msg.type == WSMsgType.TEXT: 167 if msg.data == 'close': 168 return ws 169 response = await kernel.handle_request(msg.data) 170 await ws.send_str(response) 171 elif msg.type == WSMsgType.ERROR: 172 _LOG.warning( 173 'ws connection closed with exception: %s', 174 ws.exception(), 175 ) 176 finally: 177 kernel.handle_disconnect() 178 await ws.close() 179 180 return ws 181