xref: /aosp_15_r20/external/tensorflow/tensorflow/lite/tools/optimize/debugging/python/debugger.py (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Python TF-Lite QuantizationDebugger."""
16import collections
17import csv
18import re
19from typing import (Any, Callable, Dict, IO, Iterable, List, Mapping, Optional,
20                    Sequence, Tuple)
21
22import numpy as np
23
24from tensorflow.lite.python import convert
25from tensorflow.lite.python import interpreter as _interpreter
26from tensorflow.lite.python.metrics import metrics as metrics_stub  # type: ignore
27from tensorflow.python.util import tf_export
28
29
30# TODO(b/198099651): move converter implementation out of lite.py
31TFLiteConverter = Any  # importing tf.lite creates circular dependency
32
33# Returns metrics based on difference of values for quantized/float ops.
34_DEFAULT_LAYER_DEBUG_METRICS = {
35    'num_elements': lambda diffs: diffs.size,
36    'stddev': np.std,
37    'mean_error': np.average,
38    'max_abs_error': lambda diffs: np.max(np.abs(diffs)),
39    'mean_squared_error': lambda diffs: np.average(diffs**2),
40}
41
42_NUMERIC_VERIFY_OP_NAME = 'NumericVerify'
43
44
45def _get_quant_params(
46    tensor_detail: Mapping[str, Any]) -> Optional[Tuple[float, int]]:
47  """Returns first scale and zero point from tensor detail, if present."""
48  quant_params = tensor_detail['quantization_parameters']
49  if not quant_params:
50    return None
51  if quant_params['scales'] and quant_params['zero_points']:
52    return (quant_params['scales'][0], quant_params['zero_points'][0])
53  return None
54
55
56@tf_export.tf_export('lite.experimental.QuantizationDebugOptions')
57class QuantizationDebugOptions:
58  """Debug options to set up a given QuantizationDebugger."""
59
60  def __init__(self,
61               layer_debug_metrics: Optional[Mapping[str,
62                                                     Callable[[np.ndarray],
63                                                              float]]] = None,
64               model_debug_metrics: Optional[Mapping[
65                   str, Callable[[Sequence[np.ndarray], Sequence[np.ndarray]],
66                                 float]]] = None,
67               layer_direct_compare_metrics: Optional[Mapping[str, Callable[
68                   [Sequence[np.ndarray], Sequence[np.ndarray], float, int],
69                   float]]] = None,
70               denylisted_ops: Optional[List[str]] = None,
71               denylisted_nodes: Optional[List[str]] = None,
72               fully_quantize: bool = False) -> None:
73    """Initializes debugger options.
74
75    Args:
76      layer_debug_metrics: a dict to specify layer debug functions
77        {function_name_str: function} where the function accepts result of
78          NumericVerify Op, which is value difference between float and
79          dequantized op results. The function returns single scalar value.
80      model_debug_metrics: a dict to specify model debug functions
81        {function_name_str: function} where the function accepts outputs from
82          two models, and returns single scalar value for a metric. (e.g.
83          accuracy, IoU)
84      layer_direct_compare_metrics: a dict to specify layer debug functions
85        {function_name_str: function}. The signature is different from that of
86          `layer_debug_metrics`, and this one gets passed (original float value,
87          original quantized value, scale, zero point). The function's
88          implementation is responsible for correctly dequantize the quantized
89          value to compare. Use this one when comparing diff is not enough.
90          (Note) quantized value is passed as int8, so cast to int32 is needed.
91      denylisted_ops: a list of op names which is expected to be removed from
92        quantization.
93      denylisted_nodes: a list of op's output tensor names to be removed from
94        quantization.
95      fully_quantize: Bool indicating whether to fully quantize the model.
96        Besides model body, the input/output will be quantized as well.
97        Corresponding to mlir_quantize's fully_quantize parameter.
98
99    Raises:
100      ValueError: when there are duplicate keys
101    """
102    self.layer_debug_metrics = layer_debug_metrics
103    self.model_debug_metrics = model_debug_metrics
104    self.layer_direct_compare_metrics = layer_direct_compare_metrics
105
106    keys = []
107    for metrics in [
108        layer_debug_metrics, model_debug_metrics, layer_direct_compare_metrics
109    ]:
110      if metrics is not None:
111        keys.extend(metrics.keys())
112    if len(keys) != len(set(keys)):
113      raise ValueError('Provided metrics have duplicate keys.')
114
115    self.denylisted_ops = denylisted_ops
116    self.denylisted_nodes = denylisted_nodes
117    self.fully_quantize = fully_quantize
118
119
120@tf_export.tf_export('lite.experimental.QuantizationDebugger')
121class QuantizationDebugger:
122  """Debugger for Quantized TensorFlow Lite debug mode models.
123
124  This can run the TensorFlow Lite converted models equipped with debug ops and
125  collect debug information. This debugger calculates statistics from
126  user-defined post-processing functions as well as default ones.
127  """
128
129  def __init__(self,
130               quant_debug_model_path: Optional[str] = None,
131               quant_debug_model_content: Optional[bytes] = None,
132               float_model_path: Optional[str] = None,
133               float_model_content: Optional[bytes] = None,
134               debug_dataset: Optional[Callable[
135                   [], Iterable[Sequence[np.ndarray]]]] = None,
136               debug_options: Optional[QuantizationDebugOptions] = None,
137               converter: Optional[TFLiteConverter] = None) -> None:
138    """Runs the TFLite debugging model with given debug options.
139
140    Args:
141      quant_debug_model_path: Path to the quantized debug TFLite model file.
142      quant_debug_model_content: Content of the quantized debug TFLite model.
143      float_model_path: Path to float TFLite model file.
144      float_model_content: Content of the float TFLite model.
145      debug_dataset: a factory function that returns dataset generator which is
146        used to generate input samples (list of np.ndarray) for the model. The
147        generated elements must have same types and shape as inputs to the
148        model.
149      debug_options: Debug options to debug the given model.
150      converter: Optional, use converter instead of quantized model.
151
152    Raises:
153      ValueError: If the debugger was unable to be created.
154
155    Attributes:
156      layer_statistics: results of error metrics for each NumericVerify op
157        results. in {layer_name: {metric_name: metric}} format.
158      model_statistics: results of error metrics for difference between float
159        and quantized models. in {metric_name: metric} format.
160    """
161    self._data_gen = debug_dataset
162    self._debug_options = debug_options or QuantizationDebugOptions()
163    self.converter = None
164    self.calibrated_model = None
165    self.float_model = None
166    self._float_interpreter = None
167    if converter is not None:
168      if self._debug_options.model_debug_metrics:
169        old_optimizations = converter.optimizations
170        self.converter = self._set_converter_options_for_float(converter)
171        self.float_model = self.converter.convert()
172        converter.optimizations = old_optimizations
173
174      self.converter = self._set_converter_options_for_calibration(converter)
175      self.calibrated_model = self.converter.convert()
176      # Converter should be already set up with all options
177      self._init_from_converter(
178          self._debug_options,
179          self.converter,
180          self.calibrated_model,
181          float_model=self.float_model)
182    else:
183      self._quant_interpreter = _interpreter.Interpreter(
184          quant_debug_model_path,
185          quant_debug_model_content,
186          experimental_preserve_all_tensors=(
187              self._debug_options.layer_direct_compare_metrics is not None))
188      if self._debug_options.model_debug_metrics:
189        self._float_interpreter = _interpreter.Interpreter(
190            float_model_path, float_model_content)
191    self._initialize_stats()
192
193  @property
194  def options(self) -> QuantizationDebugOptions:
195    return self._debug_options
196
197  @options.setter
198  def options(self, options: QuantizationDebugOptions) -> None:
199    self._debug_options = options
200    if not self.converter or not self.calibrated_model:
201      return
202    self._init_from_converter(
203        self._debug_options,
204        self.converter,
205        self.calibrated_model,
206        float_model=self.float_model)
207    self._initialize_stats()
208
209  def _initialize_stats(self):
210    """Helper function initializes stats."""
211    # TODO(b/177749613) : Fix the dependency on tf.lite._get_ops_details()
212    # Following code is needed to get op's name from the output tensor index,
213    # since NumericVerify op only provides its quantized input tensor index.
214    self._defining_op = dict()
215    for op_info in self._quant_interpreter._get_ops_details():  # pylint: disable=protected-access
216      self._defining_op.update(
217          {tensor_idx: op_info['index'] for tensor_idx in op_info['outputs']})
218
219    self._numeric_verify_tensor_details = None
220    self._numeric_verify_op_details = None
221    if not self._get_numeric_verify_tensor_details():
222      raise ValueError('Please check if the quantized model is in debug mode')
223
224    self._layer_debug_metrics = _DEFAULT_LAYER_DEBUG_METRICS.copy()
225    if self._debug_options.layer_debug_metrics:
226      self._layer_debug_metrics.update(self._debug_options.layer_debug_metrics)
227
228    self.layer_statistics = None
229    self.model_statistics = None
230
231    self._metrics = metrics_stub.TFLiteMetrics()
232    self._metrics.increase_counter_debugger_creation()
233
234  def _get_quantized_model(self, is_debug: bool) -> bytes:
235    if not self.converter:
236      raise ValueError('No converter found, use this function with the '
237                       'converter option in the constructor.')
238
239    return convert.mlir_quantize(
240        self.calibrated_model,
241        disable_per_channel=self.converter._experimental_disable_per_channel,  # pylint: disable=protected-access
242        fully_quantize=self._debug_options.fully_quantize,
243        enable_numeric_verify=is_debug,
244        denylisted_ops=self._debug_options.denylisted_ops,
245        denylisted_nodes=self._debug_options.denylisted_nodes)
246
247  def get_nondebug_quantized_model(self) -> bytes:
248    """Returns a non-instrumented quantized model.
249
250    Convert the quantized model with the initialized converter and
251    return bytes for nondebug model. The model will not be instrumented with
252    numeric verification operations.
253
254    Returns:
255      Model bytes corresponding to the model.
256    Raises:
257      ValueError: if converter is not passed to the debugger.
258    """
259    return self._get_quantized_model(is_debug=False)
260
261  def get_debug_quantized_model(self) -> bytes:
262    """Returns an instrumented quantized model.
263
264    Convert the quantized model with the initialized converter and
265    return bytes for model. The model will be instrumented with numeric
266    verification operations and should only be used for debugging.
267
268    Returns:
269      Model bytes corresponding to the model.
270    Raises:
271      ValueError: if converter is not passed to the debugger.
272    """
273    return self._get_quantized_model(is_debug=True)
274
275  def _init_from_converter(self,
276                           options: QuantizationDebugOptions,
277                           converter: TFLiteConverter,
278                           calibrated_model: Optional[bytes] = None,
279                           float_model: Optional[bytes] = None) -> None:
280    """Convert the model and apply options.
281
282    Converts the quantized model and initializes a quantized model interpreter
283    with the quantized model. Returns a float model interpreter if float model
284    is provided.
285
286    Args:
287      options: a QuantizationDebugOptions object.
288      converter: an initialized tf.lite.TFLiteConverter.
289      calibrated_model: Calibrated model bytes.
290      float_model: Float model bytes.
291    """
292    self.quant_model = convert.mlir_quantize(
293        calibrated_model,
294        disable_per_channel=converter._experimental_disable_per_channel,  # pylint: disable=protected-access
295        fully_quantize=options.fully_quantize,
296        enable_numeric_verify=True,
297        denylisted_ops=options.denylisted_ops,
298        denylisted_nodes=options.denylisted_nodes)
299    self._quant_interpreter = _interpreter.Interpreter(
300        model_content=self.quant_model)
301    self._float_interpreter = None
302    if float_model is not None:
303      self._float_interpreter = _interpreter.Interpreter(
304          model_content=float_model)
305
306  def _set_converter_options_for_float(
307      self, converter: TFLiteConverter) -> TFLiteConverter:
308    """Verify converter options and set required experimental options."""
309    if converter.optimizations:
310      converter.optimizations = []
311    return converter
312
313  def _set_converter_options_for_calibration(
314      self, converter: TFLiteConverter) -> TFLiteConverter:
315    """Verify converter options and set required experimental options."""
316    if not converter.optimizations:
317      raise ValueError(
318          'converter object must set optimizations to lite.Optimize.DEFAULT')
319    if not converter.representative_dataset:
320      raise ValueError('converter object must set representative_dataset')
321
322    converter.experimental_mlir_quantizer = True
323    converter._experimental_calibrate_only = True  # pylint: disable=protected-access
324    return converter
325
326  def run(self) -> None:
327    """Runs models and gets metrics."""
328    self.layer_statistics = self._collect_layer_statistics()
329    if self._debug_options.model_debug_metrics:
330      self.model_statistics = self._collect_model_statistics()
331
332  def _collect_layer_statistics(self) -> Dict[str, Dict[str, float]]:
333    """Collects layer statistics by applying layer debug metrics.
334
335    For all data from the given RepresentativeDataset, collect statistics per
336    example by getting the NumericVerify op results in _quant_interpreter
337    and calculating layer debug metrics on the results.
338
339    Returns:
340      aggregated per-layer statistics of NumericVerify results.
341      {layer_name: {metric_name: metric}}
342    """
343    layer_statistics = collections.defaultdict(
344        lambda: collections.defaultdict(list))
345
346    initialize = True
347    for tensor_data in self._data_gen():
348      self._set_input_tensors(self._quant_interpreter, tensor_data, initialize)
349      initialize = False
350
351      # Run the model.
352      self._quant_interpreter.invoke()
353
354      # Collect the statistics of this invoke result.
355      for tensor_detail in self._get_numeric_verify_tensor_details():
356        tensor_name = tensor_detail['name']  # pytype: disable=unsupported-operands  # dynamic-method-lookup
357        diffs = self._quant_interpreter.get_tensor(tensor_detail['index'])  # pytype: disable=unsupported-operands  # dynamic-method-lookup
358        for metric_name, metric_fn in self._layer_debug_metrics.items():
359          layer_statistics[tensor_name][metric_name].append(metric_fn(diffs))
360
361      if self._debug_options.layer_direct_compare_metrics is not None:
362        for tensor_detail in self._get_numeric_verify_tensor_details():
363          tensor_name = tensor_detail['name']  # pytype: disable=unsupported-operands  # dynamic-method-lookup
364          op_idx = self._defining_op[tensor_detail['index']]  # pytype: disable=unsupported-operands  # dynamic-method-lookup
365          op_detail = self._quant_interpreter._get_op_details(op_idx)  # pylint: disable=protected-access
366          q_idx, f_idx = op_detail['inputs']
367          quant_input_detail = self._quant_interpreter._get_tensor_details(  # pylint: disable=protected-access
368              q_idx)
369          for (metric_name, metric_fn
370              ) in self._debug_options.layer_direct_compare_metrics.items():
371            layer_statistics[tensor_name][metric_name].append(
372                metric_fn(
373                    self._quant_interpreter.get_tensor(f_idx),
374                    self._quant_interpreter.get_tensor(q_idx),
375                    quant_input_detail['quantization_parameters']['scales'][0],
376                    quant_input_detail['quantization_parameters']['zero_points']
377                    [0]))
378
379    # Calculate final aggregated metrics for each layer.
380    for metrics in layer_statistics.values():
381      for metric_name in metrics:
382        metrics[metric_name] = np.nanmean(metrics[metric_name])
383
384    return layer_statistics
385
386  def _collect_model_statistics(self) -> Dict[str, float]:
387    """Collects model output metrics.
388
389    For all data from the given RepresentativeDataset, collect all model output
390    results from float model & quantized debug model, and calculate metrics
391    by using model output functions. As a result, self.model_results is filled,
392
393    where self.model_results[model_output_function_name] = `aggregated model
394    output function value` (a scalar).
395
396    Returns:
397      aggregated per-model output discrepancy metrics.
398      {metric_name: aggregated_metric}
399    """
400
401    model_statistics = collections.defaultdict(list)
402
403    initialize = True
404    for tensor_data in self._data_gen():
405      self._set_input_tensors(self._quant_interpreter, tensor_data, initialize)
406      self._set_input_tensors(self._float_interpreter, tensor_data, initialize)
407      initialize = False
408
409      # Run the models.
410      self._quant_interpreter.invoke()
411      self._float_interpreter.invoke()
412
413      # Collect the output results from both models.
414      float_tensor_data = self._get_output_tensors(self._float_interpreter)
415      quant_tensor_data = self._get_output_tensors(self._quant_interpreter)
416
417      # Calculate the metrics.
418      for (metric_name,
419           metric_fn) in self._debug_options.model_debug_metrics.items():
420        model_statistics[metric_name].append(
421            metric_fn(float_tensor_data, quant_tensor_data))
422
423    # Calculate final aggregated metrics for each outputs.
424    return {
425        metric_name: np.mean(metric)
426        for metric_name, metric in model_statistics.items()
427    }
428
429  def _set_input_tensors(self, interpreter: _interpreter.Interpreter,
430                         tensor_data: Sequence[np.ndarray],
431                         initialize: bool) -> None:
432    """Sets input tensors into TFLite model Interpreter.
433
434    Args:
435      interpreter: a tf.lite.Interpreter object with allocated tensors.
436      tensor_data: a list of Numpy array data.
437      initialize: set to true when input is first set for the interpreter, to
438        set input shapes and allocate tensors.
439
440    Raises:
441      ValueError: when inputs can't be set, or size of provided inputs does not
442      match size of model inputs.
443    """
444    input_details = interpreter.get_input_details()
445    if len(input_details) != len(tensor_data):
446      raise ValueError(
447          'Number of inputs provided ({}) does not match number of inputs to '
448          'the model ({})'.format(len(tensor_data), len(input_details)))
449
450    if initialize:
451      for input_detail, tensor in zip(input_details, tensor_data):
452        interpreter.resize_tensor_input(input_detail['index'], tensor.shape)
453      interpreter.allocate_tensors()
454
455    for input_detail, tensor in zip(input_details, tensor_data):
456      if tensor.dtype == np.float32 and input_detail['dtype'] == np.int8:
457        quant_params = _get_quant_params(input_detail)
458        if quant_params:
459          scale, zero_point = quant_params
460          tensor = np.round((tensor / scale) + zero_point).astype(np.int8)
461      interpreter.set_tensor(input_detail['index'], tensor)
462
463  def _get_output_tensors(
464      self, interpreter: _interpreter.Interpreter) -> List[np.ndarray]:
465    """Returns output tensors of given TFLite model Interpreter.
466
467    Args:
468      interpreter: a tf.lite.Interpreter object with allocated tensors.
469
470    Returns:
471      a list of numpy arrays representing output tensor results.
472    """
473
474    outputs = []
475    for output_detail in interpreter.get_output_details():
476      tensor = interpreter.get_tensor(output_detail['index'])
477      if output_detail['dtype'] == np.int8:
478        quant_params = _get_quant_params(output_detail)
479        if quant_params:
480          scale, zero_point = quant_params
481          tensor = ((tensor.astype(np.float32) - zero_point) * scale).astype(
482              np.float32)
483      outputs.append(tensor)
484
485    return outputs
486
487  def _get_numeric_verify_tensor_details(self) -> List[str]:
488    """Returns all names of all tensors from NumericVerify op."""
489    # pylint: disable=protected-access
490    if not self._numeric_verify_tensor_details:
491      self._numeric_verify_tensor_details = []
492      self._numeric_verify_op_details = {}
493      for op_info in self._quant_interpreter._get_ops_details():
494        if op_info['op_name'] == _NUMERIC_VERIFY_OP_NAME:
495          self._numeric_verify_tensor_details.append(
496              self._quant_interpreter._get_tensor_details(
497                  op_info['outputs'][0]))
498          tensor_name = self._numeric_verify_tensor_details[-1]['name']
499          self._numeric_verify_op_details[tensor_name] = op_info
500    # pylint: enable=protected-access
501    return self._numeric_verify_tensor_details
502
503  def _get_operand_name_and_index(self,
504                                  numeric_verify_name: str) -> Tuple[str, int]:
505    """Gets the index and name of NumericVerify Op's quantized input tensor.
506
507    Args:
508      numeric_verify_name: name of the NumericVerify op's output tensor. It has
509        format of `NumericVerify/{quantized_tensor_name}:{quantized_tensor_idx}`
510
511    Returns:
512      Tuple of (tensor_name, tensor_idx) for quantized op's output tensor.
513    """
514    tensor_name, tensor_idx = numeric_verify_name.rsplit(':', 1)
515    float_tensor_name = tensor_name[len(_NUMERIC_VERIFY_OP_NAME) + 1:]
516    if re.match(r'\d', float_tensor_name[-1]):
517      float_tensor_name = float_tensor_name[:-1]
518
519    return (float_tensor_name, int(tensor_idx))
520
521  def layer_statistics_dump(self, file: IO[str]) -> None:
522    """Dumps layer statistics into file, in csv format.
523
524    Args:
525      file: file, or file-like object to write.
526    """
527    # order of `fields` is the order of fields in csv.
528    fields = ['op_name', 'tensor_idx'] + list(self._layer_debug_metrics.keys())
529    if self._debug_options.layer_direct_compare_metrics is not None:
530      fields += list(self._debug_options.layer_direct_compare_metrics.keys())
531    fields += ['scale', 'zero_point', 'tensor_name']
532    writer = csv.DictWriter(file, fields)
533    writer.writeheader()
534    for name, metrics in self.layer_statistics.items():
535      data = metrics.copy()
536      (data['tensor_name'], _) = self._get_operand_name_and_index(name)
537      data['tensor_idx'] = self._numeric_verify_op_details[name]['inputs'][0]
538      data['op_name'] = self._quant_interpreter._get_op_details(  # pylint: disable=protected-access
539          self._defining_op[data['tensor_idx']])['op_name']
540      details = self._quant_interpreter._get_tensor_details(data['tensor_idx'])  # pylint: disable=protected-access
541      data['scale'], data['zero_point'] = (
542          details['quantization_parameters']['scales'][0],
543          details['quantization_parameters']['zero_points'][0])
544      writer.writerow(data)
545