xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/pw_ptpython_repl.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""PwPtPythonPane class."""
15
16from __future__ import annotations
17
18import asyncio
19import functools
20import io
21import logging
22import os
23import sys
24import shlex
25import subprocess
26from typing import Any, Iterable, TYPE_CHECKING
27from unittest.mock import patch
28
29# inclusive-language: disable
30from prompt_toolkit.input import DummyInput as IgnoredInput
31
32# inclusive-language: enable
33
34from prompt_toolkit.output.plain_text import PlainTextOutput
35from prompt_toolkit.buffer import Buffer
36from prompt_toolkit.layout.controls import BufferControl
37from prompt_toolkit.completion import merge_completers
38from prompt_toolkit.filters import (
39    Condition,
40    has_focus,
41    to_filter,
42)
43from prompt_toolkit.formatted_text import StyleAndTextTuples
44
45from ptpython.completer import (  # type: ignore
46    CompletePrivateAttributes,
47    PythonCompleter,
48)
49from ptpython.printer import OutputPrinter
50import ptpython.repl  # type: ignore
51from ptpython.layout import (  # type: ignore
52    CompletionVisualisation,
53    Dimension,
54)
55import pygments.plugin
56
57from pw_console.pigweed_code_style import (
58    PigweedCodeStyle,
59    PigweedCodeLightStyle,
60    Synthwave84CodeStyle,
61)
62from pw_console.text_formatting import remove_formatting
63
64if TYPE_CHECKING:
65    from pw_console.repl_pane import ReplPane
66
67_LOG = logging.getLogger(__package__)
68_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command')
69
70_original_find_plugin_styles = pygments.plugin.find_plugin_styles
71
72
73def _wrapped_find_plugin_styles():
74    """Patch pygment find_plugin_styles to also include Pigweed codes styles
75
76    This allows using these themes without requiring Python entrypoints.
77    """
78    for style in [
79        ('pigweed-code', PigweedCodeStyle),
80        ('pigweed-code-light', PigweedCodeLightStyle),
81        ('synthwave84', Synthwave84CodeStyle),
82    ]:
83        yield style
84    yield from _original_find_plugin_styles()
85
86
87class MissingPtpythonBufferControl(Exception):
88    """Exception for a missing ptpython BufferControl object."""
89
90
91def _user_input_is_a_shell_command(text: str) -> bool:
92    return text.startswith('!')
93
94
95class PwPtPythonRepl(
96    ptpython.repl.PythonRepl
97):  # pylint: disable=too-many-instance-attributes
98    """A ptpython repl class with changes to code execution and output related
99    methods."""
100
101    @patch(
102        'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles
103    )
104    def __init__(
105        self,
106        *args,
107        # pw_console specific kwargs
108        extra_completers: Iterable | None = None,
109        **ptpython_kwargs,
110    ):
111        completer = None
112        if extra_completers:
113            # Create the default python completer used by
114            # ptpython.repl.PythonRepl
115            python_completer = PythonCompleter(
116                # No self.get_globals yet so this must be a lambda
117                # pylint: disable=unnecessary-lambda
118                lambda: self.get_globals(),
119                lambda: self.get_locals(),
120                lambda: self.enable_dictionary_completion,  # type: ignore
121            )
122
123            all_completers = [python_completer]
124            all_completers.extend(extra_completers)
125            # Merge default Python completer with the new custom one.
126            completer = merge_completers(all_completers)
127
128        super().__init__(
129            *args,
130            create_app=False,
131            # Absolute minimum height of 1
132            _input_buffer_height=Dimension(min=1),
133            _completer=completer,
134            **ptpython_kwargs,
135        )
136
137        self.enable_mouse_support: bool = True
138        self.enable_history_search: bool = True
139        self.enable_dictionary_completion: bool = True
140        self._set_pt_python_input_buffer_control_focusable()
141
142        # Change some ptpython.repl defaults.
143        self.show_status_bar = False
144        self.show_exit_confirmation = False
145        self.complete_private_attributes = CompletePrivateAttributes.NEVER
146
147        # Function signature that shows args, kwargs, and types under the cursor
148        # of the input window.
149        self.show_signature: bool = True
150        # Docstring of the current completed function that appears at the bottom
151        # of the input window.
152        self.show_docstring: bool = False
153
154        # Turn off the completion menu in ptpython. The CompletionsMenu in
155        # ConsoleApp.root_container will handle this.
156        self.completion_visualisation: CompletionVisualisation = (
157            CompletionVisualisation.NONE
158        )
159
160        # Additional state variables.
161        self.repl_pane: ReplPane | None = None
162        self._last_result = None
163        self._last_exception = None
164
165    def _set_pt_python_input_buffer_control_focusable(self) -> None:
166        """Enable focus_on_click for ptpython's input buffer."""
167        error_message = (
168            'Unable to find ptpythons BufferControl input object.\n'
169            '  For the last known position see:\n'
170            '  https://github.com/prompt-toolkit/ptpython/'
171            'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/'
172            'ptpython/layout.py#L598\n'
173            '\n'
174            'The installed version of ptpython may not be compatible with'
175            ' pw console; please try re-running environment setup.'
176        )
177
178        try:
179            # Fetch the Window's BufferControl object.
180            # From ptpython/layout.py:
181            #   self.root_container = HSplit([
182            #     VSplit([
183            #       HSplit([
184            #         FloatContainer(
185            #           content=HSplit(
186            #             [create_python_input_window()] + extra_body
187            #           ), ...
188            ptpython_buffer_control = (
189                self.ptpython_layout.root_container.children[0]  # type: ignore
190                .children[0]
191                .children[0]
192                .content.children[0]
193                .content
194            )
195            # This should be a BufferControl instance
196            if not isinstance(ptpython_buffer_control, BufferControl):
197                raise MissingPtpythonBufferControl(error_message)
198            # Enable focus options
199            ptpython_buffer_control.focusable = to_filter(True)
200            ptpython_buffer_control.focus_on_click = to_filter(True)
201        except IndexError as _error:
202            raise MissingPtpythonBufferControl(error_message)
203
204    def __pt_container__(self):
205        """Return the prompt_toolkit root container for class.
206
207        This allows self to be used wherever prompt_toolkit expects a container
208        object."""
209        return self.ptpython_layout.root_container
210
211    def set_repl_pane(self, repl_pane):
212        """Update the parent pw_console.ReplPane reference."""
213        self.repl_pane = repl_pane
214
215    def _save_result(self, formatted_text):
216        """Save the last repl execution result."""
217        unformatted_result = formatted_text
218        self._last_result = unformatted_result
219
220    def _save_exception(self, formatted_text):
221        """Save the last repl exception."""
222        unformatted_result = remove_formatting(formatted_text)
223        self._last_exception = unformatted_result
224
225    def clear_last_result(self):
226        """Erase the last repl execution result."""
227        self._last_result = None
228        self._last_exception = None
229
230    def _format_result_output(self, result: Any) -> str | None:
231        """Return a plaintext repr of any object."""
232        try:
233            formatted_result = repr(result)
234        except BaseException as e:  # pylint: disable=broad-exception-caught
235            self._handle_exception(e)
236            formatted_result = None
237        return formatted_result
238
239    def _show_result(self, result: Any):
240        """Format and save output results.
241
242        This function is called from the _run_user_code() function which is
243        always run from the user code thread, within
244        .run_and_show_expression_async().
245        """
246        self._save_result(self._format_result_output(result))
247
248    def _get_output_printer(self) -> OutputPrinter:
249        return OutputPrinter(
250            output=PlainTextOutput(io.StringIO()),
251            input=IgnoredInput(),
252            style=self._current_style,
253            style_transformation=self.style_transformation,
254            title=self.title,
255        )
256
257    def _format_exception_output(self, e: BaseException) -> StyleAndTextTuples:
258        output_printer = self._get_output_printer()
259        formatted_result = output_printer._format_exception_output(  # pylint: disable=protected-access
260            e, highlight=False
261        )
262        return list(formatted_result)
263
264    def _handle_exception(self, e: BaseException) -> None:
265        """Format and save output results.
266
267        This function is called from the _run_user_code() function which is
268        always run from the user code thread, within
269        .run_and_show_expression_async().
270        """
271        self._save_exception(self._format_exception_output(e))
272
273    async def run_and_show_expression_async(self, text: str) -> Any:
274        """Run user code and handle the result.
275
276        This function is similar to ptpython version v3.0.23.
277        """
278        loop = asyncio.get_event_loop()
279
280        try:
281            result = await self.eval_async(text)
282        except KeyboardInterrupt:
283            raise
284        except SystemExit:
285            return
286        except BaseException as e:  # pylint: disable=broad-exception-caught
287            self._handle_exception(e)
288        else:
289            # Print.
290            if result is not None:
291                await loop.run_in_executor(
292                    None, lambda: self._show_result(result)
293                )
294
295            self.current_statement_index += 1
296            self.signatures = []
297            return result
298
299    def user_code_complete_callback(self, input_text, future):
300        """Callback to run after user repl code is finished."""
301        # If there was an exception it will be saved in self._last_result
302        result_text = self._last_result
303        result_object = None
304        exception_text = self._last_exception
305
306        # _last_results consumed, erase for the next run.
307        self.clear_last_result()
308
309        stdout_contents = None
310        stderr_contents = None
311        if future.result():
312            future_result = future.result()
313            stdout_contents = future_result['stdout']
314            stderr_contents = future_result['stderr']
315            result_object = future_result['result']
316
317            if result_object is not None:
318                # Use ptpython formatted results:
319                result_text = self._format_result_output(result_object)
320
321        # Job is finished, append the last result.
322        self.repl_pane.append_result_to_executed_code(
323            input_text,
324            future,
325            result_text,
326            stdout_contents,
327            stderr_contents,
328            exception_text=exception_text,
329            result_object=result_object,
330        )
331
332        # Rebuild output buffer.
333        self.repl_pane.update_output_buffer(
334            'pw_ptpython_repl.user_code_complete_callback'
335        )
336
337        # Trigger a prompt_toolkit application redraw.
338        self.repl_pane.application.application.invalidate()
339
340    async def _run_system_command(  # pylint: disable=no-self-use
341        self, text, stdout_proxy, _stdin_proxy
342    ) -> int:
343        """Run a shell command and print results to the repl."""
344        command = shlex.split(text)
345        returncode = None
346        env = os.environ.copy()
347        # Force colors in Pigweed subcommands and some terminal apps.
348        env['PW_USE_COLOR'] = '1'
349        env['CLICOLOR_FORCE'] = '1'
350
351        def _handle_output(output):
352            # Force tab characters to 8 spaces to prevent \t from showing in
353            # prompt_toolkit.
354            output = output.replace('\t', '        ')
355            # Strip some ANSI sequences that don't render.
356            output = output.replace('\x1b(B\x1b[m', '')
357            output = output.replace('\x1b[1m', '')
358            stdout_proxy.write(output)
359            _SYSTEM_COMMAND_LOG.info(output.rstrip())
360
361        with subprocess.Popen(
362            command,
363            env=env,
364            stdout=subprocess.PIPE,
365            stderr=subprocess.STDOUT,
366            errors='replace',
367        ) as proc:
368            # Print the command
369            _SYSTEM_COMMAND_LOG.info('')
370            _SYSTEM_COMMAND_LOG.info('$ %s', text)
371            while returncode is None:
372                if not proc.stdout:
373                    continue
374
375                # Check for one line and update.
376                output = proc.stdout.readline()
377                _handle_output(output)
378
379                returncode = proc.poll()
380
381            # Print any remaining lines.
382            for output in proc.stdout.readlines():
383                _handle_output(output)
384
385        return returncode
386
387    async def _run_user_code(self, text, stdout_proxy, stdin_proxy):
388        """Run user code and capture stdout+err.
389
390        This fuction should be run in a separate thread from the main
391        prompt_toolkit application."""
392        # NOTE: This function runs in a separate thread using the asyncio event
393        # loop defined by self.repl_pane.application.user_code_loop. Patching
394        # stdout here will not effect the stdout used by prompt_toolkit and the
395        # main user interface.
396
397        # Patch stdout and stderr to capture repl print() statements.
398        original_stdout = sys.stdout
399        original_stderr = sys.stderr
400
401        sys.stdout = stdout_proxy
402        sys.stderr = stdin_proxy
403
404        # Run user repl code
405        try:
406            if _user_input_is_a_shell_command(text):
407                result = await self._run_system_command(
408                    text[1:], stdout_proxy, stdin_proxy
409                )
410            else:
411                result = await self.run_and_show_expression_async(text)
412        finally:
413            # Always restore original stdout and stderr
414            sys.stdout = original_stdout
415            sys.stderr = original_stderr
416
417        # Save the captured output
418        stdout_contents = stdout_proxy.getvalue()
419        stderr_contents = stdin_proxy.getvalue()
420
421        return {
422            'stdout': stdout_contents,
423            'stderr': stderr_contents,
424            'result': result,
425        }
426
427    def _accept_handler(self, buff: Buffer) -> bool:
428        """Function executed when pressing enter in the ptpython.repl.PythonRepl
429        input buffer."""
430        # Do nothing if no text is entered.
431        if len(buff.text) == 0:
432            return False
433        if self.repl_pane is None:
434            return False
435
436        repl_input_text = buff.text
437        # Exit if quit or exit
438        if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']:
439            self.repl_pane.application.application.exit()  # type: ignore
440
441        # Create stdout and stderr proxies
442        temp_stdout = io.StringIO()
443        temp_stderr = io.StringIO()
444
445        # The help() command with no args uses it's own interactive prompt which
446        # will not work if prompt_toolkit is running.
447        if repl_input_text.strip() in ['help()']:
448            # Run nothing
449            repl_input_text = ''
450            # Override stdout
451            temp_stdout.write(
452                'Error: Interactive help() is not compatible with this repl.'
453            )
454
455        # Pop open the system command log pane for shell commands.
456        if _user_input_is_a_shell_command(repl_input_text):
457            self.repl_pane.application.setup_command_runner_log_pane()
458
459        # Execute the repl code in the the separate user_code thread loop.
460        future = asyncio.run_coroutine_threadsafe(
461            # This function will be executed in a separate thread.
462            self._run_user_code(repl_input_text, temp_stdout, temp_stderr),
463            # Using this asyncio event loop.
464            self.repl_pane.application.user_code_loop,
465        )  # type: ignore
466
467        # Save the input text and future object.
468        self.repl_pane.append_executed_code(
469            repl_input_text, future, temp_stdout, temp_stderr
470        )  # type: ignore
471
472        # Run user_code_complete_callback() when done.
473        done_callback = functools.partial(
474            self.user_code_complete_callback, repl_input_text
475        )
476        future.add_done_callback(done_callback)
477
478        # Rebuild the parent ReplPane output buffer.
479        self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler')
480
481        # TODO(tonymd): Return True if exception is found?
482        # Don't keep input for now. Return True to keep input text.
483        return False
484
485    def line_break_count(self) -> int:
486        return self.default_buffer.text.count('\n')
487
488    def input_empty_if_in_focus_condition(self) -> Condition:
489        @Condition
490        def test() -> bool:
491            if has_focus(self)() and len(self.default_buffer.text) == 0:
492                return True
493            return not has_focus(self)()
494
495        return test
496