1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""PwPtPythonPane class.""" 15 16from __future__ import annotations 17 18import asyncio 19import functools 20import io 21import logging 22import os 23import sys 24import shlex 25import subprocess 26from typing import Any, Iterable, TYPE_CHECKING 27from unittest.mock import patch 28 29# inclusive-language: disable 30from prompt_toolkit.input import DummyInput as IgnoredInput 31 32# inclusive-language: enable 33 34from prompt_toolkit.output.plain_text import PlainTextOutput 35from prompt_toolkit.buffer import Buffer 36from prompt_toolkit.layout.controls import BufferControl 37from prompt_toolkit.completion import merge_completers 38from prompt_toolkit.filters import ( 39 Condition, 40 has_focus, 41 to_filter, 42) 43from prompt_toolkit.formatted_text import StyleAndTextTuples 44 45from ptpython.completer import ( # type: ignore 46 CompletePrivateAttributes, 47 PythonCompleter, 48) 49from ptpython.printer import OutputPrinter 50import ptpython.repl # type: ignore 51from ptpython.layout import ( # type: ignore 52 CompletionVisualisation, 53 Dimension, 54) 55import pygments.plugin 56 57from pw_console.pigweed_code_style import ( 58 PigweedCodeStyle, 59 PigweedCodeLightStyle, 60 Synthwave84CodeStyle, 61) 62from pw_console.text_formatting import remove_formatting 63 64if TYPE_CHECKING: 65 from pw_console.repl_pane import ReplPane 66 67_LOG = logging.getLogger(__package__) 68_SYSTEM_COMMAND_LOG = logging.getLogger('pw_console_system_command') 69 70_original_find_plugin_styles = pygments.plugin.find_plugin_styles 71 72 73def _wrapped_find_plugin_styles(): 74 """Patch pygment find_plugin_styles to also include Pigweed codes styles 75 76 This allows using these themes without requiring Python entrypoints. 77 """ 78 for style in [ 79 ('pigweed-code', PigweedCodeStyle), 80 ('pigweed-code-light', PigweedCodeLightStyle), 81 ('synthwave84', Synthwave84CodeStyle), 82 ]: 83 yield style 84 yield from _original_find_plugin_styles() 85 86 87class MissingPtpythonBufferControl(Exception): 88 """Exception for a missing ptpython BufferControl object.""" 89 90 91def _user_input_is_a_shell_command(text: str) -> bool: 92 return text.startswith('!') 93 94 95class PwPtPythonRepl( 96 ptpython.repl.PythonRepl 97): # pylint: disable=too-many-instance-attributes 98 """A ptpython repl class with changes to code execution and output related 99 methods.""" 100 101 @patch( 102 'pygments.styles.find_plugin_styles', new=_wrapped_find_plugin_styles 103 ) 104 def __init__( 105 self, 106 *args, 107 # pw_console specific kwargs 108 extra_completers: Iterable | None = None, 109 **ptpython_kwargs, 110 ): 111 completer = None 112 if extra_completers: 113 # Create the default python completer used by 114 # ptpython.repl.PythonRepl 115 python_completer = PythonCompleter( 116 # No self.get_globals yet so this must be a lambda 117 # pylint: disable=unnecessary-lambda 118 lambda: self.get_globals(), 119 lambda: self.get_locals(), 120 lambda: self.enable_dictionary_completion, # type: ignore 121 ) 122 123 all_completers = [python_completer] 124 all_completers.extend(extra_completers) 125 # Merge default Python completer with the new custom one. 126 completer = merge_completers(all_completers) 127 128 super().__init__( 129 *args, 130 create_app=False, 131 # Absolute minimum height of 1 132 _input_buffer_height=Dimension(min=1), 133 _completer=completer, 134 **ptpython_kwargs, 135 ) 136 137 self.enable_mouse_support: bool = True 138 self.enable_history_search: bool = True 139 self.enable_dictionary_completion: bool = True 140 self._set_pt_python_input_buffer_control_focusable() 141 142 # Change some ptpython.repl defaults. 143 self.show_status_bar = False 144 self.show_exit_confirmation = False 145 self.complete_private_attributes = CompletePrivateAttributes.NEVER 146 147 # Function signature that shows args, kwargs, and types under the cursor 148 # of the input window. 149 self.show_signature: bool = True 150 # Docstring of the current completed function that appears at the bottom 151 # of the input window. 152 self.show_docstring: bool = False 153 154 # Turn off the completion menu in ptpython. The CompletionsMenu in 155 # ConsoleApp.root_container will handle this. 156 self.completion_visualisation: CompletionVisualisation = ( 157 CompletionVisualisation.NONE 158 ) 159 160 # Additional state variables. 161 self.repl_pane: ReplPane | None = None 162 self._last_result = None 163 self._last_exception = None 164 165 def _set_pt_python_input_buffer_control_focusable(self) -> None: 166 """Enable focus_on_click for ptpython's input buffer.""" 167 error_message = ( 168 'Unable to find ptpythons BufferControl input object.\n' 169 ' For the last known position see:\n' 170 ' https://github.com/prompt-toolkit/ptpython/' 171 'blob/6072174eace5b645b0cfd5b21b4c237e2539f577/' 172 'ptpython/layout.py#L598\n' 173 '\n' 174 'The installed version of ptpython may not be compatible with' 175 ' pw console; please try re-running environment setup.' 176 ) 177 178 try: 179 # Fetch the Window's BufferControl object. 180 # From ptpython/layout.py: 181 # self.root_container = HSplit([ 182 # VSplit([ 183 # HSplit([ 184 # FloatContainer( 185 # content=HSplit( 186 # [create_python_input_window()] + extra_body 187 # ), ... 188 ptpython_buffer_control = ( 189 self.ptpython_layout.root_container.children[0] # type: ignore 190 .children[0] 191 .children[0] 192 .content.children[0] 193 .content 194 ) 195 # This should be a BufferControl instance 196 if not isinstance(ptpython_buffer_control, BufferControl): 197 raise MissingPtpythonBufferControl(error_message) 198 # Enable focus options 199 ptpython_buffer_control.focusable = to_filter(True) 200 ptpython_buffer_control.focus_on_click = to_filter(True) 201 except IndexError as _error: 202 raise MissingPtpythonBufferControl(error_message) 203 204 def __pt_container__(self): 205 """Return the prompt_toolkit root container for class. 206 207 This allows self to be used wherever prompt_toolkit expects a container 208 object.""" 209 return self.ptpython_layout.root_container 210 211 def set_repl_pane(self, repl_pane): 212 """Update the parent pw_console.ReplPane reference.""" 213 self.repl_pane = repl_pane 214 215 def _save_result(self, formatted_text): 216 """Save the last repl execution result.""" 217 unformatted_result = formatted_text 218 self._last_result = unformatted_result 219 220 def _save_exception(self, formatted_text): 221 """Save the last repl exception.""" 222 unformatted_result = remove_formatting(formatted_text) 223 self._last_exception = unformatted_result 224 225 def clear_last_result(self): 226 """Erase the last repl execution result.""" 227 self._last_result = None 228 self._last_exception = None 229 230 def _format_result_output(self, result: Any) -> str | None: 231 """Return a plaintext repr of any object.""" 232 try: 233 formatted_result = repr(result) 234 except BaseException as e: # pylint: disable=broad-exception-caught 235 self._handle_exception(e) 236 formatted_result = None 237 return formatted_result 238 239 def _show_result(self, result: Any): 240 """Format and save output results. 241 242 This function is called from the _run_user_code() function which is 243 always run from the user code thread, within 244 .run_and_show_expression_async(). 245 """ 246 self._save_result(self._format_result_output(result)) 247 248 def _get_output_printer(self) -> OutputPrinter: 249 return OutputPrinter( 250 output=PlainTextOutput(io.StringIO()), 251 input=IgnoredInput(), 252 style=self._current_style, 253 style_transformation=self.style_transformation, 254 title=self.title, 255 ) 256 257 def _format_exception_output(self, e: BaseException) -> StyleAndTextTuples: 258 output_printer = self._get_output_printer() 259 formatted_result = output_printer._format_exception_output( # pylint: disable=protected-access 260 e, highlight=False 261 ) 262 return list(formatted_result) 263 264 def _handle_exception(self, e: BaseException) -> None: 265 """Format and save output results. 266 267 This function is called from the _run_user_code() function which is 268 always run from the user code thread, within 269 .run_and_show_expression_async(). 270 """ 271 self._save_exception(self._format_exception_output(e)) 272 273 async def run_and_show_expression_async(self, text: str) -> Any: 274 """Run user code and handle the result. 275 276 This function is similar to ptpython version v3.0.23. 277 """ 278 loop = asyncio.get_event_loop() 279 280 try: 281 result = await self.eval_async(text) 282 except KeyboardInterrupt: 283 raise 284 except SystemExit: 285 return 286 except BaseException as e: # pylint: disable=broad-exception-caught 287 self._handle_exception(e) 288 else: 289 # Print. 290 if result is not None: 291 await loop.run_in_executor( 292 None, lambda: self._show_result(result) 293 ) 294 295 self.current_statement_index += 1 296 self.signatures = [] 297 return result 298 299 def user_code_complete_callback(self, input_text, future): 300 """Callback to run after user repl code is finished.""" 301 # If there was an exception it will be saved in self._last_result 302 result_text = self._last_result 303 result_object = None 304 exception_text = self._last_exception 305 306 # _last_results consumed, erase for the next run. 307 self.clear_last_result() 308 309 stdout_contents = None 310 stderr_contents = None 311 if future.result(): 312 future_result = future.result() 313 stdout_contents = future_result['stdout'] 314 stderr_contents = future_result['stderr'] 315 result_object = future_result['result'] 316 317 if result_object is not None: 318 # Use ptpython formatted results: 319 result_text = self._format_result_output(result_object) 320 321 # Job is finished, append the last result. 322 self.repl_pane.append_result_to_executed_code( 323 input_text, 324 future, 325 result_text, 326 stdout_contents, 327 stderr_contents, 328 exception_text=exception_text, 329 result_object=result_object, 330 ) 331 332 # Rebuild output buffer. 333 self.repl_pane.update_output_buffer( 334 'pw_ptpython_repl.user_code_complete_callback' 335 ) 336 337 # Trigger a prompt_toolkit application redraw. 338 self.repl_pane.application.application.invalidate() 339 340 async def _run_system_command( # pylint: disable=no-self-use 341 self, text, stdout_proxy, _stdin_proxy 342 ) -> int: 343 """Run a shell command and print results to the repl.""" 344 command = shlex.split(text) 345 returncode = None 346 env = os.environ.copy() 347 # Force colors in Pigweed subcommands and some terminal apps. 348 env['PW_USE_COLOR'] = '1' 349 env['CLICOLOR_FORCE'] = '1' 350 351 def _handle_output(output): 352 # Force tab characters to 8 spaces to prevent \t from showing in 353 # prompt_toolkit. 354 output = output.replace('\t', ' ') 355 # Strip some ANSI sequences that don't render. 356 output = output.replace('\x1b(B\x1b[m', '') 357 output = output.replace('\x1b[1m', '') 358 stdout_proxy.write(output) 359 _SYSTEM_COMMAND_LOG.info(output.rstrip()) 360 361 with subprocess.Popen( 362 command, 363 env=env, 364 stdout=subprocess.PIPE, 365 stderr=subprocess.STDOUT, 366 errors='replace', 367 ) as proc: 368 # Print the command 369 _SYSTEM_COMMAND_LOG.info('') 370 _SYSTEM_COMMAND_LOG.info('$ %s', text) 371 while returncode is None: 372 if not proc.stdout: 373 continue 374 375 # Check for one line and update. 376 output = proc.stdout.readline() 377 _handle_output(output) 378 379 returncode = proc.poll() 380 381 # Print any remaining lines. 382 for output in proc.stdout.readlines(): 383 _handle_output(output) 384 385 return returncode 386 387 async def _run_user_code(self, text, stdout_proxy, stdin_proxy): 388 """Run user code and capture stdout+err. 389 390 This fuction should be run in a separate thread from the main 391 prompt_toolkit application.""" 392 # NOTE: This function runs in a separate thread using the asyncio event 393 # loop defined by self.repl_pane.application.user_code_loop. Patching 394 # stdout here will not effect the stdout used by prompt_toolkit and the 395 # main user interface. 396 397 # Patch stdout and stderr to capture repl print() statements. 398 original_stdout = sys.stdout 399 original_stderr = sys.stderr 400 401 sys.stdout = stdout_proxy 402 sys.stderr = stdin_proxy 403 404 # Run user repl code 405 try: 406 if _user_input_is_a_shell_command(text): 407 result = await self._run_system_command( 408 text[1:], stdout_proxy, stdin_proxy 409 ) 410 else: 411 result = await self.run_and_show_expression_async(text) 412 finally: 413 # Always restore original stdout and stderr 414 sys.stdout = original_stdout 415 sys.stderr = original_stderr 416 417 # Save the captured output 418 stdout_contents = stdout_proxy.getvalue() 419 stderr_contents = stdin_proxy.getvalue() 420 421 return { 422 'stdout': stdout_contents, 423 'stderr': stderr_contents, 424 'result': result, 425 } 426 427 def _accept_handler(self, buff: Buffer) -> bool: 428 """Function executed when pressing enter in the ptpython.repl.PythonRepl 429 input buffer.""" 430 # Do nothing if no text is entered. 431 if len(buff.text) == 0: 432 return False 433 if self.repl_pane is None: 434 return False 435 436 repl_input_text = buff.text 437 # Exit if quit or exit 438 if repl_input_text.strip() in ['quit', 'quit()', 'exit', 'exit()']: 439 self.repl_pane.application.application.exit() # type: ignore 440 441 # Create stdout and stderr proxies 442 temp_stdout = io.StringIO() 443 temp_stderr = io.StringIO() 444 445 # The help() command with no args uses it's own interactive prompt which 446 # will not work if prompt_toolkit is running. 447 if repl_input_text.strip() in ['help()']: 448 # Run nothing 449 repl_input_text = '' 450 # Override stdout 451 temp_stdout.write( 452 'Error: Interactive help() is not compatible with this repl.' 453 ) 454 455 # Pop open the system command log pane for shell commands. 456 if _user_input_is_a_shell_command(repl_input_text): 457 self.repl_pane.application.setup_command_runner_log_pane() 458 459 # Execute the repl code in the the separate user_code thread loop. 460 future = asyncio.run_coroutine_threadsafe( 461 # This function will be executed in a separate thread. 462 self._run_user_code(repl_input_text, temp_stdout, temp_stderr), 463 # Using this asyncio event loop. 464 self.repl_pane.application.user_code_loop, 465 ) # type: ignore 466 467 # Save the input text and future object. 468 self.repl_pane.append_executed_code( 469 repl_input_text, future, temp_stdout, temp_stderr 470 ) # type: ignore 471 472 # Run user_code_complete_callback() when done. 473 done_callback = functools.partial( 474 self.user_code_complete_callback, repl_input_text 475 ) 476 future.add_done_callback(done_callback) 477 478 # Rebuild the parent ReplPane output buffer. 479 self.repl_pane.update_output_buffer('pw_ptpython_repl._accept_handler') 480 481 # TODO(tonymd): Return True if exception is found? 482 # Don't keep input for now. Return True to keep input text. 483 return False 484 485 def line_break_count(self) -> int: 486 return self.default_buffer.text.count('\n') 487 488 def input_empty_if_in_focus_condition(self) -> Condition: 489 @Condition 490 def test() -> bool: 491 if has_focus(self)() and len(self.default_buffer.text) == 0: 492 return True 493 return not has_focus(self)() 494 495 return test 496