1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Reader class for tfdbg v2 debug events.""" 16 17import collections 18import os 19import threading 20 21from tensorflow.core.protobuf import debug_event_pb2 22from tensorflow.python.framework import errors 23from tensorflow.python.framework import tensor_util 24from tensorflow.python.lib.io import file_io 25from tensorflow.python.lib.io import tf_record 26from tensorflow.python.util import compat 27 28 29DebugEventWithOffset = collections.namedtuple( 30 "DebugEventWithOffset", "debug_event offset") 31 32 33class DebugEventsReader: 34 """Reader class for a tfdbg v2 DebugEvents directory.""" 35 36 # Number of digests after which a read lock is released and re-acquired during 37 # serial reading of digests for SourceFiles, Execution, and 38 # GraphExecutionTrace. This allows us to avoid releasing and re-acquiring the 39 # lock too often (i.e., after each digest) and to minimize performance 40 # penalty. 41 _READER_RELEASE_PER = 100 42 43 _METADATA_SUFFIX = ".metadata" 44 _SOURCE_FILE_SUFFIX = ".source_files" 45 _STACK_FRAMES_SUFFIX = ".stack_frames" 46 _GRAPHS_SUFFIX = ".graphs" 47 _EXECUTION_SUFFIX = ".execution" 48 _GRAPH_EXECUTION_TRACES_SUFFIX = ".graph_execution_traces" 49 50 def __init__(self, dump_root): 51 if not file_io.is_directory(dump_root): 52 raise ValueError("Specified dump_root is not a directory: %s" % dump_root) 53 self._dump_root = dump_root 54 self._metadata_paths = self._load_metadata_files() 55 56 prefixes = [ 57 metadata_path[:-len(self._METADATA_SUFFIX)] 58 for metadata_path in self._metadata_paths 59 ] 60 prefix = prefixes[0] # This is the prefix of the main file set. 61 self._source_files_path = compat.as_bytes(prefix + self._SOURCE_FILE_SUFFIX) 62 self._stack_frames_path = compat.as_bytes(prefix + 63 self._STACK_FRAMES_SUFFIX) 64 self._graphs_path = compat.as_bytes(prefix + self._GRAPHS_SUFFIX) 65 self._execution_path = compat.as_bytes(prefix + self._EXECUTION_SUFFIX) 66 # There can be multiple .graph_execution_trace files each belonging 67 # to a file set generated on an individual host, in the case of 68 # a distributed TensorFlow job. 69 # This is different from the other debug event files in the file set. 70 self._graph_execution_traces_paths = [ 71 compat.as_bytes(prefix + self._GRAPH_EXECUTION_TRACES_SUFFIX) 72 for prefix in prefixes 73 ] 74 self._readers = dict() # A map from file path to reader. 75 # A map from file path to current reading offset. 76 self._reader_offsets = dict() 77 # Lock for reader creation. 78 self._readers_lock = threading.Lock() 79 # Locks for read operation on individual readers. 80 self._reader_read_locks = dict() 81 82 self._offsets = dict() 83 84 def _load_metadata_files(self): 85 """Load and parse metadata files in the dump root. 86 87 Check that all metadata files have a common tfdbg_run_id, and raise 88 a ValueError if their tfdbg_run_ids differ. 89 90 Returns: 91 A list of metadata file paths in ascending order of their starting 92 wall_time timestamp. 93 """ 94 95 metadata_paths = file_io.get_matching_files( 96 os.path.join(self._dump_root, "*%s" % self._METADATA_SUFFIX)) 97 if not metadata_paths: 98 raise ValueError("Cannot find any tfdbg metadata file in directory: %s" % 99 self._dump_root) 100 wall_times = [] 101 run_ids = [] 102 tensorflow_versions = [] 103 file_versions = [] 104 for metadata_path in metadata_paths: 105 reader = tf_record.tf_record_random_reader(metadata_path) 106 try: 107 record = reader.read(0)[0] 108 debug_event = debug_event_pb2.DebugEvent.FromString(record) 109 wall_times.append(debug_event.wall_time) 110 run_ids.append(debug_event.debug_metadata.tfdbg_run_id) 111 tensorflow_versions.append( 112 debug_event.debug_metadata.tensorflow_version) 113 file_versions.append(debug_event.debug_metadata.file_version) 114 finally: 115 reader.close() 116 self._starting_wall_time = wall_times[0] 117 self._tfdbg_run_id = run_ids[0] 118 self._tensorflow_version = tensorflow_versions[0] 119 self._file_version = file_versions[0] 120 if len(metadata_paths) == 1: 121 # Fast path for a common case (only one DebugEvent file set.) 122 return metadata_paths 123 124 num_no_id = len([run_id for run_id in run_ids if not run_id]) 125 if num_no_id: 126 paths_without_run_id = [ 127 metadata_path 128 for metadata_path, run_id in zip(metadata_paths, run_ids) 129 if not run_id 130 ] 131 raise ValueError( 132 "Found %d tfdbg metadata files and %d of them do not " 133 "have tfdbg run ids. The metadata files without run ids are: %s" % 134 (len(run_ids), num_no_id, paths_without_run_id)) 135 elif len(set(run_ids)) != 1: 136 raise ValueError( 137 "Unexpected: Found multiple (%d) tfdbg2 runs in directory %s" % 138 (len(set(run_ids)), self._dump_root)) 139 # Return the metadata files in ascending order of their timestamps. 140 paths_and_timestamps = sorted( 141 zip(metadata_paths, wall_times), key=lambda t: t[1]) 142 self._starting_wall_time = paths_and_timestamps[0][1] 143 return [path[0] for path in paths_and_timestamps] 144 145 def starting_wall_time(self): 146 """Get the starting timestamp of the instrumented TensorFlow program. 147 148 When there are multiple hosts (i.e., multiple tfdbg file sets), the earliest 149 timestamp among the file sets is returned. It is assumed to be the job that 150 starts first (e.g., the coordinator). 151 152 Returns: 153 Starting timestamp in seconds since the epoch, as a float. 154 """ 155 return self._starting_wall_time 156 157 def tfdbg_run_id(self): 158 """Get the run ID of the instrumented TensorFlow program.""" 159 return self._tfdbg_run_id 160 161 def tensorflow_version(self): 162 """Get the version string of TensorFlow that the debugged program ran on.""" 163 return self._tensorflow_version 164 165 def tfdbg_file_version(self): 166 """Get the tfdbg file format version.""" 167 return self._file_version 168 169 def __enter__(self): 170 return self 171 172 def __exit__(self, exception_type, exception_value, traceback): 173 del exception_type, exception_value, traceback # Unused 174 self.close() 175 176 def _generic_iterator(self, file_path): 177 """A helper method that makes an iterator given a debug-events file path. 178 179 Repeated calls to this method create iterators that remember the last 180 successful reading position (offset) for each given `file_path`. So the 181 iterators are meant for incremental reading of the file. 182 183 Args: 184 file_path: Path to the file to create the iterator for. 185 186 Yields: 187 A tuple of (offset, debug_event_proto) on each `next()` call. 188 """ 189 yield_count = 0 190 reader = self._get_reader(file_path) 191 read_lock = self._reader_read_locks[file_path] 192 read_lock.acquire() 193 try: 194 while True: 195 current_offset = self._reader_offsets[file_path] 196 try: 197 record, self._reader_offsets[file_path] = reader.read(current_offset) 198 except (errors.DataLossError, IndexError): 199 # We ignore partial read exceptions, because a record may be 200 # truncated. The PyRandomRecordReader throws an `IndexError` when 201 # offset goes out of bound. 202 break 203 yield DebugEventWithOffset( 204 debug_event=debug_event_pb2.DebugEvent.FromString(record), 205 offset=current_offset) 206 yield_count += 1 207 # The read lock must be periodically released to allow for concurrent 208 # random reads. But we do so at a number of reads, instead of after 209 # every single read, in order to minimize the performance penalty. 210 if yield_count % self._READER_RELEASE_PER == 0: 211 read_lock.release() 212 read_lock.acquire() 213 finally: 214 read_lock.release() 215 216 def _get_reader(self, file_path): 217 """Get a random-access reader for TFRecords file at file_path.""" 218 file_path = compat.as_bytes(file_path) 219 # The following code uses the double-checked locking pattern to optimize 220 # the common case (where the reader is already initialized). 221 if file_path not in self._readers: # 1st check, without lock. 222 with self._readers_lock: 223 if file_path not in self._readers: # 2nd check, with lock. 224 self._readers[file_path] = tf_record.tf_record_random_reader( 225 file_path) 226 self._reader_read_locks[file_path] = threading.Lock() 227 self._reader_offsets[file_path] = 0 228 return self._readers[file_path] 229 230 def source_files_iterator(self): 231 return self._generic_iterator(self._source_files_path) 232 233 def stack_frames_iterator(self): 234 return self._generic_iterator(self._stack_frames_path) 235 236 def graphs_iterator(self): 237 return self._generic_iterator(self._graphs_path) 238 239 def read_source_files_event(self, offset): 240 """Read a DebugEvent proto at given offset from the .source_files file.""" 241 with self._reader_read_locks[self._source_files_path]: 242 proto_string = self._get_reader(self._source_files_path).read(offset)[0] 243 return debug_event_pb2.DebugEvent.FromString(proto_string) 244 245 def read_graphs_event(self, offset): 246 """Read a DebugEvent proto at a given offset from the .graphs file. 247 248 Args: 249 offset: Offset to read the DebugEvent proto from. 250 251 Returns: 252 A DebugEventProto. 253 254 Raises: 255 `errors.DataLossError` if offset is at a wrong location. 256 `IndexError` if offset is out of range of the file. 257 """ 258 return debug_event_pb2.DebugEvent.FromString( 259 self._get_reader(self._graphs_path).read(offset)[0]) 260 261 def execution_iterator(self): 262 return self._generic_iterator(self._execution_path) 263 264 def read_execution_event(self, offset): 265 """Read a DebugEvent proto at a given offset from the .execution file. 266 267 Args: 268 offset: Offset to read the DebugEvent proto from. 269 270 Returns: 271 A DebugEventProto. 272 273 Raises: 274 `errors.DataLossError` if offset is at a wrong location. 275 `IndexError` if offset is out of range of the file. 276 """ 277 with self._reader_read_locks[self._execution_path]: 278 proto_string = self._get_reader(self._execution_path).read(offset)[0] 279 return debug_event_pb2.DebugEvent.FromString(proto_string) 280 281 def graph_execution_traces_iterators(self): 282 return [ 283 self._generic_iterator(path) 284 for path in self._graph_execution_traces_paths 285 ] 286 287 def read_graph_execution_traces_event(self, locator): 288 """Read DebugEvent at given offset from given .graph_execution_traces file. 289 290 Args: 291 locator: A (file_index, offset) tuple that locates the DebugEvent 292 containing the graph execution trace. 293 294 Returns: 295 A DebugEventProto. 296 297 Raises: 298 `errors.DataLossError` if offset is at a wrong location. 299 `IndexError` if offset is out of range of the file. 300 """ 301 file_index, offset = locator 302 graph_execution_traces_path = self._graph_execution_traces_paths[file_index] 303 with self._reader_read_locks[graph_execution_traces_path]: 304 proto_string = self._get_reader(graph_execution_traces_path).read( 305 offset)[0] 306 return debug_event_pb2.DebugEvent.FromString(proto_string) 307 308 def close(self): 309 with self._readers_lock: 310 file_paths = list(self._readers.keys()) 311 for file_path in file_paths: 312 self._readers[file_path].close() 313 del self._readers[file_path] 314 315 316class BaseDigest: 317 """Base class for digest. 318 319 Properties: 320 wall_time: A timestamp for the digest as a `float` (unit: s). 321 locator: A datum that allows tracng the digest to its original 322 location. It can be either of the two: 323 1. Bytes offset from the beginning of the file as a single integer, 324 for the case of all digests of the same kind coming from the same 325 file. 326 2. A tuple of a file index and a byte offset. This applies to case 327 in which the same type of debugger data may come from multple files, 328 e.g., graph execution traces. 329 """ 330 331 def __init__(self, wall_time, locator): 332 self._wall_time = wall_time 333 self._locator = locator 334 335 @property 336 def wall_time(self): 337 return self._wall_time 338 339 @property 340 def locator(self): 341 return self._locator 342 343 def to_json(self): 344 return {"wall_time": self.wall_time} 345 346 347class ExecutionDigest(BaseDigest): 348 """Light-weight digest summarizing top-level execution event. 349 350 Use `DebugDataReader.read_execution(execution_digest)` to load the more 351 detailed data object concerning the execution event (`Execution`). 352 353 Properties: 354 op_type: Type name of the executed op. In the case of the eager execution of 355 an individual op, it is the name of the op (e.g., "MatMul"). 356 In the case of the execution of a tf.function (FuncGraph), this is the 357 internally-generated name of the function (e.g., 358 "__inference_my_func_123"). 359 output_tensor_device_ids: IDs of the devices on which the output tensors of 360 the execution reside. For no-output execution, this is `None`. 361 """ 362 363 def __init__(self, 364 wall_time, 365 locator, 366 op_type, 367 output_tensor_device_ids=None): 368 super().__init__(wall_time, locator) 369 self._op_type = op_type 370 self._output_tensor_device_ids = _tuple_or_none(output_tensor_device_ids) 371 372 @property 373 def op_type(self): 374 return self._op_type 375 376 @property 377 def output_tensor_device_ids(self): 378 return self._output_tensor_device_ids 379 380 def to_json(self): 381 output = super().to_json() 382 output.update({ 383 "op_type": self.op_type, 384 "output_tensor_device_ids": self.output_tensor_device_ids, 385 }) 386 return output 387 388 389def _tuple_or_none(data): 390 return tuple(data) if data else None 391 392 393class Execution(ExecutionDigest): 394 """Detailed data relating to a top-level execution event. 395 396 The execution is of an individual op or a tf.function, which may have any 397 number of output tensors. 398 399 Properties (beyond the base class `ExecutionDigest`): 400 host_name: Name of the host on which the execution happened. 401 stack_frame_ids: Reference IDs for stack frames, ordered from bottommost to 402 topmost. Use `DebugDataReader.read_execution_stack_trace()` to load the 403 detailed stack frames (filepath, lineno and function name). 404 tensor_debug_mode: TensorDebugMode enum value, as an `int`. 405 graph_id: ID of the executed FuncGraph (applicable only the execution of a 406 tf.function). `None` for the eager execution of an individual op. 407 input_tensor_ids: IDs of the input (eager) tensor(s) for this execution, if 408 any. If the eager execution has no input tensor, this is `None`. Else, 409 this is a `tuple` of `int`s. 410 output_tensor_ids: IDs of the output (eager) tensor(s) from this execution, 411 if any. If the eager execution produces no output tensor, this is `None`. 412 Else, this is a `tuple` of `int`s. 413 debug_tensor_values: Values of the debug tensor(s), applicable only to 414 non-FULL_TENSOR tensor debug mode. A tuple of list of numbers. Each 415 element of the tuple corresponds to an output tensor of the execution. 416 See documentation of the various TensorDebugModes for the semantics of the 417 numbers. If the eager execution produces no output tensor, this is 418 `None`. Else, this is a `tuple` of `list`s. 419 """ 420 421 def __init__(self, 422 execution_digest, 423 host_name, 424 stack_frame_ids, 425 tensor_debug_mode, 426 graph_id=None, 427 input_tensor_ids=None, 428 output_tensor_ids=None, 429 debug_tensor_values=None): 430 super().__init__( 431 execution_digest.wall_time, 432 execution_digest.locator, 433 execution_digest.op_type, 434 output_tensor_device_ids=execution_digest.output_tensor_device_ids) 435 self._host_name = host_name 436 self._stack_frame_ids = tuple(stack_frame_ids) 437 self._tensor_debug_mode = tensor_debug_mode 438 self._graph_id = graph_id 439 self._input_tensor_ids = _tuple_or_none(input_tensor_ids) 440 self._output_tensor_ids = _tuple_or_none(output_tensor_ids) 441 self._debug_tensor_values = _tuple_or_none(debug_tensor_values) 442 443 @property 444 def host_name(self): 445 return self._host_name 446 447 @property 448 def stack_frame_ids(self): 449 return self._stack_frame_ids 450 451 @property 452 def tensor_debug_mode(self): 453 return self._tensor_debug_mode 454 455 @property 456 def graph_id(self): 457 return self._graph_id 458 459 @property 460 def input_tensor_ids(self): 461 return self._input_tensor_ids 462 463 @property 464 def num_outputs(self): 465 return len(self._output_tensor_ids) if self._output_tensor_ids else 0 466 467 @property 468 def output_tensor_ids(self): 469 return self._output_tensor_ids 470 471 @property 472 def debug_tensor_values(self): 473 return self._debug_tensor_values 474 475 def to_json(self): 476 output = super().to_json() 477 output.update({ 478 "host_name": self.host_name, 479 "stack_frame_ids": self.stack_frame_ids, 480 "tensor_debug_mode": self.tensor_debug_mode, 481 "graph_id": self.graph_id, 482 "input_tensor_ids": self.input_tensor_ids, 483 "output_tensor_ids": self.output_tensor_ids, 484 "debug_tensor_values": self.debug_tensor_values, 485 }) 486 return output 487 488 489class DebuggedGraph: 490 """Data object representing debugging information about a tf.Graph. 491 492 Includes `FuncGraph`s. 493 494 Properties: 495 name: Name of the graph (if any). May be `None` for non-function graphs. 496 graph_id: Debugger-generated ID for the graph. 497 inner_graph_ids: A list of the debugger-generated IDs for the graphs 498 enclosed by this graph. 499 outer_graph_id: If this graph is nested within an outer graph, ID of the 500 outer graph. If this is an outermost graph, `None`. 501 """ 502 503 def __init__(self, 504 name, 505 graph_id, 506 outer_graph_id=None): 507 self._name = name 508 self._graph_id = graph_id 509 self._outer_graph_id = outer_graph_id 510 self._inner_graph_ids = [] 511 # A dictionary from op name to GraphOpCreationDigest. 512 self._op_by_name = dict() 513 # A dictionary mapping op to immediate downstream consumers. 514 self._op_consumers = collections.defaultdict(list) 515 516 def add_inner_graph_id(self, inner_graph_id): 517 """Add the debugger-generated ID of a graph nested within this graph. 518 519 Args: 520 inner_graph_id: The debugger-generated ID of the nested inner graph. 521 """ 522 assert isinstance(inner_graph_id, str) 523 self._inner_graph_ids.append(inner_graph_id) 524 525 def add_op(self, graph_op_creation_digest): 526 """Add an op creation data object. 527 528 Args: 529 graph_op_creation_digest: A GraphOpCreationDigest data object describing 530 the creation of an op inside this graph. 531 """ 532 if graph_op_creation_digest.op_name in self._op_by_name: 533 raise ValueError( 534 "Duplicate op name: %s (op type: %s)" % 535 (graph_op_creation_digest.op_name, graph_op_creation_digest.op_type)) 536 self._op_by_name[ 537 graph_op_creation_digest.op_name] = graph_op_creation_digest 538 539 def add_op_consumer(self, src_op_name, src_slot, dst_op_name, dst_slot): 540 """Add a consuming op for this op. 541 542 Args: 543 src_op_name: Name of the op of which the output tensor is being consumed. 544 src_slot: 0-based output slot of the op being consumed. 545 dst_op_name: Name of the consuming op (e.g., "Conv2D_3/BiasAdd") 546 dst_slot: 0-based input slot of the consuming op that receives the tensor 547 from this op. 548 """ 549 self._op_consumers[src_op_name].append((src_slot, dst_op_name, dst_slot)) 550 551 @property 552 def name(self): 553 return self._name 554 555 @property 556 def graph_id(self): 557 return self._graph_id 558 559 @property 560 def outer_graph_id(self): 561 return self._outer_graph_id 562 563 @property 564 def inner_graph_ids(self): 565 return self._inner_graph_ids 566 567 def get_tensor_id(self, op_name, output_slot): 568 """Get the ID of a symbolic tensor in this graph.""" 569 return self._op_by_name[op_name].output_tensor_ids[output_slot] 570 571 def get_op_creation_digest(self, op_name): 572 """Get the GraphOpCreationDigest for a op in the graph.""" 573 return self._op_by_name[op_name] 574 575 def get_op_consumers(self, src_op_name): 576 """Get all the downstream consumers of this op. 577 578 Only data (non-control) edges are tracked. 579 580 Args: 581 src_op_name: Name of the op providing the tensor being consumed. 582 583 Returns: 584 A list of (src_slot, dst_op_name, dst_slot) tuples. In each item of 585 the list: 586 src_slot: 0-based output slot of the op of which the output tensor 587 is being consumed. 588 dst_op_name: Name of the consuming op (e.g., "Conv2D_3/BiasAdd") 589 dst_slot: 0-based input slot of the consuming op that receives 590 the tensor from this op. 591 """ 592 return self._op_consumers[src_op_name] 593 594 def to_json(self): 595 return { 596 "name": self.name, 597 "graph_id": self.graph_id, 598 "outer_graph_id": self._outer_graph_id, 599 "inner_graph_ids": self._inner_graph_ids, 600 } 601 602 603class DebuggedDevice: 604 """Debugger data regarding a device involved in the debugged program. 605 606 Properties: 607 device_name: Name of the device, as a str. 608 device_id: An integer ID for the device, unique for each device within 609 the scope of the debugged TensorFlow program. 610 """ 611 612 def __init__(self, 613 device_name, 614 device_id): 615 self._device_name = device_name 616 self._device_id = device_id 617 618 @property 619 def device_name(self): 620 return self._device_name 621 622 @property 623 def device_id(self): 624 return self._device_id 625 626 def to_json(self): 627 return { 628 "device_name": self._device_name, 629 "device_id": self._device_id, 630 } 631 632 633class GraphOpCreationDigest(BaseDigest): 634 """Data object describing the creation of an op inside a graph. 635 636 For size efficiency, this digest object does not contain any stack frames or 637 any references to them. To obtain the stack frames, use 638 `DataReader.read_graph_op_creation_stack_trace()`. 639 640 Properties (beyond the base class): 641 graph_id: Debugger-generated ID of the immediately-enclosing graph. 642 op_type: Type name of the op (e.g., "MatMul"). 643 op_name: Name of the op (e.g., "dense_1/MatMul"). 644 output_tensor_ids: Debugger-generated IDs for the output(s) of the op. 645 If the op produces no output tensor, this is `None`. Else, this is a 646 `tuple` of `int`s. 647 input_names: Names of the input tensors to the op. 648 device_name: The name of the device that the op is placed on (if available). 649 host_name: Name of the host on which the op is created. 650 stack_frame_ids: IDs of the frames of the stack trace at which the op 651 is created. 652 """ 653 654 def __init__(self, 655 wall_time, 656 locator, 657 graph_id, 658 op_type, 659 op_name, 660 output_tensor_ids, 661 host_name, 662 stack_frame_ids, 663 input_names=None, 664 device_name=None): 665 super().__init__(wall_time, locator) 666 self._graph_id = graph_id 667 self._op_type = op_type 668 self._op_name = op_name 669 self._output_tensor_ids = _tuple_or_none(output_tensor_ids) 670 self._host_name = host_name 671 self._stack_frame_ids = stack_frame_ids 672 self._input_names = _tuple_or_none(input_names) 673 self._device_name = device_name 674 675 @property 676 def graph_id(self): 677 return self._graph_id 678 679 @property 680 def op_type(self): 681 return self._op_type 682 683 @property 684 def op_name(self): 685 return self._op_name 686 687 @property 688 def output_tensor_ids(self): 689 return self._output_tensor_ids 690 691 @property 692 def num_outputs(self): 693 return len(self._output_tensor_ids) if self.output_tensor_ids else 0 694 695 @property 696 def input_names(self): 697 return self._input_names 698 699 @property 700 def device_name(self): 701 return self._device_name 702 703 @property 704 def host_name(self): 705 return self._host_name 706 707 @property 708 def stack_frame_ids(self): 709 return self._stack_frame_ids 710 711 def to_json(self): 712 output = super().to_json() 713 output.update({ 714 "graph_id": self.graph_id, 715 "op_type": self.op_type, 716 "op_name": self.op_name, 717 "output_tensor_ids": self.output_tensor_ids, 718 "host_name": self.host_name, 719 "stack_frame_ids": self.stack_frame_ids, 720 "input_names": self.input_names, 721 "device_name": self.device_name, 722 }) 723 return output 724 725 726class GraphExecutionTraceDigest(BaseDigest): 727 """Light-weight summary of a intra-graph tensor execution event. 728 729 Use `DebugDataReader.read_graph_execution_trace()` on this object to read more 730 detailed data (`GraphExecutionTrace`). 731 732 Properties (beyond the base class): 733 op_type: Type name of the executed op (e.g., "Conv2D"). 734 op_name: Name of the op (e.g., "conv_2d_3/Conv2D"). 735 output_slot: Output slot index of the tensor. 736 graph_id: The debugger-generated ID of the innermost (immediately-enclosing) 737 graph. 738 """ 739 740 def __init__(self, wall_time, locator, op_type, op_name, output_slot, 741 graph_id): 742 super().__init__(wall_time, locator) 743 self._op_type = op_type 744 self._op_name = op_name 745 self._output_slot = output_slot 746 self._graph_id = graph_id 747 748 @property 749 def op_type(self): 750 return self._op_type 751 752 @property 753 def op_name(self): 754 return self._op_name 755 756 @property 757 def output_slot(self): 758 return self._output_slot 759 760 @property 761 def graph_id(self): 762 return self._graph_id 763 764 def to_json(self): 765 output = super().to_json() 766 output.update({ 767 "op_type": self.op_type, 768 "op_name": self.op_name, 769 "output_slot": self.output_slot, 770 "graph_id": self.graph_id, 771 }) 772 return output 773 774 775class GraphExecutionTrace(GraphExecutionTraceDigest): 776 """Detailed data object describing an intra-graph tensor execution. 777 778 Attributes (in addition to GraphExecutionTraceDigest): 779 graph_ids: The debugger-generated IDs of the graphs that enclose the 780 executed op (tensor), ordered from the outermost to the innermost. 781 graph_id: The debugger-generated ID of the innermost (immediately-enclosing) 782 graph. 783 tensor_debug_mode: TensorDebugMode enum value. 784 debug_tensor_value: Debug tensor values (only for non-FULL_TENSOR 785 tensor_debug_mode). A list of numbers. See the documentation of the 786 TensorDebugModes for the semantics of the numbers. 787 device_name: Device on which the tensor resides (if available) 788 """ 789 790 def __init__(self, 791 graph_execution_trace_digest, 792 graph_ids, 793 tensor_debug_mode, 794 debug_tensor_value=None, 795 device_name=None): 796 super().__init__(graph_execution_trace_digest.wall_time, 797 graph_execution_trace_digest.locator, 798 graph_execution_trace_digest.op_type, 799 graph_execution_trace_digest.op_name, 800 graph_execution_trace_digest.output_slot, 801 graph_execution_trace_digest.graph_id) 802 self._graph_ids = tuple(graph_ids) 803 self._tensor_debug_mode = tensor_debug_mode 804 self._debug_tensor_value = debug_tensor_value 805 self._device_name = device_name 806 807 @property 808 def graph_ids(self): 809 return self._graph_ids 810 811 @property 812 def graph_id(self): 813 return self._graph_ids[-1] 814 815 @property 816 def tensor_debug_mode(self): 817 return self._tensor_debug_mode 818 819 @property 820 def debug_tensor_value(self): 821 return _tuple_or_none(self._debug_tensor_value) 822 823 @property 824 def device_name(self): 825 return self._device_name 826 827 def to_json(self): 828 output = super().to_json() 829 output.update({ 830 "graph_ids": self.graph_ids, 831 "tensor_debug_mode": self.tensor_debug_mode, 832 "debug_tensor_value": self.debug_tensor_value, 833 "device_name": self.device_name, 834 }) 835 return output 836 837 838def _parse_tensor_value(tensor_proto, return_list=False): 839 """Helper method for reading a tensor value from a tensor proto. 840 841 The rationale for the distinction between `True` and `False value of 842 `return_list` is as follows: 843 - `return_list=True` is used for TensorDebugMode values other than 844 FULL_TENSOR, e.g., CONCISE_HEALTH, SHAPE and FULL_HEATLH. Under 845 those modes, the value is guaranteed (by contract) to be a 1D float64 846 tensor. 847 - `return_list=False` is used for the FULL_HEALTH TensorDebugMode 848 specifically. Instead, we use `numpy.ndarray` to maximally preserve 849 the shape, dtype and value information regarding the underlying tensor 850 value. Under that mode, we don't use a python list to represent the 851 tensor value because that can lead to loss of information (e.g., both 852 float16 and float32 dtypes get mapped to Python floats). 853 854 Args: 855 tensor_proto: The TensorProto instance from which the tensor value will be 856 loaded. 857 return_list: Whether the return value will be a nested Python list that 858 comes out from `numpy.ndarray.tolist()`. 859 860 Returns: 861 If parsing is successful, the tensor value as a `numpy.ndarray` or the 862 nested Python list converted from it. 863 If parsing fails, `None`. 864 """ 865 try: 866 ndarray = tensor_util.MakeNdarray(tensor_proto) 867 return ndarray.tolist() if return_list else ndarray 868 except TypeError: 869 # Depending on tensor_debug_mode, certain dtype of tensors don't 870 # have logged debug tensor values. 871 return None 872 873 874def _execution_digest_from_debug_event_proto(debug_event, locator): 875 """Convert a DebugEvent proto into an ExecutionDigest data object.""" 876 return ExecutionDigest( 877 debug_event.wall_time, 878 locator, 879 debug_event.execution.op_type, 880 output_tensor_device_ids=(debug_event.execution.output_tensor_device_ids 881 or None)) 882 883 884def _execution_from_debug_event_proto(debug_event, locator): 885 """Convert a DebugEvent proto into an Execution data object.""" 886 execution_proto = debug_event.execution 887 888 debug_tensor_values = None 889 if (execution_proto.tensor_debug_mode == 890 debug_event_pb2.TensorDebugMode.FULL_TENSOR): 891 pass # TODO(cais): Build tensor store. 892 elif (execution_proto.tensor_debug_mode != 893 debug_event_pb2.TensorDebugMode.NO_TENSOR): 894 debug_tensor_values = [] 895 for tensor_proto in execution_proto.tensor_protos: 896 # TODO(cais): Refactor into a helper method. 897 debug_tensor_values.append( 898 _parse_tensor_value(tensor_proto, return_list=True)) 899 return Execution( 900 _execution_digest_from_debug_event_proto(debug_event, locator), 901 execution_proto.code_location.host_name, 902 tuple(execution_proto.code_location.stack_frame_ids), 903 execution_proto.tensor_debug_mode, 904 graph_id=execution_proto.graph_id, 905 input_tensor_ids=tuple(execution_proto.input_tensor_ids), 906 output_tensor_ids=tuple(execution_proto.output_tensor_ids), 907 debug_tensor_values=_tuple_or_none(debug_tensor_values)) 908 909 910class DebugDataReader: 911 """A reader that reads structured debugging data in the tfdbg v2 format. 912 913 The set of data read by an object of this class concerns the execution history 914 of a tfdbg2-instrumented TensorFlow program. 915 916 Note: 917 - An object of this class incrementally reads data from files that belong to 918 the tfdbg v2 DebugEvent file set. Calling `update()` triggers the reading 919 from the last-successful reading positions in the files. 920 - This object can be used as a context manager. Its `__exit__()` call 921 closes the file readers cleanly. 922 """ 923 924 def __init__(self, dump_root): 925 self._reader = DebugEventsReader(dump_root) 926 927 # TODO(cais): Implement pagination for memory constraints. 928 self._execution_digests = [] 929 930 # Mapping (host_name, file_path) tuple to offset in the .source_files file. 931 self._host_name_file_path_to_offset = collections.OrderedDict() 932 # A dict mapping id to (host_name, file_path, lineno, func) tuple. 933 self._stack_frame_by_id = dict() 934 # Stores unprocessed stack frame IDs. This is necessary to handle the 935 # case in which reading of the .stack_frames file gets ahead of the reading 936 # of the .source_files file. 937 self._unprocessed_stack_frames = dict() 938 # A dict mapping id to DebuggedDevice objects. 939 self._device_by_id = dict() 940 # A dict mapping id to DebuggedGraph objects. 941 self._graph_by_id = dict() 942 self._graph_op_digests = [] 943 # TODO(cais): Implement pagination for memory constraints. 944 self._graph_execution_trace_digests = [] 945 946 self._monitors = [] 947 948 def _add_monitor(self, monitor): 949 self._monitors.append(monitor) 950 951 def _load_source_files(self): 952 """Incrementally read the .source_files DebugEvent file.""" 953 source_files_iter = self._reader.source_files_iterator() 954 for debug_event, offset in source_files_iter: 955 source_file = debug_event.source_file 956 self._host_name_file_path_to_offset[ 957 (source_file.host_name, source_file.file_path)] = offset 958 959 def _load_stack_frames(self): 960 """Incrementally read the .stack_frames file. 961 962 This must be called after _load_source_files(). 963 It assumes that the following contract is honored by the writer of the tfdbg 964 v2 data file set: 965 - Before a stack frame is written to the .stack_frames file, the 966 corresponding source file information must have been written to the 967 .source_files file first. 968 """ 969 stack_frames_iter = self._reader.stack_frames_iterator() 970 for debug_event, _ in stack_frames_iter: 971 stack_frame_with_id = debug_event.stack_frame_with_id 972 file_line_col = stack_frame_with_id.file_line_col 973 self._unprocessed_stack_frames[stack_frame_with_id.id] = file_line_col 974 # We do the processing in a separate stage, because the reading in the 975 # .source_files file may sometimes get ahead of the .source_files file. 976 unprocessed_stack_frame_ids = tuple(self._unprocessed_stack_frames.keys()) 977 for stack_frame_id in unprocessed_stack_frame_ids: 978 file_line_col = self._unprocessed_stack_frames[stack_frame_id] 979 if len(self._host_name_file_path_to_offset) > file_line_col.file_index: 980 host_name, file_path = list(self._host_name_file_path_to_offset.keys())[ 981 file_line_col.file_index] 982 self._stack_frame_by_id[stack_frame_id] = ( 983 host_name, file_path, file_line_col.line, file_line_col.func) 984 del self._unprocessed_stack_frames[stack_frame_id] 985 986 def _load_graphs(self): 987 """Incrementally read the .graphs file. 988 989 Compiles the DebuggedGraph and GraphOpCreation data. 990 """ 991 graphs_iter = self._reader.graphs_iterator() 992 for debug_event, offset in graphs_iter: 993 if debug_event.graph_op_creation.ByteSize(): 994 op_creation_proto = debug_event.graph_op_creation 995 op_digest = GraphOpCreationDigest( 996 debug_event.wall_time, 997 offset, 998 op_creation_proto.graph_id, 999 op_creation_proto.op_type, 1000 op_creation_proto.op_name, 1001 tuple(op_creation_proto.output_tensor_ids), 1002 op_creation_proto.code_location.host_name, 1003 tuple(op_creation_proto.code_location.stack_frame_ids), 1004 input_names=tuple(op_creation_proto.input_names)) 1005 self._graph_op_digests.append(op_digest) 1006 debugged_graph = self._graph_by_id[op_creation_proto.graph_id] 1007 debugged_graph.add_op(op_digest) 1008 for dst_slot, input_name in enumerate(op_creation_proto.input_names): 1009 src_op_name, src_slot = input_name.split(":") 1010 debugged_graph.add_op_consumer(src_op_name, int(src_slot), 1011 op_creation_proto.op_name, dst_slot) 1012 1013 elif debug_event.debugged_graph.ByteSize(): 1014 graph_proto = debug_event.debugged_graph 1015 graph = DebuggedGraph( 1016 graph_proto.graph_name or None, 1017 graph_proto.graph_id, 1018 outer_graph_id=graph_proto.outer_context_id or None) 1019 self._graph_by_id[graph_proto.graph_id] = graph 1020 if graph_proto.outer_context_id: 1021 self._graph_by_id[ 1022 graph_proto.outer_context_id].add_inner_graph_id(graph.graph_id) 1023 elif debug_event.debugged_device.ByteSize(): 1024 device_proto = debug_event.debugged_device 1025 self._device_by_id[device_proto.device_id] = DebuggedDevice( 1026 device_proto.device_name, device_proto.device_id) 1027 1028 def _load_graph_execution_traces(self): 1029 """Incrementally load the .graph_execution_traces file.""" 1030 for i, traces_iter in enumerate( 1031 self._reader.graph_execution_traces_iterators()): 1032 for debug_event, offset in traces_iter: 1033 self._graph_execution_trace_digests.append( 1034 self._graph_execution_trace_digest_from_debug_event_proto( 1035 debug_event, (i, offset))) 1036 if self._monitors: 1037 graph_execution_trace = ( 1038 self._graph_execution_trace_from_debug_event_proto( 1039 debug_event, (i, offset))) 1040 for monitor in self._monitors: 1041 monitor.on_graph_execution_trace( 1042 len(self._graph_execution_trace_digests) - 1, 1043 graph_execution_trace) 1044 1045 def _graph_execution_trace_digest_from_debug_event_proto( 1046 self, debug_event, locator): 1047 trace_proto = debug_event.graph_execution_trace 1048 op_name = trace_proto.op_name 1049 op_type = self._lookup_op_type(trace_proto.tfdbg_context_id, op_name) 1050 return GraphExecutionTraceDigest( 1051 debug_event.wall_time, locator, op_type, op_name, 1052 trace_proto.output_slot, 1053 debug_event.graph_execution_trace.tfdbg_context_id) 1054 1055 def _graph_execution_trace_from_debug_event_proto(self, debug_event, locator): 1056 """Convert a DebugEvent proto into a GraphExecutionTrace data object.""" 1057 trace_proto = debug_event.graph_execution_trace 1058 graph_ids = [trace_proto.tfdbg_context_id] 1059 # Walk up the chain of outer contexts (graphs), so as to include all of 1060 # their IDs 1061 while True: 1062 graph = self.graph_by_id(graph_ids[0]) 1063 if graph.outer_graph_id: 1064 graph_ids.insert(0, graph.outer_graph_id) 1065 else: 1066 break 1067 1068 if (trace_proto.tensor_debug_mode == 1069 debug_event_pb2.TensorDebugMode.FULL_TENSOR): 1070 debug_tensor_value = None 1071 else: 1072 debug_tensor_value = _parse_tensor_value( 1073 trace_proto.tensor_proto, return_list=True) 1074 return GraphExecutionTrace( 1075 self._graph_execution_trace_digest_from_debug_event_proto( 1076 debug_event, locator), 1077 graph_ids=graph_ids, 1078 tensor_debug_mode=trace_proto.tensor_debug_mode, 1079 debug_tensor_value=debug_tensor_value, 1080 device_name=trace_proto.device_name or None) 1081 1082 def _lookup_op_type(self, graph_id, op_name): 1083 """Lookup the type of an op by name and the immediately enclosing graph. 1084 1085 Args: 1086 graph_id: Debugger-generated ID of the immediately-enclosing graph. 1087 op_name: Name of the op. 1088 1089 Returns: 1090 Op type as a str. 1091 """ 1092 return self._graph_by_id[graph_id].get_op_creation_digest(op_name).op_type 1093 1094 def _load_execution(self): 1095 """Incrementally read the .execution file.""" 1096 execution_iter = self._reader.execution_iterator() 1097 for debug_event, offset in execution_iter: 1098 self._execution_digests.append( 1099 _execution_digest_from_debug_event_proto(debug_event, offset)) 1100 if self._monitors: 1101 execution = _execution_from_debug_event_proto(debug_event, offset) 1102 for monitor in self._monitors: 1103 monitor.on_execution(len(self._execution_digests) - 1, execution) 1104 1105 def update(self): 1106 """Perform incremental read of the file set.""" 1107 self._load_source_files() 1108 self._load_stack_frames() 1109 self._load_graphs() 1110 self._load_graph_execution_traces() 1111 self._load_execution() 1112 1113 def source_file_list(self): 1114 """Get a list of source files known to the debugger data reader. 1115 1116 Returns: 1117 A tuple of `(host_name, file_path)` tuples. 1118 """ 1119 return tuple(self._host_name_file_path_to_offset.keys()) 1120 1121 def source_lines(self, host_name, file_path): 1122 """Read the line-by-line content of a source file. 1123 1124 Args: 1125 host_name: Host name on which the source file is located. 1126 file_path: File path at which the source file is located. 1127 1128 Returns: 1129 Lines of the source file as a `list` of `str`s. 1130 """ 1131 offset = self._host_name_file_path_to_offset[(host_name, file_path)] 1132 return list(self._reader.read_source_files_event(offset).source_file.lines) 1133 1134 def starting_wall_time(self): 1135 """Wall timestamp for when the debugged TensorFlow program started. 1136 1137 Returns: 1138 Stating wall time as seconds since the epoch, as a `float`. 1139 """ 1140 return self._reader.starting_wall_time() 1141 1142 def tensorflow_version(self): 1143 """TensorFlow version used in the debugged TensorFlow program. 1144 1145 Note: this is not necessarily the same as the version of TensorFlow used to 1146 load the DebugEvent file set. 1147 1148 Returns: 1149 TensorFlow version used by the debugged program, as a `str`. 1150 """ 1151 return self._reader.tensorflow_version() 1152 1153 def tfdbg_run_id(self): 1154 """Get the debugger run ID of the debugged TensorFlow program.""" 1155 return self._reader.tfdbg_run_id() 1156 1157 def outermost_graphs(self): 1158 """Get the number of outer most graphs read so far.""" 1159 return [graph for graph in self._graph_by_id.values() 1160 if not graph.outer_graph_id] 1161 1162 def graph_by_id(self, graph_id): 1163 """Get a DebuggedGraph object by its ID.""" 1164 return self._graph_by_id[graph_id] 1165 1166 def device_name_by_id(self, device_id): 1167 """Get the name of a device by the debugger-generated ID of the device.""" 1168 return self._device_by_id[device_id].device_name 1169 1170 def device_name_map(self): 1171 """Get a map mapping device IDs to device names.""" 1172 return {device_id: self._device_by_id[device_id].device_name 1173 for device_id in self._device_by_id} 1174 1175 def graph_op_digests(self, op_type=None): 1176 """Get the list of the digests for graph-op creation so far. 1177 1178 Args: 1179 op_type: Optional op type to filter the creation events with. 1180 1181 Returns: 1182 A list of `GraphOpCreationDigest` objects. 1183 """ 1184 if op_type is not None: 1185 return [digest for digest in self._graph_op_digests 1186 if digest.op_type == op_type] 1187 else: 1188 return self._graph_op_digests 1189 1190 def graph_execution_traces(self, digest=False, begin=None, end=None): 1191 """Get all the intra-graph execution tensor traces read so far. 1192 1193 Args: 1194 digest: Whether the results will be returned in the more light-weight 1195 digest form. 1196 begin: Optional beginning index for the requested traces or their digests. 1197 Python-style negative indices are supported. 1198 end: Optional ending index for the requested traces or their digests. 1199 Python-style negative indices are supported. 1200 1201 Returns: 1202 If `digest`: a `list` of `GraphExecutionTraceDigest` objects. 1203 Else: a `list` of `GraphExecutionTrace` objects. 1204 """ 1205 digests = self._graph_execution_trace_digests 1206 if begin is not None or end is not None: 1207 begin = begin or 0 1208 end = end or len(digests) 1209 digests = digests[begin:end] 1210 if digest: 1211 return digests 1212 else: 1213 return [self.read_graph_execution_trace(digest) for digest in digests] 1214 1215 def num_graph_execution_traces(self): 1216 """Get the number of graph execution traces read so far.""" 1217 return len(self._graph_execution_trace_digests) 1218 1219 def executions(self, digest=False, begin=None, end=None): 1220 """Get `Execution`s or `ExecutionDigest`s this reader has read so far. 1221 1222 Args: 1223 digest: Whether the results are returned in a digest form, i.e., 1224 `ExecutionDigest` format, instead of the more detailed `Execution` 1225 format. 1226 begin: Optional beginning index for the requested execution data objects 1227 or their digests. Python-style negative indices are supported. 1228 end: Optional ending index for the requested execution data objects or 1229 their digests. Python-style negative indices are supported. 1230 1231 Returns: 1232 If `digest`: a `list` of `ExecutionDigest` objects. 1233 Else: a `list` of `Execution` objects. 1234 """ 1235 digests = self._execution_digests 1236 if begin is not None or end is not None: 1237 begin = begin or 0 1238 end = end or len(digests) 1239 digests = digests[begin:end] 1240 if digest: 1241 return digests 1242 else: 1243 # TODO(cais): Optimizer performance removing repeated file open/close. 1244 return [self.read_execution(digest) for digest in digests] 1245 1246 def num_executions(self): 1247 """Get the number of execution events read so far.""" 1248 return len(self._execution_digests) 1249 1250 def read_execution(self, execution_digest): 1251 """Read a detailed Execution object.""" 1252 debug_event = self._reader.read_execution_event(execution_digest.locator) 1253 return _execution_from_debug_event_proto(debug_event, 1254 execution_digest.locator) 1255 1256 def read_graph_execution_trace(self, graph_execution_trace_digest): 1257 """Read the detailed graph execution trace. 1258 1259 Args: 1260 graph_execution_trace_digest: A `GraphExecutionTraceDigest` object. 1261 1262 Returns: 1263 The corresponding `GraphExecutionTrace` object. 1264 """ 1265 debug_event = self._reader.read_graph_execution_traces_event( 1266 graph_execution_trace_digest.locator) 1267 return self._graph_execution_trace_from_debug_event_proto( 1268 debug_event, graph_execution_trace_digest.locator) 1269 1270 def read_execution_stack_trace(self, execution): 1271 """Read the stack trace of a given Execution object. 1272 1273 Args: 1274 execution: The Execution object of interest. 1275 1276 Returns: 1277 1. The host name. 1278 2. The stack trace, as a list of (file_path, lineno, func) tuples. 1279 """ 1280 host_name = self._stack_frame_by_id[execution.stack_frame_ids[0]][0] 1281 return (host_name, [ 1282 self._stack_frame_by_id[frame_id][1:] 1283 for frame_id in execution.stack_frame_ids]) 1284 1285 def read_graph_op_creation_stack_trace(self, graph_op_creation_digest): 1286 """Read the stack trace of a given graph op creation object. 1287 1288 Args: 1289 graph_op_creation_digest: The GraphOpCreationDigest object of interest. 1290 1291 Returns: 1292 A tuple consisting of: 1293 1. The host name. 1294 2. The stack trace, as a list of (file_path, lineno, func) tuples. 1295 """ 1296 return graph_op_creation_digest.host_name, [ 1297 self._stack_frame_by_id[frame_id][1:] 1298 for frame_id in graph_op_creation_digest.stack_frame_ids 1299 ] 1300 1301 # TODO(cais): Add graph_execution_digests() with an ExecutionDigest 1302 # as a kwarg, to establish the association between top-level and intra-graph 1303 # execution events. 1304 1305 def execution_to_tensor_values(self, execution): 1306 """Read the full tensor values from an Execution or ExecutionDigest. 1307 1308 Args: 1309 execution: An `ExecutionDigest` or `ExeuctionDigest` object. 1310 1311 Returns: 1312 A list of numpy arrays representing the output tensor values of the 1313 execution event. 1314 """ 1315 debug_event = self._reader.read_execution_event(execution.locator) 1316 return [_parse_tensor_value(tensor_proto) 1317 for tensor_proto in debug_event.execution.tensor_protos] 1318 1319 def graph_execution_trace_to_tensor_value(self, trace): 1320 """Read full tensor values from an Execution or ExecutionDigest. 1321 1322 Args: 1323 trace: An `GraphExecutionTraceDigest` or `GraphExecutionTrace` object. 1324 1325 Returns: 1326 A numpy array representing the output tensor value of the intra-graph 1327 tensor execution event. 1328 """ 1329 debug_event = self._reader.read_graph_execution_traces_event(trace.locator) 1330 return _parse_tensor_value(debug_event.graph_execution_trace.tensor_proto) 1331 1332 def symbolic_tensor_id(self, graph_id, op_name, output_slot): 1333 """Get the ID of a symbolic tensor. 1334 1335 Args: 1336 graph_id: The ID of the immediately-enclosing graph. 1337 op_name: Name of the op. 1338 output_slot: Output slot as an int. 1339 1340 Returns: 1341 The ID of the symbolic tensor as an int. 1342 """ 1343 return self._graph_by_id[graph_id].get_tensor_id(op_name, output_slot) 1344 1345 def graph_execution_trace_to_tensor_id(self, trace): 1346 """Get symbolic tensor ID from a GraphExecutoinTraceDigest object.""" 1347 return self.symbolic_tensor_id( 1348 trace.graph_id, trace.op_name, trace.output_slot) 1349 1350 def __enter__(self): 1351 return self 1352 1353 def __exit__(self, exception_type, exception_value, traceback): 1354 del exception_type, exception_value, traceback # Unused 1355 self._reader.close() 1356