xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/test_mode.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""pw_console test mode functions."""
15
16import asyncio
17import time
18import re
19import random
20import logging
21from threading import Thread
22
23FAKE_DEVICE_LOGGER_NAME = 'pw_console_fake_device'
24
25_ROOT_LOG = logging.getLogger('')
26_FAKE_DEVICE_LOG = logging.getLogger(FAKE_DEVICE_LOGGER_NAME)
27
28
29def start_fake_logger(lines, log_thread_entry, log_thread_loop):
30    fake_log_messages = prepare_fake_logs(lines)
31
32    test_log_thread = Thread(target=log_thread_entry, args=(), daemon=True)
33    test_log_thread.start()
34
35    background_log_task = asyncio.run_coroutine_threadsafe(
36        # This function will be executed in a separate thread.
37        log_forever(fake_log_messages),
38        # Using this asyncio event loop.
39        log_thread_loop,
40    )  # type: ignore
41    return background_log_task
42
43
44def prepare_fake_logs(lines) -> list[tuple[str, dict]]:
45    fake_logs: list[tuple[str, dict]] = []
46    key_regex = re.compile(r':kbd:`(?P<key>[^`]+)`')
47    for line in lines:
48        if not line:
49            continue
50
51        keyboard_key = ''
52        search = key_regex.search(line)
53        if search:
54            keyboard_key = search.group(1)
55
56        fake_logs.append((line.lstrip(), {'keys': keyboard_key}))
57    return fake_logs
58
59
60async def log_forever(fake_log_messages: list[tuple[str, dict]]):
61    """Test mode async log generator coroutine that runs forever."""
62    _ROOT_LOG.info('Fake log device connected.')
63    start_time = time.time()
64    message_count = 0
65
66    log_methods = [
67        _FAKE_DEVICE_LOG.info,
68        _FAKE_DEVICE_LOG.debug,
69        _FAKE_DEVICE_LOG.warning,
70        _FAKE_DEVICE_LOG.error,
71        _FAKE_DEVICE_LOG.critical,
72    ]
73    log_method_rand_weights = [50, 20, 10, 10, 10]
74
75    # Fake module column names.
76    module_names = ['APP', 'RADIO', 'BAT', 'USB', 'CPU']
77    while True:
78        if message_count > 32:
79            await asyncio.sleep(1)
80        fake_log = random.choice(fake_log_messages)
81        log_func = random.choices(log_methods, weights=log_method_rand_weights)
82
83        module_name = module_names[message_count % len(module_names)]
84        log_func[0](
85            fake_log[0],
86            extra=dict(
87                extra_metadata_fields=dict(
88                    module=module_name,
89                    file='fake_app.cc',
90                    timestamp=time.time() - start_time,
91                    **fake_log[1],
92                )
93            ),
94        )
95        message_count += 1
96        if message_count % 10 == 0:
97            _ROOT_LOG.info('Device message count: %d', message_count)
98