xref: /aosp_15_r20/external/tensorflow/tensorflow/python/summary/writer/event_file_writer.py (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Writes events to disk in a logdir."""
16
17import collections
18import os.path
19import sys
20import threading
21import time
22
23import six
24
25from tensorflow.python.client import _pywrap_events_writer
26from tensorflow.python.platform import gfile
27from tensorflow.python.platform import tf_logging as logging
28from tensorflow.python.util import compat
29
30
31class EventFileWriter(object):
32  """Writes `Event` protocol buffers to an event file.
33
34  The `EventFileWriter` class creates an event file in the specified directory,
35  and asynchronously writes Event protocol buffers to the file. The Event file
36  is encoded using the tfrecord format, which is similar to RecordIO.
37
38  This class is not thread-safe.
39  """
40
41  def __init__(self, logdir, max_queue=10, flush_secs=120,
42               filename_suffix=None):
43    """Creates a `EventFileWriter` and an event file to write to.
44
45    On construction the summary writer creates a new event file in `logdir`.
46    This event file will contain `Event` protocol buffers, which are written to
47    disk via the add_event method.
48
49    The other arguments to the constructor control the asynchronous writes to
50    the event file:
51
52    *  `flush_secs`: How often, in seconds, to flush the added summaries
53       and events to disk.
54    *  `max_queue`: Maximum number of summaries or events pending to be
55       written to disk before one of the 'add' calls block.
56
57    Args:
58      logdir: A string. Directory where event file will be written.
59      max_queue: Integer. Size of the queue for pending events and summaries.
60      flush_secs: Number. How often, in seconds, to flush the
61        pending events and summaries to disk.
62      filename_suffix: A string. Every event file's name is suffixed with
63        `filename_suffix`.
64    """
65    self._logdir = str(logdir)
66    gfile.MakeDirs(self._logdir)
67    self._max_queue = max_queue
68    self._flush_secs = flush_secs
69    self._flush_complete = threading.Event()
70    self._flush_sentinel = object()
71    self._close_sentinel = object()
72    self._ev_writer = _pywrap_events_writer.EventsWriter(
73        compat.as_bytes(os.path.join(self._logdir, "events")))
74    if filename_suffix:
75      self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix))
76    self._initialize()
77    self._closed = False
78
79  def _initialize(self):
80    """Initializes or re-initializes the queue and writer thread.
81
82    The EventsWriter itself does not need to be re-initialized explicitly,
83    because it will auto-initialize itself if used after being closed.
84    """
85    self._event_queue = CloseableQueue(self._max_queue)
86    self._worker = _EventLoggerThread(self._event_queue, self._ev_writer,
87                                      self._flush_secs, self._flush_complete,
88                                      self._flush_sentinel,
89                                      self._close_sentinel)
90
91    self._worker.start()
92
93  def get_logdir(self):
94    """Returns the directory where event file will be written."""
95    return self._logdir
96
97  def reopen(self):
98    """Reopens the EventFileWriter.
99
100    Can be called after `close()` to add more events in the same directory.
101    The events will go into a new events file.
102
103    Does nothing if the EventFileWriter was not closed.
104    """
105    if self._closed:
106      self._initialize()
107      self._closed = False
108
109  def add_event(self, event):
110    """Adds an event to the event file.
111
112    Args:
113      event: An `Event` protocol buffer.
114    """
115    if not self._closed:
116      self._try_put(event)
117
118  def _try_put(self, item):
119    """Attempts to enqueue an item to the event queue.
120
121    If the queue is closed, this will close the EventFileWriter and reraise the
122    exception that caused the queue closure, if one exists.
123
124    Args:
125      item: the item to enqueue
126    """
127    try:
128      self._event_queue.put(item)
129    except QueueClosedError:
130      self._internal_close()
131      if self._worker.failure_exc_info:
132        six.reraise(*self._worker.failure_exc_info)  # pylint: disable=no-value-for-parameter
133
134  def flush(self):
135    """Flushes the event file to disk.
136
137    Call this method to make sure that all pending events have been written to
138    disk.
139    """
140    if not self._closed:
141      # Request a flush operation by enqueing a sentinel and then waiting for
142      # the writer thread to mark the flush as complete.
143      self._flush_complete.clear()
144      self._try_put(self._flush_sentinel)
145      self._flush_complete.wait()
146      if self._worker.failure_exc_info:
147        self._internal_close()
148        six.reraise(*self._worker.failure_exc_info)  # pylint: disable=no-value-for-parameter
149
150  def close(self):
151    """Flushes the event file to disk and close the file.
152
153    Call this method when you do not need the summary writer anymore.
154    """
155    if not self._closed:
156      self.flush()
157      self._try_put(self._close_sentinel)
158      self._internal_close()
159
160  def _internal_close(self):
161    self._closed = True
162    self._worker.join()
163    self._ev_writer.Close()
164
165
166class _EventLoggerThread(threading.Thread):
167  """Thread that logs events."""
168
169  def __init__(self, queue, ev_writer, flush_secs, flush_complete,
170               flush_sentinel, close_sentinel):
171    """Creates an _EventLoggerThread.
172
173    Args:
174      queue: A CloseableQueue from which to dequeue events. The queue will be
175        closed just before the thread exits, whether due to `close_sentinel` or
176        any exception raised in the writing loop.
177      ev_writer: An event writer. Used to log brain events for
178        the visualizer.
179      flush_secs: How often, in seconds, to flush the
180        pending file to disk.
181      flush_complete: A threading.Event that will be set whenever a flush
182        operation requested via `flush_sentinel` has been completed.
183      flush_sentinel: A sentinel element in queue that tells this thread to
184        flush the writer and mark the current flush operation complete.
185      close_sentinel: A sentinel element in queue that tells this thread to
186        terminate and close the queue.
187    """
188    threading.Thread.__init__(self, name="EventLoggerThread")
189    self.daemon = True
190    self._queue = queue
191    self._ev_writer = ev_writer
192    self._flush_secs = flush_secs
193    # The first event will be flushed immediately.
194    self._next_event_flush_time = 0
195    self._flush_complete = flush_complete
196    self._flush_sentinel = flush_sentinel
197    self._close_sentinel = close_sentinel
198    # Populated when writing logic raises an exception and kills the thread.
199    self.failure_exc_info = ()
200
201  def run(self):
202    try:
203      while True:
204        event = self._queue.get()
205        if event is self._close_sentinel:
206          return
207        elif event is self._flush_sentinel:
208          self._ev_writer.Flush()
209          self._flush_complete.set()
210        else:
211          self._ev_writer.WriteEvent(event)
212          # Flush the event writer every so often.
213          now = time.time()
214          if now > self._next_event_flush_time:
215            self._ev_writer.Flush()
216            self._next_event_flush_time = now + self._flush_secs
217    except Exception as e:
218      logging.error("EventFileWriter writer thread error: %s", e)
219      self.failure_exc_info = sys.exc_info()
220      raise
221    finally:
222      # When exiting the thread, always complete any pending flush operation
223      # (to unblock flush() calls) and close the queue (to unblock add_event()
224      # calls, including those used by flush() and close()), which ensures that
225      # code using EventFileWriter doesn't deadlock if this thread dies.
226      self._flush_complete.set()
227      self._queue.close()
228
229
230class CloseableQueue(object):
231  """Stripped-down fork of the standard library Queue that is closeable."""
232
233  def __init__(self, maxsize=0):
234    """Create a queue object with a given maximum size.
235
236    Args:
237      maxsize: int size of queue. If <= 0, the queue size is infinite.
238    """
239    self._maxsize = maxsize
240    self._queue = collections.deque()
241    self._closed = False
242    # Mutex must be held whenever queue is mutating; shared by conditions.
243    self._mutex = threading.Lock()
244    # Notify not_empty whenever an item is added to the queue; a
245    # thread waiting to get is notified then.
246    self._not_empty = threading.Condition(self._mutex)
247    # Notify not_full whenever an item is removed from the queue;
248    # a thread waiting to put is notified then.
249    self._not_full = threading.Condition(self._mutex)
250
251  def get(self):
252    """Remove and return an item from the queue.
253
254    If the queue is empty, blocks until an item is available.
255
256    Returns:
257      an item from the queue
258    """
259    with self._not_empty:
260      while not self._queue:
261        self._not_empty.wait()
262      item = self._queue.popleft()
263      self._not_full.notify()
264      return item
265
266  def put(self, item):
267    """Put an item into the queue.
268
269    If the queue is closed, fails immediately.
270
271    If the queue is full, blocks until space is available or until the queue
272    is closed by a call to close(), at which point this call fails.
273
274    Args:
275      item: an item to add to the queue
276
277    Raises:
278      QueueClosedError: if insertion failed because the queue is closed
279    """
280    with self._not_full:
281      if self._closed:
282        raise QueueClosedError()
283      if self._maxsize > 0:
284        while len(self._queue) == self._maxsize:
285          self._not_full.wait()
286          if self._closed:
287            raise QueueClosedError()
288      self._queue.append(item)
289      self._not_empty.notify()
290
291  def close(self):
292    """Closes the queue, causing any pending or future `put()` calls to fail."""
293    with self._not_full:
294      self._closed = True
295      self._not_full.notify_all()
296
297
298class QueueClosedError(Exception):
299  """Raised when CloseableQueue.put() fails because the queue is closed."""
300