1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/opstats/opstats_logger_impl.h"
17
18 #include <string>
19 #include <utility>
20
21 #include "google/protobuf/util/time_util.h"
22 #include "fcp/base/time_util.h"
23 #include "fcp/client/flags.h"
24 #include "fcp/client/log_manager.h"
25 #include "fcp/client/opstats/opstats_db.h"
26 #include "fcp/client/opstats/opstats_logger.h"
27 #include "fcp/protos/federated_api.pb.h"
28 #include "fcp/protos/opstats.pb.h"
29
30 namespace fcp {
31 namespace client {
32 namespace opstats {
33
34 using ::google::internal::federatedml::v2::RetryWindow;
35
OpStatsLoggerImpl(std::unique_ptr<OpStatsDb> db,LogManager * log_manager,const Flags * flags,const std::string & session_name,const std::string & population_name)36 OpStatsLoggerImpl::OpStatsLoggerImpl(std::unique_ptr<OpStatsDb> db,
37 LogManager* log_manager,
38 const Flags* flags,
39 const std::string& session_name,
40 const std::string& population_name)
41 : db_(std::move(db)), log_manager_(log_manager) {
42 log_manager_->LogDiag(DebugDiagCode::TRAINING_OPSTATS_ENABLED);
43 log_manager_->LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_EXPECTED);
44
45 // Setup the OperationalStats message for the new run.
46 stats_.set_session_name(session_name);
47 stats_.set_population_name(population_name);
48 }
49
~OpStatsLoggerImpl()50 OpStatsLoggerImpl::~OpStatsLoggerImpl() {
51 // We're in the dtor, we don't care about what CommitToStorage returns.
52 auto status = CommitToStorage();
53 }
54
AddEventAndSetTaskName(const std::string & task_name,OperationalStats::Event::EventKind event)55 void OpStatsLoggerImpl::AddEventAndSetTaskName(
56 const std::string& task_name, OperationalStats::Event::EventKind event) {
57 absl::MutexLock lock(&mutex_);
58 AddNewEventToStats(event);
59 stats_.set_task_name(task_name);
60 }
61
AddEvent(OperationalStats::Event::EventKind event)62 void OpStatsLoggerImpl::AddEvent(OperationalStats::Event::EventKind event) {
63 absl::MutexLock lock(&mutex_);
64 AddNewEventToStats(event);
65 }
66
AddEventWithErrorMessage(OperationalStats::Event::EventKind event,const std::string & error_message)67 void OpStatsLoggerImpl::AddEventWithErrorMessage(
68 OperationalStats::Event::EventKind event,
69 const std::string& error_message) {
70 absl::MutexLock lock(&mutex_);
71 AddNewEventToStats(event);
72 // Don't replace an existing error message.
73 if (stats_.error_message().empty()) {
74 stats_.set_error_message(error_message);
75 }
76 }
77
UpdateDatasetStats(const std::string & collection_uri,int additional_example_count,int64_t additional_example_size_bytes)78 void OpStatsLoggerImpl::UpdateDatasetStats(
79 const std::string& collection_uri, int additional_example_count,
80 int64_t additional_example_size_bytes) {
81 absl::MutexLock lock(&mutex_);
82 auto& dataset_stats = (*stats_.mutable_dataset_stats())[collection_uri];
83 dataset_stats.set_num_examples_read(dataset_stats.num_examples_read() +
84 additional_example_count);
85 dataset_stats.set_num_bytes_read(dataset_stats.num_bytes_read() +
86 additional_example_size_bytes);
87 }
88
SetNetworkStats(const NetworkStats & network_stats)89 void OpStatsLoggerImpl::SetNetworkStats(const NetworkStats& network_stats) {
90 absl::MutexLock lock(&mutex_);
91 stats_.set_chunking_layer_bytes_downloaded(network_stats.bytes_downloaded);
92 stats_.set_chunking_layer_bytes_uploaded(network_stats.bytes_uploaded);
93 *stats_.mutable_network_duration() =
94 TimeUtil::ConvertAbslToProtoDuration(network_stats.network_duration);
95 }
96
SetRetryWindow(RetryWindow retry_window)97 void OpStatsLoggerImpl::SetRetryWindow(RetryWindow retry_window) {
98 absl::MutexLock lock(&mutex_);
99 retry_window.clear_retry_token();
100 *stats_.mutable_retry_window() = std::move(retry_window);
101 }
102
AddNewEventToStats(OperationalStats::Event::EventKind kind)103 void OpStatsLoggerImpl::AddNewEventToStats(
104 OperationalStats::Event::EventKind kind) {
105 auto new_event = stats_.add_events();
106 new_event->set_event_type(kind);
107 *new_event->mutable_timestamp() = google::protobuf::util::TimeUtil::GetCurrentTime();
108 }
109
CommitToStorage()110 absl::Status OpStatsLoggerImpl::CommitToStorage() {
111 absl::MutexLock lock(&mutex_);
112 log_manager_->LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_ATTEMPTED);
113 const absl::Time before_commit_time = absl::Now();
114 auto status = already_committed_
115 ? db_->Transform([stats = &stats_](OpStatsSequence& data) {
116 // Check if opstats on disk somehow got cleared between
117 // the first commit and now, and handle appropriately.
118 // This can happen e.g. if the ttl for the opstats db
119 // is incorrectly configured to have a very low ttl,
120 // causing the entire history to be lost as part of the
121 // update.
122 if (data.opstats_size() == 0) {
123 *data.add_opstats() = *stats;
124 } else {
125 *data.mutable_opstats(data.opstats_size() - 1) =
126 *stats;
127 }
128 })
129 : db_->Transform([stats = &stats_](OpStatsSequence& data) {
130 *data.add_opstats() = *stats;
131 });
132 const absl::Time after_commit_time = absl::Now();
133 log_manager_->LogToLongHistogram(
134 HistogramCounters::TRAINING_OPSTATS_COMMIT_LATENCY,
135 absl::ToInt64Milliseconds(after_commit_time - before_commit_time));
136 already_committed_ = true;
137 return status;
138 }
139
GetCurrentTaskName()140 std::string OpStatsLoggerImpl::GetCurrentTaskName() {
141 absl::MutexLock lock(&mutex_);
142 return stats_.task_name();
143 }
144
145 } // namespace opstats
146 } // namespace client
147 } // namespace fcp
148