1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""pw_console test mode functions.""" 15 16import asyncio 17import time 18import re 19import random 20import logging 21from threading import Thread 22 23FAKE_DEVICE_LOGGER_NAME = 'pw_console_fake_device' 24 25_ROOT_LOG = logging.getLogger('') 26_FAKE_DEVICE_LOG = logging.getLogger(FAKE_DEVICE_LOGGER_NAME) 27 28 29def start_fake_logger(lines, log_thread_entry, log_thread_loop): 30 fake_log_messages = prepare_fake_logs(lines) 31 32 test_log_thread = Thread(target=log_thread_entry, args=(), daemon=True) 33 test_log_thread.start() 34 35 background_log_task = asyncio.run_coroutine_threadsafe( 36 # This function will be executed in a separate thread. 37 log_forever(fake_log_messages), 38 # Using this asyncio event loop. 39 log_thread_loop, 40 ) # type: ignore 41 return background_log_task 42 43 44def prepare_fake_logs(lines) -> list[tuple[str, dict]]: 45 fake_logs: list[tuple[str, dict]] = [] 46 key_regex = re.compile(r':kbd:`(?P<key>[^`]+)`') 47 for line in lines: 48 if not line: 49 continue 50 51 keyboard_key = '' 52 search = key_regex.search(line) 53 if search: 54 keyboard_key = search.group(1) 55 56 fake_logs.append((line.lstrip(), {'keys': keyboard_key})) 57 return fake_logs 58 59 60async def log_forever(fake_log_messages: list[tuple[str, dict]]): 61 """Test mode async log generator coroutine that runs forever.""" 62 _ROOT_LOG.info('Fake log device connected.') 63 start_time = time.time() 64 message_count = 0 65 66 log_methods = [ 67 _FAKE_DEVICE_LOG.info, 68 _FAKE_DEVICE_LOG.debug, 69 _FAKE_DEVICE_LOG.warning, 70 _FAKE_DEVICE_LOG.error, 71 _FAKE_DEVICE_LOG.critical, 72 ] 73 log_method_rand_weights = [50, 20, 10, 10, 10] 74 75 # Fake module column names. 76 module_names = ['APP', 'RADIO', 'BAT', 'USB', 'CPU'] 77 while True: 78 if message_count > 32: 79 await asyncio.sleep(1) 80 fake_log = random.choice(fake_log_messages) 81 log_func = random.choices(log_methods, weights=log_method_rand_weights) 82 83 module_name = module_names[message_count % len(module_names)] 84 log_func[0]( 85 fake_log[0], 86 extra=dict( 87 extra_metadata_fields=dict( 88 module=module_name, 89 file='fake_app.cc', 90 timestamp=time.time() - start_time, 91 **fake_log[1], 92 ) 93 ), 94 ) 95 message_count += 1 96 if message_count % 10 == 0: 97 _ROOT_LOG.info('Device message count: %d', message_count) 98