xref: /aosp_15_r20/external/federated-compute/fcp/client/opstats/opstats_example_store.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/opstats/opstats_example_store.h"
17 
18 #include <memory>
19 #include <optional>
20 #include <string>
21 #include <utility>
22 
23 #include "google/protobuf/any.pb.h"
24 #include "google/protobuf/util/time_util.h"
25 #include "absl/status/status.h"
26 #include "absl/strings/match.h"
27 #include "absl/strings/str_cat.h"
28 #include "fcp/client/diag_codes.pb.h"
29 #include "fcp/client/engine/example_iterator_factory.h"
30 #include "fcp/client/opstats/opstats_utils.h"
31 #include "fcp/client/simple_task_environment.h"
32 #include "fcp/protos/federated_api.pb.h"
33 #include "fcp/protos/opstats.pb.h"
34 #include "tensorflow/core/example/example.pb.h"
35 #include "tensorflow/core/example/feature.pb.h"
36 
37 namespace fcp {
38 namespace client {
39 namespace opstats {
40 
41 using ::google::internal::federated::plan::ExampleSelector;
42 using ::google::protobuf::util::TimeUtil;
43 
44 namespace {
45 
GetLastUpdatedTime(const OperationalStats & op_stats)46 absl::Time GetLastUpdatedTime(const OperationalStats& op_stats) {
47   if (op_stats.events().empty()) {
48     return absl::InfinitePast();
49   } else {
50     return absl::FromUnixMillis(TimeUtil::TimestampToMilliseconds(
51         op_stats.events().rbegin()->timestamp()));
52   }
53 }
54 
CreateFeatureFromString(const std::string & str)55 tensorflow::Feature CreateFeatureFromString(const std::string& str) {
56   tensorflow::Feature feature;
57   feature.mutable_bytes_list()->add_value(str);
58   return feature;
59 }
60 
CreateFeatureFromInt(int64_t value)61 tensorflow::Feature CreateFeatureFromInt(int64_t value) {
62   tensorflow::Feature feature;
63   feature.mutable_int64_list()->add_value(value);
64   return feature;
65 }
66 
CreateFeatureFromStringVector(const std::vector<std::string> & values)67 tensorflow::Feature CreateFeatureFromStringVector(
68     const std::vector<std::string>& values) {
69   tensorflow::Feature feature;
70   auto* bytes_list = feature.mutable_bytes_list();
71   for (const auto& value : values) {
72     bytes_list->add_value(value);
73   }
74   return feature;
75 }
76 
CreateFeatureFromIntVector(const std::vector<int64_t> & values)77 tensorflow::Feature CreateFeatureFromIntVector(
78     const std::vector<int64_t>& values) {
79   tensorflow::Feature feature;
80   auto* int64_list = feature.mutable_int64_list();
81   for (const auto& value : values) {
82     int64_list->add_value(value);
83   }
84   return feature;
85 }
86 
CreateExample(const OperationalStats & op_stats,int64_t earliest_trustworthy_time)87 std::string CreateExample(const OperationalStats& op_stats,
88                           int64_t earliest_trustworthy_time) {
89   tensorflow::Example example;
90   auto* feature_map = example.mutable_features()->mutable_feature();
91   (*feature_map)[kPopulationName] =
92       CreateFeatureFromString(op_stats.population_name());
93   (*feature_map)[kSessionName] =
94       CreateFeatureFromString(op_stats.session_name());
95   (*feature_map)[kTaskName] = CreateFeatureFromString(op_stats.task_name());
96 
97   // Create events related features.
98   std::vector<int64_t> event_types;
99   std::vector<int64_t> event_time_millis;
100   for (const auto& event : op_stats.events()) {
101     event_types.push_back(event.event_type());
102     event_time_millis.push_back(
103         TimeUtil::TimestampToMilliseconds(event.timestamp()));
104   }
105   (*feature_map)[kEventsEventType] = CreateFeatureFromIntVector(event_types);
106   (*feature_map)[kEventsTimestampMillis] =
107       CreateFeatureFromIntVector(event_time_millis);
108 
109   // Create external dataset stats related features.
110   std::vector<std::string> uris;
111   std::vector<int64_t> num_examples_read;
112   std::vector<int64_t> num_bytes_read;
113   for (const auto& stats : op_stats.dataset_stats()) {
114     uris.push_back(stats.first);
115     num_examples_read.push_back(stats.second.num_examples_read());
116     num_bytes_read.push_back(stats.second.num_bytes_read());
117   }
118   (*feature_map)[kDatasetStatsUri] = CreateFeatureFromStringVector(uris);
119   (*feature_map)[kDatasetStatsNumExamplesRead] =
120       CreateFeatureFromIntVector(num_examples_read);
121   (*feature_map)[kDatasetStatsNumBytesRead] =
122       CreateFeatureFromIntVector(num_bytes_read);
123 
124   (*feature_map)[kErrorMessage] =
125       CreateFeatureFromString(op_stats.error_message());
126 
127   // Create RetryWindow related features.
128   (*feature_map)[kRetryWindowDelayMinMillis] = CreateFeatureFromInt(
129       TimeUtil::DurationToMilliseconds(op_stats.retry_window().delay_min()));
130   (*feature_map)[kRetryWindowDelayMaxMillis] = CreateFeatureFromInt(
131       TimeUtil::DurationToMilliseconds(op_stats.retry_window().delay_max()));
132 
133   (*feature_map)[kChunkingLayerBytesDownloaded] =
134       CreateFeatureFromInt(op_stats.chunking_layer_bytes_downloaded());
135   (*feature_map)[kChunkingLayerBytesUploaded] =
136       CreateFeatureFromInt(op_stats.chunking_layer_bytes_uploaded());
137     (*feature_map)[kNetworkDuration] = CreateFeatureFromInt(
138         TimeUtil::DurationToMilliseconds(op_stats.network_duration()));
139 
140   (*feature_map)[kEarliestTrustWorthyTimeMillis] =
141       CreateFeatureFromInt(earliest_trustworthy_time);
142 
143   return example.SerializeAsString();
144 }
145 
146 class OpStatsExampleIterator : public fcp::client::ExampleIterator {
147  public:
OpStatsExampleIterator(std::vector<OperationalStats> op_stats,int64_t earliest_trustworthy_time)148   explicit OpStatsExampleIterator(std::vector<OperationalStats> op_stats,
149                                   int64_t earliest_trustworthy_time)
150       : next_(0),
151         data_(std::move(op_stats)),
152         earliest_trustworthy_time_millis_(earliest_trustworthy_time) {}
Next()153   absl::StatusOr<std::string> Next() override {
154     if (next_ < 0 || next_ >= data_.size()) {
155       return absl::OutOfRangeError("The iterator is out of range.");
156     }
157     return CreateExample(data_[next_++], earliest_trustworthy_time_millis_);
158   }
159 
Close()160   void Close() override {
161     next_ = 0;
162     data_.clear();
163   }
164 
165  private:
166   // The index for the next OperationalStats to be used.
167   int next_;
168   std::vector<OperationalStats> data_;
169   const int64_t earliest_trustworthy_time_millis_;
170 };
171 
172 }  // anonymous namespace
173 
CanHandle(const ExampleSelector & example_selector)174 bool OpStatsExampleIteratorFactory::CanHandle(
175     const ExampleSelector& example_selector) {
176   return example_selector.collection_uri() == opstats::kOpStatsCollectionUri;
177 }
178 
179 absl::StatusOr<std::unique_ptr<fcp::client::ExampleIterator>>
CreateExampleIterator(const ExampleSelector & example_selector)180 OpStatsExampleIteratorFactory::CreateExampleIterator(
181     const ExampleSelector& example_selector) {
182   if (example_selector.collection_uri() != kOpStatsCollectionUri) {
183     log_manager_->LogDiag(ProdDiagCode::OPSTATS_INCORRECT_COLLECTION_URI);
184     return absl::InvalidArgumentError(absl::StrCat(
185         "The collection uri is ", example_selector.collection_uri(),
186         ", which is not the expected uri: ", kOpStatsCollectionUri));
187   }
188   if (!op_stats_logger_->IsOpStatsEnabled()) {
189     log_manager_->LogDiag(
190         ProdDiagCode::OPSTATS_EXAMPLE_STORE_REQUESTED_NOT_ENABLED);
191     return absl::InvalidArgumentError("OpStats example store is not enabled.");
192   }
193 
194   absl::Time lower_bound_time = absl::InfinitePast();
195   absl::Time upper_bound_time = absl::InfiniteFuture();
196   bool last_successful_contribution = false;
197   if (example_selector.has_criteria()) {
198     OpStatsSelectionCriteria criteria;
199     if (!example_selector.criteria().UnpackTo(&criteria)) {
200       log_manager_->LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA);
201       return absl::InvalidArgumentError("Unable to parse selection criteria.");
202     }
203 
204     if (criteria.has_start_time()) {
205       lower_bound_time = absl::FromUnixMillis(
206           TimeUtil::TimestampToMilliseconds(criteria.start_time()));
207     }
208     if (criteria.has_end_time()) {
209       upper_bound_time = absl::FromUnixMillis(
210           TimeUtil::TimestampToMilliseconds(criteria.end_time()));
211     }
212     if (lower_bound_time > upper_bound_time) {
213       log_manager_->LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA);
214       return absl::InvalidArgumentError(
215           "Invalid selection criteria: start_time is after end_time.");
216     }
217     last_successful_contribution = criteria.last_successful_contribution();
218   }
219 
220   FCP_ASSIGN_OR_RETURN(OpStatsSequence data,
221                        op_stats_logger_->GetOpStatsDb()->Read());
222   std::vector<OperationalStats> selected_data;
223   if (last_successful_contribution) {
224     if (opstats_last_successful_contribution_criteria_) {
225       // Selector specified last_successful_contribution, and the feature is
226       // enabled. Create a last_successful_contribution iterator.
227       std::optional<OperationalStats> last_successful_contribution_entry =
228           GetLastSuccessfulContribution(data,
229                                         op_stats_logger_->GetCurrentTaskName());
230       if (last_successful_contribution_entry.has_value()) {
231         selected_data.push_back(*last_successful_contribution_entry);
232       }
233     } else {
234       return absl::InvalidArgumentError(
235           "OpStats selection criteria has last_successful_contribution enabled "
236           "but feature not enabled in the runtime!");
237     }
238   } else {
239     for (auto it = data.opstats().rbegin(); it != data.opstats().rend(); ++it) {
240       absl::Time last_update_time = GetLastUpdatedTime(*it);
241       if (last_update_time >= lower_bound_time &&
242           last_update_time <= upper_bound_time) {
243         selected_data.push_back(*it);
244       }
245     }
246   }
247   return std::make_unique<OpStatsExampleIterator>(
248       std::move(selected_data),
249       TimeUtil::TimestampToMilliseconds(data.earliest_trustworthy_time()));
250 }
251 
252 }  // namespace opstats
253 }  // namespace client
254 }  // namespace fcp
255