xref: /aosp_15_r20/external/tensorflow/tensorflow/python/debug/lib/debug_events_writer_test.py (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Tests for the debug events writer Python class."""
16
17import glob
18import json as json_lib
19import os
20import re
21import threading
22import time
23
24from absl.testing import parameterized
25
26from tensorflow.core.protobuf import debug_event_pb2
27from tensorflow.python.debug.lib import debug_events_reader
28from tensorflow.python.debug.lib import debug_events_writer
29from tensorflow.python.debug.lib import dumping_callback_test_lib
30from tensorflow.python.framework import ops
31from tensorflow.python.framework import test_util
32from tensorflow.python.framework import versions
33from tensorflow.python.platform import googletest
34
35
36class DebugEventsWriterTest(dumping_callback_test_lib.DumpingCallbackTestBase,
37                            parameterized.TestCase):
38
39  def testMultiThreadedConstructorCallWorks(self):
40    def init_writer():
41      debug_events_writer.DebugEventsWriter(self.dump_root, self.tfdbg_run_id)
42
43    num_threads = 4
44    threads = []
45    for _ in range(num_threads):
46      thread = threading.Thread(target=init_writer)
47      thread.start()
48      threads.append(thread)
49    for thread in threads:
50      thread.join()
51
52    # Verify that there is only one debug event file of each type.
53    metadata_paths = glob.glob(os.path.join(self.dump_root, "*.metadata"))
54    self.assertLen(metadata_paths, 1)
55    source_files_paths = glob.glob(
56        os.path.join(self.dump_root, "*.source_files"))
57    self.assertLen(source_files_paths, 1)
58    stack_frames_paths = glob.glob(
59        os.path.join(self.dump_root, "*.stack_frames"))
60    self.assertLen(stack_frames_paths, 1)
61    graphs_paths = glob.glob(os.path.join(self.dump_root, "*.graphs"))
62    self.assertLen(graphs_paths, 1)
63    self._readAndCheckMetadataFile()
64
65  def testWriteSourceFilesAndStackFrames(self):
66    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
67                                                   self.tfdbg_run_id)
68    num_protos = 10
69    for i in range(num_protos):
70      source_file = debug_event_pb2.SourceFile()
71      source_file.file_path = "/home/tf2user/main.py"
72      source_file.host_name = "machine.cluster"
73      source_file.lines.append("print(%d)" % i)
74      writer.WriteSourceFile(source_file)
75
76      stack_frame = debug_event_pb2.StackFrameWithId()
77      stack_frame.id = "stack_%d" % i
78      stack_frame.file_line_col.file_index = i * 10
79      writer.WriteStackFrameWithId(stack_frame)
80
81    writer.FlushNonExecutionFiles()
82
83    with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
84      actuals = list(item.debug_event.source_file
85                     for item in reader.source_files_iterator())
86      self.assertLen(actuals, num_protos)
87      for i in range(num_protos):
88        self.assertEqual(actuals[i].file_path, "/home/tf2user/main.py")
89        self.assertEqual(actuals[i].host_name, "machine.cluster")
90        self.assertEqual(actuals[i].lines, ["print(%d)" % i])
91
92      actuals = list(item.debug_event.stack_frame_with_id
93                     for item in reader.stack_frames_iterator())
94      self.assertLen(actuals, num_protos)
95      for i in range(num_protos):
96        self.assertEqual(actuals[i].id, "stack_%d" % i)
97        self.assertEqual(actuals[i].file_line_col.file_index, i * 10)
98
99  def testWriteGraphOpCreationAndDebuggedGraphs(self):
100    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
101                                                   self.tfdbg_run_id)
102    num_op_creations = 10
103    for i in range(num_op_creations):
104      graph_op_creation = debug_event_pb2.GraphOpCreation()
105      graph_op_creation.op_type = "Conv2D"
106      graph_op_creation.op_name = "Conv2D_%d" % i
107      writer.WriteGraphOpCreation(graph_op_creation)
108    debugged_graph = debug_event_pb2.DebuggedGraph()
109    debugged_graph.graph_id = "deadbeaf"
110    debugged_graph.graph_name = "MyGraph1"
111    writer.WriteDebuggedGraph(debugged_graph)
112    writer.FlushNonExecutionFiles()
113
114    reader = debug_events_reader.DebugEventsReader(self.dump_root)
115    actuals = list(item.debug_event for item in reader.graphs_iterator())
116    self.assertLen(actuals, num_op_creations + 1)
117    for i in range(num_op_creations):
118      self.assertEqual(actuals[i].graph_op_creation.op_type, "Conv2D")
119      self.assertEqual(actuals[i].graph_op_creation.op_name, "Conv2D_%d" % i)
120    self.assertEqual(actuals[num_op_creations].debugged_graph.graph_id,
121                     "deadbeaf")
122
123  def testConcurrentWritesToNonExecutionFilesWorks(self):
124    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
125                                                   self.tfdbg_run_id)
126
127    source_file_state = {"counter": 0, "lock": threading.Lock()}
128
129    def writer_source_file():
130      source_file = debug_event_pb2.SourceFile()
131      with source_file_state["lock"]:
132        source_file.file_path = "/home/tf2user/file_%d.py" % source_file_state[
133            "counter"]
134        source_file_state["counter"] += 1
135      writer.WriteSourceFile(source_file)
136      # More-frequent-than-necessary concurrent flushing is not recommended,
137      # but tolerated.
138      writer.FlushNonExecutionFiles()
139
140    stack_frame_state = {"counter": 0, "lock": threading.Lock()}
141
142    def write_stack_frame():
143      stack_frame = debug_event_pb2.StackFrameWithId()
144      with stack_frame_state["lock"]:
145        stack_frame.id = "stack_frame_%d" % stack_frame_state["counter"]
146        stack_frame_state["counter"] += 1
147      writer.WriteStackFrameWithId(stack_frame)
148      # More-frequent-than-necessary concurrent flushing is not recommended,
149      # but tolerated.
150      writer.FlushNonExecutionFiles()
151
152    graph_op_state = {"counter": 0, "lock": threading.Lock()}
153
154    def write_graph_op_creation():
155      graph_op_creation = debug_event_pb2.GraphOpCreation()
156      with graph_op_state["lock"]:
157        graph_op_creation.op_name = "Op%d" % graph_op_state["counter"]
158        graph_op_state["counter"] += 1
159      writer.WriteGraphOpCreation(graph_op_creation)
160      # More-frequent-than-necessary concurrent flushing is not recommended,
161      # but tolerated.
162      writer.FlushNonExecutionFiles()
163
164    num_threads = 9
165    threads = []
166    for i in range(num_threads):
167      if i % 3 == 0:
168        target = writer_source_file
169      elif i % 3 == 1:
170        target = write_stack_frame
171      else:
172        target = write_graph_op_creation
173      thread = threading.Thread(target=target)
174      thread.start()
175      threads.append(thread)
176    for thread in threads:
177      thread.join()
178
179    # Verify the content of the .source_files file.
180    with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
181      source_files_iter = reader.source_files_iterator()
182      actuals = list(item.debug_event.source_file for item in source_files_iter)
183      file_paths = sorted([actual.file_path for actual in actuals])
184      self.assertEqual(file_paths, [
185          "/home/tf2user/file_0.py", "/home/tf2user/file_1.py",
186          "/home/tf2user/file_2.py"
187      ])
188
189    # Verify the content of the .stack_frames file.
190    actuals = list(item.debug_event.stack_frame_with_id
191                   for item in reader.stack_frames_iterator())
192    stack_frame_ids = sorted([actual.id for actual in actuals])
193    self.assertEqual(stack_frame_ids,
194                     ["stack_frame_0", "stack_frame_1", "stack_frame_2"])
195
196    # Verify the content of the .graphs file.
197    actuals = list(item.debug_event.graph_op_creation
198                   for item in reader.graphs_iterator())
199    graph_op_names = sorted([actual.op_name for actual in actuals])
200    self.assertEqual(graph_op_names, ["Op0", "Op1", "Op2"])
201
202  def testWriteAndReadMetadata(self):
203    t0 = time.time()
204    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
205                                                   self.tfdbg_run_id)
206    writer.Close()
207    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
208      self.assertIsInstance(reader.starting_wall_time(), float)
209      self.assertGreaterEqual(reader.starting_wall_time(), t0)
210      self.assertEqual(reader.tensorflow_version(), versions.__version__)
211      self.assertTrue(reader.tfdbg_run_id())
212
213  def testWriteExecutionEventsWithCircularBuffer(self):
214    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
215                                                   self.tfdbg_run_id)
216    num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2
217    for i in range(num_execution_events):
218      execution = debug_event_pb2.Execution()
219      execution.op_type = "OpType%d" % i
220      writer.WriteExecution(execution)
221
222    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
223      # Before FlushExecutionFiles() is called. No data should have been written
224      # to the file.
225      reader.update()
226      self.assertFalse(reader.executions())
227
228      writer.FlushExecutionFiles()
229      reader.update()
230      executions = reader.executions()
231      for i, execution in enumerate(executions):
232        self.assertEqual(
233            execution.op_type,
234            "OpType%d" % (i + debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE))
235
236  def testWriteExecutionEventsWithoutCircularBufferBehavior(self):
237    # A circular buffer size of 0 abolishes the circular buffer behavior.
238    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
239                                                   self.tfdbg_run_id, 0)
240    num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2
241    for i in range(num_execution_events):
242      execution = debug_event_pb2.Execution()
243      execution.op_type = "OpType%d" % i
244      writer.WriteExecution(execution)
245    writer.FlushExecutionFiles()
246
247    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
248      reader.update()
249      executions = reader.executions()
250      self.assertLen(executions, num_execution_events)
251      for i, execution in enumerate(executions):
252        self.assertEqual(execution.op_type, "OpType%d" % i)
253
254  def testWriteGraphExecutionTraceEventsWithCircularBuffer(self):
255    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
256                                                   self.tfdbg_run_id)
257    num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2
258    for i in range(num_execution_events):
259      trace = debug_event_pb2.GraphExecutionTrace()
260      trace.op_name = "Op%d" % i
261      writer.WriteGraphExecutionTrace(trace)
262
263    with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
264      actuals = list(reader.graph_execution_traces_iterators()[0])
265      # Before FlushExecutionFiles() is called. No data should have been written
266      # to the file.
267      self.assertEmpty(actuals)
268
269      writer.FlushExecutionFiles()
270      actuals = list(item.debug_event.graph_execution_trace
271                     for item in reader.graph_execution_traces_iterators()[0])
272      self.assertLen(actuals, debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE)
273      for i in range(debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE):
274        self.assertEqual(
275            actuals[i].op_name,
276            "Op%d" % (i + debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE))
277
278  def testWriteGraphExecutionTraceEventsWithoutCircularBufferBehavior(self):
279    # A circular buffer size of 0 abolishes the circular buffer behavior.
280    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
281                                                   self.tfdbg_run_id, 0)
282    num_execution_events = debug_events_writer.DEFAULT_CIRCULAR_BUFFER_SIZE * 2
283    for i in range(num_execution_events):
284      trace = debug_event_pb2.GraphExecutionTrace()
285      trace.op_name = "Op%d" % i
286      writer.WriteGraphExecutionTrace(trace)
287    writer.FlushExecutionFiles()
288
289    with debug_events_reader.DebugEventsReader(self.dump_root) as reader:
290      actuals = list(item.debug_event.graph_execution_trace
291                     for item in reader.graph_execution_traces_iterators()[0])
292    self.assertLen(actuals, num_execution_events)
293    for i in range(num_execution_events):
294      self.assertEqual(actuals[i].op_name, "Op%d" % i)
295
296  def testConcurrentWritesToExecutionFiles(self):
297    circular_buffer_size = 5
298    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
299                                                   self.tfdbg_run_id,
300                                                   circular_buffer_size)
301    debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1",
302                                                   graph_name="graph1")
303    writer.WriteDebuggedGraph(debugged_graph)
304
305    execution_state = {"counter": 0, "lock": threading.Lock()}
306
307    def write_execution():
308      execution = debug_event_pb2.Execution()
309      with execution_state["lock"]:
310        execution.op_type = "OpType%d" % execution_state["counter"]
311        execution_state["counter"] += 1
312      writer.WriteExecution(execution)
313
314    graph_execution_trace_state = {"counter": 0, "lock": threading.Lock()}
315
316    def write_graph_execution_trace():
317      with graph_execution_trace_state["lock"]:
318        op_name = "Op%d" % graph_execution_trace_state["counter"]
319        graph_op_creation = debug_event_pb2.GraphOpCreation(
320            op_type="FooOp", op_name=op_name, graph_id="graph1")
321        trace = debug_event_pb2.GraphExecutionTrace(
322            op_name=op_name, tfdbg_context_id="graph1")
323        graph_execution_trace_state["counter"] += 1
324      writer.WriteGraphOpCreation(graph_op_creation)
325      writer.WriteGraphExecutionTrace(trace)
326
327    threads = []
328    for i in range(circular_buffer_size * 4):
329      if i % 2 == 0:
330        target = write_execution
331      else:
332        target = write_graph_execution_trace
333      thread = threading.Thread(target=target)
334      thread.start()
335      threads.append(thread)
336    for thread in threads:
337      thread.join()
338    writer.FlushNonExecutionFiles()
339    writer.FlushExecutionFiles()
340
341    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
342      reader.update()
343      # Verify the content of the .execution file.
344      executions = reader.executions()
345      executed_op_types = [execution.op_type for execution in executions]
346      self.assertLen(executed_op_types, circular_buffer_size)
347      self.assertLen(executed_op_types, len(set(executed_op_types)))
348
349      # Verify the content of the .graph_execution_traces file.
350      op_names = [trace.op_name for trace in reader.graph_execution_traces()]
351      self.assertLen(op_names, circular_buffer_size)
352      self.assertLen(op_names, len(set(op_names)))
353
354  def testConcurrentSourceFileRandomReads(self):
355    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
356                                                   self.tfdbg_run_id)
357
358    for i in range(100):
359      source_file = debug_event_pb2.SourceFile(
360          host_name="localhost", file_path="/tmp/file_%d.py" % i)
361      source_file.lines.append("# File %d" % i)
362      writer.WriteSourceFile(source_file)
363    writer.FlushNonExecutionFiles()
364
365    reader = debug_events_reader.DebugDataReader(self.dump_root)
366    reader.update()
367    lines = [None] * 100
368    def read_job_1():
369      # Read in the reverse order to enhance randomness of the read access.
370      for i in range(49, -1, -1):
371        lines[i] = reader.source_lines("localhost", "/tmp/file_%d.py" % i)
372    def read_job_2():
373      for i in range(99, 49, -1):
374        lines[i] = reader.source_lines("localhost", "/tmp/file_%d.py" % i)
375    thread_1 = threading.Thread(target=read_job_1)
376    thread_2 = threading.Thread(target=read_job_2)
377    thread_1.start()
378    thread_2.start()
379    thread_1.join()
380    thread_2.join()
381    for i in range(100):
382      self.assertEqual(lines[i], ["# File %d" % i])
383
384  def testConcurrentExecutionUpdateAndRandomRead(self):
385    circular_buffer_size = -1
386    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
387                                                   self.tfdbg_run_id,
388                                                   circular_buffer_size)
389
390    writer_state = {"counter": 0, "done": False}
391
392    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
393      def write_and_update_job():
394        while True:
395          if writer_state["done"]:
396            break
397          execution = debug_event_pb2.Execution()
398          execution.op_type = "OpType%d" % writer_state["counter"]
399          writer_state["counter"] += 1
400          writer.WriteExecution(execution)
401          writer.FlushExecutionFiles()
402          reader.update()
403      # On the sub-thread, keep writing and reading new Execution protos.
404      write_and_update_thread = threading.Thread(target=write_and_update_job)
405      write_and_update_thread.start()
406      # On the main thread, do concurrent random read.
407      while True:
408        exec_digests = reader.executions(digest=True)
409        if exec_digests:
410          exec_0 = reader.read_execution(exec_digests[0])
411          self.assertEqual(exec_0.op_type, "OpType0")
412          writer_state["done"] = True
413          break
414        else:
415          time.sleep(0.1)
416          continue
417      write_and_update_thread.join()
418
419  def testConcurrentExecutionRandomReads(self):
420    circular_buffer_size = -1
421    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
422                                                   self.tfdbg_run_id,
423                                                   circular_buffer_size)
424
425    for i in range(100):
426      execution = debug_event_pb2.Execution()
427      execution.op_type = "OpType%d" % i
428      writer.WriteExecution(execution)
429    writer.FlushNonExecutionFiles()
430    writer.FlushExecutionFiles()
431
432    reader = debug_events_reader.DebugDataReader(self.dump_root)
433    reader.update()
434    executions = [None] * 100
435    def read_job_1():
436      execution_digests = reader.executions(digest=True)
437      # Read in the reverse order to enhance randomness of the read access.
438      for i in range(49, -1, -1):
439        execution = reader.read_execution(execution_digests[i])
440        executions[i] = execution
441    def read_job_2():
442      execution_digests = reader.executions(digest=True)
443      for i in range(99, 49, -1):
444        execution = reader.read_execution(execution_digests[i])
445        executions[i] = execution
446    thread_1 = threading.Thread(target=read_job_1)
447    thread_2 = threading.Thread(target=read_job_2)
448    thread_1.start()
449    thread_2.start()
450    thread_1.join()
451    thread_2.join()
452    for i in range(100):
453      self.assertEqual(executions[i].op_type, "OpType%d" % i)
454
455  def testConcurrentGraphExecutionTraceUpdateAndRandomRead(self):
456    circular_buffer_size = -1
457    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
458                                                   self.tfdbg_run_id,
459                                                   circular_buffer_size)
460    debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1",
461                                                   graph_name="graph1")
462    writer.WriteDebuggedGraph(debugged_graph)
463
464    writer_state = {"counter": 0, "done": False}
465
466    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
467      def write_and_update_job():
468        while True:
469          if writer_state["done"]:
470            break
471          op_name = "Op%d" % writer_state["counter"]
472          graph_op_creation = debug_event_pb2.GraphOpCreation(
473              op_type="FooOp", op_name=op_name, graph_id="graph1")
474          writer.WriteGraphOpCreation(graph_op_creation)
475          trace = debug_event_pb2.GraphExecutionTrace(
476              op_name=op_name, tfdbg_context_id="graph1")
477          writer.WriteGraphExecutionTrace(trace)
478          writer_state["counter"] += 1
479          writer.FlushNonExecutionFiles()
480          writer.FlushExecutionFiles()
481          reader.update()
482      # On the sub-thread, keep writing and reading new GraphExecutionTraces.
483      write_and_update_thread = threading.Thread(target=write_and_update_job)
484      write_and_update_thread.start()
485      # On the main thread, do concurrent random read.
486      while True:
487        digests = reader.graph_execution_traces(digest=True)
488        if digests:
489          trace_0 = reader.read_graph_execution_trace(digests[0])
490          self.assertEqual(trace_0.op_name, "Op0")
491          writer_state["done"] = True
492          break
493        else:
494          time.sleep(0.1)
495          continue
496      write_and_update_thread.join()
497
498  def testConcurrentGraphExecutionTraceRandomReads(self):
499    circular_buffer_size = -1
500    writer = debug_events_writer.DebugEventsWriter(self.dump_root,
501                                                   self.tfdbg_run_id,
502                                                   circular_buffer_size)
503    debugged_graph = debug_event_pb2.DebuggedGraph(graph_id="graph1",
504                                                   graph_name="graph1")
505    writer.WriteDebuggedGraph(debugged_graph)
506
507    for i in range(100):
508      op_name = "Op%d" % i
509      graph_op_creation = debug_event_pb2.GraphOpCreation(
510          op_type="FooOp", op_name=op_name, graph_id="graph1")
511      writer.WriteGraphOpCreation(graph_op_creation)
512      trace = debug_event_pb2.GraphExecutionTrace(
513          op_name=op_name, tfdbg_context_id="graph1")
514      writer.WriteGraphExecutionTrace(trace)
515    writer.FlushNonExecutionFiles()
516    writer.FlushExecutionFiles()
517
518    reader = debug_events_reader.DebugDataReader(self.dump_root)
519    reader.update()
520    traces = [None] * 100
521    def read_job_1():
522      digests = reader.graph_execution_traces(digest=True)
523      for i in range(49, -1, -1):
524        traces[i] = reader.read_graph_execution_trace(digests[i])
525    def read_job_2():
526      digests = reader.graph_execution_traces(digest=True)
527      for i in range(99, 49, -1):
528        traces[i] = reader.read_graph_execution_trace(digests[i])
529    thread_1 = threading.Thread(target=read_job_1)
530    thread_2 = threading.Thread(target=read_job_2)
531    thread_1.start()
532    thread_2.start()
533    thread_1.join()
534    thread_2.join()
535    for i in range(100):
536      self.assertEqual(traces[i].op_name, "Op%d" % i)
537
538  @parameterized.named_parameters(
539      ("Begin1End3", 1, 3, 1, 3),
540      ("Begin0End3", 0, 3, 0, 3),
541      ("Begin0EndNeg1", 0, -1, 0, 4),
542      ("BeginNoneEnd3", None, 3, 0, 3),
543      ("Begin2EndNone", 2, None, 2, 5),
544      ("BeginNoneEndNone", None, None, 0, 5),
545  )
546  def testRangeReadingExecutions(self, begin, end, expected_begin,
547                                 expected_end):
548    writer = debug_events_writer.DebugEventsWriter(
549        self.dump_root, self.tfdbg_run_id, circular_buffer_size=-1)
550    for i in range(5):
551      execution = debug_event_pb2.Execution(op_type="OpType%d" % i)
552      writer.WriteExecution(execution)
553    writer.FlushExecutionFiles()
554    writer.Close()
555
556    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
557      reader.update()
558      executions = reader.executions(begin=begin, end=end)
559    self.assertLen(executions, expected_end - expected_begin)
560    self.assertEqual(executions[0].op_type, "OpType%d" % expected_begin)
561    self.assertEqual(executions[-1].op_type, "OpType%d" % (expected_end - 1))
562
563  @parameterized.named_parameters(
564      ("Begin1End3", 1, 3, 1, 3),
565      ("Begin0End3", 0, 3, 0, 3),
566      ("Begin0EndNeg1", 0, -1, 0, 4),
567      ("BeginNoneEnd3", None, 3, 0, 3),
568      ("Begin2EndNone", 2, None, 2, 5),
569      ("BeginNoneEndNone", None, None, 0, 5),
570  )
571  def testRangeReadingGraphExecutionTraces(self, begin, end, expected_begin,
572                                           expected_end):
573    writer = debug_events_writer.DebugEventsWriter(
574        self.dump_root, self.tfdbg_run_id, circular_buffer_size=-1)
575    debugged_graph = debug_event_pb2.DebuggedGraph(
576        graph_id="graph1", graph_name="graph1")
577    writer.WriteDebuggedGraph(debugged_graph)
578    for i in range(5):
579      op_name = "Op_%d" % i
580      graph_op_creation = debug_event_pb2.GraphOpCreation(
581          op_name=op_name, graph_id="graph1")
582      writer.WriteGraphOpCreation(graph_op_creation)
583      trace = debug_event_pb2.GraphExecutionTrace(
584          op_name=op_name, tfdbg_context_id="graph1")
585      writer.WriteGraphExecutionTrace(trace)
586    writer.FlushNonExecutionFiles()
587    writer.FlushExecutionFiles()
588    writer.Close()
589
590    with debug_events_reader.DebugDataReader(self.dump_root) as reader:
591      reader.update()
592      traces = reader.graph_execution_traces(begin=begin, end=end)
593    self.assertLen(traces, expected_end - expected_begin)
594    self.assertEqual(traces[0].op_name, "Op_%d" % expected_begin)
595    self.assertEqual(traces[-1].op_name, "Op_%d" % (expected_end - 1))
596
597
598class MultiSetReaderTest(dumping_callback_test_lib.DumpingCallbackTestBase):
599  """Test for DebugDataReader for multiple file sets under a dump root."""
600
601  def testReadingTwoFileSetsWithTheSameDumpRootSucceeds(self):
602    # To simulate a multi-host data dump, we first generate file sets in two
603    # different directories, with the same tfdbg_run_id, and then combine them.
604    tfdbg_run_id = "foo"
605    for i in range(2):
606      writer = debug_events_writer.DebugEventsWriter(
607          os.path.join(self.dump_root, str(i)),
608          tfdbg_run_id,
609          circular_buffer_size=-1)
610      if i == 0:
611        debugged_graph = debug_event_pb2.DebuggedGraph(
612            graph_id="graph1", graph_name="graph1")
613        writer.WriteDebuggedGraph(debugged_graph)
614        op_name = "Op_0"
615        graph_op_creation = debug_event_pb2.GraphOpCreation(
616            op_type="FooOp", op_name=op_name, graph_id="graph1")
617        writer.WriteGraphOpCreation(graph_op_creation)
618        op_name = "Op_1"
619        graph_op_creation = debug_event_pb2.GraphOpCreation(
620            op_type="FooOp", op_name=op_name, graph_id="graph1")
621        writer.WriteGraphOpCreation(graph_op_creation)
622      for _ in range(10):
623        trace = debug_event_pb2.GraphExecutionTrace(
624            op_name="Op_%d" % i, tfdbg_context_id="graph1")
625        writer.WriteGraphExecutionTrace(trace)
626        writer.FlushNonExecutionFiles()
627        writer.FlushExecutionFiles()
628
629    # Move all files from the subdirectory /1 to subdirectory /0.
630    dump_root_0 = os.path.join(self.dump_root, "0")
631    src_paths = glob.glob(os.path.join(self.dump_root, "1", "*"))
632    for src_path in src_paths:
633      dst_path = os.path.join(
634          dump_root_0,
635          # Rename the file set to avoid file name collision.
636          re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path)))
637      os.rename(src_path, dst_path)
638
639    with debug_events_reader.DebugDataReader(dump_root_0) as reader:
640      reader.update()
641      # Verify the content of the .graph_execution_traces file.
642      trace_digests = reader.graph_execution_traces(digest=True)
643      self.assertLen(trace_digests, 20)
644      for _ in range(10):
645        trace = reader.read_graph_execution_trace(trace_digests[i])
646        self.assertEqual(trace.op_name, "Op_0")
647      for _ in range(10):
648        trace = reader.read_graph_execution_trace(trace_digests[i + 10])
649        self.assertEqual(trace.op_name, "Op_1")
650
651  def testReadingTwoFileSetsWithTheDifferentRootsLeadsToError(self):
652    # To simulate a multi-host data dump, we first generate file sets in two
653    # different directories, with different tfdbg_run_ids, and then combine
654    # them.
655    for i in range(2):
656      writer = debug_events_writer.DebugEventsWriter(
657          os.path.join(self.dump_root, str(i)),
658          "run_id_%d" % i,
659          circular_buffer_size=-1)
660      writer.FlushNonExecutionFiles()
661      writer.FlushExecutionFiles()
662
663    # Move all files from the subdirectory /1 to subdirectory /0.
664    dump_root_0 = os.path.join(self.dump_root, "0")
665    src_paths = glob.glob(os.path.join(self.dump_root, "1", "*"))
666    for src_path in src_paths:
667      dst_path = os.path.join(
668          dump_root_0,
669          # Rename the file set to avoid file name collision.
670          re.sub(r"(tfdbg_events\.\d+)", r"\g<1>1", os.path.basename(src_path)))
671      os.rename(src_path, dst_path)
672
673    with self.assertRaisesRegex(ValueError,
674                                r"Found multiple \(2\) tfdbg2 runs"):
675      debug_events_reader.DebugDataReader(dump_root_0)
676
677
678class DataObjectsTest(test_util.TensorFlowTestCase, parameterized.TestCase):
679
680  def jsonRoundTripCheck(self, obj):
681    self.assertEqual(
682        json_lib.dumps(json_lib.loads(json_lib.dumps(obj)), sort_keys=True),
683        json_lib.dumps(obj, sort_keys=True))
684
685  def testExecutionDigestWithNoOutputToJson(self):
686    execution_digest = debug_events_reader.ExecutionDigest(
687        1234, 5678, "FooOp", output_tensor_device_ids=None)
688    json = execution_digest.to_json()
689    self.jsonRoundTripCheck(json)
690    self.assertEqual(json["wall_time"], 1234)
691    self.assertEqual(json["op_type"], "FooOp")
692    self.assertEqual(json["output_tensor_device_ids"], None)
693
694  def testExecutionDigestWithTwoOutputsToJson(self):
695    execution_digest = debug_events_reader.ExecutionDigest(
696        1234, 5678, "FooOp", output_tensor_device_ids=[1357, 2468])
697    json = execution_digest.to_json()
698    self.jsonRoundTripCheck(json)
699    self.assertEqual(json["wall_time"], 1234)
700    self.assertEqual(json["op_type"], "FooOp")
701    self.assertEqual(json["output_tensor_device_ids"], (1357, 2468))
702
703  def testExecutionNoGraphNoInputToJson(self):
704    execution_digest = debug_events_reader.ExecutionDigest(
705        1234, 5678, "FooOp", output_tensor_device_ids=[1357])
706    execution = debug_events_reader.Execution(
707        execution_digest,
708        "localhost",
709        ("a1", "b2"),
710        debug_event_pb2.TensorDebugMode.CURT_HEALTH,
711        graph_id=None,
712        input_tensor_ids=None,
713        output_tensor_ids=[2468],
714        debug_tensor_values=([1, 0],))
715    json = execution.to_json()
716    self.jsonRoundTripCheck(json)
717    self.assertEqual(json["wall_time"], 1234)
718    self.assertEqual(json["op_type"], "FooOp")
719    self.assertEqual(json["output_tensor_device_ids"], (1357,))
720    self.assertEqual(json["host_name"], "localhost")
721    self.assertEqual(json["stack_frame_ids"], ("a1", "b2"))
722    self.assertEqual(json["tensor_debug_mode"],
723                     debug_event_pb2.TensorDebugMode.CURT_HEALTH)
724    self.assertIsNone(json["graph_id"])
725    self.assertIsNone(json["input_tensor_ids"])
726    self.assertEqual(json["output_tensor_ids"], (2468,))
727    self.assertEqual(json["debug_tensor_values"], ([1, 0],))
728
729  def testExecutionNoGraphNoInputButWithOutputToJson(self):
730    execution_digest = debug_events_reader.ExecutionDigest(
731        1234, 5678, "FooOp", output_tensor_device_ids=[1357])
732    execution = debug_events_reader.Execution(
733        execution_digest,
734        "localhost",
735        ("a1", "b2"),
736        debug_event_pb2.TensorDebugMode.FULL_HEALTH,
737        graph_id="abcd",
738        input_tensor_ids=[13, 37],
739        output_tensor_ids=None,
740        debug_tensor_values=None)
741    json = execution.to_json()
742    self.jsonRoundTripCheck(json)
743    self.assertEqual(json["wall_time"], 1234)
744    self.assertEqual(json["op_type"], "FooOp")
745    self.assertEqual(json["output_tensor_device_ids"], (1357,))
746    self.assertEqual(json["host_name"], "localhost")
747    self.assertEqual(json["stack_frame_ids"], ("a1", "b2"))
748    self.assertEqual(json["tensor_debug_mode"],
749                     debug_event_pb2.TensorDebugMode.FULL_HEALTH)
750    self.assertEqual(json["graph_id"], "abcd")
751    self.assertEqual(json["input_tensor_ids"], (13, 37))
752    self.assertIsNone(json["output_tensor_ids"])
753    self.assertIsNone(json["debug_tensor_values"])
754
755  @parameterized.named_parameters(
756      ("EmptyList", []),
757      ("None", None),
758  )
759  def testExecutionWithNoOutputTensorsReturnsZeroForNumOutputs(
760      self, output_tensor_ids):
761    execution = debug_events_reader.Execution(
762        debug_events_reader.ExecutionDigest(1234, 5678, "FooOp"),
763        "localhost", ("a1", "b2"),
764        debug_event_pb2.TensorDebugMode.FULL_HEALTH,
765        graph_id="abcd",
766        input_tensor_ids=[13, 37],
767        output_tensor_ids=output_tensor_ids,
768        debug_tensor_values=None)
769    self.assertEqual(execution.num_outputs, 0)
770
771  def testDebuggedDeviceToJons(self):
772    debugged_device = debug_events_reader.DebuggedDevice("/TPU:3", 4)
773    self.assertEqual(debugged_device.to_json(), {
774        "device_name": "/TPU:3",
775        "device_id": 4,
776    })
777
778  def testDebuggedGraphToJonsWitouthNameInnerOuterGraphIds(self):
779    debugged_graph = debug_events_reader.DebuggedGraph(
780        None,
781        "b1c2",
782        outer_graph_id=None,
783    )
784    self.assertEqual(
785        debugged_graph.to_json(), {
786            "name": None,
787            "graph_id": "b1c2",
788            "outer_graph_id": None,
789            "inner_graph_ids": [],
790        })
791
792  def testDebuggedGraphToJonsWithNameAndInnerOuterGraphIds(self):
793    debugged_graph = debug_events_reader.DebuggedGraph(
794        "loss_function",
795        "b1c2",
796        outer_graph_id="a0b1",
797    )
798    debugged_graph.add_inner_graph_id("c2d3")
799    debugged_graph.add_inner_graph_id("c2d3e4")
800    self.assertEqual(
801        debugged_graph.to_json(), {
802            "name": "loss_function",
803            "graph_id": "b1c2",
804            "outer_graph_id": "a0b1",
805            "inner_graph_ids": ["c2d3", "c2d3e4"],
806        })
807
808  @parameterized.named_parameters(
809      ("EmptyList", []),
810      ("None", None),
811  )
812  def testGraphOpDigestWithNoOutpusReturnsNumOutputsZero(
813      self, output_tensor_ids):
814    op_creation_digest = debug_events_reader.GraphOpCreationDigest(
815        1234,
816        5678,
817        "deadbeef",
818        "FooOp",
819        "Model_1/Foo_2",
820        output_tensor_ids,
821        "machine.cluster", ("a1", "a2"),
822        input_names=None,
823        device_name=None)
824    self.assertEqual(op_creation_digest.num_outputs, 0)
825
826  def testGraphOpCreationDigestNoInputNoDeviceNameToJson(self):
827    op_creation_digest = debug_events_reader.GraphOpCreationDigest(
828        1234,
829        5678,
830        "deadbeef",
831        "FooOp",
832        "Model_1/Foo_2", [135],
833        "machine.cluster", ("a1", "a2"),
834        input_names=None,
835        device_name=None)
836    json = op_creation_digest.to_json()
837    self.jsonRoundTripCheck(json)
838    self.assertEqual(json["wall_time"], 1234)
839    self.assertEqual(json["graph_id"], "deadbeef")
840    self.assertEqual(json["op_type"], "FooOp")
841    self.assertEqual(json["op_name"], "Model_1/Foo_2")
842    self.assertEqual(json["output_tensor_ids"], (135,))
843    self.assertEqual(json["host_name"], "machine.cluster")
844    self.assertEqual(json["stack_frame_ids"], ("a1", "a2"))
845    self.assertIsNone(json["input_names"])
846    self.assertIsNone(json["device_name"])
847
848  def testGraphOpCreationDigestWithInputsAndDeviceNameToJson(self):
849    op_creation_digest = debug_events_reader.GraphOpCreationDigest(
850        1234,
851        5678,
852        "deadbeef",
853        "FooOp",
854        "Model_1/Foo_2", [135],
855        "machine.cluster", ("a1", "a2"),
856        input_names=["Bar_1", "Qux_2"],
857        device_name="/device:GPU:0")
858    json = op_creation_digest.to_json()
859    self.jsonRoundTripCheck(json)
860    self.assertEqual(json["wall_time"], 1234)
861    self.assertEqual(json["graph_id"], "deadbeef")
862    self.assertEqual(json["op_type"], "FooOp")
863    self.assertEqual(json["op_name"], "Model_1/Foo_2")
864    self.assertEqual(json["output_tensor_ids"], (135,))
865    self.assertEqual(json["host_name"], "machine.cluster")
866    self.assertEqual(json["stack_frame_ids"], ("a1", "a2"))
867    self.assertEqual(json["input_names"], ("Bar_1", "Qux_2"))
868    self.assertEqual(json["device_name"], "/device:GPU:0")
869
870  def testGraphExecutionTraceDigestToJson(self):
871    trace_digest = debug_events_reader.GraphExecutionTraceDigest(
872        1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef")
873    json = trace_digest.to_json()
874    self.assertEqual(json["wall_time"], 1234)
875    self.assertEqual(json["op_type"], "FooOp")
876    self.assertEqual(json["op_name"], "Model_1/Foo_2")
877    self.assertEqual(json["output_slot"], 1)
878    self.assertEqual(json["graph_id"], "deadbeef")
879
880  def testGraphExecutionTraceWithTensorDebugValueAndDeviceNameToJson(self):
881    trace_digest = debug_events_reader.GraphExecutionTraceDigest(
882        1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef")
883    trace = debug_events_reader.GraphExecutionTrace(
884        trace_digest, ["g1", "g2", "deadbeef"],
885        debug_event_pb2.TensorDebugMode.CURT_HEALTH,
886        debug_tensor_value=[3, 1], device_name="/device:GPU:0")
887    json = trace.to_json()
888    self.assertEqual(json["wall_time"], 1234)
889    self.assertEqual(json["op_type"], "FooOp")
890    self.assertEqual(json["op_name"], "Model_1/Foo_2")
891    self.assertEqual(json["output_slot"], 1)
892    self.assertEqual(json["graph_id"], "deadbeef")
893    self.assertEqual(json["graph_ids"], ("g1", "g2", "deadbeef"))
894    self.assertEqual(json["tensor_debug_mode"],
895                     debug_event_pb2.TensorDebugMode.CURT_HEALTH)
896    self.assertEqual(json["debug_tensor_value"], (3, 1))
897    self.assertEqual(json["device_name"], "/device:GPU:0")
898
899  def testGraphExecutionTraceNoTensorDebugValueNoDeviceNameToJson(self):
900    trace_digest = debug_events_reader.GraphExecutionTraceDigest(
901        1234, 5678, "FooOp", "Model_1/Foo_2", 1, "deadbeef")
902    trace = debug_events_reader.GraphExecutionTrace(
903        trace_digest, ["g1", "g2", "deadbeef"],
904        debug_event_pb2.TensorDebugMode.NO_TENSOR,
905        debug_tensor_value=None, device_name=None)
906    json = trace.to_json()
907    self.assertEqual(json["wall_time"], 1234)
908    self.assertEqual(json["op_type"], "FooOp")
909    self.assertEqual(json["op_name"], "Model_1/Foo_2")
910    self.assertEqual(json["output_slot"], 1)
911    self.assertEqual(json["graph_id"], "deadbeef")
912    self.assertEqual(json["graph_ids"], ("g1", "g2", "deadbeef"))
913    self.assertEqual(json["tensor_debug_mode"],
914                     debug_event_pb2.TensorDebugMode.NO_TENSOR)
915    self.assertIsNone(json["debug_tensor_value"])
916    self.assertIsNone(json["device_name"])
917
918
919if __name__ == "__main__":
920  ops.enable_eager_execution()
921  googletest.main()
922