xref: /aosp_15_r20/external/federated-compute/fcp/client/opstats/opstats_logger_impl.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/opstats/opstats_logger_impl.h"
17 
18 #include <string>
19 #include <utility>
20 
21 #include "google/protobuf/util/time_util.h"
22 #include "fcp/base/time_util.h"
23 #include "fcp/client/flags.h"
24 #include "fcp/client/log_manager.h"
25 #include "fcp/client/opstats/opstats_db.h"
26 #include "fcp/client/opstats/opstats_logger.h"
27 #include "fcp/protos/federated_api.pb.h"
28 #include "fcp/protos/opstats.pb.h"
29 
30 namespace fcp {
31 namespace client {
32 namespace opstats {
33 
34 using ::google::internal::federatedml::v2::RetryWindow;
35 
OpStatsLoggerImpl(std::unique_ptr<OpStatsDb> db,LogManager * log_manager,const Flags * flags,const std::string & session_name,const std::string & population_name)36 OpStatsLoggerImpl::OpStatsLoggerImpl(std::unique_ptr<OpStatsDb> db,
37                                      LogManager* log_manager,
38                                      const Flags* flags,
39                                      const std::string& session_name,
40                                      const std::string& population_name)
41     : db_(std::move(db)), log_manager_(log_manager) {
42   log_manager_->LogDiag(DebugDiagCode::TRAINING_OPSTATS_ENABLED);
43   log_manager_->LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_EXPECTED);
44 
45   // Setup the OperationalStats message for the new run.
46   stats_.set_session_name(session_name);
47   stats_.set_population_name(population_name);
48 }
49 
~OpStatsLoggerImpl()50 OpStatsLoggerImpl::~OpStatsLoggerImpl() {
51   // We're in the dtor, we don't care about what CommitToStorage returns.
52   auto status = CommitToStorage();
53 }
54 
AddEventAndSetTaskName(const std::string & task_name,OperationalStats::Event::EventKind event)55 void OpStatsLoggerImpl::AddEventAndSetTaskName(
56     const std::string& task_name, OperationalStats::Event::EventKind event) {
57   absl::MutexLock lock(&mutex_);
58   AddNewEventToStats(event);
59   stats_.set_task_name(task_name);
60 }
61 
AddEvent(OperationalStats::Event::EventKind event)62 void OpStatsLoggerImpl::AddEvent(OperationalStats::Event::EventKind event) {
63   absl::MutexLock lock(&mutex_);
64   AddNewEventToStats(event);
65 }
66 
AddEventWithErrorMessage(OperationalStats::Event::EventKind event,const std::string & error_message)67 void OpStatsLoggerImpl::AddEventWithErrorMessage(
68     OperationalStats::Event::EventKind event,
69     const std::string& error_message) {
70   absl::MutexLock lock(&mutex_);
71   AddNewEventToStats(event);
72   // Don't replace an existing error message.
73   if (stats_.error_message().empty()) {
74     stats_.set_error_message(error_message);
75   }
76 }
77 
UpdateDatasetStats(const std::string & collection_uri,int additional_example_count,int64_t additional_example_size_bytes)78 void OpStatsLoggerImpl::UpdateDatasetStats(
79     const std::string& collection_uri, int additional_example_count,
80     int64_t additional_example_size_bytes) {
81   absl::MutexLock lock(&mutex_);
82   auto& dataset_stats = (*stats_.mutable_dataset_stats())[collection_uri];
83   dataset_stats.set_num_examples_read(dataset_stats.num_examples_read() +
84                                       additional_example_count);
85   dataset_stats.set_num_bytes_read(dataset_stats.num_bytes_read() +
86                                    additional_example_size_bytes);
87 }
88 
SetNetworkStats(const NetworkStats & network_stats)89 void OpStatsLoggerImpl::SetNetworkStats(const NetworkStats& network_stats) {
90   absl::MutexLock lock(&mutex_);
91   stats_.set_chunking_layer_bytes_downloaded(network_stats.bytes_downloaded);
92   stats_.set_chunking_layer_bytes_uploaded(network_stats.bytes_uploaded);
93   *stats_.mutable_network_duration() =
94       TimeUtil::ConvertAbslToProtoDuration(network_stats.network_duration);
95 }
96 
SetRetryWindow(RetryWindow retry_window)97 void OpStatsLoggerImpl::SetRetryWindow(RetryWindow retry_window) {
98   absl::MutexLock lock(&mutex_);
99   retry_window.clear_retry_token();
100   *stats_.mutable_retry_window() = std::move(retry_window);
101 }
102 
AddNewEventToStats(OperationalStats::Event::EventKind kind)103 void OpStatsLoggerImpl::AddNewEventToStats(
104     OperationalStats::Event::EventKind kind) {
105   auto new_event = stats_.add_events();
106   new_event->set_event_type(kind);
107   *new_event->mutable_timestamp() = google::protobuf::util::TimeUtil::GetCurrentTime();
108 }
109 
CommitToStorage()110 absl::Status OpStatsLoggerImpl::CommitToStorage() {
111   absl::MutexLock lock(&mutex_);
112   log_manager_->LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_ATTEMPTED);
113   const absl::Time before_commit_time = absl::Now();
114   auto status = already_committed_
115                     ? db_->Transform([stats = &stats_](OpStatsSequence& data) {
116                         // Check if opstats on disk somehow got cleared between
117                         // the first commit and now, and handle appropriately.
118                         // This can happen e.g. if the ttl for the opstats db
119                         // is incorrectly configured to have a very low ttl,
120                         // causing the entire history to be lost as part of the
121                         // update.
122                         if (data.opstats_size() == 0) {
123                           *data.add_opstats() = *stats;
124                         } else {
125                           *data.mutable_opstats(data.opstats_size() - 1) =
126                               *stats;
127                         }
128                       })
129                     : db_->Transform([stats = &stats_](OpStatsSequence& data) {
130                         *data.add_opstats() = *stats;
131                       });
132   const absl::Time after_commit_time = absl::Now();
133   log_manager_->LogToLongHistogram(
134       HistogramCounters::TRAINING_OPSTATS_COMMIT_LATENCY,
135       absl::ToInt64Milliseconds(after_commit_time - before_commit_time));
136   already_committed_ = true;
137   return status;
138 }
139 
GetCurrentTaskName()140 std::string OpStatsLoggerImpl::GetCurrentTaskName() {
141   absl::MutexLock lock(&mutex_);
142   return stats_.task_name();
143 }
144 
145 }  // namespace opstats
146 }  // namespace client
147 }  // namespace fcp
148