xref: /aosp_15_r20/tools/asuite/atest/logstorage/log_uploader_unittest.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16import pathlib
17import unittest
18from unittest.mock import patch
19from atest import constants
20from atest.logstorage import log_uploader
21from pyfakefs import fake_filesystem_unittest
22
23
24class LogUploaderModuleTest(unittest.TestCase):
25
26  def setUp(self):
27    super().setUp()
28
29  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
30  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
31  @patch.dict(
32      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '0'}
33  )
34  def test_is_uploading_logs_flag_value_unrecognized_returns_false(self):
35    gcert_checker = lambda: True
36
37    result = log_uploader.is_uploading_logs(gcert_checker)
38
39    self.assertFalse(result)
40
41  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
42  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
43  @patch.dict(
44      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '1'}
45  )
46  def test_is_uploading_logs_flag_value_1_returns_true(self):
47    gcert_checker = lambda: True
48
49    result = log_uploader.is_uploading_logs(gcert_checker)
50
51    self.assertTrue(result)
52
53  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
54  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
55  @patch.dict(
56      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'TRUE'}
57  )
58  def test_is_uploading_logs_flag_value_capitalized_true_returns_true(self):
59    gcert_checker = lambda: True
60
61    result = log_uploader.is_uploading_logs(gcert_checker)
62
63    self.assertTrue(result)
64
65  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
66  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
67  @patch.dict(
68      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'}
69  )
70  def test_is_uploading_logs_flag_value_true_process_returns_true(self):
71    gcert_checker = lambda: True
72
73    result = log_uploader.is_uploading_logs(gcert_checker)
74
75    self.assertTrue(result)
76
77  @patch('atest.constants.CREDENTIAL_FILE_NAME', None)
78  @patch('atest.constants.TOKEN_FILE_PATH', None)
79  @patch.dict(
80      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'}
81  )
82  def test_is_uploading_logs_no_creds_process_returns_false(self):
83    gcert_checker = lambda: True
84
85    result = log_uploader.is_uploading_logs(gcert_checker)
86
87    self.assertFalse(result)
88
89  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
90  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
91  @patch.dict(
92      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'false'}
93  )
94  def test_is_uploading_logs_not_requested_process_returns_false(self):
95    gcert_checker = lambda: True
96
97    result = log_uploader.is_uploading_logs(gcert_checker)
98
99    self.assertFalse(result)
100
101  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
102  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
103  @patch.dict(
104      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''}
105  )
106  def test_is_uploading_logs_returns_true_by_default(self):
107    gcert_checker = lambda: True
108
109    result = log_uploader.is_uploading_logs(gcert_checker)
110
111    self.assertTrue(result)
112
113  @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt')
114  @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt')
115  @patch.dict(
116      os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''}
117  )
118  def test_is_uploading_logs_gcert_not_available_returns_false(self):
119    gcert_checker = lambda: False
120
121    result = log_uploader.is_uploading_logs(gcert_checker)
122
123    self.assertFalse(result)
124
125
126class LogUploaderTest(fake_filesystem_unittest.TestCase):
127
128  def setUp(self):
129    super().setUp()
130    self.setUpPyfakefs()
131
132  def test_upload_single_file_contains_name(self):
133    file_path = pathlib.Path('/dir/some_name.log')
134    self.fs.create_file(file_path)
135    fake_client = self._FakeUploadingClient()
136
137    log_uploader._LogUploadSession(
138        'any_run_id', fake_client
139    ).upload_single_file(file_path)
140
141    self.assertEqual(
142        fake_client.get_received_upload_artifact_arguments()[0]['resource_id'],
143        file_path.name,
144    )
145
146  def test_upload_single_file_metadata_content_type(self):
147    file_path = pathlib.Path('/dir/some_name.log')
148    self.fs.create_file(file_path)
149    fake_client = self._FakeUploadingClient()
150
151    log_uploader._LogUploadSession(
152        'any_run_id', fake_client
153    ).upload_single_file(file_path)
154
155    self.assertEqual(
156        fake_client.get_received_upload_artifact_arguments()[0]['metadata'][
157            'artifactType'
158        ],
159        'HOST_LOG',
160    )
161
162  def test_create_artifact_metadata_use_file_name(self):
163    file_path = pathlib.Path('/dir/some_name.log')
164    self.fs.create_file(file_path)
165    fake_client = self._FakeUploadingClient()
166
167    log_uploader._LogUploadSession(
168        'any_run_id', fake_client
169    ).upload_single_file(file_path)
170
171    self.assertEqual(
172        fake_client.get_received_upload_artifact_arguments()[0]['metadata'][
173            'name'
174        ],
175        file_path.name,
176    )
177
178  def test_upload_single_file_duplicate_names_unique_resource_id(self):
179    dir_path = pathlib.Path('/test_dir')
180    file_name = 'some_name.log'
181    file_path1 = dir_path.joinpath(file_name)
182    file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name)
183    self.fs.create_file(file_path1)
184    self.fs.create_file(file_path2)
185    fake_client = self._FakeUploadingClient()
186    suj = log_uploader._LogUploadSession('any_run_id', fake_client)
187
188    suj.upload_single_file(file_path1)
189    suj.upload_single_file(file_path2)
190
191    self.assertNotEqual(
192        fake_client.get_received_upload_artifact_arguments()[0]['resource_id'],
193        fake_client.get_received_upload_artifact_arguments()[1]['resource_id'],
194    )
195
196  def test_upload_single_file_duplicate_names_preserve_stem_and_suffix(self):
197    dir_path = pathlib.Path('/test_dir')
198    file_name = 'filename.extension'
199    file_path1 = dir_path.joinpath(file_name)
200    file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name)
201    self.fs.create_file(file_path1)
202    self.fs.create_file(file_path2)
203    fake_client = self._FakeUploadingClient()
204    suj = log_uploader._LogUploadSession('any_run_id', fake_client)
205    suj.upload_single_file(file_path1)
206
207    suj.upload_single_file(file_path2)
208
209    self.assertRegex(
210        fake_client.get_received_upload_artifact_arguments()[1]['resource_id'],
211        r'^filename.+\.extension',
212    )
213
214  def test_upload_directory_does_not_upload_empty_directory(self):
215    empty_dir = pathlib.Path('/test_dir')
216    self.fs.create_dir(empty_dir)
217    fake_client = self._FakeUploadingClient()
218
219    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
220        empty_dir
221    )
222
223    self.assertEqual(
224        len(fake_client.get_received_upload_artifact_arguments()), 0
225    )
226
227  def test_get_file_paths_single_file(self):
228    dir_path = pathlib.Path('/test_dir')
229    file_path = dir_path.joinpath('file.txt')
230    self.fs.create_file(file_path)
231    fake_client = self._FakeUploadingClient()
232
233    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
234        dir_path
235    )
236
237    self.assertEqual(
238        len(fake_client.get_received_upload_artifact_arguments()), 1
239    )
240
241  def test_get_file_paths_multiple_files(self):
242    dir_path = pathlib.Path('/test_dir')
243    file_path1 = dir_path.joinpath('file1.txt')
244    file_path2 = dir_path.joinpath('file2.txt')
245    self.fs.create_file(file_path1)
246    self.fs.create_file(file_path2)
247    fake_client = self._FakeUploadingClient()
248
249    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
250        dir_path
251    )
252
253    self.assertEqual(
254        len(fake_client.get_received_upload_artifact_arguments()), 2
255    )
256
257  def test_get_file_paths_nested_directories(self):
258    dir_path = pathlib.Path('/test_dir')
259    file_path = dir_path.joinpath('test_dir2/file2.txt')
260    self.fs.create_file(file_path)
261    fake_client = self._FakeUploadingClient()
262
263    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
264        dir_path
265    )
266
267    self.assertEqual(
268        fake_client.get_received_upload_artifact_arguments()[0][
269            'artifact_path'
270        ],
271        file_path,
272    )
273
274  def test_get_file_paths_symbolic_link_file(self):
275    dir_path = pathlib.Path('/test_dir')
276    link_path = dir_path.joinpath('link.txt')
277    target_path = pathlib.Path('/test_dir2/file.txt')
278    file_content = 'some content'
279    self.fs.create_file(target_path, contents=file_content)
280    self.fs.create_symlink(link_path, target_path)
281    fake_client = self._FakeUploadingClient()
282
283    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
284        dir_path
285    )
286
287    self.assertEqual(
288        fake_client.get_received_upload_artifact_arguments()[0][
289            'artifact_path'
290        ],
291        link_path,
292    )
293
294  def test_get_file_paths_symbolic_link_directory(self):
295    # Create a directory with a file and a symbolic link to it
296    dir_path = pathlib.Path('/test_dir')
297    link_path = dir_path.joinpath('link_dir')
298    target_path = pathlib.Path('/test_dir2')
299    file_name = 'file.txt'
300    file_content = 'some content'
301    self.fs.create_file(target_path.joinpath(file_name), contents=file_content)
302    self.fs.create_symlink(link_path, target_path)
303    fake_client = self._FakeUploadingClient()
304
305    log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory(
306        dir_path
307    )
308
309    self.assertEqual(
310        fake_client.get_received_upload_artifact_arguments()[0][
311            'artifact_path'
312        ],
313        link_path.joinpath(file_name),
314    )
315
316  class _FakeUploadingClient(log_uploader._SimpleUploadingClient):
317
318    def __init__(self):
319      self._received_upload_artifact_arguments = []
320
321    # TODO(yuexima): This is basically a mock, switch to use mock directly.
322    def get_received_upload_artifact_arguments(self):
323      return self._received_upload_artifact_arguments
324
325    def upload_artifact(
326        self,
327        resource_id: str,
328        metadata: dict[str, str],
329        artifact_path: pathlib.Path,
330        num_of_retries,
331    ) -> None:
332      self._received_upload_artifact_arguments.append({
333          'resource_id': resource_id,
334          'metadata': metadata,
335          'artifact_path': artifact_path,
336      })
337
338
339if __name__ == '__main__':
340  unittest.main()
341