xref: /aosp_15_r20/external/pigweed/pw_console/py/repl_pane_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tests for pw_console.console_app"""
15
16import asyncio
17import builtins
18import inspect
19import io
20import sys
21import threading
22import unittest
23from unittest.mock import MagicMock, call
24
25from prompt_toolkit.application import create_app_session
26from prompt_toolkit.output import (
27    ColorDepth,
28    # inclusive-language: ignore
29    DummyOutput as FakeOutput,
30)
31
32from pw_console.console_app import ConsoleApp
33from pw_console.console_prefs import ConsolePrefs
34from pw_console.repl_pane import ReplPane
35from pw_console.pw_ptpython_repl import PwPtPythonRepl
36
37_PYTHON_3_8 = sys.version_info >= (
38    3,
39    8,
40)
41
42if _PYTHON_3_8:
43    from unittest import IsolatedAsyncioTestCase  # type: ignore # pylint: disable=no-name-in-module
44
45    class TestReplPane(IsolatedAsyncioTestCase):
46        """Tests for ReplPane."""
47
48        maxDiff = None
49
50        def test_repl_code_return_values(self) -> None:
51            """Test stdout, return values, and exceptions can be returned from
52            running user repl code."""
53            app = MagicMock()
54
55            global_vars = {
56                '__name__': '__main__',
57                '__package__': None,
58                '__doc__': None,
59                '__builtins__': builtins,
60            }
61
62            pw_ptpython_repl = PwPtPythonRepl(
63                get_globals=lambda: global_vars,
64                get_locals=lambda: global_vars,
65                color_depth=ColorDepth.DEPTH_8_BIT,
66            )
67            repl_pane = ReplPane(
68                application=app,
69                python_repl=pw_ptpython_repl,
70            )
71            # Check pw_ptpython_repl has a reference to the parent repl_pane.
72            self.assertEqual(repl_pane, pw_ptpython_repl.repl_pane)
73
74            # Define a function, should return nothing.
75            code = inspect.cleandoc(
76                """
77                def run():
78                    print('The answer is ', end='')
79                    return 1+1+4+16+20
80            """
81            )
82            temp_stdout = io.StringIO()
83            temp_stderr = io.StringIO()
84            # pylint: disable=protected-access
85            result = asyncio.run(
86                pw_ptpython_repl._run_user_code(code, temp_stdout, temp_stderr)
87            )
88            self.assertEqual(
89                result, {'stdout': '', 'stderr': '', 'result': None}
90            )
91
92            temp_stdout = io.StringIO()
93            temp_stderr = io.StringIO()
94            # Check stdout and return value
95            result = asyncio.run(
96                pw_ptpython_repl._run_user_code(
97                    'run()', temp_stdout, temp_stderr
98                )
99            )
100            self.assertEqual(
101                result, {'stdout': 'The answer is ', 'stderr': '', 'result': 42}
102            )
103
104            temp_stdout = io.StringIO()
105            temp_stderr = io.StringIO()
106            # Check for repl exception
107            result = asyncio.run(
108                pw_ptpython_repl._run_user_code(
109                    'return "blah"', temp_stdout, temp_stderr
110                )
111            )
112            self.assertIn(
113                "SyntaxError: 'return' outside function",
114                pw_ptpython_repl._last_exception,  # type: ignore
115            )
116
117        async def test_user_thread(self) -> None:
118            """Test user code thread."""
119
120            with create_app_session(output=FakeOutput()):
121                # Setup Mocks
122                prefs = ConsolePrefs(
123                    project_file=False, project_user_file=False, user_file=False
124                )
125                prefs.set_code_theme('default')
126                app = ConsoleApp(
127                    color_depth=ColorDepth.DEPTH_8_BIT, prefs=prefs
128                )
129
130                app.start_user_code_thread()
131
132                pw_ptpython_repl = app.pw_ptpython_repl
133                repl_pane = app.repl_pane
134
135                # Mock update_output_buffer to track number of update calls
136                repl_pane.update_output_buffer = MagicMock(  # type: ignore
137                    wraps=repl_pane.update_output_buffer
138                )
139
140                # Mock complete callback
141                pw_ptpython_repl.user_code_complete_callback = (  # type: ignore
142                    MagicMock(
143                        wraps=pw_ptpython_repl.user_code_complete_callback
144                    )
145                )
146
147                # Repl done flag for tests
148                user_code_done = threading.Event()
149
150                # Run some code
151                code = inspect.cleandoc(
152                    """
153                    import time
154                    def run():
155                        for i in range(2):
156                            time.sleep(0.5)
157                            print(i)
158                        print('The answer is ', end='')
159                        return 1+1+4+16+20
160                """
161                )
162                input_buffer = MagicMock(text=code)
163                # pylint: disable=protected-access
164                pw_ptpython_repl._accept_handler(input_buffer)
165                # pylint: enable=protected-access
166
167                # Get last executed code object.
168                user_code1 = repl_pane.executed_code[-1]
169                # Wait for repl code to finish.
170                user_code1.future.add_done_callback(
171                    lambda future: user_code_done.set()
172                )
173                # Wait for stdout monitoring to complete.
174                if user_code1.stdout_check_task:
175                    await user_code1.stdout_check_task
176                # Wait for test done callback.
177                user_code_done.wait()
178
179                # Check user_code1 results
180                # NOTE: Avoid using assert_has_calls. Thread timing can make the
181                # test flaky.
182                expected_calls = [
183                    # Initial exec start
184                    call('pw_ptpython_repl._accept_handler'),
185                    # Code finishes
186                    call('repl_pane.append_result_to_executed_code'),
187                    # Complete callback
188                    call('pw_ptpython_repl.user_code_complete_callback'),
189                ]
190                for expected_call in expected_calls:
191                    self.assertIn(
192                        expected_call, repl_pane.update_output_buffer.mock_calls
193                    )
194
195                user_code_complete_callback = (
196                    pw_ptpython_repl.user_code_complete_callback
197                )
198                user_code_complete_callback.assert_called_once()
199
200                self.assertIsNotNone(user_code1)
201                self.assertTrue(user_code1.future.done())
202                self.assertEqual(user_code1.input, code)
203                self.assertEqual(user_code1.output, None)
204                # stdout / stderr may be '' or None
205                self.assertFalse(user_code1.stdout)
206                self.assertFalse(user_code1.stderr)
207
208                # Reset mocks
209                user_code_done.clear()
210                pw_ptpython_repl.user_code_complete_callback.reset_mock()
211                repl_pane.update_output_buffer.reset_mock()
212
213                # Run some code
214                input_buffer = MagicMock(text='run()')
215                # pylint: disable=protected-access
216                pw_ptpython_repl._accept_handler(input_buffer)
217                # pylint: enable=protected-access
218
219                # Get last executed code object.
220                user_code2 = repl_pane.executed_code[-1]
221                # Wait for repl code to finish.
222                user_code2.future.add_done_callback(
223                    lambda future: user_code_done.set()
224                )
225                # Wait for stdout monitoring to complete.
226                if user_code2.stdout_check_task:
227                    await user_code2.stdout_check_task
228                # Wait for test done callback.
229                user_code_done.wait()
230
231                # Check user_code2 results
232                # NOTE: Avoid using assert_has_calls. Thread timing can make the
233                # test flaky.
234                expected_calls = [
235                    # Initial exec start
236                    call('pw_ptpython_repl._accept_handler'),
237                    # Periodic checks, should be a total of 4:
238                    #   Code should take 1.0 second to run.
239                    #   Periodic checks every 0.3 seconds
240                    #   1.0 / 0.3 = 3.33 (4) checks
241                    call('repl_pane.periodic_check'),
242                    call('repl_pane.periodic_check'),
243                    call('repl_pane.periodic_check'),
244                    # Code finishes
245                    call('repl_pane.append_result_to_executed_code'),
246                    # Complete callback
247                    call('pw_ptpython_repl.user_code_complete_callback'),
248                    # Final periodic check
249                    call('repl_pane.periodic_check'),
250                ]
251                for expected_call in expected_calls:
252                    self.assertIn(
253                        expected_call, repl_pane.update_output_buffer.mock_calls
254                    )
255
256                # pylint: disable=line-too-long
257                pw_ptpython_repl.user_code_complete_callback.assert_called_once()
258                # pylint: enable=line-too-long
259                self.assertIsNotNone(user_code2)
260                self.assertTrue(user_code2.future.done())
261                self.assertEqual(user_code2.input, 'run()')
262                self.assertEqual(user_code2.output, '42')
263                self.assertEqual(user_code2.stdout, '0\n1\nThe answer is ')
264                self.assertFalse(user_code2.stderr)
265
266                # Reset mocks
267                user_code_done.clear()
268                pw_ptpython_repl.user_code_complete_callback.reset_mock()
269                repl_pane.update_output_buffer.reset_mock()
270
271
272if __name__ == '__main__':
273    unittest.main()
274