1 /* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/data/service/journal.h"
17
18 #include <algorithm>
19 #include <memory>
20 #include <string>
21 #include <vector>
22
23 #include "absl/memory/memory.h"
24 #include "tensorflow/core/data/service/journal.pb.h"
25 #include "tensorflow/core/lib/io/record_reader.h"
26 #include "tensorflow/core/lib/io/record_writer.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/errors.h"
29 #include "tensorflow/core/platform/path.h"
30 #include "tensorflow/core/platform/regexp.h"
31
32 namespace tensorflow {
33 namespace data {
34
35 namespace {
36 constexpr StringPiece kJournal = "journal";
37
ParseSequenceNumber(const std::string & journal_file,int64_t * sequence_number)38 Status ParseSequenceNumber(const std::string& journal_file,
39 int64_t* sequence_number) {
40 if (!RE2::FullMatch(journal_file, ".*_(\\d+)", sequence_number)) {
41 return errors::InvalidArgument("Failed to parse journal file name: ",
42 journal_file);
43 }
44 return OkStatus();
45 }
46 } // namespace
47
DataServiceJournalFile(const std::string & journal_dir,int64_t sequence_number)48 std::string DataServiceJournalFile(const std::string& journal_dir,
49 int64_t sequence_number) {
50 return io::JoinPath(journal_dir,
51 absl::StrCat(kJournal, "_", sequence_number));
52 }
53
FileJournalWriter(Env * env,const std::string & journal_dir)54 FileJournalWriter::FileJournalWriter(Env* env, const std::string& journal_dir)
55 : env_(env), journal_dir_(journal_dir) {}
56
EnsureInitialized()57 Status FileJournalWriter::EnsureInitialized() {
58 if (writer_) {
59 return OkStatus();
60 }
61 std::vector<std::string> journal_files;
62 TF_RETURN_IF_ERROR(env_->RecursivelyCreateDir(journal_dir_));
63 TF_RETURN_IF_ERROR(env_->GetChildren(journal_dir_, &journal_files));
64 int64_t latest_sequence_number = -1;
65 for (const auto& file : journal_files) {
66 int64_t sequence_number;
67 TF_RETURN_IF_ERROR(ParseSequenceNumber(file, &sequence_number));
68 latest_sequence_number = std::max(latest_sequence_number, sequence_number);
69 }
70 std::string journal_file =
71 DataServiceJournalFile(journal_dir_, latest_sequence_number + 1);
72 TF_RETURN_IF_ERROR(env_->NewAppendableFile(journal_file, &file_));
73 writer_ = std::make_unique<io::RecordWriter>(file_.get());
74 VLOG(1) << "Created journal writer to write to " << journal_file;
75 return OkStatus();
76 }
77
Write(const Update & update)78 Status FileJournalWriter::Write(const Update& update) {
79 TF_RETURN_IF_ERROR(EnsureInitialized());
80 std::string s = update.SerializeAsString();
81 if (s.empty()) {
82 return errors::Internal("Failed to serialize update ", update.DebugString(),
83 " to string");
84 }
85 TF_RETURN_IF_ERROR(writer_->WriteRecord(s));
86 TF_RETURN_IF_ERROR(writer_->Flush());
87 TF_RETURN_IF_ERROR(file_->Sync());
88 if (VLOG_IS_ON(4)) {
89 VLOG(4) << "Wrote journal entry: " << update.DebugString();
90 }
91 return OkStatus();
92 }
93
FileJournalReader(Env * env,StringPiece journal_dir)94 FileJournalReader::FileJournalReader(Env* env, StringPiece journal_dir)
95 : env_(env), journal_dir_(journal_dir) {}
96
EnsureInitialized()97 Status FileJournalReader::EnsureInitialized() {
98 if (reader_) {
99 return OkStatus();
100 }
101 return UpdateFile(DataServiceJournalFile(journal_dir_, 0));
102 }
103
Read(Update & update,bool & end_of_journal)104 Status FileJournalReader::Read(Update& update, bool& end_of_journal) {
105 TF_RETURN_IF_ERROR(EnsureInitialized());
106 while (true) {
107 tstring record;
108 Status s = reader_->ReadRecord(&record);
109 if (errors::IsOutOfRange(s)) {
110 sequence_number_++;
111 std::string next_journal_file =
112 DataServiceJournalFile(journal_dir_, sequence_number_);
113 if (errors::IsNotFound(env_->FileExists(next_journal_file))) {
114 VLOG(3) << "Next journal file " << next_journal_file
115 << " does not exist. End of journal reached.";
116 end_of_journal = true;
117 return OkStatus();
118 }
119 TF_RETURN_IF_ERROR(UpdateFile(next_journal_file));
120 continue;
121 }
122 TF_RETURN_IF_ERROR(s);
123 if (!update.ParseFromString(record)) {
124 return errors::DataLoss("Failed to parse journal record.");
125 }
126 if (VLOG_IS_ON(4)) {
127 VLOG(4) << "Read journal entry: " << update.DebugString();
128 }
129 end_of_journal = false;
130 return OkStatus();
131 }
132 }
133
UpdateFile(const std::string & filename)134 Status FileJournalReader::UpdateFile(const std::string& filename) {
135 VLOG(1) << "Reading from journal file " << filename;
136 TF_RETURN_IF_ERROR(env_->NewRandomAccessFile(filename, &file_));
137 io::RecordReaderOptions opts;
138 opts.buffer_size = 2 << 20; // 2MB
139 reader_ = std::make_unique<io::SequentialRecordReader>(file_.get(), opts);
140 return OkStatus();
141 }
142
143 } // namespace data
144 } // namespace tensorflow
145