xref: /aosp_15_r20/external/toolchain-utils/cros_utils/logger.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2019 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Logging helper module."""
6
7
8# System modules
9import os.path
10import sys
11import traceback
12from typing import Union
13
14
15# TODO([email protected]): Use GetRoot from misc
16def GetRoot(scr_name):
17    """Break up pathname into (dir+name)."""
18    abs_path = os.path.abspath(scr_name)
19    return (os.path.dirname(abs_path), os.path.basename(abs_path))
20
21
22class Logger:
23    """Logging helper class."""
24
25    MAX_LOG_FILES = 10
26
27    def __init__(self, rootdir, basefilename, print_console, subdir="logs"):
28        logdir = os.path.join(rootdir, subdir)
29        basename = os.path.join(logdir, basefilename)
30
31        try:
32            os.makedirs(logdir)
33        except OSError:
34            pass
35            # print("Warning: Logs directory '%s' already exists." % logdir)
36
37        self.print_console = print_console
38
39        self._CreateLogFileHandles(basename)
40
41        self._WriteTo(self.cmdfd, " ".join(sys.argv), True)
42
43    def _AddSuffix(self, basename, suffix):
44        return "%s%s" % (basename, suffix)
45
46    def _FindSuffix(self, basename):
47        timestamps = []
48        found_suffix = None
49        for i in range(self.MAX_LOG_FILES):
50            suffix = str(i)
51            suffixed_basename = self._AddSuffix(basename, suffix)
52            cmd_file = "%s.cmd" % suffixed_basename
53            if not os.path.exists(cmd_file):
54                found_suffix = suffix
55                break
56            timestamps.append(os.stat(cmd_file).st_mtime)
57
58        if found_suffix:
59            return found_suffix
60
61        # Try to pick the oldest file with the suffix and return that one.
62        suffix = str(timestamps.index(min(timestamps)))
63        # print ("Warning: Overwriting log file: %s" %
64        #       self._AddSuffix(basename, suffix))
65        return suffix
66
67    def _CreateLogFileHandle(self, name):
68        fd = None
69        try:
70            fd = open(name, "w", encoding="utf-8")
71        except IOError:
72            print("Warning: could not open %s for writing." % name)
73        return fd
74
75    def _CreateLogFileHandles(self, basename):
76        suffix = self._FindSuffix(basename)
77        suffixed_basename = self._AddSuffix(basename, suffix)
78
79        self.cmdfd = self._CreateLogFileHandle("%s.cmd" % suffixed_basename)
80        self.stdout = self._CreateLogFileHandle("%s.out" % suffixed_basename)
81        self.stderr = self._CreateLogFileHandle("%s.err" % suffixed_basename)
82
83        self._CreateLogFileSymlinks(basename, suffixed_basename)
84
85    # Symlink unsuffixed basename to currently suffixed one.
86    def _CreateLogFileSymlinks(self, basename, suffixed_basename):
87        try:
88            for extension in ["cmd", "out", "err"]:
89                src_file = "%s.%s" % (
90                    os.path.basename(suffixed_basename),
91                    extension,
92                )
93                dest_file = "%s.%s" % (basename, extension)
94                if os.path.exists(dest_file):
95                    os.remove(dest_file)
96                os.symlink(src_file, dest_file)
97        except Exception as ex:
98            print("Exception while creating symlinks: %s" % str(ex))
99
100    def _WriteTo(self, fd, msg, flush):
101        if fd:
102            fd.write(msg)
103            if flush:
104                fd.flush()
105
106    def LogStartDots(self, print_to_console=True):
107        term_fd = self._GetStdout(print_to_console)
108        if term_fd:
109            term_fd.flush()
110            term_fd.write(". ")
111            term_fd.flush()
112
113    def LogAppendDot(self, print_to_console=True):
114        term_fd = self._GetStdout(print_to_console)
115        if term_fd:
116            term_fd.write(". ")
117            term_fd.flush()
118
119    def LogEndDots(self, print_to_console=True):
120        term_fd = self._GetStdout(print_to_console)
121        if term_fd:
122            term_fd.write("\n")
123            term_fd.flush()
124
125    def LogMsg(self, file_fd, term_fd, msg, flush=True):
126        if file_fd:
127            self._WriteTo(file_fd, msg, flush)
128        if self.print_console:
129            self._WriteTo(term_fd, msg, flush)
130
131    def _GetStdout(self, print_to_console):
132        if print_to_console:
133            return sys.stdout
134        return None
135
136    def _GetStderr(self, print_to_console):
137        if print_to_console:
138            return sys.stderr
139        return None
140
141    def LogCmdToFileOnly(self, cmd, machine="", user=None):
142        if not self.cmdfd:
143            return
144
145        host = ("%s@%s" % (user, machine)) if user else machine
146        flush = True
147        cmd_string = "CMD (%s): %s\n" % (host, cmd)
148        self._WriteTo(self.cmdfd, cmd_string, flush)
149
150    def LogCmd(self, cmd, machine="", user=None, print_to_console=True):
151        if user:
152            host = "%s@%s" % (user, machine)
153        else:
154            host = machine
155
156        self.LogMsg(
157            self.cmdfd,
158            self._GetStdout(print_to_console),
159            "CMD (%s): %s\n" % (host, cmd),
160        )
161
162    def LogFatal(self, msg, print_to_console=True):
163        self.LogMsg(
164            self.stderr, self._GetStderr(print_to_console), "FATAL: %s\n" % msg
165        )
166        self.LogMsg(
167            self.stderr,
168            self._GetStderr(print_to_console),
169            "\n".join(traceback.format_stack()),
170        )
171        sys.exit(1)
172
173    def LogError(self, msg, print_to_console=True):
174        self.LogMsg(
175            self.stderr, self._GetStderr(print_to_console), "ERROR: %s\n" % msg
176        )
177
178    def LogWarning(self, msg, print_to_console=True):
179        self.LogMsg(
180            self.stderr,
181            self._GetStderr(print_to_console),
182            "WARNING: %s\n" % msg,
183        )
184
185    def LogOutput(self, msg, print_to_console=True):
186        self.LogMsg(
187            self.stdout, self._GetStdout(print_to_console), "OUTPUT: %s\n" % msg
188        )
189
190    def LogFatalIf(self, condition, msg):
191        if condition:
192            self.LogFatal(msg)
193
194    def LogErrorIf(self, condition, msg):
195        if condition:
196            self.LogError(msg)
197
198    def LogWarningIf(self, condition, msg):
199        if condition:
200            self.LogWarning(msg)
201
202    def LogCommandOutput(self, msg, print_to_console=True):
203        self.LogMsg(
204            self.stdout, self._GetStdout(print_to_console), msg, flush=False
205        )
206
207    def LogCommandError(self, msg, print_to_console=True):
208        self.LogMsg(
209            self.stderr, self._GetStderr(print_to_console), msg, flush=False
210        )
211
212    def Flush(self):
213        self.cmdfd.flush()
214        self.stdout.flush()
215        self.stderr.flush()
216
217
218class MockLogger:
219    """Logging helper class."""
220
221    MAX_LOG_FILES = 10
222
223    def __init__(self, *_args, **_kwargs):
224        self.stdout = sys.stdout
225        self.stderr = sys.stderr
226
227    def _AddSuffix(self, basename, suffix):
228        return "%s%s" % (basename, suffix)
229
230    def _FindSuffix(self, basename):
231        timestamps = []
232        found_suffix = None
233        for i in range(self.MAX_LOG_FILES):
234            suffix = str(i)
235            suffixed_basename = self._AddSuffix(basename, suffix)
236            cmd_file = "%s.cmd" % suffixed_basename
237            if not os.path.exists(cmd_file):
238                found_suffix = suffix
239                break
240            timestamps.append(os.stat(cmd_file).st_mtime)
241
242        if found_suffix:
243            return found_suffix
244
245        # Try to pick the oldest file with the suffix and return that one.
246        suffix = str(timestamps.index(min(timestamps)))
247        # print ("Warning: Overwriting log file: %s" %
248        #       self._AddSuffix(basename, suffix))
249        return suffix
250
251    def _CreateLogFileHandle(self, name):
252        print("MockLogger: creating open file handle for %s (writing)" % name)
253
254    def _CreateLogFileHandles(self, basename):
255        suffix = self._FindSuffix(basename)
256        suffixed_basename = self._AddSuffix(basename, suffix)
257
258        print("MockLogger: opening file %s.cmd" % suffixed_basename)
259        print("MockLogger: opening file %s.out" % suffixed_basename)
260        print("MockLogger: opening file %s.err" % suffixed_basename)
261
262        self._CreateLogFileSymlinks(basename, suffixed_basename)
263
264    # Symlink unsuffixed basename to currently suffixed one.
265    def _CreateLogFileSymlinks(self, basename, suffixed_basename):
266        for extension in ["cmd", "out", "err"]:
267            src_file = "%s.%s" % (
268                os.path.basename(suffixed_basename),
269                extension,
270            )
271            dest_file = "%s.%s" % (basename, extension)
272            print(
273                "MockLogger: Calling os.symlink(%s, %s)" % (src_file, dest_file)
274            )
275
276    def _WriteTo(self, _fd, msg, _flush):
277        print("MockLogger: %s" % msg)
278
279    def LogStartDots(self, _print_to_console=True):
280        print(". ")
281
282    def LogAppendDot(self, _print_to_console=True):
283        print(". ")
284
285    def LogEndDots(self, _print_to_console=True):
286        print("\n")
287
288    def LogMsg(self, _file_fd, _term_fd, msg, **_kwargs):
289        print("MockLogger: %s" % msg)
290
291    def _GetStdout(self, _print_to_console):
292        return None
293
294    def _GetStderr(self, _print_to_console):
295        return None
296
297    def LogCmdToFileOnly(self, *_args, **_kwargs):
298        return
299
300    # def LogCmdToFileOnly(self, cmd, machine='', user=None):
301    #   host = ('%s@%s' % (user, machine)) if user else machine
302    #   cmd_string = 'CMD (%s): %s\n' % (host, cmd)
303    #   print('MockLogger: Writing to file ONLY: %s' % cmd_string)
304
305    def LogCmd(self, cmd, machine="", user=None, print_to_console=True):
306        if user:
307            host = "%s@%s" % (user, machine)
308        else:
309            host = machine
310
311        self.LogMsg(
312            0, self._GetStdout(print_to_console), "CMD (%s): %s\n" % (host, cmd)
313        )
314
315    def LogFatal(self, msg, print_to_console=True):
316        self.LogMsg(0, self._GetStderr(print_to_console), "FATAL: %s\n" % msg)
317        self.LogMsg(
318            0,
319            self._GetStderr(print_to_console),
320            "\n".join(traceback.format_stack()),
321        )
322        print("MockLogger: Calling sysexit(1)")
323
324    def LogError(self, msg, print_to_console=True):
325        self.LogMsg(0, self._GetStderr(print_to_console), "ERROR: %s\n" % msg)
326
327    def LogWarning(self, msg, print_to_console=True):
328        self.LogMsg(0, self._GetStderr(print_to_console), "WARNING: %s\n" % msg)
329
330    def LogOutput(self, msg, print_to_console=True):
331        self.LogMsg(0, self._GetStdout(print_to_console), "OUTPUT: %s\n" % msg)
332
333    def LogFatalIf(self, condition, msg):
334        if condition:
335            self.LogFatal(msg)
336
337    def LogErrorIf(self, condition, msg):
338        if condition:
339            self.LogError(msg)
340
341    def LogWarningIf(self, condition, msg):
342        if condition:
343            self.LogWarning(msg)
344
345    def LogCommandOutput(self, msg, print_to_console=True):
346        self.LogMsg(
347            self.stdout, self._GetStdout(print_to_console), msg, flush=False
348        )
349
350    def LogCommandError(self, msg, print_to_console=True):
351        self.LogMsg(
352            self.stderr, self._GetStderr(print_to_console), msg, flush=False
353        )
354
355    def Flush(self):
356        print("MockLogger: Flushing cmdfd, stdout, stderr")
357
358
359main_logger = None
360
361
362def InitLogger(script_name, log_dir, print_console=True, mock=False):
363    """Initialize a global logger. To be called only once."""
364    # pylint: disable=global-statement
365    global main_logger
366    if main_logger:
367        return main_logger
368    rootdir, basefilename = GetRoot(script_name)
369    if not log_dir:
370        log_dir = rootdir
371    if not mock:
372        main_logger = Logger(log_dir, basefilename, print_console)
373    else:
374        main_logger = MockLogger(log_dir, basefilename, print_console)
375    return main_logger
376
377
378def GetLogger(log_dir="", mock=False) -> Union[Logger, MockLogger]:
379    return InitLogger(sys.argv[0], log_dir, mock=mock)
380
381
382def HandleUncaughtExceptions(fun):
383    """Catches all exceptions that would go outside decorated fun scope."""
384
385    def _Interceptor(*args, **kwargs):
386        try:
387            return fun(*args, **kwargs)
388        except Exception:
389            GetLogger().LogFatal(
390                "Uncaught exception:\n%s" % traceback.format_exc()
391            )
392
393    return _Interceptor
394