1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tests for pw_console.console_app""" 15 16import asyncio 17import builtins 18import inspect 19import io 20import sys 21import threading 22import unittest 23from unittest.mock import MagicMock, call 24 25from prompt_toolkit.application import create_app_session 26from prompt_toolkit.output import ( 27 ColorDepth, 28 # inclusive-language: ignore 29 DummyOutput as FakeOutput, 30) 31 32from pw_console.console_app import ConsoleApp 33from pw_console.console_prefs import ConsolePrefs 34from pw_console.repl_pane import ReplPane 35from pw_console.pw_ptpython_repl import PwPtPythonRepl 36 37_PYTHON_3_8 = sys.version_info >= ( 38 3, 39 8, 40) 41 42if _PYTHON_3_8: 43 from unittest import IsolatedAsyncioTestCase # type: ignore # pylint: disable=no-name-in-module 44 45 class TestReplPane(IsolatedAsyncioTestCase): 46 """Tests for ReplPane.""" 47 48 maxDiff = None 49 50 def test_repl_code_return_values(self) -> None: 51 """Test stdout, return values, and exceptions can be returned from 52 running user repl code.""" 53 app = MagicMock() 54 55 global_vars = { 56 '__name__': '__main__', 57 '__package__': None, 58 '__doc__': None, 59 '__builtins__': builtins, 60 } 61 62 pw_ptpython_repl = PwPtPythonRepl( 63 get_globals=lambda: global_vars, 64 get_locals=lambda: global_vars, 65 color_depth=ColorDepth.DEPTH_8_BIT, 66 ) 67 repl_pane = ReplPane( 68 application=app, 69 python_repl=pw_ptpython_repl, 70 ) 71 # Check pw_ptpython_repl has a reference to the parent repl_pane. 72 self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane) 73 74 # Define a function, should return nothing. 75 code = inspect.cleandoc( 76 """ 77 def run(): 78 print('The answer is ', end='') 79 return 1+1+4+16+20 80 """ 81 ) 82 temp_stdout = io.StringIO() 83 temp_stderr = io.StringIO() 84 # pylint: disable=protected-access 85 result = asyncio.run( 86 pw_ptpython_repl._run_user_code(code, temp_stdout, temp_stderr) 87 ) 88 self.assertEqual( 89 result, {'stdout': '', 'stderr': '', 'result': None} 90 ) 91 92 temp_stdout = io.StringIO() 93 temp_stderr = io.StringIO() 94 # Check stdout and return value 95 result = asyncio.run( 96 pw_ptpython_repl._run_user_code( 97 'run()', temp_stdout, temp_stderr 98 ) 99 ) 100 self.assertEqual( 101 result, {'stdout': 'The answer is ', 'stderr': '', 'result': 42} 102 ) 103 104 temp_stdout = io.StringIO() 105 temp_stderr = io.StringIO() 106 # Check for repl exception 107 result = asyncio.run( 108 pw_ptpython_repl._run_user_code( 109 'return "blah"', temp_stdout, temp_stderr 110 ) 111 ) 112 self.assertIn( 113 "SyntaxError: 'return' outside function", 114 pw_ptpython_repl._last_exception, # type: ignore 115 ) 116 117 async def test_user_thread(self) -> None: 118 """Test user code thread.""" 119 120 with create_app_session(output=FakeOutput()): 121 # Setup Mocks 122 prefs = ConsolePrefs( 123 project_file=False, project_user_file=False, user_file=False 124 ) 125 prefs.set_code_theme('default') 126 app = ConsoleApp( 127 color_depth=ColorDepth.DEPTH_8_BIT, prefs=prefs 128 ) 129 130 app.start_user_code_thread() 131 132 pw_ptpython_repl = app.pw_ptpython_repl 133 repl_pane = app.repl_pane 134 135 # Mock update_output_buffer to track number of update calls 136 repl_pane.update_output_buffer = MagicMock( # type: ignore 137 wraps=repl_pane.update_output_buffer 138 ) 139 140 # Mock complete callback 141 pw_ptpython_repl.user_code_complete_callback = ( # type: ignore 142 MagicMock( 143 wraps=pw_ptpython_repl.user_code_complete_callback 144 ) 145 ) 146 147 # Repl done flag for tests 148 user_code_done = threading.Event() 149 150 # Run some code 151 code = inspect.cleandoc( 152 """ 153 import time 154 def run(): 155 for i in range(2): 156 time.sleep(0.5) 157 print(i) 158 print('The answer is ', end='') 159 return 1+1+4+16+20 160 """ 161 ) 162 input_buffer = MagicMock(text=code) 163 # pylint: disable=protected-access 164 pw_ptpython_repl._accept_handler(input_buffer) 165 # pylint: enable=protected-access 166 167 # Get last executed code object. 168 user_code1 = repl_pane.executed_code[-1] 169 # Wait for repl code to finish. 170 user_code1.future.add_done_callback( 171 lambda future: user_code_done.set() 172 ) 173 # Wait for stdout monitoring to complete. 174 if user_code1.stdout_check_task: 175 await user_code1.stdout_check_task 176 # Wait for test done callback. 177 user_code_done.wait() 178 179 # Check user_code1 results 180 # NOTE: Avoid using assert_has_calls. Thread timing can make the 181 # test flaky. 182 expected_calls = [ 183 # Initial exec start 184 call('pw_ptpython_repl._accept_handler'), 185 # Code finishes 186 call('repl_pane.append_result_to_executed_code'), 187 # Complete callback 188 call('pw_ptpython_repl.user_code_complete_callback'), 189 ] 190 for expected_call in expected_calls: 191 self.assertIn( 192 expected_call, repl_pane.update_output_buffer.mock_calls 193 ) 194 195 user_code_complete_callback = ( 196 pw_ptpython_repl.user_code_complete_callback 197 ) 198 user_code_complete_callback.assert_called_once() 199 200 self.assertIsNotNone(user_code1) 201 self.assertTrue(user_code1.future.done()) 202 self.assertEqual(user_code1.input, code) 203 self.assertEqual(user_code1.output, None) 204 # stdout / stderr may be '' or None 205 self.assertFalse(user_code1.stdout) 206 self.assertFalse(user_code1.stderr) 207 208 # Reset mocks 209 user_code_done.clear() 210 pw_ptpython_repl.user_code_complete_callback.reset_mock() 211 repl_pane.update_output_buffer.reset_mock() 212 213 # Run some code 214 input_buffer = MagicMock(text='run()') 215 # pylint: disable=protected-access 216 pw_ptpython_repl._accept_handler(input_buffer) 217 # pylint: enable=protected-access 218 219 # Get last executed code object. 220 user_code2 = repl_pane.executed_code[-1] 221 # Wait for repl code to finish. 222 user_code2.future.add_done_callback( 223 lambda future: user_code_done.set() 224 ) 225 # Wait for stdout monitoring to complete. 226 if user_code2.stdout_check_task: 227 await user_code2.stdout_check_task 228 # Wait for test done callback. 229 user_code_done.wait() 230 231 # Check user_code2 results 232 # NOTE: Avoid using assert_has_calls. Thread timing can make the 233 # test flaky. 234 expected_calls = [ 235 # Initial exec start 236 call('pw_ptpython_repl._accept_handler'), 237 # Periodic checks, should be a total of 4: 238 # Code should take 1.0 second to run. 239 # Periodic checks every 0.3 seconds 240 # 1.0 / 0.3 = 3.33 (4) checks 241 call('repl_pane.periodic_check'), 242 call('repl_pane.periodic_check'), 243 call('repl_pane.periodic_check'), 244 # Code finishes 245 call('repl_pane.append_result_to_executed_code'), 246 # Complete callback 247 call('pw_ptpython_repl.user_code_complete_callback'), 248 # Final periodic check 249 call('repl_pane.periodic_check'), 250 ] 251 for expected_call in expected_calls: 252 self.assertIn( 253 expected_call, repl_pane.update_output_buffer.mock_calls 254 ) 255 256 # pylint: disable=line-too-long 257 pw_ptpython_repl.user_code_complete_callback.assert_called_once() 258 # pylint: enable=line-too-long 259 self.assertIsNotNone(user_code2) 260 self.assertTrue(user_code2.future.done()) 261 self.assertEqual(user_code2.input, 'run()') 262 self.assertEqual(user_code2.output, '42') 263 self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ') 264 self.assertFalse(user_code2.stderr) 265 266 # Reset mocks 267 user_code_done.clear() 268 pw_ptpython_repl.user_code_complete_callback.reset_mock() 269 repl_pane.update_output_buffer.reset_mock() 270 271 272if __name__ == '__main__': 273 unittest.main() 274