1# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Writes events to disk in a logdir.""" 16 17import collections 18import os.path 19import sys 20import threading 21import time 22 23import six 24 25from tensorflow.python.client import _pywrap_events_writer 26from tensorflow.python.platform import gfile 27from tensorflow.python.platform import tf_logging as logging 28from tensorflow.python.util import compat 29 30 31class EventFileWriter(object): 32 """Writes `Event` protocol buffers to an event file. 33 34 The `EventFileWriter` class creates an event file in the specified directory, 35 and asynchronously writes Event protocol buffers to the file. The Event file 36 is encoded using the tfrecord format, which is similar to RecordIO. 37 38 This class is not thread-safe. 39 """ 40 41 def __init__(self, logdir, max_queue=10, flush_secs=120, 42 filename_suffix=None): 43 """Creates a `EventFileWriter` and an event file to write to. 44 45 On construction the summary writer creates a new event file in `logdir`. 46 This event file will contain `Event` protocol buffers, which are written to 47 disk via the add_event method. 48 49 The other arguments to the constructor control the asynchronous writes to 50 the event file: 51 52 * `flush_secs`: How often, in seconds, to flush the added summaries 53 and events to disk. 54 * `max_queue`: Maximum number of summaries or events pending to be 55 written to disk before one of the 'add' calls block. 56 57 Args: 58 logdir: A string. Directory where event file will be written. 59 max_queue: Integer. Size of the queue for pending events and summaries. 60 flush_secs: Number. How often, in seconds, to flush the 61 pending events and summaries to disk. 62 filename_suffix: A string. Every event file's name is suffixed with 63 `filename_suffix`. 64 """ 65 self._logdir = str(logdir) 66 gfile.MakeDirs(self._logdir) 67 self._max_queue = max_queue 68 self._flush_secs = flush_secs 69 self._flush_complete = threading.Event() 70 self._flush_sentinel = object() 71 self._close_sentinel = object() 72 self._ev_writer = _pywrap_events_writer.EventsWriter( 73 compat.as_bytes(os.path.join(self._logdir, "events"))) 74 if filename_suffix: 75 self._ev_writer.InitWithSuffix(compat.as_bytes(filename_suffix)) 76 self._initialize() 77 self._closed = False 78 79 def _initialize(self): 80 """Initializes or re-initializes the queue and writer thread. 81 82 The EventsWriter itself does not need to be re-initialized explicitly, 83 because it will auto-initialize itself if used after being closed. 84 """ 85 self._event_queue = CloseableQueue(self._max_queue) 86 self._worker = _EventLoggerThread(self._event_queue, self._ev_writer, 87 self._flush_secs, self._flush_complete, 88 self._flush_sentinel, 89 self._close_sentinel) 90 91 self._worker.start() 92 93 def get_logdir(self): 94 """Returns the directory where event file will be written.""" 95 return self._logdir 96 97 def reopen(self): 98 """Reopens the EventFileWriter. 99 100 Can be called after `close()` to add more events in the same directory. 101 The events will go into a new events file. 102 103 Does nothing if the EventFileWriter was not closed. 104 """ 105 if self._closed: 106 self._initialize() 107 self._closed = False 108 109 def add_event(self, event): 110 """Adds an event to the event file. 111 112 Args: 113 event: An `Event` protocol buffer. 114 """ 115 if not self._closed: 116 self._try_put(event) 117 118 def _try_put(self, item): 119 """Attempts to enqueue an item to the event queue. 120 121 If the queue is closed, this will close the EventFileWriter and reraise the 122 exception that caused the queue closure, if one exists. 123 124 Args: 125 item: the item to enqueue 126 """ 127 try: 128 self._event_queue.put(item) 129 except QueueClosedError: 130 self._internal_close() 131 if self._worker.failure_exc_info: 132 six.reraise(*self._worker.failure_exc_info) # pylint: disable=no-value-for-parameter 133 134 def flush(self): 135 """Flushes the event file to disk. 136 137 Call this method to make sure that all pending events have been written to 138 disk. 139 """ 140 if not self._closed: 141 # Request a flush operation by enqueing a sentinel and then waiting for 142 # the writer thread to mark the flush as complete. 143 self._flush_complete.clear() 144 self._try_put(self._flush_sentinel) 145 self._flush_complete.wait() 146 if self._worker.failure_exc_info: 147 self._internal_close() 148 six.reraise(*self._worker.failure_exc_info) # pylint: disable=no-value-for-parameter 149 150 def close(self): 151 """Flushes the event file to disk and close the file. 152 153 Call this method when you do not need the summary writer anymore. 154 """ 155 if not self._closed: 156 self.flush() 157 self._try_put(self._close_sentinel) 158 self._internal_close() 159 160 def _internal_close(self): 161 self._closed = True 162 self._worker.join() 163 self._ev_writer.Close() 164 165 166class _EventLoggerThread(threading.Thread): 167 """Thread that logs events.""" 168 169 def __init__(self, queue, ev_writer, flush_secs, flush_complete, 170 flush_sentinel, close_sentinel): 171 """Creates an _EventLoggerThread. 172 173 Args: 174 queue: A CloseableQueue from which to dequeue events. The queue will be 175 closed just before the thread exits, whether due to `close_sentinel` or 176 any exception raised in the writing loop. 177 ev_writer: An event writer. Used to log brain events for 178 the visualizer. 179 flush_secs: How often, in seconds, to flush the 180 pending file to disk. 181 flush_complete: A threading.Event that will be set whenever a flush 182 operation requested via `flush_sentinel` has been completed. 183 flush_sentinel: A sentinel element in queue that tells this thread to 184 flush the writer and mark the current flush operation complete. 185 close_sentinel: A sentinel element in queue that tells this thread to 186 terminate and close the queue. 187 """ 188 threading.Thread.__init__(self, name="EventLoggerThread") 189 self.daemon = True 190 self._queue = queue 191 self._ev_writer = ev_writer 192 self._flush_secs = flush_secs 193 # The first event will be flushed immediately. 194 self._next_event_flush_time = 0 195 self._flush_complete = flush_complete 196 self._flush_sentinel = flush_sentinel 197 self._close_sentinel = close_sentinel 198 # Populated when writing logic raises an exception and kills the thread. 199 self.failure_exc_info = () 200 201 def run(self): 202 try: 203 while True: 204 event = self._queue.get() 205 if event is self._close_sentinel: 206 return 207 elif event is self._flush_sentinel: 208 self._ev_writer.Flush() 209 self._flush_complete.set() 210 else: 211 self._ev_writer.WriteEvent(event) 212 # Flush the event writer every so often. 213 now = time.time() 214 if now > self._next_event_flush_time: 215 self._ev_writer.Flush() 216 self._next_event_flush_time = now + self._flush_secs 217 except Exception as e: 218 logging.error("EventFileWriter writer thread error: %s", e) 219 self.failure_exc_info = sys.exc_info() 220 raise 221 finally: 222 # When exiting the thread, always complete any pending flush operation 223 # (to unblock flush() calls) and close the queue (to unblock add_event() 224 # calls, including those used by flush() and close()), which ensures that 225 # code using EventFileWriter doesn't deadlock if this thread dies. 226 self._flush_complete.set() 227 self._queue.close() 228 229 230class CloseableQueue(object): 231 """Stripped-down fork of the standard library Queue that is closeable.""" 232 233 def __init__(self, maxsize=0): 234 """Create a queue object with a given maximum size. 235 236 Args: 237 maxsize: int size of queue. If <= 0, the queue size is infinite. 238 """ 239 self._maxsize = maxsize 240 self._queue = collections.deque() 241 self._closed = False 242 # Mutex must be held whenever queue is mutating; shared by conditions. 243 self._mutex = threading.Lock() 244 # Notify not_empty whenever an item is added to the queue; a 245 # thread waiting to get is notified then. 246 self._not_empty = threading.Condition(self._mutex) 247 # Notify not_full whenever an item is removed from the queue; 248 # a thread waiting to put is notified then. 249 self._not_full = threading.Condition(self._mutex) 250 251 def get(self): 252 """Remove and return an item from the queue. 253 254 If the queue is empty, blocks until an item is available. 255 256 Returns: 257 an item from the queue 258 """ 259 with self._not_empty: 260 while not self._queue: 261 self._not_empty.wait() 262 item = self._queue.popleft() 263 self._not_full.notify() 264 return item 265 266 def put(self, item): 267 """Put an item into the queue. 268 269 If the queue is closed, fails immediately. 270 271 If the queue is full, blocks until space is available or until the queue 272 is closed by a call to close(), at which point this call fails. 273 274 Args: 275 item: an item to add to the queue 276 277 Raises: 278 QueueClosedError: if insertion failed because the queue is closed 279 """ 280 with self._not_full: 281 if self._closed: 282 raise QueueClosedError() 283 if self._maxsize > 0: 284 while len(self._queue) == self._maxsize: 285 self._not_full.wait() 286 if self._closed: 287 raise QueueClosedError() 288 self._queue.append(item) 289 self._not_empty.notify() 290 291 def close(self): 292 """Closes the queue, causing any pending or future `put()` calls to fail.""" 293 with self._not_full: 294 self._closed = True 295 self._not_full.notify_all() 296 297 298class QueueClosedError(Exception): 299 """Raised when CloseableQueue.put() fails because the queue is closed.""" 300