1# Copyright 2021 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Python TF-Lite QuantizationDebugger.""" 16import collections 17import csv 18import re 19from typing import (Any, Callable, Dict, IO, Iterable, List, Mapping, Optional, 20 Sequence, Tuple) 21 22import numpy as np 23 24from tensorflow.lite.python import convert 25from tensorflow.lite.python import interpreter as _interpreter 26from tensorflow.lite.python.metrics import metrics as metrics_stub # type: ignore 27from tensorflow.python.util import tf_export 28 29 30# TODO(b/198099651): move converter implementation out of lite.py 31TFLiteConverter = Any # importing tf.lite creates circular dependency 32 33# Returns metrics based on difference of values for quantized/float ops. 34_DEFAULT_LAYER_DEBUG_METRICS = { 35 'num_elements': lambda diffs: diffs.size, 36 'stddev': np.std, 37 'mean_error': np.average, 38 'max_abs_error': lambda diffs: np.max(np.abs(diffs)), 39 'mean_squared_error': lambda diffs: np.average(diffs**2), 40} 41 42_NUMERIC_VERIFY_OP_NAME = 'NumericVerify' 43 44 45def _get_quant_params( 46 tensor_detail: Mapping[str, Any]) -> Optional[Tuple[float, int]]: 47 """Returns first scale and zero point from tensor detail, if present.""" 48 quant_params = tensor_detail['quantization_parameters'] 49 if not quant_params: 50 return None 51 if quant_params['scales'] and quant_params['zero_points']: 52 return (quant_params['scales'][0], quant_params['zero_points'][0]) 53 return None 54 55 56@tf_export.tf_export('lite.experimental.QuantizationDebugOptions') 57class QuantizationDebugOptions: 58 """Debug options to set up a given QuantizationDebugger.""" 59 60 def __init__(self, 61 layer_debug_metrics: Optional[Mapping[str, 62 Callable[[np.ndarray], 63 float]]] = None, 64 model_debug_metrics: Optional[Mapping[ 65 str, Callable[[Sequence[np.ndarray], Sequence[np.ndarray]], 66 float]]] = None, 67 layer_direct_compare_metrics: Optional[Mapping[str, Callable[ 68 [Sequence[np.ndarray], Sequence[np.ndarray], float, int], 69 float]]] = None, 70 denylisted_ops: Optional[List[str]] = None, 71 denylisted_nodes: Optional[List[str]] = None, 72 fully_quantize: bool = False) -> None: 73 """Initializes debugger options. 74 75 Args: 76 layer_debug_metrics: a dict to specify layer debug functions 77 {function_name_str: function} where the function accepts result of 78 NumericVerify Op, which is value difference between float and 79 dequantized op results. The function returns single scalar value. 80 model_debug_metrics: a dict to specify model debug functions 81 {function_name_str: function} where the function accepts outputs from 82 two models, and returns single scalar value for a metric. (e.g. 83 accuracy, IoU) 84 layer_direct_compare_metrics: a dict to specify layer debug functions 85 {function_name_str: function}. The signature is different from that of 86 `layer_debug_metrics`, and this one gets passed (original float value, 87 original quantized value, scale, zero point). The function's 88 implementation is responsible for correctly dequantize the quantized 89 value to compare. Use this one when comparing diff is not enough. 90 (Note) quantized value is passed as int8, so cast to int32 is needed. 91 denylisted_ops: a list of op names which is expected to be removed from 92 quantization. 93 denylisted_nodes: a list of op's output tensor names to be removed from 94 quantization. 95 fully_quantize: Bool indicating whether to fully quantize the model. 96 Besides model body, the input/output will be quantized as well. 97 Corresponding to mlir_quantize's fully_quantize parameter. 98 99 Raises: 100 ValueError: when there are duplicate keys 101 """ 102 self.layer_debug_metrics = layer_debug_metrics 103 self.model_debug_metrics = model_debug_metrics 104 self.layer_direct_compare_metrics = layer_direct_compare_metrics 105 106 keys = [] 107 for metrics in [ 108 layer_debug_metrics, model_debug_metrics, layer_direct_compare_metrics 109 ]: 110 if metrics is not None: 111 keys.extend(metrics.keys()) 112 if len(keys) != len(set(keys)): 113 raise ValueError('Provided metrics have duplicate keys.') 114 115 self.denylisted_ops = denylisted_ops 116 self.denylisted_nodes = denylisted_nodes 117 self.fully_quantize = fully_quantize 118 119 120@tf_export.tf_export('lite.experimental.QuantizationDebugger') 121class QuantizationDebugger: 122 """Debugger for Quantized TensorFlow Lite debug mode models. 123 124 This can run the TensorFlow Lite converted models equipped with debug ops and 125 collect debug information. This debugger calculates statistics from 126 user-defined post-processing functions as well as default ones. 127 """ 128 129 def __init__(self, 130 quant_debug_model_path: Optional[str] = None, 131 quant_debug_model_content: Optional[bytes] = None, 132 float_model_path: Optional[str] = None, 133 float_model_content: Optional[bytes] = None, 134 debug_dataset: Optional[Callable[ 135 [], Iterable[Sequence[np.ndarray]]]] = None, 136 debug_options: Optional[QuantizationDebugOptions] = None, 137 converter: Optional[TFLiteConverter] = None) -> None: 138 """Runs the TFLite debugging model with given debug options. 139 140 Args: 141 quant_debug_model_path: Path to the quantized debug TFLite model file. 142 quant_debug_model_content: Content of the quantized debug TFLite model. 143 float_model_path: Path to float TFLite model file. 144 float_model_content: Content of the float TFLite model. 145 debug_dataset: a factory function that returns dataset generator which is 146 used to generate input samples (list of np.ndarray) for the model. The 147 generated elements must have same types and shape as inputs to the 148 model. 149 debug_options: Debug options to debug the given model. 150 converter: Optional, use converter instead of quantized model. 151 152 Raises: 153 ValueError: If the debugger was unable to be created. 154 155 Attributes: 156 layer_statistics: results of error metrics for each NumericVerify op 157 results. in {layer_name: {metric_name: metric}} format. 158 model_statistics: results of error metrics for difference between float 159 and quantized models. in {metric_name: metric} format. 160 """ 161 self._data_gen = debug_dataset 162 self._debug_options = debug_options or QuantizationDebugOptions() 163 self.converter = None 164 self.calibrated_model = None 165 self.float_model = None 166 self._float_interpreter = None 167 if converter is not None: 168 if self._debug_options.model_debug_metrics: 169 old_optimizations = converter.optimizations 170 self.converter = self._set_converter_options_for_float(converter) 171 self.float_model = self.converter.convert() 172 converter.optimizations = old_optimizations 173 174 self.converter = self._set_converter_options_for_calibration(converter) 175 self.calibrated_model = self.converter.convert() 176 # Converter should be already set up with all options 177 self._init_from_converter( 178 self._debug_options, 179 self.converter, 180 self.calibrated_model, 181 float_model=self.float_model) 182 else: 183 self._quant_interpreter = _interpreter.Interpreter( 184 quant_debug_model_path, 185 quant_debug_model_content, 186 experimental_preserve_all_tensors=( 187 self._debug_options.layer_direct_compare_metrics is not None)) 188 if self._debug_options.model_debug_metrics: 189 self._float_interpreter = _interpreter.Interpreter( 190 float_model_path, float_model_content) 191 self._initialize_stats() 192 193 @property 194 def options(self) -> QuantizationDebugOptions: 195 return self._debug_options 196 197 @options.setter 198 def options(self, options: QuantizationDebugOptions) -> None: 199 self._debug_options = options 200 if not self.converter or not self.calibrated_model: 201 return 202 self._init_from_converter( 203 self._debug_options, 204 self.converter, 205 self.calibrated_model, 206 float_model=self.float_model) 207 self._initialize_stats() 208 209 def _initialize_stats(self): 210 """Helper function initializes stats.""" 211 # TODO(b/177749613) : Fix the dependency on tf.lite._get_ops_details() 212 # Following code is needed to get op's name from the output tensor index, 213 # since NumericVerify op only provides its quantized input tensor index. 214 self._defining_op = dict() 215 for op_info in self._quant_interpreter._get_ops_details(): # pylint: disable=protected-access 216 self._defining_op.update( 217 {tensor_idx: op_info['index'] for tensor_idx in op_info['outputs']}) 218 219 self._numeric_verify_tensor_details = None 220 self._numeric_verify_op_details = None 221 if not self._get_numeric_verify_tensor_details(): 222 raise ValueError('Please check if the quantized model is in debug mode') 223 224 self._layer_debug_metrics = _DEFAULT_LAYER_DEBUG_METRICS.copy() 225 if self._debug_options.layer_debug_metrics: 226 self._layer_debug_metrics.update(self._debug_options.layer_debug_metrics) 227 228 self.layer_statistics = None 229 self.model_statistics = None 230 231 self._metrics = metrics_stub.TFLiteMetrics() 232 self._metrics.increase_counter_debugger_creation() 233 234 def _get_quantized_model(self, is_debug: bool) -> bytes: 235 if not self.converter: 236 raise ValueError('No converter found, use this function with the ' 237 'converter option in the constructor.') 238 239 return convert.mlir_quantize( 240 self.calibrated_model, 241 disable_per_channel=self.converter._experimental_disable_per_channel, # pylint: disable=protected-access 242 fully_quantize=self._debug_options.fully_quantize, 243 enable_numeric_verify=is_debug, 244 denylisted_ops=self._debug_options.denylisted_ops, 245 denylisted_nodes=self._debug_options.denylisted_nodes) 246 247 def get_nondebug_quantized_model(self) -> bytes: 248 """Returns a non-instrumented quantized model. 249 250 Convert the quantized model with the initialized converter and 251 return bytes for nondebug model. The model will not be instrumented with 252 numeric verification operations. 253 254 Returns: 255 Model bytes corresponding to the model. 256 Raises: 257 ValueError: if converter is not passed to the debugger. 258 """ 259 return self._get_quantized_model(is_debug=False) 260 261 def get_debug_quantized_model(self) -> bytes: 262 """Returns an instrumented quantized model. 263 264 Convert the quantized model with the initialized converter and 265 return bytes for model. The model will be instrumented with numeric 266 verification operations and should only be used for debugging. 267 268 Returns: 269 Model bytes corresponding to the model. 270 Raises: 271 ValueError: if converter is not passed to the debugger. 272 """ 273 return self._get_quantized_model(is_debug=True) 274 275 def _init_from_converter(self, 276 options: QuantizationDebugOptions, 277 converter: TFLiteConverter, 278 calibrated_model: Optional[bytes] = None, 279 float_model: Optional[bytes] = None) -> None: 280 """Convert the model and apply options. 281 282 Converts the quantized model and initializes a quantized model interpreter 283 with the quantized model. Returns a float model interpreter if float model 284 is provided. 285 286 Args: 287 options: a QuantizationDebugOptions object. 288 converter: an initialized tf.lite.TFLiteConverter. 289 calibrated_model: Calibrated model bytes. 290 float_model: Float model bytes. 291 """ 292 self.quant_model = convert.mlir_quantize( 293 calibrated_model, 294 disable_per_channel=converter._experimental_disable_per_channel, # pylint: disable=protected-access 295 fully_quantize=options.fully_quantize, 296 enable_numeric_verify=True, 297 denylisted_ops=options.denylisted_ops, 298 denylisted_nodes=options.denylisted_nodes) 299 self._quant_interpreter = _interpreter.Interpreter( 300 model_content=self.quant_model) 301 self._float_interpreter = None 302 if float_model is not None: 303 self._float_interpreter = _interpreter.Interpreter( 304 model_content=float_model) 305 306 def _set_converter_options_for_float( 307 self, converter: TFLiteConverter) -> TFLiteConverter: 308 """Verify converter options and set required experimental options.""" 309 if converter.optimizations: 310 converter.optimizations = [] 311 return converter 312 313 def _set_converter_options_for_calibration( 314 self, converter: TFLiteConverter) -> TFLiteConverter: 315 """Verify converter options and set required experimental options.""" 316 if not converter.optimizations: 317 raise ValueError( 318 'converter object must set optimizations to lite.Optimize.DEFAULT') 319 if not converter.representative_dataset: 320 raise ValueError('converter object must set representative_dataset') 321 322 converter.experimental_mlir_quantizer = True 323 converter._experimental_calibrate_only = True # pylint: disable=protected-access 324 return converter 325 326 def run(self) -> None: 327 """Runs models and gets metrics.""" 328 self.layer_statistics = self._collect_layer_statistics() 329 if self._debug_options.model_debug_metrics: 330 self.model_statistics = self._collect_model_statistics() 331 332 def _collect_layer_statistics(self) -> Dict[str, Dict[str, float]]: 333 """Collects layer statistics by applying layer debug metrics. 334 335 For all data from the given RepresentativeDataset, collect statistics per 336 example by getting the NumericVerify op results in _quant_interpreter 337 and calculating layer debug metrics on the results. 338 339 Returns: 340 aggregated per-layer statistics of NumericVerify results. 341 {layer_name: {metric_name: metric}} 342 """ 343 layer_statistics = collections.defaultdict( 344 lambda: collections.defaultdict(list)) 345 346 initialize = True 347 for tensor_data in self._data_gen(): 348 self._set_input_tensors(self._quant_interpreter, tensor_data, initialize) 349 initialize = False 350 351 # Run the model. 352 self._quant_interpreter.invoke() 353 354 # Collect the statistics of this invoke result. 355 for tensor_detail in self._get_numeric_verify_tensor_details(): 356 tensor_name = tensor_detail['name'] # pytype: disable=unsupported-operands # dynamic-method-lookup 357 diffs = self._quant_interpreter.get_tensor(tensor_detail['index']) # pytype: disable=unsupported-operands # dynamic-method-lookup 358 for metric_name, metric_fn in self._layer_debug_metrics.items(): 359 layer_statistics[tensor_name][metric_name].append(metric_fn(diffs)) 360 361 if self._debug_options.layer_direct_compare_metrics is not None: 362 for tensor_detail in self._get_numeric_verify_tensor_details(): 363 tensor_name = tensor_detail['name'] # pytype: disable=unsupported-operands # dynamic-method-lookup 364 op_idx = self._defining_op[tensor_detail['index']] # pytype: disable=unsupported-operands # dynamic-method-lookup 365 op_detail = self._quant_interpreter._get_op_details(op_idx) # pylint: disable=protected-access 366 q_idx, f_idx = op_detail['inputs'] 367 quant_input_detail = self._quant_interpreter._get_tensor_details( # pylint: disable=protected-access 368 q_idx) 369 for (metric_name, metric_fn 370 ) in self._debug_options.layer_direct_compare_metrics.items(): 371 layer_statistics[tensor_name][metric_name].append( 372 metric_fn( 373 self._quant_interpreter.get_tensor(f_idx), 374 self._quant_interpreter.get_tensor(q_idx), 375 quant_input_detail['quantization_parameters']['scales'][0], 376 quant_input_detail['quantization_parameters']['zero_points'] 377 [0])) 378 379 # Calculate final aggregated metrics for each layer. 380 for metrics in layer_statistics.values(): 381 for metric_name in metrics: 382 metrics[metric_name] = np.nanmean(metrics[metric_name]) 383 384 return layer_statistics 385 386 def _collect_model_statistics(self) -> Dict[str, float]: 387 """Collects model output metrics. 388 389 For all data from the given RepresentativeDataset, collect all model output 390 results from float model & quantized debug model, and calculate metrics 391 by using model output functions. As a result, self.model_results is filled, 392 393 where self.model_results[model_output_function_name] = `aggregated model 394 output function value` (a scalar). 395 396 Returns: 397 aggregated per-model output discrepancy metrics. 398 {metric_name: aggregated_metric} 399 """ 400 401 model_statistics = collections.defaultdict(list) 402 403 initialize = True 404 for tensor_data in self._data_gen(): 405 self._set_input_tensors(self._quant_interpreter, tensor_data, initialize) 406 self._set_input_tensors(self._float_interpreter, tensor_data, initialize) 407 initialize = False 408 409 # Run the models. 410 self._quant_interpreter.invoke() 411 self._float_interpreter.invoke() 412 413 # Collect the output results from both models. 414 float_tensor_data = self._get_output_tensors(self._float_interpreter) 415 quant_tensor_data = self._get_output_tensors(self._quant_interpreter) 416 417 # Calculate the metrics. 418 for (metric_name, 419 metric_fn) in self._debug_options.model_debug_metrics.items(): 420 model_statistics[metric_name].append( 421 metric_fn(float_tensor_data, quant_tensor_data)) 422 423 # Calculate final aggregated metrics for each outputs. 424 return { 425 metric_name: np.mean(metric) 426 for metric_name, metric in model_statistics.items() 427 } 428 429 def _set_input_tensors(self, interpreter: _interpreter.Interpreter, 430 tensor_data: Sequence[np.ndarray], 431 initialize: bool) -> None: 432 """Sets input tensors into TFLite model Interpreter. 433 434 Args: 435 interpreter: a tf.lite.Interpreter object with allocated tensors. 436 tensor_data: a list of Numpy array data. 437 initialize: set to true when input is first set for the interpreter, to 438 set input shapes and allocate tensors. 439 440 Raises: 441 ValueError: when inputs can't be set, or size of provided inputs does not 442 match size of model inputs. 443 """ 444 input_details = interpreter.get_input_details() 445 if len(input_details) != len(tensor_data): 446 raise ValueError( 447 'Number of inputs provided ({}) does not match number of inputs to ' 448 'the model ({})'.format(len(tensor_data), len(input_details))) 449 450 if initialize: 451 for input_detail, tensor in zip(input_details, tensor_data): 452 interpreter.resize_tensor_input(input_detail['index'], tensor.shape) 453 interpreter.allocate_tensors() 454 455 for input_detail, tensor in zip(input_details, tensor_data): 456 if tensor.dtype == np.float32 and input_detail['dtype'] == np.int8: 457 quant_params = _get_quant_params(input_detail) 458 if quant_params: 459 scale, zero_point = quant_params 460 tensor = np.round((tensor / scale) + zero_point).astype(np.int8) 461 interpreter.set_tensor(input_detail['index'], tensor) 462 463 def _get_output_tensors( 464 self, interpreter: _interpreter.Interpreter) -> List[np.ndarray]: 465 """Returns output tensors of given TFLite model Interpreter. 466 467 Args: 468 interpreter: a tf.lite.Interpreter object with allocated tensors. 469 470 Returns: 471 a list of numpy arrays representing output tensor results. 472 """ 473 474 outputs = [] 475 for output_detail in interpreter.get_output_details(): 476 tensor = interpreter.get_tensor(output_detail['index']) 477 if output_detail['dtype'] == np.int8: 478 quant_params = _get_quant_params(output_detail) 479 if quant_params: 480 scale, zero_point = quant_params 481 tensor = ((tensor.astype(np.float32) - zero_point) * scale).astype( 482 np.float32) 483 outputs.append(tensor) 484 485 return outputs 486 487 def _get_numeric_verify_tensor_details(self) -> List[str]: 488 """Returns all names of all tensors from NumericVerify op.""" 489 # pylint: disable=protected-access 490 if not self._numeric_verify_tensor_details: 491 self._numeric_verify_tensor_details = [] 492 self._numeric_verify_op_details = {} 493 for op_info in self._quant_interpreter._get_ops_details(): 494 if op_info['op_name'] == _NUMERIC_VERIFY_OP_NAME: 495 self._numeric_verify_tensor_details.append( 496 self._quant_interpreter._get_tensor_details( 497 op_info['outputs'][0])) 498 tensor_name = self._numeric_verify_tensor_details[-1]['name'] 499 self._numeric_verify_op_details[tensor_name] = op_info 500 # pylint: enable=protected-access 501 return self._numeric_verify_tensor_details 502 503 def _get_operand_name_and_index(self, 504 numeric_verify_name: str) -> Tuple[str, int]: 505 """Gets the index and name of NumericVerify Op's quantized input tensor. 506 507 Args: 508 numeric_verify_name: name of the NumericVerify op's output tensor. It has 509 format of `NumericVerify/{quantized_tensor_name}:{quantized_tensor_idx}` 510 511 Returns: 512 Tuple of (tensor_name, tensor_idx) for quantized op's output tensor. 513 """ 514 tensor_name, tensor_idx = numeric_verify_name.rsplit(':', 1) 515 float_tensor_name = tensor_name[len(_NUMERIC_VERIFY_OP_NAME) + 1:] 516 if re.match(r'\d', float_tensor_name[-1]): 517 float_tensor_name = float_tensor_name[:-1] 518 519 return (float_tensor_name, int(tensor_idx)) 520 521 def layer_statistics_dump(self, file: IO[str]) -> None: 522 """Dumps layer statistics into file, in csv format. 523 524 Args: 525 file: file, or file-like object to write. 526 """ 527 # order of `fields` is the order of fields in csv. 528 fields = ['op_name', 'tensor_idx'] + list(self._layer_debug_metrics.keys()) 529 if self._debug_options.layer_direct_compare_metrics is not None: 530 fields += list(self._debug_options.layer_direct_compare_metrics.keys()) 531 fields += ['scale', 'zero_point', 'tensor_name'] 532 writer = csv.DictWriter(file, fields) 533 writer.writeheader() 534 for name, metrics in self.layer_statistics.items(): 535 data = metrics.copy() 536 (data['tensor_name'], _) = self._get_operand_name_and_index(name) 537 data['tensor_idx'] = self._numeric_verify_op_details[name]['inputs'][0] 538 data['op_name'] = self._quant_interpreter._get_op_details( # pylint: disable=protected-access 539 self._defining_op[data['tensor_idx']])['op_name'] 540 details = self._quant_interpreter._get_tensor_details(data['tensor_idx']) # pylint: disable=protected-access 541 data['scale'], data['zero_point'] = ( 542 details['quantization_parameters']['scales'][0], 543 details['quantization_parameters']['zero_points'][0]) 544 writer.writerow(data) 545