1 /*
2 * Copyright 2022 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/federated_select.h"
17
18 #include <deque>
19 #include <filesystem>
20 #include <fstream>
21 #include <functional>
22 #include <ios>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27
28 #include "google/protobuf/any.pb.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/str_replace.h"
32 #include "fcp/base/monitoring.h"
33 #include "fcp/base/wall_clock_stopwatch.h"
34 #include "fcp/client/diag_codes.pb.h"
35 #include "fcp/client/engine/example_iterator_factory.h"
36 #include "fcp/client/files.h"
37 #include "fcp/client/http/http_client.h"
38 #include "fcp/client/http/http_client_util.h"
39 #include "fcp/client/http/in_memory_request_response.h"
40 #include "fcp/client/interruptible_runner.h"
41 #include "fcp/client/log_manager.h"
42 #include "fcp/client/stats.h"
43 #include "fcp/protos/plan.pb.h"
44
45 namespace fcp {
46 namespace client {
47
48 using fcp::client::http::HttpClient;
49 using fcp::client::http::InMemoryHttpResponse;
50 using fcp::client::http::UriOrInlineData;
51 using ::google::internal::federated::plan::SlicesSelector;
52
53 namespace {
54
55 // A Federated Select `ExampleIteratorFactory` that fails all queries.
56 class DisabledFederatedSelectExampleIteratorFactory
57 : public FederatedSelectExampleIteratorFactory {
58 public:
DisabledFederatedSelectExampleIteratorFactory(LogManager * log_manager)59 explicit DisabledFederatedSelectExampleIteratorFactory(
60 LogManager* log_manager)
61 : log_manager_(*log_manager) {}
62
63 // Will fetch the slice data via HTTP and return an error if any of the
64 // slice fetch requests failed.
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)65 absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
66 const ::google::internal::federated::plan::ExampleSelector&
67 example_selector) override {
68 log_manager_.LogDiag(
69 ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED_BUT_DISABLED);
70 return absl::InvalidArgumentError("Federated Select is disabled.");
71 }
72
73 private:
74 LogManager& log_manager_;
75 };
76
FetchSlicesViaHttp(const SlicesSelector & slices_selector,absl::string_view uri_template,HttpClient & http_client,InterruptibleRunner & interruptible_runner,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)77 absl::StatusOr<std::deque<absl::Cord>> FetchSlicesViaHttp(
78 const SlicesSelector& slices_selector, absl::string_view uri_template,
79 HttpClient& http_client, InterruptibleRunner& interruptible_runner,
80 int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
81 std::vector<UriOrInlineData> resources;
82 for (int32_t slice_key : slices_selector.keys()) {
83 std::string slice_uri = absl::StrReplaceAll(
84 // Note that `served_at_id` is documented to not require URL-escaping,
85 // so we don't apply any here.
86 uri_template, {{"{served_at_id}", slices_selector.served_at_id()},
87 {"{key_base10}", absl::StrCat(slice_key)}});
88
89 resources.push_back(
90 UriOrInlineData::CreateUri(slice_uri, "", absl::ZeroDuration()));
91 }
92
93 // Perform the requests.
94 absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
95 slice_fetch_result = http::FetchResourcesInMemory(
96 http_client, interruptible_runner, std::move(resources),
97 bytes_received_acc, bytes_sent_acc,
98 // TODO(team): Enable caching for federated select slices.
99 /*resource_cache=*/nullptr);
100
101 // Check whether issuing the requests failed as a whole (generally indicating
102 // a programming error).
103 if (!slice_fetch_result.ok()) {
104 return absl::InternalError(absl::StrCat(
105 "Failed to perform HTTP requests (URI template: ", uri_template,
106 "): ", absl::StatusCodeToString(slice_fetch_result.status().code())));
107 }
108
109 std::deque<absl::Cord> slices;
110 for (const absl::StatusOr<InMemoryHttpResponse>& http_response :
111 *slice_fetch_result) {
112 if (!http_response.ok()) {
113 return absl::UnavailableError(absl::StrCat(
114 "Slice fetch request failed (URI template: ", uri_template,
115 "): ", absl::StatusCodeToString(http_response.status().code())));
116 }
117 slices.push_back(http_response->body);
118 }
119 return slices;
120 }
121
122 // A Federated Select `ExampleIteratorFactory` that, upon creation of an
123 // iterator, fetches the slice data via HTTP, buffers it in-memory, and then
124 // exposes it to the plan via an `InMemoryFederatedSelectExampleIterator`.
125 class HttpFederatedSelectExampleIteratorFactory
126 : public FederatedSelectExampleIteratorFactory {
127 public:
HttpFederatedSelectExampleIteratorFactory(LogManager * log_manager,Files * files,HttpClient * http_client,InterruptibleRunner * interruptible_runner,absl::string_view uri_template,std::atomic<int64_t> & bytes_sent_acc,std::atomic<int64_t> & bytes_received_acc,WallClockStopwatch * network_stopwatch)128 HttpFederatedSelectExampleIteratorFactory(
129 LogManager* log_manager, Files* files, HttpClient* http_client,
130 InterruptibleRunner* interruptible_runner, absl::string_view uri_template,
131 std::atomic<int64_t>& bytes_sent_acc,
132 std::atomic<int64_t>& bytes_received_acc,
133 WallClockStopwatch* network_stopwatch)
134 : log_manager_(*log_manager),
135 files_(*files),
136 http_client_(*http_client),
137 interruptible_runner_(*interruptible_runner),
138 uri_template_(uri_template),
139 bytes_sent_acc_(bytes_sent_acc),
140 bytes_received_acc_(bytes_received_acc),
141 network_stopwatch_(*network_stopwatch) {}
142
143 // Will fetch the slice data via HTTP and return an error if any of the slice
144 // fetch requests failed.
145 absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
146 const ::google::internal::federated::plan::ExampleSelector&
147 example_selector) override;
148
149 private:
150 LogManager& log_manager_;
151 Files& files_;
152 HttpClient& http_client_;
153 InterruptibleRunner& interruptible_runner_;
154 std::string uri_template_;
155 std::atomic<int64_t>& bytes_sent_acc_;
156 std::atomic<int64_t>& bytes_received_acc_;
157 WallClockStopwatch& network_stopwatch_;
158 };
159
160 absl::StatusOr<std::unique_ptr<ExampleIterator>>
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)161 HttpFederatedSelectExampleIteratorFactory::CreateExampleIterator(
162 const ::google::internal::federated::plan::ExampleSelector&
163 example_selector) {
164 SlicesSelector slices_selector;
165 if (!example_selector.criteria().UnpackTo(&slices_selector)) {
166 return absl::InvalidArgumentError(
167 absl::StrCat("Unexpected/unparseable selection criteria: ",
168 example_selector.criteria().GetTypeName()));
169 }
170
171 log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED);
172
173 // Create the temporary scratch file to store the checkpoint data in.
174 // Deletion of the file is done in the
175 // InMemoryFederatedSelectExampleIterator::Close() method or its destructor.
176 absl::StatusOr<std::string> scratch_filename =
177 files_.CreateTempFile("slice", ".ckp");
178
179 if (!scratch_filename.ok()) {
180 return absl::InternalError(absl::StrCat(
181 "Failed to create scratch file for slice data (URI template: ",
182 uri_template_,
183 "): ", absl::StatusCodeToString(scratch_filename.status().code()), ": ",
184 scratch_filename.status().message()));
185 }
186
187 // Fetch the slices.
188 int64_t bytes_received = 0;
189 int64_t bytes_sent = 0;
190 absl::StatusOr<std::deque<absl::Cord>> slices;
191 {
192 auto started_stopwatch = network_stopwatch_.Start();
193 slices = FetchSlicesViaHttp(slices_selector, uri_template_, http_client_,
194 interruptible_runner_,
195 /*bytes_received_acc=*/&bytes_received,
196 /*bytes_sent_acc=*/&bytes_sent);
197 }
198 bytes_sent_acc_ += bytes_sent;
199 bytes_received_acc_ += bytes_received;
200 if (!slices.ok()) {
201 log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_FAILED);
202 return absl::Status(slices.status().code(),
203 absl::StrCat("Failed to fetch slice data: ",
204 slices.status().message()));
205 }
206 log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_SUCCEEDED);
207
208 return std::make_unique<InMemoryFederatedSelectExampleIterator>(
209 *scratch_filename, std::move(*slices));
210 }
211 } // namespace
212
DisabledFederatedSelectManager(LogManager * log_manager)213 DisabledFederatedSelectManager::DisabledFederatedSelectManager(
214 LogManager* log_manager)
215 : log_manager_(*log_manager) {}
216
217 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)218 DisabledFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
219 absl::string_view uri_template) {
220 return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
221 &log_manager_);
222 }
223
HttpFederatedSelectManager(LogManager * log_manager,Files * files,fcp::client::http::HttpClient * http_client,std::function<bool ()> should_abort,const InterruptibleRunner::TimingConfig & timing_config)224 HttpFederatedSelectManager::HttpFederatedSelectManager(
225 LogManager* log_manager, Files* files,
226 fcp::client::http::HttpClient* http_client,
227 std::function<bool()> should_abort,
228 const InterruptibleRunner::TimingConfig& timing_config)
229 : log_manager_(*log_manager),
230 files_(*files),
231 http_client_(*http_client),
232 interruptible_runner_(std::make_unique<InterruptibleRunner>(
233 log_manager, should_abort, timing_config,
234 InterruptibleRunner::DiagnosticsConfig{
235 .interrupted = ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP,
236 .interrupt_timeout =
237 ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP_TIMED_OUT,
238 .interrupted_extended = ProdDiagCode::
239 BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_COMPLETED,
240 .interrupt_timeout_extended = ProdDiagCode::
241 BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_TIMED_OUT})) {}
242
243 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)244 HttpFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
245 absl::string_view uri_template) {
246 // If the server didn't populate the URI template then we can't support any
247 // slice fetch requests.
248 if (uri_template.empty()) {
249 return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
250 &log_manager_);
251 }
252 return std::make_unique<HttpFederatedSelectExampleIteratorFactory>(
253 &log_manager_, &files_, &http_client_, interruptible_runner_.get(),
254 uri_template,
255 /*bytes_sent_acc=*/bytes_sent_, /*bytes_received_acc=*/bytes_received_,
256 network_stopwatch_.get());
257 }
258
Next()259 absl::StatusOr<std::string> InMemoryFederatedSelectExampleIterator::Next() {
260 absl::MutexLock lock(&mutex_);
261
262 if (slices_.empty()) {
263 // Eagerly delete the scratch file, since we won't need it anymore.
264 std::filesystem::remove(scratch_filename_);
265 return absl::OutOfRangeError("end of iterator reached");
266 }
267
268 absl::Cord& slice_data = slices_.front();
269
270 // Write the checkpoint data to the file (truncating any data previously
271 // written to the file).
272 std::fstream checkpoint_stream(scratch_filename_,
273 std::ios_base::out | std::ios_base::trunc);
274 if (checkpoint_stream.fail()) {
275 return absl::InternalError("Failed to write slice to file");
276 }
277 for (absl::string_view chunk : slice_data.Chunks()) {
278 if (!(checkpoint_stream << chunk).good()) {
279 return absl::InternalError("Failed to write slice to file");
280 }
281 }
282 checkpoint_stream.close();
283
284 // Remove the slice from the deque, releasing its data from memory.
285 slices_.pop_front();
286
287 return scratch_filename_;
288 }
289
Close()290 void InMemoryFederatedSelectExampleIterator::Close() { CleanupInternal(); }
291
292 InMemoryFederatedSelectExampleIterator::
~InMemoryFederatedSelectExampleIterator()293 ~InMemoryFederatedSelectExampleIterator() {
294 // Remove the scratch file, even if Close() wasn't called first.
295 CleanupInternal();
296 }
297
CleanupInternal()298 void InMemoryFederatedSelectExampleIterator::CleanupInternal() {
299 absl::MutexLock lock(&mutex_);
300 // Remove the scratch filename, if it hadn't been removed yet.
301 slices_.clear();
302 std::filesystem::remove(scratch_filename_);
303 }
304
305 } // namespace client
306 } // namespace fcp
307