1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16import pathlib 17import unittest 18from unittest.mock import patch 19from atest import constants 20from atest.logstorage import log_uploader 21from pyfakefs import fake_filesystem_unittest 22 23 24class LogUploaderModuleTest(unittest.TestCase): 25 26 def setUp(self): 27 super().setUp() 28 29 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 30 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 31 @patch.dict( 32 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '0'} 33 ) 34 def test_is_uploading_logs_flag_value_unrecognized_returns_false(self): 35 gcert_checker = lambda: True 36 37 result = log_uploader.is_uploading_logs(gcert_checker) 38 39 self.assertFalse(result) 40 41 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 42 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 43 @patch.dict( 44 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '1'} 45 ) 46 def test_is_uploading_logs_flag_value_1_returns_true(self): 47 gcert_checker = lambda: True 48 49 result = log_uploader.is_uploading_logs(gcert_checker) 50 51 self.assertTrue(result) 52 53 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 54 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 55 @patch.dict( 56 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'TRUE'} 57 ) 58 def test_is_uploading_logs_flag_value_capitalized_true_returns_true(self): 59 gcert_checker = lambda: True 60 61 result = log_uploader.is_uploading_logs(gcert_checker) 62 63 self.assertTrue(result) 64 65 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 66 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 67 @patch.dict( 68 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'} 69 ) 70 def test_is_uploading_logs_flag_value_true_process_returns_true(self): 71 gcert_checker = lambda: True 72 73 result = log_uploader.is_uploading_logs(gcert_checker) 74 75 self.assertTrue(result) 76 77 @patch('atest.constants.CREDENTIAL_FILE_NAME', None) 78 @patch('atest.constants.TOKEN_FILE_PATH', None) 79 @patch.dict( 80 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'} 81 ) 82 def test_is_uploading_logs_no_creds_process_returns_false(self): 83 gcert_checker = lambda: True 84 85 result = log_uploader.is_uploading_logs(gcert_checker) 86 87 self.assertFalse(result) 88 89 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 90 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 91 @patch.dict( 92 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'false'} 93 ) 94 def test_is_uploading_logs_not_requested_process_returns_false(self): 95 gcert_checker = lambda: True 96 97 result = log_uploader.is_uploading_logs(gcert_checker) 98 99 self.assertFalse(result) 100 101 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 102 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 103 @patch.dict( 104 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''} 105 ) 106 def test_is_uploading_logs_returns_true_by_default(self): 107 gcert_checker = lambda: True 108 109 result = log_uploader.is_uploading_logs(gcert_checker) 110 111 self.assertTrue(result) 112 113 @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') 114 @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') 115 @patch.dict( 116 os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''} 117 ) 118 def test_is_uploading_logs_gcert_not_available_returns_false(self): 119 gcert_checker = lambda: False 120 121 result = log_uploader.is_uploading_logs(gcert_checker) 122 123 self.assertFalse(result) 124 125 126class LogUploaderTest(fake_filesystem_unittest.TestCase): 127 128 def setUp(self): 129 super().setUp() 130 self.setUpPyfakefs() 131 132 def test_upload_single_file_contains_name(self): 133 file_path = pathlib.Path('/dir/some_name.log') 134 self.fs.create_file(file_path) 135 fake_client = self._FakeUploadingClient() 136 137 log_uploader._LogUploadSession( 138 'any_run_id', fake_client 139 ).upload_single_file(file_path) 140 141 self.assertEqual( 142 fake_client.get_received_upload_artifact_arguments()[0]['resource_id'], 143 file_path.name, 144 ) 145 146 def test_upload_single_file_metadata_content_type(self): 147 file_path = pathlib.Path('/dir/some_name.log') 148 self.fs.create_file(file_path) 149 fake_client = self._FakeUploadingClient() 150 151 log_uploader._LogUploadSession( 152 'any_run_id', fake_client 153 ).upload_single_file(file_path) 154 155 self.assertEqual( 156 fake_client.get_received_upload_artifact_arguments()[0]['metadata'][ 157 'artifactType' 158 ], 159 'HOST_LOG', 160 ) 161 162 def test_create_artifact_metadata_use_file_name(self): 163 file_path = pathlib.Path('/dir/some_name.log') 164 self.fs.create_file(file_path) 165 fake_client = self._FakeUploadingClient() 166 167 log_uploader._LogUploadSession( 168 'any_run_id', fake_client 169 ).upload_single_file(file_path) 170 171 self.assertEqual( 172 fake_client.get_received_upload_artifact_arguments()[0]['metadata'][ 173 'name' 174 ], 175 file_path.name, 176 ) 177 178 def test_upload_single_file_duplicate_names_unique_resource_id(self): 179 dir_path = pathlib.Path('/test_dir') 180 file_name = 'some_name.log' 181 file_path1 = dir_path.joinpath(file_name) 182 file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name) 183 self.fs.create_file(file_path1) 184 self.fs.create_file(file_path2) 185 fake_client = self._FakeUploadingClient() 186 suj = log_uploader._LogUploadSession('any_run_id', fake_client) 187 188 suj.upload_single_file(file_path1) 189 suj.upload_single_file(file_path2) 190 191 self.assertNotEqual( 192 fake_client.get_received_upload_artifact_arguments()[0]['resource_id'], 193 fake_client.get_received_upload_artifact_arguments()[1]['resource_id'], 194 ) 195 196 def test_upload_single_file_duplicate_names_preserve_stem_and_suffix(self): 197 dir_path = pathlib.Path('/test_dir') 198 file_name = 'filename.extension' 199 file_path1 = dir_path.joinpath(file_name) 200 file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name) 201 self.fs.create_file(file_path1) 202 self.fs.create_file(file_path2) 203 fake_client = self._FakeUploadingClient() 204 suj = log_uploader._LogUploadSession('any_run_id', fake_client) 205 suj.upload_single_file(file_path1) 206 207 suj.upload_single_file(file_path2) 208 209 self.assertRegex( 210 fake_client.get_received_upload_artifact_arguments()[1]['resource_id'], 211 r'^filename.+\.extension', 212 ) 213 214 def test_upload_directory_does_not_upload_empty_directory(self): 215 empty_dir = pathlib.Path('/test_dir') 216 self.fs.create_dir(empty_dir) 217 fake_client = self._FakeUploadingClient() 218 219 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 220 empty_dir 221 ) 222 223 self.assertEqual( 224 len(fake_client.get_received_upload_artifact_arguments()), 0 225 ) 226 227 def test_get_file_paths_single_file(self): 228 dir_path = pathlib.Path('/test_dir') 229 file_path = dir_path.joinpath('file.txt') 230 self.fs.create_file(file_path) 231 fake_client = self._FakeUploadingClient() 232 233 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 234 dir_path 235 ) 236 237 self.assertEqual( 238 len(fake_client.get_received_upload_artifact_arguments()), 1 239 ) 240 241 def test_get_file_paths_multiple_files(self): 242 dir_path = pathlib.Path('/test_dir') 243 file_path1 = dir_path.joinpath('file1.txt') 244 file_path2 = dir_path.joinpath('file2.txt') 245 self.fs.create_file(file_path1) 246 self.fs.create_file(file_path2) 247 fake_client = self._FakeUploadingClient() 248 249 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 250 dir_path 251 ) 252 253 self.assertEqual( 254 len(fake_client.get_received_upload_artifact_arguments()), 2 255 ) 256 257 def test_get_file_paths_nested_directories(self): 258 dir_path = pathlib.Path('/test_dir') 259 file_path = dir_path.joinpath('test_dir2/file2.txt') 260 self.fs.create_file(file_path) 261 fake_client = self._FakeUploadingClient() 262 263 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 264 dir_path 265 ) 266 267 self.assertEqual( 268 fake_client.get_received_upload_artifact_arguments()[0][ 269 'artifact_path' 270 ], 271 file_path, 272 ) 273 274 def test_get_file_paths_symbolic_link_file(self): 275 dir_path = pathlib.Path('/test_dir') 276 link_path = dir_path.joinpath('link.txt') 277 target_path = pathlib.Path('/test_dir2/file.txt') 278 file_content = 'some content' 279 self.fs.create_file(target_path, contents=file_content) 280 self.fs.create_symlink(link_path, target_path) 281 fake_client = self._FakeUploadingClient() 282 283 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 284 dir_path 285 ) 286 287 self.assertEqual( 288 fake_client.get_received_upload_artifact_arguments()[0][ 289 'artifact_path' 290 ], 291 link_path, 292 ) 293 294 def test_get_file_paths_symbolic_link_directory(self): 295 # Create a directory with a file and a symbolic link to it 296 dir_path = pathlib.Path('/test_dir') 297 link_path = dir_path.joinpath('link_dir') 298 target_path = pathlib.Path('/test_dir2') 299 file_name = 'file.txt' 300 file_content = 'some content' 301 self.fs.create_file(target_path.joinpath(file_name), contents=file_content) 302 self.fs.create_symlink(link_path, target_path) 303 fake_client = self._FakeUploadingClient() 304 305 log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( 306 dir_path 307 ) 308 309 self.assertEqual( 310 fake_client.get_received_upload_artifact_arguments()[0][ 311 'artifact_path' 312 ], 313 link_path.joinpath(file_name), 314 ) 315 316 class _FakeUploadingClient(log_uploader._SimpleUploadingClient): 317 318 def __init__(self): 319 self._received_upload_artifact_arguments = [] 320 321 # TODO(yuexima): This is basically a mock, switch to use mock directly. 322 def get_received_upload_artifact_arguments(self): 323 return self._received_upload_artifact_arguments 324 325 def upload_artifact( 326 self, 327 resource_id: str, 328 metadata: dict[str, str], 329 artifact_path: pathlib.Path, 330 num_of_retries, 331 ) -> None: 332 self._received_upload_artifact_arguments.append({ 333 'resource_id': resource_id, 334 'metadata': metadata, 335 'artifact_path': artifact_path, 336 }) 337 338 339if __name__ == '__main__': 340 unittest.main() 341