xref: /aosp_15_r20/external/tensorflow/tensorflow/core/data/service/journal.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/data/service/journal.h"
17 
18 #include <algorithm>
19 #include <memory>
20 #include <string>
21 #include <vector>
22 
23 #include "absl/memory/memory.h"
24 #include "tensorflow/core/data/service/journal.pb.h"
25 #include "tensorflow/core/lib/io/record_reader.h"
26 #include "tensorflow/core/lib/io/record_writer.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/errors.h"
29 #include "tensorflow/core/platform/path.h"
30 #include "tensorflow/core/platform/regexp.h"
31 
32 namespace tensorflow {
33 namespace data {
34 
35 namespace {
36 constexpr StringPiece kJournal = "journal";
37 
ParseSequenceNumber(const std::string & journal_file,int64_t * sequence_number)38 Status ParseSequenceNumber(const std::string& journal_file,
39                            int64_t* sequence_number) {
40   if (!RE2::FullMatch(journal_file, ".*_(\\d+)", sequence_number)) {
41     return errors::InvalidArgument("Failed to parse journal file name: ",
42                                    journal_file);
43   }
44   return OkStatus();
45 }
46 }  // namespace
47 
DataServiceJournalFile(const std::string & journal_dir,int64_t sequence_number)48 std::string DataServiceJournalFile(const std::string& journal_dir,
49                                    int64_t sequence_number) {
50   return io::JoinPath(journal_dir,
51                       absl::StrCat(kJournal, "_", sequence_number));
52 }
53 
FileJournalWriter(Env * env,const std::string & journal_dir)54 FileJournalWriter::FileJournalWriter(Env* env, const std::string& journal_dir)
55     : env_(env), journal_dir_(journal_dir) {}
56 
EnsureInitialized()57 Status FileJournalWriter::EnsureInitialized() {
58   if (writer_) {
59     return OkStatus();
60   }
61   std::vector<std::string> journal_files;
62   TF_RETURN_IF_ERROR(env_->RecursivelyCreateDir(journal_dir_));
63   TF_RETURN_IF_ERROR(env_->GetChildren(journal_dir_, &journal_files));
64   int64_t latest_sequence_number = -1;
65   for (const auto& file : journal_files) {
66     int64_t sequence_number;
67     TF_RETURN_IF_ERROR(ParseSequenceNumber(file, &sequence_number));
68     latest_sequence_number = std::max(latest_sequence_number, sequence_number);
69   }
70   std::string journal_file =
71       DataServiceJournalFile(journal_dir_, latest_sequence_number + 1);
72   TF_RETURN_IF_ERROR(env_->NewAppendableFile(journal_file, &file_));
73   writer_ = std::make_unique<io::RecordWriter>(file_.get());
74   VLOG(1) << "Created journal writer to write to " << journal_file;
75   return OkStatus();
76 }
77 
Write(const Update & update)78 Status FileJournalWriter::Write(const Update& update) {
79   TF_RETURN_IF_ERROR(EnsureInitialized());
80   std::string s = update.SerializeAsString();
81   if (s.empty()) {
82     return errors::Internal("Failed to serialize update ", update.DebugString(),
83                             " to string");
84   }
85   TF_RETURN_IF_ERROR(writer_->WriteRecord(s));
86   TF_RETURN_IF_ERROR(writer_->Flush());
87   TF_RETURN_IF_ERROR(file_->Sync());
88   if (VLOG_IS_ON(4)) {
89     VLOG(4) << "Wrote journal entry: " << update.DebugString();
90   }
91   return OkStatus();
92 }
93 
FileJournalReader(Env * env,StringPiece journal_dir)94 FileJournalReader::FileJournalReader(Env* env, StringPiece journal_dir)
95     : env_(env), journal_dir_(journal_dir) {}
96 
EnsureInitialized()97 Status FileJournalReader::EnsureInitialized() {
98   if (reader_) {
99     return OkStatus();
100   }
101   return UpdateFile(DataServiceJournalFile(journal_dir_, 0));
102 }
103 
Read(Update & update,bool & end_of_journal)104 Status FileJournalReader::Read(Update& update, bool& end_of_journal) {
105   TF_RETURN_IF_ERROR(EnsureInitialized());
106   while (true) {
107     tstring record;
108     Status s = reader_->ReadRecord(&record);
109     if (errors::IsOutOfRange(s)) {
110       sequence_number_++;
111       std::string next_journal_file =
112           DataServiceJournalFile(journal_dir_, sequence_number_);
113       if (errors::IsNotFound(env_->FileExists(next_journal_file))) {
114         VLOG(3) << "Next journal file " << next_journal_file
115                 << " does not exist. End of journal reached.";
116         end_of_journal = true;
117         return OkStatus();
118       }
119       TF_RETURN_IF_ERROR(UpdateFile(next_journal_file));
120       continue;
121     }
122     TF_RETURN_IF_ERROR(s);
123     if (!update.ParseFromString(record)) {
124       return errors::DataLoss("Failed to parse journal record.");
125     }
126     if (VLOG_IS_ON(4)) {
127       VLOG(4) << "Read journal entry: " << update.DebugString();
128     }
129     end_of_journal = false;
130     return OkStatus();
131   }
132 }
133 
UpdateFile(const std::string & filename)134 Status FileJournalReader::UpdateFile(const std::string& filename) {
135   VLOG(1) << "Reading from journal file " << filename;
136   TF_RETURN_IF_ERROR(env_->NewRandomAccessFile(filename, &file_));
137   io::RecordReaderOptions opts;
138   opts.buffer_size = 2 << 20;  // 2MB
139   reader_ = std::make_unique<io::SequentialRecordReader>(file_.get(), opts);
140   return OkStatus();
141 }
142 
143 }  // namespace data
144 }  // namespace tensorflow
145