xref: /aosp_15_r20/external/pigweed/pw_console/py/pw_console/repl_pane.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""ReplPane class."""
15
16from __future__ import annotations
17
18import asyncio
19import concurrent
20import functools
21import logging
22import pprint
23from dataclasses import dataclass
24from typing import (
25    Any,
26    Awaitable,
27    Callable,
28    TYPE_CHECKING,
29)
30
31from prompt_toolkit.filters import (
32    Condition,
33    has_focus,
34)
35from prompt_toolkit.document import Document
36from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
37from prompt_toolkit.layout.dimension import AnyDimension
38from prompt_toolkit.widgets import TextArea
39from prompt_toolkit.layout import (
40    ConditionalContainer,
41    DynamicContainer,
42    Dimension,
43    FloatContainer,
44    HSplit,
45    Window,
46)
47from prompt_toolkit.lexers import PygmentsLexer  # type: ignore
48from pygments.lexers.python import PythonConsoleLexer  # type: ignore
49
50# Alternative Formatting
51# from IPython.lib.lexers import IPythonConsoleLexer  # type: ignore
52
53from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR
54from pw_console.pw_ptpython_repl import PwPtPythonRepl
55from pw_console.style import get_pane_style
56from pw_console.widgets import (
57    ToolbarButton,
58    WindowPane,
59    WindowPaneHSplit,
60    WindowPaneToolbar,
61)
62
63if TYPE_CHECKING:
64    from pw_console.console_app import ConsoleApp
65
66_LOG = logging.getLogger(__package__)
67
68_Namespace = dict[str, Any]
69_GetNamespace = Callable[[], _Namespace]
70
71_REPL_OUTPUT_SCROLL_AMOUNT = 5
72
73
74@dataclass
75class UserCodeExecution:
76    """Class to hold a single user repl execution event."""
77
78    input: str
79    future: concurrent.futures.Future
80    output: str
81    stdout: str
82    stderr: str
83    stdout_check_task: Awaitable | None = None
84    result_object: Any | None = None
85    result_str: str | None = None
86    exception_text: str | None = None
87
88    @property
89    def is_running(self):
90        return not self.future.done()
91
92    def update_stdout(self, text: str | None):
93        if text:
94            self.stdout = text
95
96    def update_stderr(self, text: str | None):
97        if text:
98            self.stderr = text
99
100
101class ReplPane(WindowPane):
102    """Pane for reading Python input."""
103
104    # pylint: disable=too-many-instance-attributes,too-many-public-methods
105    def __init__(
106        self,
107        application: ConsoleApp,
108        python_repl: PwPtPythonRepl,
109        pane_title: str = 'Python Repl',
110        startup_message: str | None = None,
111    ) -> None:
112        super().__init__(application, pane_title)
113
114        self.executed_code: list[UserCodeExecution] = []
115        self.application = application
116
117        self.pw_ptpython_repl = python_repl
118        self.pw_ptpython_repl.set_repl_pane(self)
119
120        self.wrap_output_lines = True
121
122        self.startup_message = startup_message if startup_message else ''
123
124        self.output_field = TextArea(
125            text=self.startup_message,
126            focusable=True,
127            focus_on_click=True,
128            scrollbar=True,
129            wrap_lines=Condition(lambda: self.wrap_output_lines),
130            lexer=PygmentsLexer(PythonConsoleLexer),
131        )
132
133        # Additional keybindings for the text area.
134        key_bindings = KeyBindings()
135        register = self.application.prefs.register_keybinding
136
137        @register('python-repl.copy-output-selection', key_bindings)
138        def _copy_selection(_event: KeyPressEvent) -> None:
139            """Copy selected text."""
140            self.copy_output_selection()
141
142        self.output_field.control.key_bindings = key_bindings
143
144        # Override output buffer mouse wheel scroll
145        self.output_field.window._scroll_up = (  # type: ignore
146            self.scroll_output_up
147        )
148        self.output_field.window._scroll_down = (  # type: ignore
149            self.scroll_output_down
150        )
151
152        self.bottom_toolbar = self._create_input_toolbar()
153        self.results_toolbar = self._create_output_toolbar()
154
155        self.progress_state = TASKS_CONTEXTVAR.get()
156
157        # ReplPane root container
158        self.container = ConditionalContainer(
159            FloatContainer(
160                # Horizontal split of all Repl pane sections.
161                WindowPaneHSplit(
162                    self,
163                    [
164                        HSplit(
165                            [
166                                # 1. Repl Output
167                                self.output_field,
168                                # 2. Progress bars if any
169                                ConditionalContainer(
170                                    DynamicContainer(
171                                        self.get_progress_bar_task_container
172                                    ),
173                                    filter=Condition(
174                                        # pylint: disable=line-too-long
175                                        lambda: not self.progress_state.all_tasks_complete
176                                        # pylint: enable=line-too-long
177                                    ),
178                                ),
179                                # 3. Static separator toolbar.
180                                self.results_toolbar,
181                            ],
182                            # Output area only dimensions
183                            height=self.get_output_height,
184                        ),
185                        HSplit(
186                            [
187                                # 3. Repl Input
188                                self.pw_ptpython_repl,
189                                # 4. Bottom toolbar
190                                self.bottom_toolbar,
191                            ],
192                            # Input area only dimensions
193                            height=self.get_input_height,
194                        ),
195                    ],
196                    # Repl pane dimensions
197                    height=lambda: self.height,
198                    width=lambda: self.width,
199                    style=functools.partial(get_pane_style, self),
200                ),
201                floats=[],
202            ),
203            filter=Condition(lambda: self.show_pane),
204        )
205
206    def toggle_wrap_output_lines(self):
207        """Enable or disable output line wraping/truncation."""
208        self.wrap_output_lines = not self.wrap_output_lines
209
210    def scroll_output_down(self) -> None:
211        """Scroll the output buffer down on mouse wheel down events."""
212        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
213            # There is no move cursor more than one line at a time function.
214            self.output_field.control.move_cursor_down()
215        self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT
216
217    def scroll_output_up(self) -> None:
218        """Scroll the output buffer up on mouse wheel up events."""
219        for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT):
220            # There is no move cursor more than one line at a time function.
221            self.output_field.control.move_cursor_up()
222        self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT
223
224    def focus_output(self):
225        self.application.focus_on_container(self.output_field)
226
227    def focus_input(self):
228        self.application.focus_on_container(self.pw_ptpython_repl)
229
230    def get_progress_bar_task_container(self):
231        bar_container = self.progress_state.get_container()
232        if bar_container:
233            return bar_container
234        return Window()
235
236    def get_output_height(self) -> AnyDimension:
237        # pylint: disable=no-self-use
238        return Dimension(min=1)
239
240    def get_input_height(self) -> AnyDimension:
241        desired_max_height = 10
242        # Check number of line breaks in the input buffer.
243        input_line_count = self.pw_ptpython_repl.line_break_count()
244        if input_line_count > desired_max_height:
245            desired_max_height = input_line_count
246        # Check if it's taller than the available space
247        if desired_max_height > self.current_pane_height:
248            # Leave space for minimum of
249            #   1 line of content in the output
250            #   + 1 for output toolbar
251            #   + 1 for input toolbar
252            desired_max_height = self.current_pane_height - 3
253
254        if desired_max_height > 1:
255            return Dimension(min=1, max=desired_max_height)
256        # Fall back to at least a height of 1
257        return Dimension(min=1)
258
259    def _create_input_toolbar(self):
260        bottom_toolbar = WindowPaneToolbar(
261            self,
262            focus_action_callable=self.focus_input,
263            focus_check_container=self.pw_ptpython_repl,
264        )
265        bottom_toolbar.add_button(
266            ToolbarButton(
267                'Ctrl-v', 'Paste', self.paste_system_clipboard_to_input_buffer
268            )
269        )
270        bottom_toolbar.add_button(
271            ToolbarButton(
272                'Ctrl-c', 'Copy / Clear', self.copy_or_clear_input_buffer
273            )
274        )
275        bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code))
276        bottom_toolbar.add_button(ToolbarButton('F2', 'Settings'))
277        bottom_toolbar.add_button(ToolbarButton('F3', 'History'))
278        return bottom_toolbar
279
280    def _create_output_toolbar(self):
281        results_toolbar = WindowPaneToolbar(
282            self,
283            title='Python Results',
284            focus_action_callable=self.focus_output,
285            focus_check_container=self.output_field,
286            include_resize_handle=False,
287        )
288        results_toolbar.add_button(
289            ToolbarButton(
290                description='Wrap lines',
291                mouse_handler=self.toggle_wrap_output_lines,
292                is_checkbox=True,
293                checked=lambda: self.wrap_output_lines,
294            )
295        )
296        results_toolbar.add_button(
297            ToolbarButton(
298                'Ctrl-Alt-c', 'Copy All Output', self.copy_all_output_text
299            )
300        )
301        results_toolbar.add_button(
302            ToolbarButton(
303                'Ctrl-c', 'Copy Selected Text', self.copy_output_selection
304            )
305        )
306
307        results_toolbar.add_button(
308            ToolbarButton(
309                description='Clear', mouse_handler=self.clear_output_buffer
310            )
311        )
312        results_toolbar.add_button(
313            ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text')
314        )
315
316        return results_toolbar
317
318    def copy_output_selection(self):
319        """Copy highlighted output text to the system clipboard."""
320        clipboard_data = self.output_field.buffer.copy_selection()
321        self.application.set_system_clipboard_data(clipboard_data)
322
323    def copy_input_selection(self):
324        """Copy highlighted input text to the system clipboard."""
325        clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection()
326        self.application.set_system_clipboard_data(clipboard_data)
327
328    def copy_all_output_text(self):
329        """Copy all text in the Python output to the system clipboard."""
330        self.application.set_system_clipboard(self.output_field.buffer.text)
331
332    def copy_all_input_text(self):
333        """Copy all text in the Python input to the system clipboard."""
334        self.application.set_system_clipboard(
335            self.pw_ptpython_repl.default_buffer.text
336        )
337
338    # pylint: disable=no-self-use
339    def get_all_key_bindings(self) -> list:
340        """Return all keybinds for this plugin."""
341        # ptpython native bindings:
342        # return [load_python_bindings(self.pw_ptpython_repl)]
343
344        # Hand-crafted bindings for display in the HelpWindow:
345        return [
346            {
347                'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'],
348                'Reverse search history': ['Ctrl-r'],
349                'Erase input buffer.': ['Ctrl-c'],
350                'Show settings.': ['F2'],
351                'Show history.': ['F3'],
352            }
353        ]
354
355    def get_window_menu_options(
356        self,
357    ) -> list[tuple[str, Callable | None]]:
358        return [
359            (
360                'Python Input > Paste',
361                self.paste_system_clipboard_to_input_buffer,
362            ),
363            ('Python Input > Copy or Clear', self.copy_or_clear_input_buffer),
364            ('Python Input > Run', self.run_code),
365            # Menu separator
366            ('-', None),
367            (
368                'Python Output > Toggle Wrap lines',
369                self.toggle_wrap_output_lines,
370            ),
371            ('Python Output > Copy All', self.copy_all_output_text),
372            ('Python Output > Copy Selection', self.copy_output_selection),
373            ('Python Output > Clear', self.clear_output_buffer),
374        ]
375
376    def run_code(self):
377        """Trigger a repl code execution on mouse click."""
378        self.pw_ptpython_repl.default_buffer.validate_and_handle()
379
380    def ctrl_c(self):
381        """Ctrl-C keybinding behavior."""
382        # If there is text in the input buffer
383        if self.pw_ptpython_repl.default_buffer.text:
384            self.copy_or_clear_input_buffer()
385        else:
386            self.interrupt_last_code_execution()
387
388    def insert_text_into_input_buffer(self, text: str) -> None:
389        self.pw_ptpython_repl.default_buffer.insert_text(text)
390
391    def paste_system_clipboard_to_input_buffer(self, erase_buffer=False):
392        if erase_buffer:
393            self.clear_input_buffer()
394
395        clip_data = self.application.application.clipboard.get_data()
396        self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data)
397
398    def clear_input_buffer(self):
399        # Erase input buffer.
400        self.pw_ptpython_repl.default_buffer.reset()
401        # Clear any displayed function signatures.
402        self.pw_ptpython_repl.on_reset()
403
404    def clear_output_buffer(self):
405        self.executed_code.clear()
406        self.update_output_buffer()
407
408    def copy_or_clear_input_buffer(self):
409        # Copy selected text if a selection is active.
410        if self.pw_ptpython_repl.default_buffer.selection_state:
411            self.copy_input_selection()
412            return
413        # Otherwise, clear the input buffer
414        self.clear_input_buffer()
415
416    def interrupt_last_code_execution(self):
417        code = self._get_currently_running_code()
418        if code:
419            code.future.cancel()
420            code.output = 'Canceled'
421            self.progress_state.cancel_all_tasks()
422        self.pw_ptpython_repl.clear_last_result()
423        self.update_output_buffer('repl_pane.interrupt_last_code_execution')
424
425    def _get_currently_running_code(self):
426        for code in self.executed_code:
427            if not code.future.done():
428                return code
429        return None
430
431    def _get_executed_code(self, future):
432        for code in self.executed_code:
433            if code.future == future:
434                return code
435        return None
436
437    def _log_executed_code(self, code, prefix=''):
438        """Log repl command input text along with a prefix string."""
439        text = self.get_output_buffer_text([code], show_index=False)
440        text = text.strip()
441        for line in text.splitlines():
442            _LOG.debug('[PYTHON %s]  %s', prefix, line.strip())
443
444    async def periodically_check_stdout(
445        self, user_code: UserCodeExecution, stdout_proxy, stderr_proxy
446    ):
447        while not user_code.future.done():
448            await asyncio.sleep(0.3)
449            stdout_text_so_far = stdout_proxy.getvalue()
450            stderr_text_so_far = stderr_proxy.getvalue()
451            if stdout_text_so_far:
452                user_code.update_stdout(stdout_text_so_far)
453            if stderr_text_so_far:
454                user_code.update_stderr(stderr_text_so_far)
455
456            # if stdout_text_so_far or stderr_text_so_far:
457            self.update_output_buffer('repl_pane.periodic_check')
458
459    def append_executed_code(self, text, future, temp_stdout, temp_stderr):
460        user_code = UserCodeExecution(
461            input=text, future=future, output=None, stdout=None, stderr=None
462        )
463
464        background_stdout_check = asyncio.create_task(
465            self.periodically_check_stdout(user_code, temp_stdout, temp_stderr)
466        )
467        user_code.stdout_check_task = background_stdout_check
468        self.executed_code.append(user_code)
469        self._log_executed_code(user_code, prefix='START')
470
471    def append_result_to_executed_code(
472        self,
473        _input_text,
474        future,
475        result_text,
476        stdout_text='',
477        stderr_text='',
478        exception_text='',
479        result_object=None,
480    ):
481        code = self._get_executed_code(future)
482        if code:
483            code.output = result_text
484            code.stdout = stdout_text
485            code.stderr = stderr_text
486            code.exception_text = exception_text
487            code.result_object = result_object
488            if result_object is not None:
489                code.result_str = self._format_result_object(result_object)
490
491        self._log_executed_code(code, prefix='FINISH')
492        self.update_output_buffer('repl_pane.append_result_to_executed_code')
493
494    def _format_result_object(self, result_object: Any) -> str:
495        """Pretty print format a Python object respecting the window width."""
496        content_width = (
497            self.current_pane_width if self.current_pane_width else 80
498        )
499        pprint_respecting_width = pprint.PrettyPrinter(
500            indent=2, width=content_width
501        ).pformat
502
503        return pprint_respecting_width(result_object)
504
505    def get_output_buffer_text(
506        self,
507        code_items: list[UserCodeExecution] | None = None,
508        show_index: bool = True,
509    ):
510        executed_code = code_items or self.executed_code
511
512        template = self.application.get_template('repl_output.jinja')
513
514        return template.render(
515            code_items=executed_code,
516            show_index=show_index,
517        )
518
519    def update_output_buffer(self, *unused_args):
520        text = self.get_output_buffer_text()
521        # Add an extra line break so the last cursor position is in column 0
522        # instead of the end of the last line.
523        text += '\n'
524        self.output_field.buffer.set_document(
525            Document(text=text, cursor_position=len(text))
526        )
527
528        self.application.redraw_ui()
529
530    def input_or_output_has_focus(self) -> Condition:
531        @Condition
532        def test() -> bool:
533            if (
534                has_focus(self.output_field)()
535                or has_focus(self.pw_ptpython_repl)()
536            ):
537                return True
538            return False
539
540        return test
541
542    def history_completions(self) -> list[tuple[str, str]]:
543        return [
544            (
545                ' '.join([line.lstrip() for line in text.splitlines()]),
546                # Pass original text as the completion result.
547                text,
548            )
549            for text in list(
550                self.pw_ptpython_repl.history.load_history_strings()
551            )
552        ]
553