xref: /aosp_15_r20/external/federated-compute/fcp/client/opstats/pds_backed_opstats_db.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/opstats/pds_backed_opstats_db.h"
17 
18 #include <fcntl.h>
19 #include <sys/file.h>
20 
21 #include <filesystem>
22 #include <functional>
23 #include <string>
24 #include <utility>
25 
26 #include "google/protobuf/util/time_util.h"
27 #include "absl/strings/str_cat.h"
28 #include "absl/time/time.h"
29 #include "fcp/base/monitoring.h"
30 #include "fcp/client/diag_codes.pb.h"
31 #include "fcp/client/log_manager.h"
32 #include "protostore/file-storage.h"
33 #include "protostore/proto-data-store.h"
34 
35 namespace fcp {
36 namespace client {
37 namespace opstats {
38 namespace {
39 
40 using ::google::protobuf::util::TimeUtil;
41 
42 ABSL_CONST_INIT absl::Mutex file_lock_mutex(absl::kConstInit);
43 
GetFilesInUseSet()44 absl::flat_hash_set<std::string>* GetFilesInUseSet() {
45   // Create the heap allocated static set only once, never call d'tor.
46   // See: go/totw/110
47   static absl::flat_hash_set<std::string>* files_in_use =
48       new absl::flat_hash_set<std::string>();
49   return files_in_use;
50 }
51 
AcquireFileLock(const std::string & db_path,LogManager & log_manager)52 absl::StatusOr<int> AcquireFileLock(const std::string& db_path,
53                                     LogManager& log_manager) {
54   absl::WriterMutexLock lock(&file_lock_mutex);
55   // If the underlying file is already in the hash set, it means another
56   // instance of OpStatsDb is using it, and we'll return an error.
57   absl::flat_hash_set<std::string>* files_in_use = GetFilesInUseSet();
58   if (!files_in_use->insert(db_path).second) {
59     log_manager.LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED);
60     return absl::InternalError(
61         "Another instance is already using the underlying database file.");
62   }
63   // Create a new file descriptor.
64   // Create the file if it doesn't exist, set permission to 0644.
65   int fd = open(db_path.c_str(), O_CREAT | O_RDWR,
66                 S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
67   if (fd < 0) {
68     files_in_use->erase(db_path);
69     log_manager.LogDiag(ProdDiagCode::OPSTATS_FAILED_TO_OPEN_FILE);
70     return absl::InternalError(absl::StrCat("Failed to open file: ", db_path));
71   }
72   // Acquire exclusive lock on the file in a non-blocking mode.
73   // flock(2) applies lock on the file object in the open file table, so it can
74   // apply lock across different processes.  Within a process, flock doesn't
75   // necessarily guarantee synchronization across multiple threads.
76   // See:https://man7.org/linux/man-pages/man2/flock.2.html
77   if (flock(fd, LOCK_EX | LOCK_NB) < 0) {
78     files_in_use->erase(db_path);
79     close(fd);
80     log_manager.LogDiag(ProdDiagCode::OPSTATS_MULTIPLE_DB_INSTANCE_DETECTED);
81     return absl::InternalError(
82         "Failed to acquire file lock on the underlying database file.");
83   }
84   return fd;
85 }
86 
ReleaseFileLock(const std::string & db_path,int fd)87 void ReleaseFileLock(const std::string& db_path, int fd) {
88   absl::WriterMutexLock lock(&file_lock_mutex);
89   GetFilesInUseSet()->erase(db_path);
90   FCP_CHECK(fd >= 0);
91   // File lock is released when the descriptor is closed.
92   close(fd);
93 }
94 
CreateEmptyData()95 std::unique_ptr<OpStatsSequence> CreateEmptyData() {
96   auto empty_data = std::make_unique<OpStatsSequence>();
97   *(empty_data->mutable_earliest_trustworthy_time()) =
98       google::protobuf::util::TimeUtil::GetCurrentTime();
99   return empty_data;
100 }
101 
102 // Returns the data in the db, or an error from the read operation.
ReadInternal(protostore::ProtoDataStore<OpStatsSequence> & db,LogManager & log_manager)103 absl::StatusOr<OpStatsSequence> ReadInternal(
104     protostore::ProtoDataStore<OpStatsSequence>& db, LogManager& log_manager) {
105   absl::StatusOr<const OpStatsSequence*> data = db.Read();
106   if (data.ok()) {
107     return *data.value();
108   } else {
109     log_manager.LogDiag(ProdDiagCode::OPSTATS_READ_FAILED);
110     return absl::InternalError(
111         absl::StrCat("Failed to read from database, with error message: ",
112                      data.status().message()));
113   }
114 }
115 
116 // Overwrites the db to contain an empty OpStatsSequence message.
ResetInternal(protostore::ProtoDataStore<OpStatsSequence> & db,LogManager & log_manager)117 absl::Status ResetInternal(protostore::ProtoDataStore<OpStatsSequence>& db,
118                            LogManager& log_manager) {
119   absl::Status reset_status = db.Write(CreateEmptyData());
120   if (!reset_status.ok()) {
121     log_manager.LogDiag(ProdDiagCode::OPSTATS_RESET_FAILED);
122     return absl::InternalError(
123         absl::StrCat("Failed to reset the database, with error message: ",
124                      reset_status.code()));
125   }
126   return absl::OkStatus();
127 }
128 
GetLastUpdateTime(const OperationalStats & operational_stats)129 absl::Time GetLastUpdateTime(const OperationalStats& operational_stats) {
130   if (operational_stats.events().empty()) {
131     return absl::InfinitePast();
132   }
133   return absl::FromUnixSeconds(TimeUtil::TimestampToSeconds(
134       operational_stats.events().rbegin()->timestamp()));
135 }
136 
137 // If there's data, use the timestamp of the first event as the earliest
138 // trustworthy time; otherwise, the current time will be used.
GetEarliestTrustWorthyTime(const google::protobuf::RepeatedPtrField<OperationalStats> & op_stats)139 ::google::protobuf::Timestamp GetEarliestTrustWorthyTime(
140     const google::protobuf::RepeatedPtrField<OperationalStats>& op_stats) {
141   ::google::protobuf::Timestamp timestamp = TimeUtil::GetCurrentTime();
142   for (const auto& stat : op_stats) {
143     if (!stat.events().empty()) {
144       timestamp = stat.events().begin()->timestamp();
145       break;
146     }
147   }
148   return timestamp;
149 }
150 
RemoveOutdatedData(OpStatsSequence & data,absl::Duration ttl)151 void RemoveOutdatedData(OpStatsSequence& data, absl::Duration ttl) {
152   absl::Time earliest_accepted_time = absl::Now() - ttl;
153   auto* op_stats = data.mutable_opstats();
154   int64_t original_num_entries = op_stats->size();
155   op_stats->erase(
156       std::remove_if(op_stats->begin(), op_stats->end(),
157                      [earliest_accepted_time](const OperationalStats& data) {
158                        return GetLastUpdateTime(data) < earliest_accepted_time;
159                      }),
160       op_stats->end());
161   int64_t num_entries_after_purging = op_stats->size();
162   if (num_entries_after_purging < original_num_entries) {
163     *(data.mutable_earliest_trustworthy_time()) =
164         TimeUtil::MillisecondsToTimestamp(
165             absl::ToUnixMillis(earliest_accepted_time));
166   }
167 }
168 
PruneOldDataUntilBelowSizeLimit(OpStatsSequence & data,const int64_t max_size_bytes,LogManager & log_manager)169 void PruneOldDataUntilBelowSizeLimit(OpStatsSequence& data,
170                                      const int64_t max_size_bytes,
171                                      LogManager& log_manager) {
172   int64_t current_size = data.ByteSizeLong();
173   auto& op_stats = *(data.mutable_opstats());
174   if (current_size > max_size_bytes) {
175     int64_t num_pruned_entries = 0;
176     auto it = op_stats.begin();
177     absl::Time earliest_event_time = absl::InfinitePast();
178     // The OperationalStats are sorted by time from earliest to latest, so we'll
179     // remove from the start.
180     while (current_size > max_size_bytes && it != op_stats.end()) {
181       if (earliest_event_time == absl::InfinitePast()) {
182         earliest_event_time = GetLastUpdateTime(*it);
183       }
184       num_pruned_entries++;
185       // Note that the size of an OperationalStats is smaller than the size
186       // impact it has on the OpStatsSequence. We are being conservative here.
187       current_size -= it->ByteSizeLong();
188       it++;
189     }
190     op_stats.erase(op_stats.begin(), it);
191     *data.mutable_earliest_trustworthy_time() =
192         GetEarliestTrustWorthyTime(op_stats);
193     log_manager.LogToLongHistogram(
194         HistogramCounters::OPSTATS_NUM_PRUNED_ENTRIES, num_pruned_entries);
195     log_manager.LogToLongHistogram(
196         HistogramCounters::OPSTATS_OLDEST_PRUNED_ENTRY_TENURE_HOURS,
197         absl::ToInt64Hours(absl::Now() - earliest_event_time));
198   }
199   log_manager.LogToLongHistogram(HistogramCounters::OPSTATS_DB_SIZE_BYTES,
200                                  current_size);
201   log_manager.LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
202                                  op_stats.size());
203 }
204 
205 }  // anonymous namespace
206 
Create(const std::string & base_dir,absl::Duration ttl,LogManager & log_manager,int64_t max_size_bytes)207 absl::StatusOr<std::unique_ptr<OpStatsDb>> PdsBackedOpStatsDb::Create(
208     const std::string& base_dir, absl::Duration ttl, LogManager& log_manager,
209     int64_t max_size_bytes) {
210   std::filesystem::path path(base_dir);
211   if (!path.is_absolute()) {
212     log_manager.LogDiag(ProdDiagCode::OPSTATS_INVALID_FILE_PATH);
213     return absl::InvalidArgumentError(
214         absl::StrCat("The provided path: ", base_dir,
215                      " is invalid. The path must start with \"/\""));
216   }
217   path /= kParentDir;
218   std::error_code error;
219   std::filesystem::create_directories(path, error);
220   if (error.value() != 0) {
221     log_manager.LogDiag(ProdDiagCode::OPSTATS_PARENT_DIR_CREATION_FAILED);
222     return absl::InternalError(
223         absl::StrCat("Failed to create directory ", path.generic_string()));
224   }
225   path /= kDbFileName;
226   std::function<void()> lock_releaser;
227   auto file_storage = std::make_unique<protostore::FileStorage>();
228   std::unique_ptr<protostore::ProtoDataStore<OpStatsSequence>> pds;
229   std::string db_path = path.generic_string();
230   FCP_ASSIGN_OR_RETURN(int fd, AcquireFileLock(db_path, log_manager));
231   lock_releaser = [db_path, fd]() { ReleaseFileLock(db_path, fd); };
232   pds = std::make_unique<protostore::ProtoDataStore<OpStatsSequence>>(
233       *file_storage, db_path);
234   absl::StatusOr<int64_t> file_size = file_storage->GetFileSize(path);
235   if (!file_size.ok()) {
236     lock_releaser();
237     return file_size.status();
238   }
239   // If the size of the underlying file is zero, it means this is the first
240   // time we create the database.
241   bool should_initiate = file_size.value() == 0;
242 
243   // If this is the first time we create the OpStatsDb, we want to create an
244   // empty database.
245   if (should_initiate) {
246     absl::Status write_status = pds->Write(CreateEmptyData());
247     if (!write_status.ok()) {
248       lock_releaser();
249       return write_status;
250     }
251   }
252   return absl::WrapUnique(
253       new PdsBackedOpStatsDb(std::move(pds), std::move(file_storage), ttl,
254                              log_manager, max_size_bytes, lock_releaser));
255 }
256 
~PdsBackedOpStatsDb()257 PdsBackedOpStatsDb::~PdsBackedOpStatsDb() { lock_releaser_(); }
258 
Read()259 absl::StatusOr<OpStatsSequence> PdsBackedOpStatsDb::Read() {
260   absl::WriterMutexLock lock(&mutex_);
261   auto data_or = ReadInternal(*db_, log_manager_);
262   if (!data_or.ok()) {
263     // Try resetting after a failed read.
264     auto reset_status = ResetInternal(*db_, log_manager_);
265   }
266   return data_or;
267 }
268 
Transform(std::function<void (OpStatsSequence &)> func)269 absl::Status PdsBackedOpStatsDb::Transform(
270     std::function<void(OpStatsSequence&)> func) {
271   absl::WriterMutexLock lock(&mutex_);
272   OpStatsSequence data;
273   auto data_or = ReadInternal(*db_, log_manager_);
274   if (!data_or.ok()) {
275     // Try resetting after a failed read.
276     FCP_RETURN_IF_ERROR(ResetInternal(*db_, log_manager_));
277   } else {
278     data = std::move(data_or).value();
279     RemoveOutdatedData(data, ttl_);
280   }
281   func(data);
282   PruneOldDataUntilBelowSizeLimit(data, max_size_bytes_, log_manager_);
283   if (!data.has_earliest_trustworthy_time()) {
284     *data.mutable_earliest_trustworthy_time() =
285         GetEarliestTrustWorthyTime(data.opstats());
286   }
287   absl::Status status =
288       db_->Write(std::make_unique<OpStatsSequence>(std::move(data)));
289   if (!status.ok()) {
290     log_manager_.LogDiag(ProdDiagCode::OPSTATS_WRITE_FAILED);
291   }
292   return status;
293 }
294 
295 }  // namespace opstats
296 }  // namespace client
297 }  // namespace fcp
298