1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 #ifndef TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 16 #define TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 17 18 #include <memory> 19 #include <string> 20 21 #include "tensorflow/core/data/service/journal.pb.h" 22 #include "tensorflow/core/lib/core/status.h" 23 #include "tensorflow/core/lib/io/record_reader.h" 24 #include "tensorflow/core/lib/io/record_writer.h" 25 #include "tensorflow/core/platform/env.h" 26 27 namespace tensorflow { 28 namespace data { 29 30 // Returns the location of the journal file within the journal directory. 31 std::string DataServiceJournalFile(const std::string& journal_dir, 32 int64_t sequence_number); 33 34 // Interface for writing to a journal. 35 class JournalWriter { 36 public: 37 virtual ~JournalWriter() = default; 38 // Writes and syncs an update to the journal. 39 virtual Status Write(const Update& update) = 0; 40 // Initializes the writer if it is not yet initialized. 41 virtual Status EnsureInitialized() = 0; 42 }; 43 44 // FileJournalWriter is not thread-safe, requiring external synchronization when 45 // used by multiple threads. 46 // 47 // FileJournalWriter writes journal files to a configured journal directory. The 48 // directory is laid out in the following format: 49 // 50 // journal_dir/ 51 // journal_0 52 // journal_1 53 // ... 54 // 55 // When the writer is created, it lists the directory to find the next available 56 // journal file name. For example, if the journal directory contains 57 // "journal_0", "journal_1", and "journal_2", the writer will write to 58 // "journal_3". The writer will flush updates as they are written, so that they 59 // can be stored durably in case of machine failure. 60 class FileJournalWriter : public JournalWriter { 61 public: 62 // Creates a journal writer to write to the given journal directory. 63 // If there is already journal data there, the journal writer will append to 64 // the existing journal. 65 explicit FileJournalWriter(Env* env, const std::string& journal_dir); 66 FileJournalWriter(const FileJournalWriter&) = delete; 67 FileJournalWriter& operator=(const FileJournalWriter&) = delete; 68 69 Status Write(const Update& update) override; 70 Status EnsureInitialized() override; 71 72 private: 73 Env* env_; 74 const std::string journal_dir_; 75 std::unique_ptr<WritableFile> file_; 76 std::unique_ptr<io::RecordWriter> writer_; 77 }; 78 79 // Interface for reading from a journal. 80 class JournalReader { 81 public: 82 virtual ~JournalReader() = default; 83 // Reads the next update from the journal. Sets `end_of_journal=true` if 84 // there are no more updates left in the journal. 85 virtual Status Read(Update& update, bool& end_of_journal) = 0; 86 }; 87 88 // JournalReader is not thread-safe, requiring external synchronization when 89 // used by multiple threads. 90 // 91 // The journal reader reads through all journal files in the configured journal 92 // directory, in order of their sequence numbers. See FileJournalWriter above. 93 class FileJournalReader : public JournalReader { 94 public: 95 explicit FileJournalReader(Env* env, StringPiece journal_dir); 96 FileJournalReader(const FileJournalReader&) = delete; 97 FileJournalReader& operator=(const FileJournalReader&) = delete; 98 99 Status Read(Update& update, bool& end_of_journal) override; 100 101 private: 102 // Initializes the reader if it is not yet initialized. 103 Status EnsureInitialized(); 104 // Updates the `FileJournalReader` to read from a new file. 105 Status UpdateFile(const std::string& filename); 106 107 Env* env_; 108 const std::string journal_dir_; 109 // Sequence number of current journal file. 110 int64_t sequence_number_ = 0; 111 std::unique_ptr<RandomAccessFile> file_; 112 std::unique_ptr<io::SequentialRecordReader> reader_; 113 }; 114 115 } // namespace data 116 } // namespace tensorflow 117 118 #endif // TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_ 119