1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/opstats/opstats_example_store.h"
17
18 #include <memory>
19 #include <optional>
20 #include <string>
21 #include <utility>
22
23 #include "google/protobuf/any.pb.h"
24 #include "google/protobuf/util/time_util.h"
25 #include "absl/status/status.h"
26 #include "absl/strings/match.h"
27 #include "absl/strings/str_cat.h"
28 #include "fcp/client/diag_codes.pb.h"
29 #include "fcp/client/engine/example_iterator_factory.h"
30 #include "fcp/client/opstats/opstats_utils.h"
31 #include "fcp/client/simple_task_environment.h"
32 #include "fcp/protos/federated_api.pb.h"
33 #include "fcp/protos/opstats.pb.h"
34 #include "tensorflow/core/example/example.pb.h"
35 #include "tensorflow/core/example/feature.pb.h"
36
37 namespace fcp {
38 namespace client {
39 namespace opstats {
40
41 using ::google::internal::federated::plan::ExampleSelector;
42 using ::google::protobuf::util::TimeUtil;
43
44 namespace {
45
GetLastUpdatedTime(const OperationalStats & op_stats)46 absl::Time GetLastUpdatedTime(const OperationalStats& op_stats) {
47 if (op_stats.events().empty()) {
48 return absl::InfinitePast();
49 } else {
50 return absl::FromUnixMillis(TimeUtil::TimestampToMilliseconds(
51 op_stats.events().rbegin()->timestamp()));
52 }
53 }
54
CreateFeatureFromString(const std::string & str)55 tensorflow::Feature CreateFeatureFromString(const std::string& str) {
56 tensorflow::Feature feature;
57 feature.mutable_bytes_list()->add_value(str);
58 return feature;
59 }
60
CreateFeatureFromInt(int64_t value)61 tensorflow::Feature CreateFeatureFromInt(int64_t value) {
62 tensorflow::Feature feature;
63 feature.mutable_int64_list()->add_value(value);
64 return feature;
65 }
66
CreateFeatureFromStringVector(const std::vector<std::string> & values)67 tensorflow::Feature CreateFeatureFromStringVector(
68 const std::vector<std::string>& values) {
69 tensorflow::Feature feature;
70 auto* bytes_list = feature.mutable_bytes_list();
71 for (const auto& value : values) {
72 bytes_list->add_value(value);
73 }
74 return feature;
75 }
76
CreateFeatureFromIntVector(const std::vector<int64_t> & values)77 tensorflow::Feature CreateFeatureFromIntVector(
78 const std::vector<int64_t>& values) {
79 tensorflow::Feature feature;
80 auto* int64_list = feature.mutable_int64_list();
81 for (const auto& value : values) {
82 int64_list->add_value(value);
83 }
84 return feature;
85 }
86
CreateExample(const OperationalStats & op_stats,int64_t earliest_trustworthy_time)87 std::string CreateExample(const OperationalStats& op_stats,
88 int64_t earliest_trustworthy_time) {
89 tensorflow::Example example;
90 auto* feature_map = example.mutable_features()->mutable_feature();
91 (*feature_map)[kPopulationName] =
92 CreateFeatureFromString(op_stats.population_name());
93 (*feature_map)[kSessionName] =
94 CreateFeatureFromString(op_stats.session_name());
95 (*feature_map)[kTaskName] = CreateFeatureFromString(op_stats.task_name());
96
97 // Create events related features.
98 std::vector<int64_t> event_types;
99 std::vector<int64_t> event_time_millis;
100 for (const auto& event : op_stats.events()) {
101 event_types.push_back(event.event_type());
102 event_time_millis.push_back(
103 TimeUtil::TimestampToMilliseconds(event.timestamp()));
104 }
105 (*feature_map)[kEventsEventType] = CreateFeatureFromIntVector(event_types);
106 (*feature_map)[kEventsTimestampMillis] =
107 CreateFeatureFromIntVector(event_time_millis);
108
109 // Create external dataset stats related features.
110 std::vector<std::string> uris;
111 std::vector<int64_t> num_examples_read;
112 std::vector<int64_t> num_bytes_read;
113 for (const auto& stats : op_stats.dataset_stats()) {
114 uris.push_back(stats.first);
115 num_examples_read.push_back(stats.second.num_examples_read());
116 num_bytes_read.push_back(stats.second.num_bytes_read());
117 }
118 (*feature_map)[kDatasetStatsUri] = CreateFeatureFromStringVector(uris);
119 (*feature_map)[kDatasetStatsNumExamplesRead] =
120 CreateFeatureFromIntVector(num_examples_read);
121 (*feature_map)[kDatasetStatsNumBytesRead] =
122 CreateFeatureFromIntVector(num_bytes_read);
123
124 (*feature_map)[kErrorMessage] =
125 CreateFeatureFromString(op_stats.error_message());
126
127 // Create RetryWindow related features.
128 (*feature_map)[kRetryWindowDelayMinMillis] = CreateFeatureFromInt(
129 TimeUtil::DurationToMilliseconds(op_stats.retry_window().delay_min()));
130 (*feature_map)[kRetryWindowDelayMaxMillis] = CreateFeatureFromInt(
131 TimeUtil::DurationToMilliseconds(op_stats.retry_window().delay_max()));
132
133 (*feature_map)[kChunkingLayerBytesDownloaded] =
134 CreateFeatureFromInt(op_stats.chunking_layer_bytes_downloaded());
135 (*feature_map)[kChunkingLayerBytesUploaded] =
136 CreateFeatureFromInt(op_stats.chunking_layer_bytes_uploaded());
137 (*feature_map)[kNetworkDuration] = CreateFeatureFromInt(
138 TimeUtil::DurationToMilliseconds(op_stats.network_duration()));
139
140 (*feature_map)[kEarliestTrustWorthyTimeMillis] =
141 CreateFeatureFromInt(earliest_trustworthy_time);
142
143 return example.SerializeAsString();
144 }
145
146 class OpStatsExampleIterator : public fcp::client::ExampleIterator {
147 public:
OpStatsExampleIterator(std::vector<OperationalStats> op_stats,int64_t earliest_trustworthy_time)148 explicit OpStatsExampleIterator(std::vector<OperationalStats> op_stats,
149 int64_t earliest_trustworthy_time)
150 : next_(0),
151 data_(std::move(op_stats)),
152 earliest_trustworthy_time_millis_(earliest_trustworthy_time) {}
Next()153 absl::StatusOr<std::string> Next() override {
154 if (next_ < 0 || next_ >= data_.size()) {
155 return absl::OutOfRangeError("The iterator is out of range.");
156 }
157 return CreateExample(data_[next_++], earliest_trustworthy_time_millis_);
158 }
159
Close()160 void Close() override {
161 next_ = 0;
162 data_.clear();
163 }
164
165 private:
166 // The index for the next OperationalStats to be used.
167 int next_;
168 std::vector<OperationalStats> data_;
169 const int64_t earliest_trustworthy_time_millis_;
170 };
171
172 } // anonymous namespace
173
CanHandle(const ExampleSelector & example_selector)174 bool OpStatsExampleIteratorFactory::CanHandle(
175 const ExampleSelector& example_selector) {
176 return example_selector.collection_uri() == opstats::kOpStatsCollectionUri;
177 }
178
179 absl::StatusOr<std::unique_ptr<fcp::client::ExampleIterator>>
CreateExampleIterator(const ExampleSelector & example_selector)180 OpStatsExampleIteratorFactory::CreateExampleIterator(
181 const ExampleSelector& example_selector) {
182 if (example_selector.collection_uri() != kOpStatsCollectionUri) {
183 log_manager_->LogDiag(ProdDiagCode::OPSTATS_INCORRECT_COLLECTION_URI);
184 return absl::InvalidArgumentError(absl::StrCat(
185 "The collection uri is ", example_selector.collection_uri(),
186 ", which is not the expected uri: ", kOpStatsCollectionUri));
187 }
188 if (!op_stats_logger_->IsOpStatsEnabled()) {
189 log_manager_->LogDiag(
190 ProdDiagCode::OPSTATS_EXAMPLE_STORE_REQUESTED_NOT_ENABLED);
191 return absl::InvalidArgumentError("OpStats example store is not enabled.");
192 }
193
194 absl::Time lower_bound_time = absl::InfinitePast();
195 absl::Time upper_bound_time = absl::InfiniteFuture();
196 bool last_successful_contribution = false;
197 if (example_selector.has_criteria()) {
198 OpStatsSelectionCriteria criteria;
199 if (!example_selector.criteria().UnpackTo(&criteria)) {
200 log_manager_->LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA);
201 return absl::InvalidArgumentError("Unable to parse selection criteria.");
202 }
203
204 if (criteria.has_start_time()) {
205 lower_bound_time = absl::FromUnixMillis(
206 TimeUtil::TimestampToMilliseconds(criteria.start_time()));
207 }
208 if (criteria.has_end_time()) {
209 upper_bound_time = absl::FromUnixMillis(
210 TimeUtil::TimestampToMilliseconds(criteria.end_time()));
211 }
212 if (lower_bound_time > upper_bound_time) {
213 log_manager_->LogDiag(ProdDiagCode::OPSTATS_INVALID_SELECTION_CRITERIA);
214 return absl::InvalidArgumentError(
215 "Invalid selection criteria: start_time is after end_time.");
216 }
217 last_successful_contribution = criteria.last_successful_contribution();
218 }
219
220 FCP_ASSIGN_OR_RETURN(OpStatsSequence data,
221 op_stats_logger_->GetOpStatsDb()->Read());
222 std::vector<OperationalStats> selected_data;
223 if (last_successful_contribution) {
224 if (opstats_last_successful_contribution_criteria_) {
225 // Selector specified last_successful_contribution, and the feature is
226 // enabled. Create a last_successful_contribution iterator.
227 std::optional<OperationalStats> last_successful_contribution_entry =
228 GetLastSuccessfulContribution(data,
229 op_stats_logger_->GetCurrentTaskName());
230 if (last_successful_contribution_entry.has_value()) {
231 selected_data.push_back(*last_successful_contribution_entry);
232 }
233 } else {
234 return absl::InvalidArgumentError(
235 "OpStats selection criteria has last_successful_contribution enabled "
236 "but feature not enabled in the runtime!");
237 }
238 } else {
239 for (auto it = data.opstats().rbegin(); it != data.opstats().rend(); ++it) {
240 absl::Time last_update_time = GetLastUpdatedTime(*it);
241 if (last_update_time >= lower_bound_time &&
242 last_update_time <= upper_bound_time) {
243 selected_data.push_back(*it);
244 }
245 }
246 }
247 return std::make_unique<OpStatsExampleIterator>(
248 std::move(selected_data),
249 TimeUtil::TimestampToMilliseconds(data.earliest_trustworthy_time()));
250 }
251
252 } // namespace opstats
253 } // namespace client
254 } // namespace fcp
255