xref: /aosp_15_r20/external/federated-compute/fcp/client/federated_select.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/federated_select.h"
17 
18 #include <deque>
19 #include <filesystem>
20 #include <fstream>
21 #include <functional>
22 #include <ios>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include "google/protobuf/any.pb.h"
29 #include "absl/status/statusor.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/str_replace.h"
32 #include "fcp/base/monitoring.h"
33 #include "fcp/base/wall_clock_stopwatch.h"
34 #include "fcp/client/diag_codes.pb.h"
35 #include "fcp/client/engine/example_iterator_factory.h"
36 #include "fcp/client/files.h"
37 #include "fcp/client/http/http_client.h"
38 #include "fcp/client/http/http_client_util.h"
39 #include "fcp/client/http/in_memory_request_response.h"
40 #include "fcp/client/interruptible_runner.h"
41 #include "fcp/client/log_manager.h"
42 #include "fcp/client/stats.h"
43 #include "fcp/protos/plan.pb.h"
44 
45 namespace fcp {
46 namespace client {
47 
48 using fcp::client::http::HttpClient;
49 using fcp::client::http::InMemoryHttpResponse;
50 using fcp::client::http::UriOrInlineData;
51 using ::google::internal::federated::plan::SlicesSelector;
52 
53 namespace {
54 
55 // A Federated Select `ExampleIteratorFactory` that fails all queries.
56 class DisabledFederatedSelectExampleIteratorFactory
57     : public FederatedSelectExampleIteratorFactory {
58  public:
DisabledFederatedSelectExampleIteratorFactory(LogManager * log_manager)59   explicit DisabledFederatedSelectExampleIteratorFactory(
60       LogManager* log_manager)
61       : log_manager_(*log_manager) {}
62 
63   // Will fetch the slice data via HTTP and return an error if any of the
64   // slice fetch requests failed.
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)65   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
66       const ::google::internal::federated::plan::ExampleSelector&
67           example_selector) override {
68     log_manager_.LogDiag(
69         ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED_BUT_DISABLED);
70     return absl::InvalidArgumentError("Federated Select is disabled.");
71   }
72 
73  private:
74   LogManager& log_manager_;
75 };
76 
FetchSlicesViaHttp(const SlicesSelector & slices_selector,absl::string_view uri_template,HttpClient & http_client,InterruptibleRunner & interruptible_runner,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)77 absl::StatusOr<std::deque<absl::Cord>> FetchSlicesViaHttp(
78     const SlicesSelector& slices_selector, absl::string_view uri_template,
79     HttpClient& http_client, InterruptibleRunner& interruptible_runner,
80     int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
81   std::vector<UriOrInlineData> resources;
82   for (int32_t slice_key : slices_selector.keys()) {
83     std::string slice_uri = absl::StrReplaceAll(
84         // Note that `served_at_id` is documented to not require URL-escaping,
85         // so we don't apply any here.
86         uri_template, {{"{served_at_id}", slices_selector.served_at_id()},
87                        {"{key_base10}", absl::StrCat(slice_key)}});
88 
89     resources.push_back(
90         UriOrInlineData::CreateUri(slice_uri, "", absl::ZeroDuration()));
91   }
92 
93   // Perform the requests.
94   absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
95       slice_fetch_result = http::FetchResourcesInMemory(
96           http_client, interruptible_runner, std::move(resources),
97           bytes_received_acc, bytes_sent_acc,
98           // TODO(team): Enable caching for federated select slices.
99           /*resource_cache=*/nullptr);
100 
101   // Check whether issuing the requests failed as a whole (generally indicating
102   // a programming error).
103   if (!slice_fetch_result.ok()) {
104     return absl::InternalError(absl::StrCat(
105         "Failed to perform HTTP requests (URI template: ", uri_template,
106         "): ", absl::StatusCodeToString(slice_fetch_result.status().code())));
107   }
108 
109   std::deque<absl::Cord> slices;
110   for (const absl::StatusOr<InMemoryHttpResponse>& http_response :
111        *slice_fetch_result) {
112     if (!http_response.ok()) {
113       return absl::UnavailableError(absl::StrCat(
114           "Slice fetch request failed (URI template: ", uri_template,
115           "): ", absl::StatusCodeToString(http_response.status().code())));
116     }
117     slices.push_back(http_response->body);
118   }
119   return slices;
120 }
121 
122 // A Federated Select `ExampleIteratorFactory` that, upon creation of an
123 // iterator, fetches the slice data via HTTP, buffers it in-memory, and then
124 // exposes it to the plan via an `InMemoryFederatedSelectExampleIterator`.
125 class HttpFederatedSelectExampleIteratorFactory
126     : public FederatedSelectExampleIteratorFactory {
127  public:
HttpFederatedSelectExampleIteratorFactory(LogManager * log_manager,Files * files,HttpClient * http_client,InterruptibleRunner * interruptible_runner,absl::string_view uri_template,std::atomic<int64_t> & bytes_sent_acc,std::atomic<int64_t> & bytes_received_acc,WallClockStopwatch * network_stopwatch)128   HttpFederatedSelectExampleIteratorFactory(
129       LogManager* log_manager, Files* files, HttpClient* http_client,
130       InterruptibleRunner* interruptible_runner, absl::string_view uri_template,
131       std::atomic<int64_t>& bytes_sent_acc,
132       std::atomic<int64_t>& bytes_received_acc,
133       WallClockStopwatch* network_stopwatch)
134       : log_manager_(*log_manager),
135         files_(*files),
136         http_client_(*http_client),
137         interruptible_runner_(*interruptible_runner),
138         uri_template_(uri_template),
139         bytes_sent_acc_(bytes_sent_acc),
140         bytes_received_acc_(bytes_received_acc),
141         network_stopwatch_(*network_stopwatch) {}
142 
143   // Will fetch the slice data via HTTP and return an error if any of the slice
144   // fetch requests failed.
145   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
146       const ::google::internal::federated::plan::ExampleSelector&
147           example_selector) override;
148 
149  private:
150   LogManager& log_manager_;
151   Files& files_;
152   HttpClient& http_client_;
153   InterruptibleRunner& interruptible_runner_;
154   std::string uri_template_;
155   std::atomic<int64_t>& bytes_sent_acc_;
156   std::atomic<int64_t>& bytes_received_acc_;
157   WallClockStopwatch& network_stopwatch_;
158 };
159 
160 absl::StatusOr<std::unique_ptr<ExampleIterator>>
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)161 HttpFederatedSelectExampleIteratorFactory::CreateExampleIterator(
162     const ::google::internal::federated::plan::ExampleSelector&
163         example_selector) {
164   SlicesSelector slices_selector;
165   if (!example_selector.criteria().UnpackTo(&slices_selector)) {
166     return absl::InvalidArgumentError(
167         absl::StrCat("Unexpected/unparseable selection criteria: ",
168                      example_selector.criteria().GetTypeName()));
169   }
170 
171   log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED);
172 
173   // Create the temporary scratch file to store the checkpoint data in.
174   // Deletion of the file is done in the
175   // InMemoryFederatedSelectExampleIterator::Close() method or its destructor.
176   absl::StatusOr<std::string> scratch_filename =
177       files_.CreateTempFile("slice", ".ckp");
178 
179   if (!scratch_filename.ok()) {
180     return absl::InternalError(absl::StrCat(
181         "Failed to create scratch file for slice data (URI template: ",
182         uri_template_,
183         "): ", absl::StatusCodeToString(scratch_filename.status().code()), ": ",
184         scratch_filename.status().message()));
185   }
186 
187   // Fetch the slices.
188   int64_t bytes_received = 0;
189   int64_t bytes_sent = 0;
190   absl::StatusOr<std::deque<absl::Cord>> slices;
191   {
192     auto started_stopwatch = network_stopwatch_.Start();
193     slices = FetchSlicesViaHttp(slices_selector, uri_template_, http_client_,
194                                 interruptible_runner_,
195                                 /*bytes_received_acc=*/&bytes_received,
196                                 /*bytes_sent_acc=*/&bytes_sent);
197   }
198   bytes_sent_acc_ += bytes_sent;
199   bytes_received_acc_ += bytes_received;
200   if (!slices.ok()) {
201     log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_FAILED);
202     return absl::Status(slices.status().code(),
203                         absl::StrCat("Failed to fetch slice data: ",
204                                      slices.status().message()));
205   }
206   log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_SUCCEEDED);
207 
208   return std::make_unique<InMemoryFederatedSelectExampleIterator>(
209       *scratch_filename, std::move(*slices));
210 }
211 }  // namespace
212 
DisabledFederatedSelectManager(LogManager * log_manager)213 DisabledFederatedSelectManager::DisabledFederatedSelectManager(
214     LogManager* log_manager)
215     : log_manager_(*log_manager) {}
216 
217 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)218 DisabledFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
219     absl::string_view uri_template) {
220   return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
221       &log_manager_);
222 }
223 
HttpFederatedSelectManager(LogManager * log_manager,Files * files,fcp::client::http::HttpClient * http_client,std::function<bool ()> should_abort,const InterruptibleRunner::TimingConfig & timing_config)224 HttpFederatedSelectManager::HttpFederatedSelectManager(
225     LogManager* log_manager, Files* files,
226     fcp::client::http::HttpClient* http_client,
227     std::function<bool()> should_abort,
228     const InterruptibleRunner::TimingConfig& timing_config)
229     : log_manager_(*log_manager),
230       files_(*files),
231       http_client_(*http_client),
232       interruptible_runner_(std::make_unique<InterruptibleRunner>(
233           log_manager, should_abort, timing_config,
234           InterruptibleRunner::DiagnosticsConfig{
235               .interrupted = ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP,
236               .interrupt_timeout =
237                   ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP_TIMED_OUT,
238               .interrupted_extended = ProdDiagCode::
239                   BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_COMPLETED,
240               .interrupt_timeout_extended = ProdDiagCode::
241                   BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_TIMED_OUT})) {}
242 
243 std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)244 HttpFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
245     absl::string_view uri_template) {
246   // If the server didn't populate the URI template then we can't support any
247   // slice fetch requests.
248   if (uri_template.empty()) {
249     return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
250         &log_manager_);
251   }
252   return std::make_unique<HttpFederatedSelectExampleIteratorFactory>(
253       &log_manager_, &files_, &http_client_, interruptible_runner_.get(),
254       uri_template,
255       /*bytes_sent_acc=*/bytes_sent_, /*bytes_received_acc=*/bytes_received_,
256       network_stopwatch_.get());
257 }
258 
Next()259 absl::StatusOr<std::string> InMemoryFederatedSelectExampleIterator::Next() {
260   absl::MutexLock lock(&mutex_);
261 
262   if (slices_.empty()) {
263     // Eagerly delete the scratch file, since we won't need it anymore.
264     std::filesystem::remove(scratch_filename_);
265     return absl::OutOfRangeError("end of iterator reached");
266   }
267 
268   absl::Cord& slice_data = slices_.front();
269 
270   // Write the checkpoint data to the file (truncating any data previously
271   // written to the file).
272   std::fstream checkpoint_stream(scratch_filename_,
273                                  std::ios_base::out | std::ios_base::trunc);
274   if (checkpoint_stream.fail()) {
275     return absl::InternalError("Failed to write slice to file");
276   }
277   for (absl::string_view chunk : slice_data.Chunks()) {
278     if (!(checkpoint_stream << chunk).good()) {
279       return absl::InternalError("Failed to write slice to file");
280     }
281   }
282   checkpoint_stream.close();
283 
284   // Remove the slice from the deque, releasing its data from memory.
285   slices_.pop_front();
286 
287   return scratch_filename_;
288 }
289 
Close()290 void InMemoryFederatedSelectExampleIterator::Close() { CleanupInternal(); }
291 
292 InMemoryFederatedSelectExampleIterator::
~InMemoryFederatedSelectExampleIterator()293     ~InMemoryFederatedSelectExampleIterator() {
294   // Remove the scratch file, even if Close() wasn't called first.
295   CleanupInternal();
296 }
297 
CleanupInternal()298 void InMemoryFederatedSelectExampleIterator::CleanupInternal() {
299   absl::MutexLock lock(&mutex_);
300   // Remove the scratch filename, if it hadn't been removed yet.
301   slices_.clear();
302   std::filesystem::remove(scratch_filename_);
303 }
304 
305 }  // namespace client
306 }  // namespace fcp
307