1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/opstats/pds_backed_opstats_db.h"
17
18 #include <fcntl.h>
19 #include <sys/file.h>
20
21 #include <filesystem>
22 #include <functional>
23 #include <string>
24 #include <utility>
25
26 #include "google/protobuf/util/time_util.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/time.h"
29 #include "fcp/base/monitoring.h"
30 #include "fcp/client/diag_codes.pb.h"
31 #include "fcp/client/log_manager.h"
32 #include "protostore/file-storage.h"
33 #include "protostore/proto-data-store.h"
34
35 namespace fcp {
36 namespace client {
37 namespace opstats {
38 namespace {
39
40 using ::google::protobuf::util::TimeUtil;
41
42 ABSL_CONST_INIT absl::Mutex file_lock_mutex(absl::kConstInit);
43
GetFilesInUseSet()44 absl::flat_hash_set<std::string>* GetFilesInUseSet() {
45 // Create the heap allocated static set only once, never call d'tor.
46 // See: go/totw/110
47 static absl::flat_hash_set<std::string>* files_in_use =
48 new absl::flat_hash_set<std::string>();
49 return files_in_use;
50 }
51
AcquireFileLock(const std::string & db_path,LogManager & log_manager)52 absl::StatusOr<int> AcquireFileLock(const std::string& db_path,
53 LogManager& log_manager) {
54 absl::WriterMutexLock lock(&file_lock_mutex);
55 // If the underlying file is already in the hash set, it means another
56 // instance of OpStatsDb is using it, and we'll return an error.
57 absl::flat_hash_set<std::string>* files_in_use = GetFilesInUseSet();
58 if (!files_in_use->insert(db_path).second) {
59 log_manager.LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED);
60 return absl::InternalError(
61 "Another instance is already using the underlying database file.");
62 }
63 // Create a new file descriptor.
64 // Create the file if it doesn't exist, set permission to 0644.
65 int fd = open(db_path.c_str(), O_CREAT | O_RDWR,
66 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
67 if (fd < 0) {
68 files_in_use->erase(db_path);
69 log_manager.LogDiag(ProdDiagCode::OPSTATS_FAILED_TO_OPEN_FILE);
70 return absl::InternalError(absl::StrCat("Failed to open file: ", db_path));
71 }
72 // Acquire exclusive lock on the file in a non-blocking mode.
73 // flock(2) applies lock on the file object in the open file table, so it can
74 // apply lock across different processes. Within a process, flock doesn't
75 // necessarily guarantee synchronization across multiple threads.
76 // See:https://man7.org/linux/man-pages/man2/flock.2.html
77 if (flock(fd, LOCK_EX | LOCK_NB) < 0) {
78 files_in_use->erase(db_path);
79 close(fd);
80 log_manager.LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED);
81 return absl::InternalError(
82 "Failed to acquire file lock on the underlying database file.");
83 }
84 return fd;
85 }
86
ReleaseFileLock(const std::string & db_path,int fd)87 void ReleaseFileLock(const std::string& db_path, int fd) {
88 absl::WriterMutexLock lock(&file_lock_mutex);
89 GetFilesInUseSet()->erase(db_path);
90 FCP_CHECK(fd >= 0);
91 // File lock is released when the descriptor is closed.
92 close(fd);
93 }
94
CreateEmptyData()95 std::unique_ptr<OpStatsSequence> CreateEmptyData() {
96 auto empty_data = std::make_unique<OpStatsSequence>();
97 *(empty_data->mutable_earliest_trustworthy_time()) =
98 google::protobuf::util::TimeUtil::GetCurrentTime();
99 return empty_data;
100 }
101
102 // Returns the data in the db, or an error from the read operation.
ReadInternal(protostore::ProtoDataStore<OpStatsSequence> & db,LogManager & log_manager)103 absl::StatusOr<OpStatsSequence> ReadInternal(
104 protostore::ProtoDataStore<OpStatsSequence>& db, LogManager& log_manager) {
105 absl::StatusOr<const OpStatsSequence*> data = db.Read();
106 if (data.ok()) {
107 return *data.value();
108 } else {
109 log_manager.LogDiag(ProdDiagCode::OPSTATS_READ_FAILED);
110 return absl::InternalError(
111 absl::StrCat("Failed to read from database, with error message: ",
112 data.status().message()));
113 }
114 }
115
116 // Overwrites the db to contain an empty OpStatsSequence message.
ResetInternal(protostore::ProtoDataStore<OpStatsSequence> & db,LogManager & log_manager)117 absl::Status ResetInternal(protostore::ProtoDataStore<OpStatsSequence>& db,
118 LogManager& log_manager) {
119 absl::Status reset_status = db.Write(CreateEmptyData());
120 if (!reset_status.ok()) {
121 log_manager.LogDiag(ProdDiagCode::OPSTATS_RESET_FAILED);
122 return absl::InternalError(
123 absl::StrCat("Failed to reset the database, with error message: ",
124 reset_status.code()));
125 }
126 return absl::OkStatus();
127 }
128
GetLastUpdateTime(const OperationalStats & operational_stats)129 absl::Time GetLastUpdateTime(const OperationalStats& operational_stats) {
130 if (operational_stats.events().empty()) {
131 return absl::InfinitePast();
132 }
133 return absl::FromUnixSeconds(TimeUtil::TimestampToSeconds(
134 operational_stats.events().rbegin()->timestamp()));
135 }
136
137 // If there's data, use the timestamp of the first event as the earliest
138 // trustworthy time; otherwise, the current time will be used.
GetEarliestTrustWorthyTime(const google::protobuf::RepeatedPtrField<OperationalStats> & op_stats)139 ::google::protobuf::Timestamp GetEarliestTrustWorthyTime(
140 const google::protobuf::RepeatedPtrField<OperationalStats>& op_stats) {
141 ::google::protobuf::Timestamp timestamp = TimeUtil::GetCurrentTime();
142 for (const auto& stat : op_stats) {
143 if (!stat.events().empty()) {
144 timestamp = stat.events().begin()->timestamp();
145 break;
146 }
147 }
148 return timestamp;
149 }
150
RemoveOutdatedData(OpStatsSequence & data,absl::Duration ttl)151 void RemoveOutdatedData(OpStatsSequence& data, absl::Duration ttl) {
152 absl::Time earliest_accepted_time = absl::Now() - ttl;
153 auto* op_stats = data.mutable_opstats();
154 int64_t original_num_entries = op_stats->size();
155 op_stats->erase(
156 std::remove_if(op_stats->begin(), op_stats->end(),
157 [earliest_accepted_time](const OperationalStats& data) {
158 return GetLastUpdateTime(data) < earliest_accepted_time;
159 }),
160 op_stats->end());
161 int64_t num_entries_after_purging = op_stats->size();
162 if (num_entries_after_purging < original_num_entries) {
163 *(data.mutable_earliest_trustworthy_time()) =
164 TimeUtil::MillisecondsToTimestamp(
165 absl::ToUnixMillis(earliest_accepted_time));
166 }
167 }
168
PruneOldDataUntilBelowSizeLimit(OpStatsSequence & data,const int64_t max_size_bytes,LogManager & log_manager)169 void PruneOldDataUntilBelowSizeLimit(OpStatsSequence& data,
170 const int64_t max_size_bytes,
171 LogManager& log_manager) {
172 int64_t current_size = data.ByteSizeLong();
173 auto& op_stats = *(data.mutable_opstats());
174 if (current_size > max_size_bytes) {
175 int64_t num_pruned_entries = 0;
176 auto it = op_stats.begin();
177 absl::Time earliest_event_time = absl::InfinitePast();
178 // The OperationalStats are sorted by time from earliest to latest, so we'll
179 // remove from the start.
180 while (current_size > max_size_bytes && it != op_stats.end()) {
181 if (earliest_event_time == absl::InfinitePast()) {
182 earliest_event_time = GetLastUpdateTime(*it);
183 }
184 num_pruned_entries++;
185 // Note that the size of an OperationalStats is smaller than the size
186 // impact it has on the OpStatsSequence. We are being conservative here.
187 current_size -= it->ByteSizeLong();
188 it++;
189 }
190 op_stats.erase(op_stats.begin(), it);
191 *data.mutable_earliest_trustworthy_time() =
192 GetEarliestTrustWorthyTime(op_stats);
193 log_manager.LogToLongHistogram(
194 HistogramCounters::OPSTATS_NUM_PRUNED_ENTRIES, num_pruned_entries);
195 log_manager.LogToLongHistogram(
196 HistogramCounters::OPSTATS_OLDEST_PRUNED_ENTRY_TENURE_HOURS,
197 absl::ToInt64Hours(absl::Now() - earliest_event_time));
198 }
199 log_manager.LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
200 current_size);
201 log_manager.LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
202 op_stats.size());
203 }
204
205 } // anonymous namespace
206
Create(const std::string & base_dir,absl::Duration ttl,LogManager & log_manager,int64_t max_size_bytes)207 absl::StatusOr<std::unique_ptr<OpStatsDb>> PdsBackedOpStatsDb::Create(
208 const std::string& base_dir, absl::Duration ttl, LogManager& log_manager,
209 int64_t max_size_bytes) {
210 std::filesystem::path path(base_dir);
211 if (!path.is_absolute()) {
212 log_manager.LogDiag(ProdDiagCode::OPSTATS_INVALID_FILE_PATH);
213 return absl::InvalidArgumentError(
214 absl::StrCat("The provided path: ", base_dir,
215 " is invalid. The path must start with \"/\""));
216 }
217 path /= kParentDir;
218 std::error_code error;
219 std::filesystem::create_directories(path, error);
220 if (error.value() != 0) {
221 log_manager.LogDiag(ProdDiagCode::OPSTATS_PARENT_DIR_CREATION_FAILED);
222 return absl::InternalError(
223 absl::StrCat("Failed to create directory ", path.generic_string()));
224 }
225 path /= kDbFileName;
226 std::function<void()> lock_releaser;
227 auto file_storage = std::make_unique<protostore::FileStorage>();
228 std::unique_ptr<protostore::ProtoDataStore<OpStatsSequence>> pds;
229 std::string db_path = path.generic_string();
230 FCP_ASSIGN_OR_RETURN(int fd, AcquireFileLock(db_path, log_manager));
231 lock_releaser = [db_path, fd]() { ReleaseFileLock(db_path, fd); };
232 pds = std::make_unique<protostore::ProtoDataStore<OpStatsSequence>>(
233 *file_storage, db_path);
234 absl::StatusOr<int64_t> file_size = file_storage->GetFileSize(path);
235 if (!file_size.ok()) {
236 lock_releaser();
237 return file_size.status();
238 }
239 // If the size of the underlying file is zero, it means this is the first
240 // time we create the database.
241 bool should_initiate = file_size.value() == 0;
242
243 // If this is the first time we create the OpStatsDb, we want to create an
244 // empty database.
245 if (should_initiate) {
246 absl::Status write_status = pds->Write(CreateEmptyData());
247 if (!write_status.ok()) {
248 lock_releaser();
249 return write_status;
250 }
251 }
252 return absl::WrapUnique(
253 new PdsBackedOpStatsDb(std::move(pds), std::move(file_storage), ttl,
254 log_manager, max_size_bytes, lock_releaser));
255 }
256
~PdsBackedOpStatsDb()257 PdsBackedOpStatsDb::~PdsBackedOpStatsDb() { lock_releaser_(); }
258
Read()259 absl::StatusOr<OpStatsSequence> PdsBackedOpStatsDb::Read() {
260 absl::WriterMutexLock lock(&mutex_);
261 auto data_or = ReadInternal(*db_, log_manager_);
262 if (!data_or.ok()) {
263 // Try resetting after a failed read.
264 auto reset_status = ResetInternal(*db_, log_manager_);
265 }
266 return data_or;
267 }
268
Transform(std::function<void (OpStatsSequence &)> func)269 absl::Status PdsBackedOpStatsDb::Transform(
270 std::function<void(OpStatsSequence&)> func) {
271 absl::WriterMutexLock lock(&mutex_);
272 OpStatsSequence data;
273 auto data_or = ReadInternal(*db_, log_manager_);
274 if (!data_or.ok()) {
275 // Try resetting after a failed read.
276 FCP_RETURN_IF_ERROR(ResetInternal(*db_, log_manager_));
277 } else {
278 data = std::move(data_or).value();
279 RemoveOutdatedData(data, ttl_);
280 }
281 func(data);
282 PruneOldDataUntilBelowSizeLimit(data, max_size_bytes_, log_manager_);
283 if (!data.has_earliest_trustworthy_time()) {
284 *data.mutable_earliest_trustworthy_time() =
285 GetEarliestTrustWorthyTime(data.opstats());
286 }
287 absl::Status status =
288 db_->Write(std::make_unique<OpStatsSequence>(std::move(data)));
289 if (!status.ok()) {
290 log_manager_.LogDiag(ProdDiagCode::OPSTATS_WRITE_FAILED);
291 }
292 return status;
293 }
294
295 } // namespace opstats
296 } // namespace client
297 } // namespace fcp
298