1#!/usr/bin/env vpython3 2# Copyright 2024 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import contextlib 7import datetime 8import json 9import pathlib 10import unittest 11import os 12import signal 13import socket 14import subprocess 15import sys 16import time 17import uuid 18 19import fast_local_dev_server as server 20 21sys.path.append(os.path.join(os.path.dirname(__file__), 'gyp')) 22from util import server_utils 23 24 25class RegexTest(unittest.TestCase): 26 27 def testBuildIdRegex(self): 28 self.assertRegex(server.FIRST_LOG_LINE.format(build_id='abc'), 29 server.BUILD_ID_RE) 30 31 32def sendMessage(message_dict): 33 with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: 34 sock.settimeout(1) 35 sock.connect(server_utils.SOCKET_ADDRESS) 36 server_utils.SendMessage(sock, json.dumps(message_dict).encode('utf-8')) 37 38 39def pollServer(): 40 try: 41 sendMessage({'message_type': server_utils.POLL_HEARTBEAT}) 42 return True 43 except ConnectionRefusedError: 44 return False 45 46 47def callServer(args, stdout=subprocess.DEVNULL): 48 return subprocess.check_call([server_utils.SERVER_SCRIPT.absolute()] + args, 49 cwd=pathlib.Path(__file__).parent, 50 stdout=stdout) 51 52 53class TasksTest(unittest.TestCase): 54 55 def setUp(self): 56 self._TTY_FILE = '/tmp/fast_local_dev_server_test_tty' 57 if pollServer(): 58 # TODO(mheikal): Support overriding the standard named pipe for 59 # communicating with the server so that we can run an instance just for 60 # this test even if a real one is running. 61 self.skipTest("Cannot run test when server already running.") 62 self._process = subprocess.Popen( 63 [server_utils.SERVER_SCRIPT.absolute(), '--exit-on-idle', '--quiet'], 64 start_new_session=True, 65 cwd=pathlib.Path(__file__).parent, 66 stdout=subprocess.PIPE, 67 stderr=subprocess.STDOUT, 68 text=True) 69 # pylint: disable=unused-variable 70 for attempt in range(5): 71 if pollServer(): 72 break 73 time.sleep(0.05) 74 75 def tearDown(self): 76 if os.path.exists(self._TTY_FILE): 77 os.unlink(self._TTY_FILE) 78 self._process.terminate() 79 stdout, _ = self._process.communicate() 80 if stdout != '': 81 self.fail(f'build server should be silent but it output:\n{stdout}') 82 83 def sendTask(self, cmd, stamp_path=None): 84 if stamp_path: 85 _stamp_file = pathlib.Path(stamp_path) 86 else: 87 _stamp_file = pathlib.Path('/tmp/.test.stamp') 88 _stamp_file.touch() 89 90 sendMessage({ 91 'name': f'test task {uuid.uuid4()}', 92 'message_type': server_utils.ADD_TASK, 93 'cmd': cmd, 94 # So that logfiles do not clutter cwd. 95 'cwd': '/tmp/', 96 'tty': self._TTY_FILE, 97 'build_id': self.id(), 98 'experimental': True, 99 'stamp_file': _stamp_file.name, 100 }) 101 102 def getTtyContents(self): 103 if os.path.exists(self._TTY_FILE): 104 with open(self._TTY_FILE, 'rt') as tty: 105 return tty.read() 106 return '' 107 108 def getBuildInfo(self): 109 build_info = server.query_build_info(self.id()) 110 pending_tasks = build_info['pending_tasks'] 111 completed_tasks = build_info['completed_tasks'] 112 return pending_tasks, completed_tasks 113 114 def waitForTasksDone(self, timeout_seconds=3): 115 timeout_duration = datetime.timedelta(seconds=timeout_seconds) 116 start_time = datetime.datetime.now() 117 while True: 118 pending_tasks, completed_tasks = self.getBuildInfo() 119 120 if completed_tasks > 0 and pending_tasks == 0: 121 return 122 123 current_time = datetime.datetime.now() 124 duration = current_time - start_time 125 if duration > timeout_duration: 126 raise TimeoutError() 127 time.sleep(0.1) 128 129 def testRunsQuietTask(self): 130 self.sendTask(['true']) 131 self.waitForTasksDone() 132 self.assertEqual(self.getTtyContents(), '') 133 134 def testRunsNoisyTask(self): 135 self.sendTask(['echo', 'some_output']) 136 self.waitForTasksDone() 137 tty_contents = self.getTtyContents() 138 self.assertIn('some_output', tty_contents) 139 140 def testStampFileDeletedOnFailedTask(self): 141 stamp_file = pathlib.Path('/tmp/.failed_task.stamp') 142 self.sendTask(['echo', 'some_output'], stamp_path=stamp_file) 143 self.waitForTasksDone() 144 self.assertFalse(stamp_file.exists()) 145 146 def testStampFileNotDeletedOnSuccess(self): 147 stamp_file = pathlib.Path('/tmp/.successful_task.stamp') 148 self.sendTask(['true'], stamp_path=stamp_file) 149 self.waitForTasksDone() 150 self.assertTrue(stamp_file.exists()) 151 152 def testRegisterBuilderMessage(self): 153 sendMessage({ 154 'message_type': server_utils.REGISTER_BUILDER, 155 'build_id': self.id(), 156 'builder_pid': os.getpid(), 157 }) 158 pollServer() 159 self.assertEqual(self.getTtyContents(), '') 160 161 def testRegisterBuilderServerCall(self): 162 self.assertEqual( 163 callServer( 164 ['--register-build', 165 self.id(), '--builder-pid', 166 str(os.getpid())]), 0) 167 self.assertEqual(self.getTtyContents(), '') 168 169 def testWaitForBuildServerCall(self): 170 self.assertEqual(callServer(['--wait-for-build', self.id()]), 0) 171 self.assertEqual(self.getTtyContents(), '') 172 173 def testCancelBuildServerCall(self): 174 self.assertEqual(callServer(['--cancel-build', self.id()]), 0) 175 self.assertEqual(self.getTtyContents(), '') 176 177 def testKeyboardInterrupt(self): 178 os.kill(self._process.pid, signal.SIGINT) 179 self._process.wait(timeout=1) 180 181 182if __name__ == '__main__': 183 # Suppress logging messages. 184 unittest.main(buffer=True) 185