1# Copyright 2019 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Tests for the debug events writer Python class.""" 16 17import glob 18import json as json_lib 19import os 20import re 21import threading 22import time 23 24from absl.testing import parameterized 25 26from tensorflow.core.protobuf import debug_event_pb2 27from tensorflow.python.debug.lib import debug_events_reader 28from tensorflow.python.debug.lib import debug_events_writer 29from tensorflow.python.debug.lib import dumping_callback_test_lib 30from tensorflow.python.framework import ops 31from tensorflow.python.framework import test_util 32from tensorflow.python.framework import versions 33from tensorflow.python.platform import googletest 34 35 36class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase, 37 parameterized.TestCase): 38 39 def testMultiThreadedConstructorCallWorks(self): 40 def init_writer(): 41 debug_events_writer.DebugEventsWriter(self.dump_root, self.tfdbg_run_id) 42 43 num_threads = 4 44 threads = [] 45 for _ in range(num_threads): 46 thread = threading.Thread(target=init_writer) 47 thread.start() 48 threads.append(thread) 49 for thread in threads: 50 thread.join() 51 52 # Verify that there is only one debug event file of each type. 53 metadata_paths = glob.glob(os.path.join(self.dump_root, "*.metadata")) 54 self.assertLen(metadata_paths, 1) 55 source_files_paths = glob.glob( 56 os.path.join(self.dump_root, "*.source_files")) 57 self.assertLen(source_files_paths, 1) 58 stack_frames_paths = glob.glob( 59 os.path.join(self.dump_root, "*.stack_frames")) 60 self.assertLen(stack_frames_paths, 1) 61 graphs_paths = glob.glob(os.path.join(self.dump_root, "*.graphs")) 62 self.assertLen(graphs_paths, 1) 63 self._readAndCheckMetadataFile() 64 65 def testWriteSourceFilesAndStackFrames(self): 66 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 67 self.tfdbg_run_id) 68 num_protos = 10 69 for i in range(num_protos): 70 source_file = debug_event_pb2.SourceFile() 71 source_file.file_path = "/home/tf2user/main.py" 72 source_file.host_name = "machine.cluster" 73 source_file.lines.append("print(%d)" % i) 74 writer.WriteSourceFile(source_file) 75 76 stack_frame = debug_event_pb2.StackFrameWithId() 77 stack_frame.id = "stack_%d" % i 78 stack_frame.file_line_col.file_index = i * 10 79 writer.WriteStackFrameWithId(stack_frame) 80 81 writer.FlushNonExecutionFiles() 82 83 with debug_events_reader.DebugEventsReader(self.dump_root) as reader: 84 actuals = list(item.debug_event.source_file 85 for item in reader.source_files_iterator()) 86 self.assertLen(actuals, num_protos) 87 for i in range(num_protos): 88 self.assertEqual(actuals[i].file_path, "/home/tf2user/main.py") 89 self.assertEqual(actuals[i].host_name, "machine.cluster") 90 self.assertEqual(actuals[i].lines, ["print(%d)" % i]) 91 92 actuals = list(item.debug_event.stack_frame_with_id 93 for item in reader.stack_frames_iterator()) 94 self.assertLen(actuals, num_protos) 95 for i in range(num_protos): 96 self.assertEqual(actuals[i].id, "stack_%d" % i) 97 self.assertEqual(actuals[i].file_line_col.file_index, i * 10) 98 99 def testWriteGraphOpCreationAndDebuggedGraphs(self): 100 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 101 self.tfdbg_run_id) 102 num_op_creations = 10 103 for i in range(num_op_creations): 104 graph_op_creation = debug_event_pb2.GraphOpCreation() 105 graph_op_creation.op_type = "Conv2D" 106 graph_op_creation.op_name = "Conv2D_%d" % i 107 writer.WriteGraphOpCreation(graph_op_creation) 108 debugged_graph = debug_event_pb2.DebuggedGraph() 109 debugged_graph.graph_id = "deadbeaf" 110 debugged_graph.graph_name = "MyGraph1" 111 writer.WriteDebuggedGraph(debugged_graph) 112 writer.FlushNonExecutionFiles() 113 114 reader = debug_events_reader.DebugEventsReader(self.dump_root) 115 actuals = list(item.debug_event for item in reader.graphs_iterator()) 116 self.assertLen(actuals, num_op_creations + 1) 117 for i in range(num_op_creations): 118 self.assertEqual(actuals[i].graph_op_creation.op_type, "Conv2D") 119 self.assertEqual(actuals[i].graph_op_creation.op_name, "Conv2D_%d" % i) 120 self.assertEqual(actuals[num_op_creations].debugged_graph.graph_id, 121 "deadbeaf") 122 123 def testConcurrentWritesToNonExecutionFilesWorks(self): 124 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 125 self.tfdbg_run_id) 126 127 source_file_state = {"counter": 0, "lock": threading.Lock()} 128 129 def writer_source_file(): 130 source_file = debug_event_pb2.SourceFile() 131 with source_file_state["lock"]: 132 source_file.file_path = "/home/tf2user/file_%d.py" % source_file_state[ 133 "counter"] 134 source_file_state["counter"] += 1 135 writer.WriteSourceFile(source_file) 136 # More-frequent-than-necessary concurrent flushing is not recommended, 137 # but tolerated. 138 writer.FlushNonExecutionFiles() 139 140 stack_frame_state = {"counter": 0, "lock": threading.Lock()} 141 142 def write_stack_frame(): 143 stack_frame = debug_event_pb2.StackFrameWithId() 144 with stack_frame_state["lock"]: 145 stack_frame.id = "stack_frame_%d" % stack_frame_state["counter"] 146 stack_frame_state["counter"] += 1 147 writer.WriteStackFrameWithId(stack_frame) 148 # More-frequent-than-necessary concurrent flushing is not recommended, 149 # but tolerated. 150 writer.FlushNonExecutionFiles() 151 152 graph_op_state = {"counter": 0, "lock": threading.Lock()} 153 154 def write_graph_op_creation(): 155 graph_op_creation = debug_event_pb2.GraphOpCreation() 156 with graph_op_state["lock"]: 157 graph_op_creation.op_name = "Op%d" % graph_op_state["counter"] 158 graph_op_state["counter"] += 1 159 writer.WriteGraphOpCreation(graph_op_creation) 160 # More-frequent-than-necessary concurrent flushing is not recommended, 161 # but tolerated. 162 writer.FlushNonExecutionFiles() 163 164 num_threads = 9 165 threads = [] 166 for i in range(num_threads): 167 if i % 3 == 0: 168 target = writer_source_file 169 elif i % 3 == 1: 170 target = write_stack_frame 171 else: 172 target = write_graph_op_creation 173 thread = threading.Thread(target=target) 174 thread.start() 175 threads.append(thread) 176 for thread in threads: 177 thread.join() 178 179 # Verify the content of the .source_files file. 180 with debug_events_reader.DebugEventsReader(self.dump_root) as reader: 181 source_files_iter = reader.source_files_iterator() 182 actuals = list(item.debug_event.source_file for item in source_files_iter) 183 file_paths = sorted([actual.file_path for actual in actuals]) 184 self.assertEqual(file_paths, [ 185 "/home/tf2user/file_0.py", "/home/tf2user/file_1.py", 186 "/home/tf2user/file_2.py" 187 ]) 188 189 # Verify the content of the .stack_frames file. 190 actuals = list(item.debug_event.stack_frame_with_id 191 for item in reader.stack_frames_iterator()) 192 stack_frame_ids = sorted([actual.id for actual in actuals]) 193 self.assertEqual(stack_frame_ids, 194 ["stack_frame_0", "stack_frame_1", "stack_frame_2"]) 195 196 # Verify the content of the .graphs file. 197 actuals = list(item.debug_event.graph_op_creation 198 for item in reader.graphs_iterator()) 199 graph_op_names = sorted([actual.op_name for actual in actuals]) 200 self.assertEqual(graph_op_names, ["Op0", "Op1", "Op2"]) 201 202 def testWriteAndReadMetadata(self): 203 t0 = time.time() 204 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 205 self.tfdbg_run_id) 206 writer.Close() 207 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 208 self.assertIsInstance(reader.starting_wall_time(), float) 209 self.assertGreaterEqual(reader.starting_wall_time(), t0) 210 self.assertEqual(reader.tensorflow_version(), versions.__version__) 211 self.assertTrue(reader.tfdbg_run_id()) 212 213 def testWriteExecutionEventsWithCircularBuffer(self): 214 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 215 self.tfdbg_run_id) 216 num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2 217 for i in range(num_execution_events): 218 execution = debug_event_pb2.Execution() 219 execution.op_type = "OpType%d" % i 220 writer.WriteExecution(execution) 221 222 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 223 # Before FlushExecutionFiles() is called. No data should have been written 224 # to the file. 225 reader.update() 226 self.assertFalse(reader.executions()) 227 228 writer.FlushExecutionFiles() 229 reader.update() 230 executions = reader.executions() 231 for i, execution in enumerate(executions): 232 self.assertEqual( 233 execution.op_type, 234 "OpType%d" % (i + debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE)) 235 236 def testWriteExecutionEventsWithoutCircularBufferBehavior(self): 237 # A circular buffer size of 0 abolishes the circular buffer behavior. 238 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 239 self.tfdbg_run_id, 0) 240 num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2 241 for i in range(num_execution_events): 242 execution = debug_event_pb2.Execution() 243 execution.op_type = "OpType%d" % i 244 writer.WriteExecution(execution) 245 writer.FlushExecutionFiles() 246 247 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 248 reader.update() 249 executions = reader.executions() 250 self.assertLen(executions, num_execution_events) 251 for i, execution in enumerate(executions): 252 self.assertEqual(execution.op_type, "OpType%d" % i) 253 254 def testWriteGraphExecutionTraceEventsWithCircularBuffer(self): 255 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 256 self.tfdbg_run_id) 257 num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2 258 for i in range(num_execution_events): 259 trace = debug_event_pb2.GraphExecutionTrace() 260 trace.op_name = "Op%d" % i 261 writer.WriteGraphExecutionTrace(trace) 262 263 with debug_events_reader.DebugEventsReader(self.dump_root) as reader: 264 actuals = list(reader.graph_execution_traces_iterators()[0]) 265 # Before FlushExecutionFiles() is called. No data should have been written 266 # to the file. 267 self.assertEmpty(actuals) 268 269 writer.FlushExecutionFiles() 270 actuals = list(item.debug_event.graph_execution_trace 271 for item in reader.graph_execution_traces_iterators()[0]) 272 self.assertLen(actuals, debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE) 273 for i in range(debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE): 274 self.assertEqual( 275 actuals[i].op_name, 276 "Op%d" % (i + debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE)) 277 278 def testWriteGraphExecutionTraceEventsWithoutCircularBufferBehavior(self): 279 # A circular buffer size of 0 abolishes the circular buffer behavior. 280 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 281 self.tfdbg_run_id, 0) 282 num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2 283 for i in range(num_execution_events): 284 trace = debug_event_pb2.GraphExecutionTrace() 285 trace.op_name = "Op%d" % i 286 writer.WriteGraphExecutionTrace(trace) 287 writer.FlushExecutionFiles() 288 289 with debug_events_reader.DebugEventsReader(self.dump_root) as reader: 290 actuals = list(item.debug_event.graph_execution_trace 291 for item in reader.graph_execution_traces_iterators()[0]) 292 self.assertLen(actuals, num_execution_events) 293 for i in range(num_execution_events): 294 self.assertEqual(actuals[i].op_name, "Op%d" % i) 295 296 def testConcurrentWritesToExecutionFiles(self): 297 circular_buffer_size = 5 298 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 299 self.tfdbg_run_id, 300 circular_buffer_size) 301 debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1", 302 graph_name="graph1") 303 writer.WriteDebuggedGraph(debugged_graph) 304 305 execution_state = {"counter": 0, "lock": threading.Lock()} 306 307 def write_execution(): 308 execution = debug_event_pb2.Execution() 309 with execution_state["lock"]: 310 execution.op_type = "OpType%d" % execution_state["counter"] 311 execution_state["counter"] += 1 312 writer.WriteExecution(execution) 313 314 graph_execution_trace_state = {"counter": 0, "lock": threading.Lock()} 315 316 def write_graph_execution_trace(): 317 with graph_execution_trace_state["lock"]: 318 op_name = "Op%d" % graph_execution_trace_state["counter"] 319 graph_op_creation = debug_event_pb2.GraphOpCreation( 320 op_type="FooOp", op_name=op_name, graph_id="graph1") 321 trace = debug_event_pb2.GraphExecutionTrace( 322 op_name=op_name, tfdbg_context_id="graph1") 323 graph_execution_trace_state["counter"] += 1 324 writer.WriteGraphOpCreation(graph_op_creation) 325 writer.WriteGraphExecutionTrace(trace) 326 327 threads = [] 328 for i in range(circular_buffer_size * 4): 329 if i % 2 == 0: 330 target = write_execution 331 else: 332 target = write_graph_execution_trace 333 thread = threading.Thread(target=target) 334 thread.start() 335 threads.append(thread) 336 for thread in threads: 337 thread.join() 338 writer.FlushNonExecutionFiles() 339 writer.FlushExecutionFiles() 340 341 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 342 reader.update() 343 # Verify the content of the .execution file. 344 executions = reader.executions() 345 executed_op_types = [execution.op_type for execution in executions] 346 self.assertLen(executed_op_types, circular_buffer_size) 347 self.assertLen(executed_op_types, len(set(executed_op_types))) 348 349 # Verify the content of the .graph_execution_traces file. 350 op_names = [trace.op_name for trace in reader.graph_execution_traces()] 351 self.assertLen(op_names, circular_buffer_size) 352 self.assertLen(op_names, len(set(op_names))) 353 354 def testConcurrentSourceFileRandomReads(self): 355 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 356 self.tfdbg_run_id) 357 358 for i in range(100): 359 source_file = debug_event_pb2.SourceFile( 360 host_name="localhost", file_path="/tmp/file_%d.py" % i) 361 source_file.lines.append("# File %d" % i) 362 writer.WriteSourceFile(source_file) 363 writer.FlushNonExecutionFiles() 364 365 reader = debug_events_reader.DebugDataReader(self.dump_root) 366 reader.update() 367 lines = [None] * 100 368 def read_job_1(): 369 # Read in the reverse order to enhance randomness of the read access. 370 for i in range(49, -1, -1): 371 lines[i] = reader.source_lines("localhost", "/tmp/file_%d.py" % i) 372 def read_job_2(): 373 for i in range(99, 49, -1): 374 lines[i] = reader.source_lines("localhost", "/tmp/file_%d.py" % i) 375 thread_1 = threading.Thread(target=read_job_1) 376 thread_2 = threading.Thread(target=read_job_2) 377 thread_1.start() 378 thread_2.start() 379 thread_1.join() 380 thread_2.join() 381 for i in range(100): 382 self.assertEqual(lines[i], ["# File %d" % i]) 383 384 def testConcurrentExecutionUpdateAndRandomRead(self): 385 circular_buffer_size = -1 386 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 387 self.tfdbg_run_id, 388 circular_buffer_size) 389 390 writer_state = {"counter": 0, "done": False} 391 392 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 393 def write_and_update_job(): 394 while True: 395 if writer_state["done"]: 396 break 397 execution = debug_event_pb2.Execution() 398 execution.op_type = "OpType%d" % writer_state["counter"] 399 writer_state["counter"] += 1 400 writer.WriteExecution(execution) 401 writer.FlushExecutionFiles() 402 reader.update() 403 # On the sub-thread, keep writing and reading new Execution protos. 404 write_and_update_thread = threading.Thread(target=write_and_update_job) 405 write_and_update_thread.start() 406 # On the main thread, do concurrent random read. 407 while True: 408 exec_digests = reader.executions(digest=True) 409 if exec_digests: 410 exec_0 = reader.read_execution(exec_digests[0]) 411 self.assertEqual(exec_0.op_type, "OpType0") 412 writer_state["done"] = True 413 break 414 else: 415 time.sleep(0.1) 416 continue 417 write_and_update_thread.join() 418 419 def testConcurrentExecutionRandomReads(self): 420 circular_buffer_size = -1 421 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 422 self.tfdbg_run_id, 423 circular_buffer_size) 424 425 for i in range(100): 426 execution = debug_event_pb2.Execution() 427 execution.op_type = "OpType%d" % i 428 writer.WriteExecution(execution) 429 writer.FlushNonExecutionFiles() 430 writer.FlushExecutionFiles() 431 432 reader = debug_events_reader.DebugDataReader(self.dump_root) 433 reader.update() 434 executions = [None] * 100 435 def read_job_1(): 436 execution_digests = reader.executions(digest=True) 437 # Read in the reverse order to enhance randomness of the read access. 438 for i in range(49, -1, -1): 439 execution = reader.read_execution(execution_digests[i]) 440 executions[i] = execution 441 def read_job_2(): 442 execution_digests = reader.executions(digest=True) 443 for i in range(99, 49, -1): 444 execution = reader.read_execution(execution_digests[i]) 445 executions[i] = execution 446 thread_1 = threading.Thread(target=read_job_1) 447 thread_2 = threading.Thread(target=read_job_2) 448 thread_1.start() 449 thread_2.start() 450 thread_1.join() 451 thread_2.join() 452 for i in range(100): 453 self.assertEqual(executions[i].op_type, "OpType%d" % i) 454 455 def testConcurrentGraphExecutionTraceUpdateAndRandomRead(self): 456 circular_buffer_size = -1 457 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 458 self.tfdbg_run_id, 459 circular_buffer_size) 460 debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1", 461 graph_name="graph1") 462 writer.WriteDebuggedGraph(debugged_graph) 463 464 writer_state = {"counter": 0, "done": False} 465 466 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 467 def write_and_update_job(): 468 while True: 469 if writer_state["done"]: 470 break 471 op_name = "Op%d" % writer_state["counter"] 472 graph_op_creation = debug_event_pb2.GraphOpCreation( 473 op_type="FooOp", op_name=op_name, graph_id="graph1") 474 writer.WriteGraphOpCreation(graph_op_creation) 475 trace = debug_event_pb2.GraphExecutionTrace( 476 op_name=op_name, tfdbg_context_id="graph1") 477 writer.WriteGraphExecutionTrace(trace) 478 writer_state["counter"] += 1 479 writer.FlushNonExecutionFiles() 480 writer.FlushExecutionFiles() 481 reader.update() 482 # On the sub-thread, keep writing and reading new GraphExecutionTraces. 483 write_and_update_thread = threading.Thread(target=write_and_update_job) 484 write_and_update_thread.start() 485 # On the main thread, do concurrent random read. 486 while True: 487 digests = reader.graph_execution_traces(digest=True) 488 if digests: 489 trace_0 = reader.read_graph_execution_trace(digests[0]) 490 self.assertEqual(trace_0.op_name, "Op0") 491 writer_state["done"] = True 492 break 493 else: 494 time.sleep(0.1) 495 continue 496 write_and_update_thread.join() 497 498 def testConcurrentGraphExecutionTraceRandomReads(self): 499 circular_buffer_size = -1 500 writer = debug_events_writer.DebugEventsWriter(self.dump_root, 501 self.tfdbg_run_id, 502 circular_buffer_size) 503 debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1", 504 graph_name="graph1") 505 writer.WriteDebuggedGraph(debugged_graph) 506 507 for i in range(100): 508 op_name = "Op%d" % i 509 graph_op_creation = debug_event_pb2.GraphOpCreation( 510 op_type="FooOp", op_name=op_name, graph_id="graph1") 511 writer.WriteGraphOpCreation(graph_op_creation) 512 trace = debug_event_pb2.GraphExecutionTrace( 513 op_name=op_name, tfdbg_context_id="graph1") 514 writer.WriteGraphExecutionTrace(trace) 515 writer.FlushNonExecutionFiles() 516 writer.FlushExecutionFiles() 517 518 reader = debug_events_reader.DebugDataReader(self.dump_root) 519 reader.update() 520 traces = [None] * 100 521 def read_job_1(): 522 digests = reader.graph_execution_traces(digest=True) 523 for i in range(49, -1, -1): 524 traces[i] = reader.read_graph_execution_trace(digests[i]) 525 def read_job_2(): 526 digests = reader.graph_execution_traces(digest=True) 527 for i in range(99, 49, -1): 528 traces[i] = reader.read_graph_execution_trace(digests[i]) 529 thread_1 = threading.Thread(target=read_job_1) 530 thread_2 = threading.Thread(target=read_job_2) 531 thread_1.start() 532 thread_2.start() 533 thread_1.join() 534 thread_2.join() 535 for i in range(100): 536 self.assertEqual(traces[i].op_name, "Op%d" % i) 537 538 @parameterized.named_parameters( 539 ("Begin1End3", 1, 3, 1, 3), 540 ("Begin0End3", 0, 3, 0, 3), 541 ("Begin0EndNeg1", 0, -1, 0, 4), 542 ("BeginNoneEnd3", None, 3, 0, 3), 543 ("Begin2EndNone", 2, None, 2, 5), 544 ("BeginNoneEndNone", None, None, 0, 5), 545 ) 546 def testRangeReadingExecutions(self, begin, end, expected_begin, 547 expected_end): 548 writer = debug_events_writer.DebugEventsWriter( 549 self.dump_root, self.tfdbg_run_id, circular_buffer_size=-1) 550 for i in range(5): 551 execution = debug_event_pb2.Execution(op_type="OpType%d" % i) 552 writer.WriteExecution(execution) 553 writer.FlushExecutionFiles() 554 writer.Close() 555 556 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 557 reader.update() 558 executions = reader.executions(begin=begin, end=end) 559 self.assertLen(executions, expected_end - expected_begin) 560 self.assertEqual(executions[0].op_type, "OpType%d" % expected_begin) 561 self.assertEqual(executions[-1].op_type, "OpType%d" % (expected_end - 1)) 562 563 @parameterized.named_parameters( 564 ("Begin1End3", 1, 3, 1, 3), 565 ("Begin0End3", 0, 3, 0, 3), 566 ("Begin0EndNeg1", 0, -1, 0, 4), 567 ("BeginNoneEnd3", None, 3, 0, 3), 568 ("Begin2EndNone", 2, None, 2, 5), 569 ("BeginNoneEndNone", None, None, 0, 5), 570 ) 571 def testRangeReadingGraphExecutionTraces(self, begin, end, expected_begin, 572 expected_end): 573 writer = debug_events_writer.DebugEventsWriter( 574 self.dump_root, self.tfdbg_run_id, circular_buffer_size=-1) 575 debugged_graph = debug_event_pb2.DebuggedGraph( 576 graph_id="graph1", graph_name="graph1") 577 writer.WriteDebuggedGraph(debugged_graph) 578 for i in range(5): 579 op_name = "Op_%d" % i 580 graph_op_creation = debug_event_pb2.GraphOpCreation( 581 op_name=op_name, graph_id="graph1") 582 writer.WriteGraphOpCreation(graph_op_creation) 583 trace = debug_event_pb2.GraphExecutionTrace( 584 op_name=op_name, tfdbg_context_id="graph1") 585 writer.WriteGraphExecutionTrace(trace) 586 writer.FlushNonExecutionFiles() 587 writer.FlushExecutionFiles() 588 writer.Close() 589 590 with debug_events_reader.DebugDataReader(self.dump_root) as reader: 591 reader.update() 592 traces = reader.graph_execution_traces(begin=begin, end=end) 593 self.assertLen(traces, expected_end - expected_begin) 594 self.assertEqual(traces[0].op_name, "Op_%d" % expected_begin) 595 self.assertEqual(traces[-1].op_name, "Op_%d" % (expected_end - 1)) 596 597 598class MultiSetReaderTest(dumping_callback_test_lib.DumpingCallbackTestBase): 599 """Test for DebugDataReader for multiple file sets under a dump root.""" 600 601 def testReadingTwoFileSetsWithTheSameDumpRootSucceeds(self): 602 # To simulate a multi-host data dump, we first generate file sets in two 603 # different directories, with the same tfdbg_run_id, and then combine them. 604 tfdbg_run_id = "foo" 605 for i in range(2): 606 writer = debug_events_writer.DebugEventsWriter( 607 os.path.join(self.dump_root, str(i)), 608 tfdbg_run_id, 609 circular_buffer_size=-1) 610 if i == 0: 611 debugged_graph = debug_event_pb2.DebuggedGraph( 612 graph_id="graph1", graph_name="graph1") 613 writer.WriteDebuggedGraph(debugged_graph) 614 op_name = "Op_0" 615 graph_op_creation = debug_event_pb2.GraphOpCreation( 616 op_type="FooOp", op_name=op_name, graph_id="graph1") 617 writer.WriteGraphOpCreation(graph_op_creation) 618 op_name = "Op_1" 619 graph_op_creation = debug_event_pb2.GraphOpCreation( 620 op_type="FooOp", op_name=op_name, graph_id="graph1") 621 writer.WriteGraphOpCreation(graph_op_creation) 622 for _ in range(10): 623 trace = debug_event_pb2.GraphExecutionTrace( 624 op_name="Op_%d" % i, tfdbg_context_id="graph1") 625 writer.WriteGraphExecutionTrace(trace) 626 writer.FlushNonExecutionFiles() 627 writer.FlushExecutionFiles() 628 629 # Move all files from the subdirectory /1 to subdirectory /0. 630 dump_root_0 = os.path.join(self.dump_root, "0") 631 src_paths = glob.glob(os.path.join(self.dump_root, "1", "*")) 632 for src_path in src_paths: 633 dst_path = os.path.join( 634 dump_root_0, 635 # Rename the file set to avoid file name collision. 636 re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path))) 637 os.rename(src_path, dst_path) 638 639 with debug_events_reader.DebugDataReader(dump_root_0) as reader: 640 reader.update() 641 # Verify the content of the .graph_execution_traces file. 642 trace_digests = reader.graph_execution_traces(digest=True) 643 self.assertLen(trace_digests, 20) 644 for _ in range(10): 645 trace = reader.read_graph_execution_trace(trace_digests[i]) 646 self.assertEqual(trace.op_name, "Op_0") 647 for _ in range(10): 648 trace = reader.read_graph_execution_trace(trace_digests[i + 10]) 649 self.assertEqual(trace.op_name, "Op_1") 650 651 def testReadingTwoFileSetsWithTheDifferentRootsLeadsToError(self): 652 # To simulate a multi-host data dump, we first generate file sets in two 653 # different directories, with different tfdbg_run_ids, and then combine 654 # them. 655 for i in range(2): 656 writer = debug_events_writer.DebugEventsWriter( 657 os.path.join(self.dump_root, str(i)), 658 "run_id_%d" % i, 659 circular_buffer_size=-1) 660 writer.FlushNonExecutionFiles() 661 writer.FlushExecutionFiles() 662 663 # Move all files from the subdirectory /1 to subdirectory /0. 664 dump_root_0 = os.path.join(self.dump_root, "0") 665 src_paths = glob.glob(os.path.join(self.dump_root, "1", "*")) 666 for src_path in src_paths: 667 dst_path = os.path.join( 668 dump_root_0, 669 # Rename the file set to avoid file name collision. 670 re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path))) 671 os.rename(src_path, dst_path) 672 673 with self.assertRaisesRegex(ValueError, 674 r"Found multiple \(2\) tfdbg2 runs"): 675 debug_events_reader.DebugDataReader(dump_root_0) 676 677 678class DataObjectsTest(test_util.TensorFlowTestCase, parameterized.TestCase): 679 680 def jsonRoundTripCheck(self, obj): 681 self.assertEqual( 682 json_lib.dumps(json_lib.loads(json_lib.dumps(obj)), sort_keys=True), 683 json_lib.dumps(obj, sort_keys=True)) 684 685 def testExecutionDigestWithNoOutputToJson(self): 686 execution_digest = debug_events_reader.ExecutionDigest( 687 1234, 5678, "FooOp", output_tensor_device_ids=None) 688 json = execution_digest.to_json() 689 self.jsonRoundTripCheck(json) 690 self.assertEqual(json["wall_time"], 1234) 691 self.assertEqual(json["op_type"], "FooOp") 692 self.assertEqual(json["output_tensor_device_ids"], None) 693 694 def testExecutionDigestWithTwoOutputsToJson(self): 695 execution_digest = debug_events_reader.ExecutionDigest( 696 1234, 5678, "FooOp", output_tensor_device_ids=[1357, 2468]) 697 json = execution_digest.to_json() 698 self.jsonRoundTripCheck(json) 699 self.assertEqual(json["wall_time"], 1234) 700 self.assertEqual(json["op_type"], "FooOp") 701 self.assertEqual(json["output_tensor_device_ids"], (1357, 2468)) 702 703 def testExecutionNoGraphNoInputToJson(self): 704 execution_digest = debug_events_reader.ExecutionDigest( 705 1234, 5678, "FooOp", output_tensor_device_ids=[1357]) 706 execution = debug_events_reader.Execution( 707 execution_digest, 708 "localhost", 709 ("a1", "b2"), 710 debug_event_pb2.TensorDebugMode.CURT_HEALTH, 711 graph_id=None, 712 input_tensor_ids=None, 713 output_tensor_ids=[2468], 714 debug_tensor_values=([1, 0],)) 715 json = execution.to_json() 716 self.jsonRoundTripCheck(json) 717 self.assertEqual(json["wall_time"], 1234) 718 self.assertEqual(json["op_type"], "FooOp") 719 self.assertEqual(json["output_tensor_device_ids"], (1357,)) 720 self.assertEqual(json["host_name"], "localhost") 721 self.assertEqual(json["stack_frame_ids"], ("a1", "b2")) 722 self.assertEqual(json["tensor_debug_mode"], 723 debug_event_pb2.TensorDebugMode.CURT_HEALTH) 724 self.assertIsNone(json["graph_id"]) 725 self.assertIsNone(json["input_tensor_ids"]) 726 self.assertEqual(json["output_tensor_ids"], (2468,)) 727 self.assertEqual(json["debug_tensor_values"], ([1, 0],)) 728 729 def testExecutionNoGraphNoInputButWithOutputToJson(self): 730 execution_digest = debug_events_reader.ExecutionDigest( 731 1234, 5678, "FooOp", output_tensor_device_ids=[1357]) 732 execution = debug_events_reader.Execution( 733 execution_digest, 734 "localhost", 735 ("a1", "b2"), 736 debug_event_pb2.TensorDebugMode.FULL_HEALTH, 737 graph_id="abcd", 738 input_tensor_ids=[13, 37], 739 output_tensor_ids=None, 740 debug_tensor_values=None) 741 json = execution.to_json() 742 self.jsonRoundTripCheck(json) 743 self.assertEqual(json["wall_time"], 1234) 744 self.assertEqual(json["op_type"], "FooOp") 745 self.assertEqual(json["output_tensor_device_ids"], (1357,)) 746 self.assertEqual(json["host_name"], "localhost") 747 self.assertEqual(json["stack_frame_ids"], ("a1", "b2")) 748 self.assertEqual(json["tensor_debug_mode"], 749 debug_event_pb2.TensorDebugMode.FULL_HEALTH) 750 self.assertEqual(json["graph_id"], "abcd") 751 self.assertEqual(json["input_tensor_ids"], (13, 37)) 752 self.assertIsNone(json["output_tensor_ids"]) 753 self.assertIsNone(json["debug_tensor_values"]) 754 755 @parameterized.named_parameters( 756 ("EmptyList", []), 757 ("None", None), 758 ) 759 def testExecutionWithNoOutputTensorsReturnsZeroForNumOutputs( 760 self, output_tensor_ids): 761 execution = debug_events_reader.Execution( 762 debug_events_reader.ExecutionDigest(1234, 5678, "FooOp"), 763 "localhost", ("a1", "b2"), 764 debug_event_pb2.TensorDebugMode.FULL_HEALTH, 765 graph_id="abcd", 766 input_tensor_ids=[13, 37], 767 output_tensor_ids=output_tensor_ids, 768 debug_tensor_values=None) 769 self.assertEqual(execution.num_outputs, 0) 770 771 def testDebuggedDeviceToJons(self): 772 debugged_device = debug_events_reader.DebuggedDevice("/TPU:3", 4) 773 self.assertEqual(debugged_device.to_json(), { 774 "device_name": "/TPU:3", 775 "device_id": 4, 776 }) 777 778 def testDebuggedGraphToJonsWitouthNameInnerOuterGraphIds(self): 779 debugged_graph = debug_events_reader.DebuggedGraph( 780 None, 781 "b1c2", 782 outer_graph_id=None, 783 ) 784 self.assertEqual( 785 debugged_graph.to_json(), { 786 "name": None, 787 "graph_id": "b1c2", 788 "outer_graph_id": None, 789 "inner_graph_ids": [], 790 }) 791 792 def testDebuggedGraphToJonsWithNameAndInnerOuterGraphIds(self): 793 debugged_graph = debug_events_reader.DebuggedGraph( 794 "loss_function", 795 "b1c2", 796 outer_graph_id="a0b1", 797 ) 798 debugged_graph.add_inner_graph_id("c2d3") 799 debugged_graph.add_inner_graph_id("c2d3e4") 800 self.assertEqual( 801 debugged_graph.to_json(), { 802 "name": "loss_function", 803 "graph_id": "b1c2", 804 "outer_graph_id": "a0b1", 805 "inner_graph_ids": ["c2d3", "c2d3e4"], 806 }) 807 808 @parameterized.named_parameters( 809 ("EmptyList", []), 810 ("None", None), 811 ) 812 def testGraphOpDigestWithNoOutpusReturnsNumOutputsZero( 813 self, output_tensor_ids): 814 op_creation_digest = debug_events_reader.GraphOpCreationDigest( 815 1234, 816 5678, 817 "deadbeef", 818 "FooOp", 819 "Model_1/Foo_2", 820 output_tensor_ids, 821 "machine.cluster", ("a1", "a2"), 822 input_names=None, 823 device_name=None) 824 self.assertEqual(op_creation_digest.num_outputs, 0) 825 826 def testGraphOpCreationDigestNoInputNoDeviceNameToJson(self): 827 op_creation_digest = debug_events_reader.GraphOpCreationDigest( 828 1234, 829 5678, 830 "deadbeef", 831 "FooOp", 832 "Model_1/Foo_2", [135], 833 "machine.cluster", ("a1", "a2"), 834 input_names=None, 835 device_name=None) 836 json = op_creation_digest.to_json() 837 self.jsonRoundTripCheck(json) 838 self.assertEqual(json["wall_time"], 1234) 839 self.assertEqual(json["graph_id"], "deadbeef") 840 self.assertEqual(json["op_type"], "FooOp") 841 self.assertEqual(json["op_name"], "Model_1/Foo_2") 842 self.assertEqual(json["output_tensor_ids"], (135,)) 843 self.assertEqual(json["host_name"], "machine.cluster") 844 self.assertEqual(json["stack_frame_ids"], ("a1", "a2")) 845 self.assertIsNone(json["input_names"]) 846 self.assertIsNone(json["device_name"]) 847 848 def testGraphOpCreationDigestWithInputsAndDeviceNameToJson(self): 849 op_creation_digest = debug_events_reader.GraphOpCreationDigest( 850 1234, 851 5678, 852 "deadbeef", 853 "FooOp", 854 "Model_1/Foo_2", [135], 855 "machine.cluster", ("a1", "a2"), 856 input_names=["Bar_1", "Qux_2"], 857 device_name="/device:GPU:0") 858 json = op_creation_digest.to_json() 859 self.jsonRoundTripCheck(json) 860 self.assertEqual(json["wall_time"], 1234) 861 self.assertEqual(json["graph_id"], "deadbeef") 862 self.assertEqual(json["op_type"], "FooOp") 863 self.assertEqual(json["op_name"], "Model_1/Foo_2") 864 self.assertEqual(json["output_tensor_ids"], (135,)) 865 self.assertEqual(json["host_name"], "machine.cluster") 866 self.assertEqual(json["stack_frame_ids"], ("a1", "a2")) 867 self.assertEqual(json["input_names"], ("Bar_1", "Qux_2")) 868 self.assertEqual(json["device_name"], "/device:GPU:0") 869 870 def testGraphExecutionTraceDigestToJson(self): 871 trace_digest = debug_events_reader.GraphExecutionTraceDigest( 872 1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef") 873 json = trace_digest.to_json() 874 self.assertEqual(json["wall_time"], 1234) 875 self.assertEqual(json["op_type"], "FooOp") 876 self.assertEqual(json["op_name"], "Model_1/Foo_2") 877 self.assertEqual(json["output_slot"], 1) 878 self.assertEqual(json["graph_id"], "deadbeef") 879 880 def testGraphExecutionTraceWithTensorDebugValueAndDeviceNameToJson(self): 881 trace_digest = debug_events_reader.GraphExecutionTraceDigest( 882 1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef") 883 trace = debug_events_reader.GraphExecutionTrace( 884 trace_digest, ["g1", "g2", "deadbeef"], 885 debug_event_pb2.TensorDebugMode.CURT_HEALTH, 886 debug_tensor_value=[3, 1], device_name="/device:GPU:0") 887 json = trace.to_json() 888 self.assertEqual(json["wall_time"], 1234) 889 self.assertEqual(json["op_type"], "FooOp") 890 self.assertEqual(json["op_name"], "Model_1/Foo_2") 891 self.assertEqual(json["output_slot"], 1) 892 self.assertEqual(json["graph_id"], "deadbeef") 893 self.assertEqual(json["graph_ids"], ("g1", "g2", "deadbeef")) 894 self.assertEqual(json["tensor_debug_mode"], 895 debug_event_pb2.TensorDebugMode.CURT_HEALTH) 896 self.assertEqual(json["debug_tensor_value"], (3, 1)) 897 self.assertEqual(json["device_name"], "/device:GPU:0") 898 899 def testGraphExecutionTraceNoTensorDebugValueNoDeviceNameToJson(self): 900 trace_digest = debug_events_reader.GraphExecutionTraceDigest( 901 1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef") 902 trace = debug_events_reader.GraphExecutionTrace( 903 trace_digest, ["g1", "g2", "deadbeef"], 904 debug_event_pb2.TensorDebugMode.NO_TENSOR, 905 debug_tensor_value=None, device_name=None) 906 json = trace.to_json() 907 self.assertEqual(json["wall_time"], 1234) 908 self.assertEqual(json["op_type"], "FooOp") 909 self.assertEqual(json["op_name"], "Model_1/Foo_2") 910 self.assertEqual(json["output_slot"], 1) 911 self.assertEqual(json["graph_id"], "deadbeef") 912 self.assertEqual(json["graph_ids"], ("g1", "g2", "deadbeef")) 913 self.assertEqual(json["tensor_debug_mode"], 914 debug_event_pb2.TensorDebugMode.NO_TENSOR) 915 self.assertIsNone(json["debug_tensor_value"]) 916 self.assertIsNone(json["device_name"]) 917 918 919if __name__ == "__main__": 920 ops.enable_eager_execution() 921 googletest.main() 922