1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""ReplPane class.""" 15 16from __future__ import annotations 17 18import asyncio 19import concurrent 20import functools 21import logging 22import pprint 23from dataclasses import dataclass 24from typing import ( 25 Any, 26 Awaitable, 27 Callable, 28 TYPE_CHECKING, 29) 30 31from prompt_toolkit.filters import ( 32 Condition, 33 has_focus, 34) 35from prompt_toolkit.document import Document 36from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent 37from prompt_toolkit.layout.dimension import AnyDimension 38from prompt_toolkit.widgets import TextArea 39from prompt_toolkit.layout import ( 40 ConditionalContainer, 41 DynamicContainer, 42 Dimension, 43 FloatContainer, 44 HSplit, 45 Window, 46) 47from prompt_toolkit.lexers import PygmentsLexer # type: ignore 48from pygments.lexers.python import PythonConsoleLexer # type: ignore 49 50# Alternative Formatting 51# from IPython.lib.lexers import IPythonConsoleLexer # type: ignore 52 53from pw_console.progress_bar.progress_bar_state import TASKS_CONTEXTVAR 54from pw_console.pw_ptpython_repl import PwPtPythonRepl 55from pw_console.style import get_pane_style 56from pw_console.widgets import ( 57 ToolbarButton, 58 WindowPane, 59 WindowPaneHSplit, 60 WindowPaneToolbar, 61) 62 63if TYPE_CHECKING: 64 from pw_console.console_app import ConsoleApp 65 66_LOG = logging.getLogger(__package__) 67 68_Namespace = dict[str, Any] 69_GetNamespace = Callable[[], _Namespace] 70 71_REPL_OUTPUT_SCROLL_AMOUNT = 5 72 73 74@dataclass 75class UserCodeExecution: 76 """Class to hold a single user repl execution event.""" 77 78 input: str 79 future: concurrent.futures.Future 80 output: str 81 stdout: str 82 stderr: str 83 stdout_check_task: Awaitable | None = None 84 result_object: Any | None = None 85 result_str: str | None = None 86 exception_text: str | None = None 87 88 @property 89 def is_running(self): 90 return not self.future.done() 91 92 def update_stdout(self, text: str | None): 93 if text: 94 self.stdout = text 95 96 def update_stderr(self, text: str | None): 97 if text: 98 self.stderr = text 99 100 101class ReplPane(WindowPane): 102 """Pane for reading Python input.""" 103 104 # pylint: disable=too-many-instance-attributes,too-many-public-methods 105 def __init__( 106 self, 107 application: ConsoleApp, 108 python_repl: PwPtPythonRepl, 109 pane_title: str = 'Python Repl', 110 startup_message: str | None = None, 111 ) -> None: 112 super().__init__(application, pane_title) 113 114 self.executed_code: list[UserCodeExecution] = [] 115 self.application = application 116 117 self.pw_ptpython_repl = python_repl 118 self.pw_ptpython_repl.set_repl_pane(self) 119 120 self.wrap_output_lines = True 121 122 self.startup_message = startup_message if startup_message else '' 123 124 self.output_field = TextArea( 125 text=self.startup_message, 126 focusable=True, 127 focus_on_click=True, 128 scrollbar=True, 129 wrap_lines=Condition(lambda: self.wrap_output_lines), 130 lexer=PygmentsLexer(PythonConsoleLexer), 131 ) 132 133 # Additional keybindings for the text area. 134 key_bindings = KeyBindings() 135 register = self.application.prefs.register_keybinding 136 137 @register('python-repl.copy-output-selection', key_bindings) 138 def _copy_selection(_event: KeyPressEvent) -> None: 139 """Copy selected text.""" 140 self.copy_output_selection() 141 142 self.output_field.control.key_bindings = key_bindings 143 144 # Override output buffer mouse wheel scroll 145 self.output_field.window._scroll_up = ( # type: ignore 146 self.scroll_output_up 147 ) 148 self.output_field.window._scroll_down = ( # type: ignore 149 self.scroll_output_down 150 ) 151 152 self.bottom_toolbar = self._create_input_toolbar() 153 self.results_toolbar = self._create_output_toolbar() 154 155 self.progress_state = TASKS_CONTEXTVAR.get() 156 157 # ReplPane root container 158 self.container = ConditionalContainer( 159 FloatContainer( 160 # Horizontal split of all Repl pane sections. 161 WindowPaneHSplit( 162 self, 163 [ 164 HSplit( 165 [ 166 # 1. Repl Output 167 self.output_field, 168 # 2. Progress bars if any 169 ConditionalContainer( 170 DynamicContainer( 171 self.get_progress_bar_task_container 172 ), 173 filter=Condition( 174 # pylint: disable=line-too-long 175 lambda: not self.progress_state.all_tasks_complete 176 # pylint: enable=line-too-long 177 ), 178 ), 179 # 3. Static separator toolbar. 180 self.results_toolbar, 181 ], 182 # Output area only dimensions 183 height=self.get_output_height, 184 ), 185 HSplit( 186 [ 187 # 3. Repl Input 188 self.pw_ptpython_repl, 189 # 4. Bottom toolbar 190 self.bottom_toolbar, 191 ], 192 # Input area only dimensions 193 height=self.get_input_height, 194 ), 195 ], 196 # Repl pane dimensions 197 height=lambda: self.height, 198 width=lambda: self.width, 199 style=functools.partial(get_pane_style, self), 200 ), 201 floats=[], 202 ), 203 filter=Condition(lambda: self.show_pane), 204 ) 205 206 def toggle_wrap_output_lines(self): 207 """Enable or disable output line wraping/truncation.""" 208 self.wrap_output_lines = not self.wrap_output_lines 209 210 def scroll_output_down(self) -> None: 211 """Scroll the output buffer down on mouse wheel down events.""" 212 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 213 # There is no move cursor more than one line at a time function. 214 self.output_field.control.move_cursor_down() 215 self.output_field.window.vertical_scroll += _REPL_OUTPUT_SCROLL_AMOUNT 216 217 def scroll_output_up(self) -> None: 218 """Scroll the output buffer up on mouse wheel up events.""" 219 for _i in range(_REPL_OUTPUT_SCROLL_AMOUNT): 220 # There is no move cursor more than one line at a time function. 221 self.output_field.control.move_cursor_up() 222 self.output_field.window.vertical_scroll -= _REPL_OUTPUT_SCROLL_AMOUNT 223 224 def focus_output(self): 225 self.application.focus_on_container(self.output_field) 226 227 def focus_input(self): 228 self.application.focus_on_container(self.pw_ptpython_repl) 229 230 def get_progress_bar_task_container(self): 231 bar_container = self.progress_state.get_container() 232 if bar_container: 233 return bar_container 234 return Window() 235 236 def get_output_height(self) -> AnyDimension: 237 # pylint: disable=no-self-use 238 return Dimension(min=1) 239 240 def get_input_height(self) -> AnyDimension: 241 desired_max_height = 10 242 # Check number of line breaks in the input buffer. 243 input_line_count = self.pw_ptpython_repl.line_break_count() 244 if input_line_count > desired_max_height: 245 desired_max_height = input_line_count 246 # Check if it's taller than the available space 247 if desired_max_height > self.current_pane_height: 248 # Leave space for minimum of 249 # 1 line of content in the output 250 # + 1 for output toolbar 251 # + 1 for input toolbar 252 desired_max_height = self.current_pane_height - 3 253 254 if desired_max_height > 1: 255 return Dimension(min=1, max=desired_max_height) 256 # Fall back to at least a height of 1 257 return Dimension(min=1) 258 259 def _create_input_toolbar(self): 260 bottom_toolbar = WindowPaneToolbar( 261 self, 262 focus_action_callable=self.focus_input, 263 focus_check_container=self.pw_ptpython_repl, 264 ) 265 bottom_toolbar.add_button( 266 ToolbarButton( 267 'Ctrl-v', 'Paste', self.paste_system_clipboard_to_input_buffer 268 ) 269 ) 270 bottom_toolbar.add_button( 271 ToolbarButton( 272 'Ctrl-c', 'Copy / Clear', self.copy_or_clear_input_buffer 273 ) 274 ) 275 bottom_toolbar.add_button(ToolbarButton('Enter', 'Run', self.run_code)) 276 bottom_toolbar.add_button(ToolbarButton('F2', 'Settings')) 277 bottom_toolbar.add_button(ToolbarButton('F3', 'History')) 278 return bottom_toolbar 279 280 def _create_output_toolbar(self): 281 results_toolbar = WindowPaneToolbar( 282 self, 283 title='Python Results', 284 focus_action_callable=self.focus_output, 285 focus_check_container=self.output_field, 286 include_resize_handle=False, 287 ) 288 results_toolbar.add_button( 289 ToolbarButton( 290 description='Wrap lines', 291 mouse_handler=self.toggle_wrap_output_lines, 292 is_checkbox=True, 293 checked=lambda: self.wrap_output_lines, 294 ) 295 ) 296 results_toolbar.add_button( 297 ToolbarButton( 298 'Ctrl-Alt-c', 'Copy All Output', self.copy_all_output_text 299 ) 300 ) 301 results_toolbar.add_button( 302 ToolbarButton( 303 'Ctrl-c', 'Copy Selected Text', self.copy_output_selection 304 ) 305 ) 306 307 results_toolbar.add_button( 308 ToolbarButton( 309 description='Clear', mouse_handler=self.clear_output_buffer 310 ) 311 ) 312 results_toolbar.add_button( 313 ToolbarButton('Shift+Arrows / Mouse Drag', 'Select Text') 314 ) 315 316 return results_toolbar 317 318 def copy_output_selection(self): 319 """Copy highlighted output text to the system clipboard.""" 320 clipboard_data = self.output_field.buffer.copy_selection() 321 self.application.set_system_clipboard_data(clipboard_data) 322 323 def copy_input_selection(self): 324 """Copy highlighted input text to the system clipboard.""" 325 clipboard_data = self.pw_ptpython_repl.default_buffer.copy_selection() 326 self.application.set_system_clipboard_data(clipboard_data) 327 328 def copy_all_output_text(self): 329 """Copy all text in the Python output to the system clipboard.""" 330 self.application.set_system_clipboard(self.output_field.buffer.text) 331 332 def copy_all_input_text(self): 333 """Copy all text in the Python input to the system clipboard.""" 334 self.application.set_system_clipboard( 335 self.pw_ptpython_repl.default_buffer.text 336 ) 337 338 # pylint: disable=no-self-use 339 def get_all_key_bindings(self) -> list: 340 """Return all keybinds for this plugin.""" 341 # ptpython native bindings: 342 # return [load_python_bindings(self.pw_ptpython_repl)] 343 344 # Hand-crafted bindings for display in the HelpWindow: 345 return [ 346 { 347 'Execute code': ['Enter', 'Option-Enter', 'Alt-Enter'], 348 'Reverse search history': ['Ctrl-r'], 349 'Erase input buffer.': ['Ctrl-c'], 350 'Show settings.': ['F2'], 351 'Show history.': ['F3'], 352 } 353 ] 354 355 def get_window_menu_options( 356 self, 357 ) -> list[tuple[str, Callable | None]]: 358 return [ 359 ( 360 'Python Input > Paste', 361 self.paste_system_clipboard_to_input_buffer, 362 ), 363 ('Python Input > Copy or Clear', self.copy_or_clear_input_buffer), 364 ('Python Input > Run', self.run_code), 365 # Menu separator 366 ('-', None), 367 ( 368 'Python Output > Toggle Wrap lines', 369 self.toggle_wrap_output_lines, 370 ), 371 ('Python Output > Copy All', self.copy_all_output_text), 372 ('Python Output > Copy Selection', self.copy_output_selection), 373 ('Python Output > Clear', self.clear_output_buffer), 374 ] 375 376 def run_code(self): 377 """Trigger a repl code execution on mouse click.""" 378 self.pw_ptpython_repl.default_buffer.validate_and_handle() 379 380 def ctrl_c(self): 381 """Ctrl-C keybinding behavior.""" 382 # If there is text in the input buffer 383 if self.pw_ptpython_repl.default_buffer.text: 384 self.copy_or_clear_input_buffer() 385 else: 386 self.interrupt_last_code_execution() 387 388 def insert_text_into_input_buffer(self, text: str) -> None: 389 self.pw_ptpython_repl.default_buffer.insert_text(text) 390 391 def paste_system_clipboard_to_input_buffer(self, erase_buffer=False): 392 if erase_buffer: 393 self.clear_input_buffer() 394 395 clip_data = self.application.application.clipboard.get_data() 396 self.pw_ptpython_repl.default_buffer.paste_clipboard_data(clip_data) 397 398 def clear_input_buffer(self): 399 # Erase input buffer. 400 self.pw_ptpython_repl.default_buffer.reset() 401 # Clear any displayed function signatures. 402 self.pw_ptpython_repl.on_reset() 403 404 def clear_output_buffer(self): 405 self.executed_code.clear() 406 self.update_output_buffer() 407 408 def copy_or_clear_input_buffer(self): 409 # Copy selected text if a selection is active. 410 if self.pw_ptpython_repl.default_buffer.selection_state: 411 self.copy_input_selection() 412 return 413 # Otherwise, clear the input buffer 414 self.clear_input_buffer() 415 416 def interrupt_last_code_execution(self): 417 code = self._get_currently_running_code() 418 if code: 419 code.future.cancel() 420 code.output = 'Canceled' 421 self.progress_state.cancel_all_tasks() 422 self.pw_ptpython_repl.clear_last_result() 423 self.update_output_buffer('repl_pane.interrupt_last_code_execution') 424 425 def _get_currently_running_code(self): 426 for code in self.executed_code: 427 if not code.future.done(): 428 return code 429 return None 430 431 def _get_executed_code(self, future): 432 for code in self.executed_code: 433 if code.future == future: 434 return code 435 return None 436 437 def _log_executed_code(self, code, prefix=''): 438 """Log repl command input text along with a prefix string.""" 439 text = self.get_output_buffer_text([code], show_index=False) 440 text = text.strip() 441 for line in text.splitlines(): 442 _LOG.debug('[PYTHON %s] %s', prefix, line.strip()) 443 444 async def periodically_check_stdout( 445 self, user_code: UserCodeExecution, stdout_proxy, stderr_proxy 446 ): 447 while not user_code.future.done(): 448 await asyncio.sleep(0.3) 449 stdout_text_so_far = stdout_proxy.getvalue() 450 stderr_text_so_far = stderr_proxy.getvalue() 451 if stdout_text_so_far: 452 user_code.update_stdout(stdout_text_so_far) 453 if stderr_text_so_far: 454 user_code.update_stderr(stderr_text_so_far) 455 456 # if stdout_text_so_far or stderr_text_so_far: 457 self.update_output_buffer('repl_pane.periodic_check') 458 459 def append_executed_code(self, text, future, temp_stdout, temp_stderr): 460 user_code = UserCodeExecution( 461 input=text, future=future, output=None, stdout=None, stderr=None 462 ) 463 464 background_stdout_check = asyncio.create_task( 465 self.periodically_check_stdout(user_code, temp_stdout, temp_stderr) 466 ) 467 user_code.stdout_check_task = background_stdout_check 468 self.executed_code.append(user_code) 469 self._log_executed_code(user_code, prefix='START') 470 471 def append_result_to_executed_code( 472 self, 473 _input_text, 474 future, 475 result_text, 476 stdout_text='', 477 stderr_text='', 478 exception_text='', 479 result_object=None, 480 ): 481 code = self._get_executed_code(future) 482 if code: 483 code.output = result_text 484 code.stdout = stdout_text 485 code.stderr = stderr_text 486 code.exception_text = exception_text 487 code.result_object = result_object 488 if result_object is not None: 489 code.result_str = self._format_result_object(result_object) 490 491 self._log_executed_code(code, prefix='FINISH') 492 self.update_output_buffer('repl_pane.append_result_to_executed_code') 493 494 def _format_result_object(self, result_object: Any) -> str: 495 """Pretty print format a Python object respecting the window width.""" 496 content_width = ( 497 self.current_pane_width if self.current_pane_width else 80 498 ) 499 pprint_respecting_width = pprint.PrettyPrinter( 500 indent=2, width=content_width 501 ).pformat 502 503 return pprint_respecting_width(result_object) 504 505 def get_output_buffer_text( 506 self, 507 code_items: list[UserCodeExecution] | None = None, 508 show_index: bool = True, 509 ): 510 executed_code = code_items or self.executed_code 511 512 template = self.application.get_template('repl_output.jinja') 513 514 return template.render( 515 code_items=executed_code, 516 show_index=show_index, 517 ) 518 519 def update_output_buffer(self, *unused_args): 520 text = self.get_output_buffer_text() 521 # Add an extra line break so the last cursor position is in column 0 522 # instead of the end of the last line. 523 text += '\n' 524 self.output_field.buffer.set_document( 525 Document(text=text, cursor_position=len(text)) 526 ) 527 528 self.application.redraw_ui() 529 530 def input_or_output_has_focus(self) -> Condition: 531 @Condition 532 def test() -> bool: 533 if ( 534 has_focus(self.output_field)() 535 or has_focus(self.pw_ptpython_repl)() 536 ): 537 return True 538 return False 539 540 return test 541 542 def history_completions(self) -> list[tuple[str, str]]: 543 return [ 544 ( 545 ' '.join([line.lstrip() for line in text.splitlines()]), 546 # Pass original text as the completion result. 547 text, 548 ) 549 for text in list( 550 self.pw_ptpython_repl.history.load_history_strings() 551 ) 552 ] 553