xref: /aosp_15_r20/external/tensorflow/tensorflow/core/data/service/journal.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
16 #define TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
17 
18 #include <memory>
19 #include <string>
20 
21 #include "tensorflow/core/data/service/journal.pb.h"
22 #include "tensorflow/core/lib/core/status.h"
23 #include "tensorflow/core/lib/io/record_reader.h"
24 #include "tensorflow/core/lib/io/record_writer.h"
25 #include "tensorflow/core/platform/env.h"
26 
27 namespace tensorflow {
28 namespace data {
29 
30 // Returns the location of the journal file within the journal directory.
31 std::string DataServiceJournalFile(const std::string& journal_dir,
32                                    int64_t sequence_number);
33 
34 // Interface for writing to a journal.
35 class JournalWriter {
36  public:
37   virtual ~JournalWriter() = default;
38   // Writes and syncs an update to the journal.
39   virtual Status Write(const Update& update) = 0;
40   // Initializes the writer if it is not yet initialized.
41   virtual Status EnsureInitialized() = 0;
42 };
43 
44 // FileJournalWriter is not thread-safe, requiring external synchronization when
45 // used by multiple threads.
46 //
47 // FileJournalWriter writes journal files to a configured journal directory. The
48 // directory is laid out in the following format:
49 //
50 // journal_dir/
51 //   journal_0
52 //   journal_1
53 //   ...
54 //
55 // When the writer is created, it lists the directory to find the next available
56 // journal file name. For example, if the journal directory contains
57 // "journal_0", "journal_1", and "journal_2", the writer will write to
58 // "journal_3". The writer will flush updates as they are written, so that they
59 // can be stored durably in case of machine failure.
60 class FileJournalWriter : public JournalWriter {
61  public:
62   // Creates a journal writer to write to the given journal directory.
63   // If there is already journal data there, the journal writer will append to
64   // the existing journal.
65   explicit FileJournalWriter(Env* env, const std::string& journal_dir);
66   FileJournalWriter(const FileJournalWriter&) = delete;
67   FileJournalWriter& operator=(const FileJournalWriter&) = delete;
68 
69   Status Write(const Update& update) override;
70   Status EnsureInitialized() override;
71 
72  private:
73   Env* env_;
74   const std::string journal_dir_;
75   std::unique_ptr<WritableFile> file_;
76   std::unique_ptr<io::RecordWriter> writer_;
77 };
78 
79 // Interface for reading from a journal.
80 class JournalReader {
81  public:
82   virtual ~JournalReader() = default;
83   // Reads the next update from the journal. Sets `end_of_journal=true` if
84   // there are no more updates left in the journal.
85   virtual Status Read(Update& update, bool& end_of_journal) = 0;
86 };
87 
88 // JournalReader is not thread-safe, requiring external synchronization when
89 // used by multiple threads.
90 //
91 // The journal reader reads through all journal files in the configured journal
92 // directory, in order of their sequence numbers. See FileJournalWriter above.
93 class FileJournalReader : public JournalReader {
94  public:
95   explicit FileJournalReader(Env* env, StringPiece journal_dir);
96   FileJournalReader(const FileJournalReader&) = delete;
97   FileJournalReader& operator=(const FileJournalReader&) = delete;
98 
99   Status Read(Update& update, bool& end_of_journal) override;
100 
101  private:
102   // Initializes the reader if it is not yet initialized.
103   Status EnsureInitialized();
104   // Updates the `FileJournalReader` to read from a new file.
105   Status UpdateFile(const std::string& filename);
106 
107   Env* env_;
108   const std::string journal_dir_;
109   // Sequence number of current journal file.
110   int64_t sequence_number_ = 0;
111   std::unique_ptr<RandomAccessFile> file_;
112   std::unique_ptr<io::SequentialRecordReader> reader_;
113 };
114 
115 }  // namespace data
116 }  // namespace tensorflow
117 
118 #endif  // TENSORFLOW_CORE_DATA_SERVICE_JOURNAL_H_
119