xref: /aosp_15_r20/external/libchrome-gestures/tools/tplog.py (revision aed3e5085e770be5b69ce25295ecf6ddf906af95)
1#!/usr/bin/python
2#
3# Copyright 2012 The ChromiumOS Authors
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""TPLog Manipulation"""
8
9
10import getopt
11import json
12import logging
13import os
14import sys
15
16from operator import ge, lt
17
18
19class TPLog:
20  """TPLog Manipulation"""
21  # Constants for entry type
22  CALLBACK_REQUEST = 'callbackRequest'
23  GESTURE = 'gesture'
24  HARDWARE_STATE = 'hardwareState'
25  PROPERTY_CHANGE = 'propertyChange'
26  TIMER_CALLBACK = 'timerCallback'
27
28  # Constants for log keys
29  DESCRIPTION = 'description'
30  ENTRIES = 'entries'
31  GESTURES_VERSION = 'gesturesVersion'
32  HARDWARE_PROPERTIES = 'hardwareProperties'
33  PROPERTIES = 'properties'
34  VERSION = 'version'
35
36  # Constants for entry keys
37  END_TIME = 'endTime'
38  START_TIME = 'startTime'
39  TIMESTAMP = 'timestamp'
40  TYPE = 'type'
41
42  def __init__(self, log_file):
43    self._load_log(log_file)
44    self._setup_get_time_functions()
45
46  def _load_log(self, log_file):
47    """Load the json file."""
48    with open(log_file) as f:
49      self.log = json.load(f)
50    self.shrunk_log = {}
51    # Build a new shrunk log from the original log so that we could
52    # modify the entries and properties, and add description later.
53    self.shrunk_log = self.log.copy()
54    self.entries = self.log[self.ENTRIES]
55
56  def _setup_get_time_functions(self):
57    """Set up get time functions for hardware state, gesture,
58    and timer callback."""
59    self._get_time = {self.HARDWARE_STATE: self._get_hwstate_time,
60                      self.GESTURE: self._get_gesture_end_time,
61                      self.TIMER_CALLBACK: self._get_timercb_time}
62
63  def _get_hwstate_time(self, entry):
64    """Get the timestamp of a hardware state entry."""
65    if entry[self.TYPE] == self.HARDWARE_STATE:
66      return entry[self.TIMESTAMP]
67    else:
68      return None
69
70  def _get_gesture_end_time(self, entry):
71    """Get the end timestamp of a gesture entry."""
72    if entry[self.TYPE] == self.GESTURE:
73      return entry[self.END_TIME]
74    else:
75      return None
76
77  def _get_timercb_time(self, entry):
78    """Get the timestamp of a timer callback entry."""
79    if entry[self.TYPE] == self.TIMER_CALLBACK:
80      return entry['now']
81    else:
82      return None
83
84  def _get_entry_time(self, entry):
85    """Get the timestamp of the given entry."""
86    e_type = entry[self.TYPE]
87    if self._get_time.get(e_type):
88      return self._get_time[e_type](entry)
89    return None
90
91  def _compare_entry_time(self, entry, timestamp, op):
92    """Compare entry time with a given timestamp using the operator op."""
93    e_time = self._get_entry_time(entry)
94    return e_time and op(e_time, timestamp)
95
96  def _get_begin_hwstate(self, timestamp):
97    """Get the hardwareState entry after the specified timestamp."""
98    for index, e in enumerate(self.entries):
99      if (e[self.TYPE] == self.HARDWARE_STATE and
100          self._get_hwstate_time(e) >= timestamp):
101        return index
102    return None
103
104  def _get_end_entry(self, timestamp):
105    """Get the entry after the specified timestamp."""
106    for index, e in enumerate(self.entries):
107      if self._compare_entry_time(e, timestamp, ge):
108        return index
109    return None
110
111  def _get_end_gesture(self, timestamp):
112    """Get the gesture entry after the specified timestamp."""
113    end_entry = None
114    entry_len = len(self.entries)
115    for index, e in enumerate(reversed(self.entries)):
116      # Try to find the last gesture entry resulted from the events within the
117      # timestamp.
118      if (e[self.TYPE] == self.GESTURE and
119          self._get_gesture_end_time(e) <= timestamp):
120        return entry_len - index - 1
121      # Keep the entry with timestamp >= the specified timestamp
122      elif self._compare_entry_time(e, timestamp, ge):
123        end_entry = entry_len - index - 1
124      elif self._compare_entry_time(e, timestamp, lt):
125        return end_entry
126    return end_entry
127
128  def shrink(self, bgn_time=None, end_time=None, end_gesture_flag=True):
129    """Shrink the log according to the begin time and end time.
130
131    end_gesture_flag:
132      When set to True, the shrunk log will contain the gestures resulted from
133      the activities within the time range.
134      When set to False, the shrunk log will have a hard cut at the entry
135      with the smallest timestamp greater than or equal to the specified
136      end_time.
137    """
138    if bgn_time is not None:
139      self.bgn_entry_index = self._get_begin_hwstate(bgn_time)
140    else:
141      self.bgn_entry_index = 0
142
143    if end_time is not None:
144      if end_gesture_flag:
145        self.end_entry_index = self._get_end_gesture(end_time)
146      else:
147        self.end_entry_index = self._get_end_entry(end_time)
148    else:
149      self.end_entry_index = len(self.entries) - 1
150
151    if self.bgn_entry_index is None:
152      logging.error('Error: fail to shrink the log baed on begin time: %f' %
153                    bgn_time)
154    if self.end_entry_index is None:
155      logging.error('Error: fail to shrink the log baed on end time: %f' %
156                    end_time)
157    if self.bgn_entry_index is None or self.end_entry_index is None:
158      exit(1)
159
160    self.shrunk_log[self.ENTRIES] = self.entries[self.bgn_entry_index :
161                                                 self.end_entry_index + 1]
162    logging.info('  bgn_entry_index (%d):  %s' %
163                 (self.bgn_entry_index, self.entries[self.bgn_entry_index]))
164    logging.info('  end_entry_index (%d):  %s' %
165                 (self.end_entry_index, self.entries[self.end_entry_index]))
166
167  def replace_properties(self, prop_file):
168    """Replace properties with those in the given file."""
169    if not prop_file:
170      return
171    with open(prop_file) as f:
172      prop = json.load(f)
173    properties = prop.get(self.PROPERTIES)
174    if properties:
175        self.shrunk_log[self.PROPERTIES] = properties
176
177  def add_description(self, description):
178    """Add description to the shrunk log."""
179    if description:
180      self.shrunk_log[self.DESCRIPTION] = description
181
182  def dump_json(self, output_file):
183    """Dump the new log object to a jason file."""
184    with open(output_file, 'w') as f:
185      json.dump(self.shrunk_log, f, indent=3, separators=(',', ': '),
186                sort_keys=True)
187
188  def run(self, options):
189    """Run the operations on the log.
190
191    The operations include shrinking the log and replacing the properties.
192    """
193    logging.info('Log file: %s' % options['log'])
194    self.shrink(bgn_time=options['bgn_time'], end_time=options['end_time'],
195                end_gesture_flag=options['end_gesture'])
196    self.replace_properties(options['prop'])
197    self.add_description(options['description'])
198    self.dump_json(options['output'])
199
200
201def _usage():
202  """Print the usage of this program."""
203  logging.info('Usage: $ %s [options]\n' % sys.argv[0])
204  logging.info('options:')
205  logging.info('  -b, --begin=<event_begin_time>')
206  logging.info('        the begin timestamp to shrink the log.')
207  logging.info('  -d, --description=<log_description>')
208  logging.info('        Description of the log, e.g., "crosbug.com/12345"')
209  logging.info('  -e, --end=<event_end_time>')
210  logging.info('        the end timestamp to shrink the log.')
211  logging.info('  -g, --end_gesture')
212  logging.info('        When this flag is set, the shrunk log will contain\n'
213               '        the gestures resulted from the activities within the\n'
214               '        time range. Otherwise, the shrunk log will have a\n'
215               '        hard cut at the entry with the smallest timestamp\n'
216               '        greater than or equal to the specified end_time.')
217  logging.info('  -h, --help: show this help')
218  logging.info('  -l, --log=<activity_log> (required)')
219  logging.info('  -o, --output=<output_file> (required)')
220  logging.info('  -p, --prop=<new_property_file>')
221  logging.info('        If a new property file is specified, it will be used\n'
222               '        to replace the original properties in the log.')
223  logging.info('')
224
225
226def _parse_options():
227  """Parse the command line options."""
228  try:
229    short_opt = 'b:d:e:ghl:o:p:'
230    long_opt = ['begin=', 'description', 'end=', 'end_gesture', 'help',
231                'log=', 'output=', 'prop=']
232    opts, _ = getopt.getopt(sys.argv[1:], short_opt, long_opt)
233  except getopt.GetoptError, err:
234    logging.error('Error: %s' % str(err))
235    _usage()
236    sys.exit(1)
237
238  options = {}
239  options['end_gesture'] = False
240  options['bgn_time'] = None
241  options['description'] = None
242  options['end_time'] = None
243  options['prop'] = None
244  for opt, arg in opts:
245    if opt in ('-h', '--help'):
246      _usage()
247      sys.exit()
248    elif opt in ('-b', '--begin'):
249      options['bgn_time'] = float(arg)
250    elif opt in ('-d', '--description'):
251      options['description'] = arg
252    elif opt in ('-e', '--end'):
253      options['end_time'] = float(arg)
254    elif opt in ('-g', '--end_gesture'):
255      options['end_gesture'] = True
256    elif opt in ('-l', '--log'):
257      if os.path.isfile(arg):
258        options['log'] = arg
259      else:
260        logging.error('Error: the log file does not exist: %s.' % arg)
261        sys.exit(1)
262    elif opt in ('-o', '--output'):
263      options['output'] = arg
264    elif opt in ('-p', '--prop'):
265      if os.path.isfile(arg):
266        options['prop'] = arg
267      else:
268        logging.error('Error: the properties file does not exist: %s.' % arg)
269        sys.exit(1)
270    else:
271      logging.error('Error: This option %s is not handled in program.' % opt)
272      _usage()
273      sys.exit(1)
274
275  if not options.get('log') or not options.get('output'):
276    logging.error('Error: You need to specify both --log and --output.')
277    _usage()
278    sys.exit(1)
279  return options
280
281
282if __name__ == '__main__':
283  logging.basicConfig(format='', level=logging.INFO)
284  options = _parse_options()
285  tplog = TPLog(options['log'])
286  tplog.run(options)
287