xref: /aosp_15_r20/external/federated-compute/fcp/client/federated_select.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker  * Copyright 2022 Google LLC
3*14675a02SAndroid Build Coastguard Worker  *
4*14675a02SAndroid Build Coastguard Worker  * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker  * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker  * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker  *
8*14675a02SAndroid Build Coastguard Worker  *      http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker  *
10*14675a02SAndroid Build Coastguard Worker  * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker  * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker  * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker  * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker  */
16*14675a02SAndroid Build Coastguard Worker #include "fcp/client/federated_select.h"
17*14675a02SAndroid Build Coastguard Worker 
18*14675a02SAndroid Build Coastguard Worker #include <deque>
19*14675a02SAndroid Build Coastguard Worker #include <filesystem>
20*14675a02SAndroid Build Coastguard Worker #include <fstream>
21*14675a02SAndroid Build Coastguard Worker #include <functional>
22*14675a02SAndroid Build Coastguard Worker #include <ios>
23*14675a02SAndroid Build Coastguard Worker #include <memory>
24*14675a02SAndroid Build Coastguard Worker #include <string>
25*14675a02SAndroid Build Coastguard Worker #include <utility>
26*14675a02SAndroid Build Coastguard Worker #include <vector>
27*14675a02SAndroid Build Coastguard Worker 
28*14675a02SAndroid Build Coastguard Worker #include "google/protobuf/any.pb.h"
29*14675a02SAndroid Build Coastguard Worker #include "absl/status/statusor.h"
30*14675a02SAndroid Build Coastguard Worker #include "absl/strings/cord.h"
31*14675a02SAndroid Build Coastguard Worker #include "absl/strings/str_replace.h"
32*14675a02SAndroid Build Coastguard Worker #include "fcp/base/monitoring.h"
33*14675a02SAndroid Build Coastguard Worker #include "fcp/base/wall_clock_stopwatch.h"
34*14675a02SAndroid Build Coastguard Worker #include "fcp/client/diag_codes.pb.h"
35*14675a02SAndroid Build Coastguard Worker #include "fcp/client/engine/example_iterator_factory.h"
36*14675a02SAndroid Build Coastguard Worker #include "fcp/client/files.h"
37*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client.h"
38*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client_util.h"
39*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/in_memory_request_response.h"
40*14675a02SAndroid Build Coastguard Worker #include "fcp/client/interruptible_runner.h"
41*14675a02SAndroid Build Coastguard Worker #include "fcp/client/log_manager.h"
42*14675a02SAndroid Build Coastguard Worker #include "fcp/client/stats.h"
43*14675a02SAndroid Build Coastguard Worker #include "fcp/protos/plan.pb.h"
44*14675a02SAndroid Build Coastguard Worker 
45*14675a02SAndroid Build Coastguard Worker namespace fcp {
46*14675a02SAndroid Build Coastguard Worker namespace client {
47*14675a02SAndroid Build Coastguard Worker 
48*14675a02SAndroid Build Coastguard Worker using fcp::client::http::HttpClient;
49*14675a02SAndroid Build Coastguard Worker using fcp::client::http::InMemoryHttpResponse;
50*14675a02SAndroid Build Coastguard Worker using fcp::client::http::UriOrInlineData;
51*14675a02SAndroid Build Coastguard Worker using ::google::internal::federated::plan::SlicesSelector;
52*14675a02SAndroid Build Coastguard Worker 
53*14675a02SAndroid Build Coastguard Worker namespace {
54*14675a02SAndroid Build Coastguard Worker 
55*14675a02SAndroid Build Coastguard Worker // A Federated Select `ExampleIteratorFactory` that fails all queries.
56*14675a02SAndroid Build Coastguard Worker class DisabledFederatedSelectExampleIteratorFactory
57*14675a02SAndroid Build Coastguard Worker     : public FederatedSelectExampleIteratorFactory {
58*14675a02SAndroid Build Coastguard Worker  public:
DisabledFederatedSelectExampleIteratorFactory(LogManager * log_manager)59*14675a02SAndroid Build Coastguard Worker   explicit DisabledFederatedSelectExampleIteratorFactory(
60*14675a02SAndroid Build Coastguard Worker       LogManager* log_manager)
61*14675a02SAndroid Build Coastguard Worker       : log_manager_(*log_manager) {}
62*14675a02SAndroid Build Coastguard Worker 
63*14675a02SAndroid Build Coastguard Worker   // Will fetch the slice data via HTTP and return an error if any of the
64*14675a02SAndroid Build Coastguard Worker   // slice fetch requests failed.
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)65*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
66*14675a02SAndroid Build Coastguard Worker       const ::google::internal::federated::plan::ExampleSelector&
67*14675a02SAndroid Build Coastguard Worker           example_selector) override {
68*14675a02SAndroid Build Coastguard Worker     log_manager_.LogDiag(
69*14675a02SAndroid Build Coastguard Worker         ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED_BUT_DISABLED);
70*14675a02SAndroid Build Coastguard Worker     return absl::InvalidArgumentError("Federated Select is disabled.");
71*14675a02SAndroid Build Coastguard Worker   }
72*14675a02SAndroid Build Coastguard Worker 
73*14675a02SAndroid Build Coastguard Worker  private:
74*14675a02SAndroid Build Coastguard Worker   LogManager& log_manager_;
75*14675a02SAndroid Build Coastguard Worker };
76*14675a02SAndroid Build Coastguard Worker 
FetchSlicesViaHttp(const SlicesSelector & slices_selector,absl::string_view uri_template,HttpClient & http_client,InterruptibleRunner & interruptible_runner,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)77*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::deque<absl::Cord>> FetchSlicesViaHttp(
78*14675a02SAndroid Build Coastguard Worker     const SlicesSelector& slices_selector, absl::string_view uri_template,
79*14675a02SAndroid Build Coastguard Worker     HttpClient& http_client, InterruptibleRunner& interruptible_runner,
80*14675a02SAndroid Build Coastguard Worker     int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
81*14675a02SAndroid Build Coastguard Worker   std::vector<UriOrInlineData> resources;
82*14675a02SAndroid Build Coastguard Worker   for (int32_t slice_key : slices_selector.keys()) {
83*14675a02SAndroid Build Coastguard Worker     std::string slice_uri = absl::StrReplaceAll(
84*14675a02SAndroid Build Coastguard Worker         // Note that `served_at_id` is documented to not require URL-escaping,
85*14675a02SAndroid Build Coastguard Worker         // so we don't apply any here.
86*14675a02SAndroid Build Coastguard Worker         uri_template, {{"{served_at_id}", slices_selector.served_at_id()},
87*14675a02SAndroid Build Coastguard Worker                        {"{key_base10}", absl::StrCat(slice_key)}});
88*14675a02SAndroid Build Coastguard Worker 
89*14675a02SAndroid Build Coastguard Worker     resources.push_back(
90*14675a02SAndroid Build Coastguard Worker         UriOrInlineData::CreateUri(slice_uri, "", absl::ZeroDuration()));
91*14675a02SAndroid Build Coastguard Worker   }
92*14675a02SAndroid Build Coastguard Worker 
93*14675a02SAndroid Build Coastguard Worker   // Perform the requests.
94*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
95*14675a02SAndroid Build Coastguard Worker       slice_fetch_result = http::FetchResourcesInMemory(
96*14675a02SAndroid Build Coastguard Worker           http_client, interruptible_runner, std::move(resources),
97*14675a02SAndroid Build Coastguard Worker           bytes_received_acc, bytes_sent_acc,
98*14675a02SAndroid Build Coastguard Worker           // TODO(team): Enable caching for federated select slices.
99*14675a02SAndroid Build Coastguard Worker           /*resource_cache=*/nullptr);
100*14675a02SAndroid Build Coastguard Worker 
101*14675a02SAndroid Build Coastguard Worker   // Check whether issuing the requests failed as a whole (generally indicating
102*14675a02SAndroid Build Coastguard Worker   // a programming error).
103*14675a02SAndroid Build Coastguard Worker   if (!slice_fetch_result.ok()) {
104*14675a02SAndroid Build Coastguard Worker     return absl::InternalError(absl::StrCat(
105*14675a02SAndroid Build Coastguard Worker         "Failed to perform HTTP requests (URI template: ", uri_template,
106*14675a02SAndroid Build Coastguard Worker         "): ", absl::StatusCodeToString(slice_fetch_result.status().code())));
107*14675a02SAndroid Build Coastguard Worker   }
108*14675a02SAndroid Build Coastguard Worker 
109*14675a02SAndroid Build Coastguard Worker   std::deque<absl::Cord> slices;
110*14675a02SAndroid Build Coastguard Worker   for (const absl::StatusOr<InMemoryHttpResponse>& http_response :
111*14675a02SAndroid Build Coastguard Worker        *slice_fetch_result) {
112*14675a02SAndroid Build Coastguard Worker     if (!http_response.ok()) {
113*14675a02SAndroid Build Coastguard Worker       return absl::UnavailableError(absl::StrCat(
114*14675a02SAndroid Build Coastguard Worker           "Slice fetch request failed (URI template: ", uri_template,
115*14675a02SAndroid Build Coastguard Worker           "): ", absl::StatusCodeToString(http_response.status().code())));
116*14675a02SAndroid Build Coastguard Worker     }
117*14675a02SAndroid Build Coastguard Worker     slices.push_back(http_response->body);
118*14675a02SAndroid Build Coastguard Worker   }
119*14675a02SAndroid Build Coastguard Worker   return slices;
120*14675a02SAndroid Build Coastguard Worker }
121*14675a02SAndroid Build Coastguard Worker 
122*14675a02SAndroid Build Coastguard Worker // A Federated Select `ExampleIteratorFactory` that, upon creation of an
123*14675a02SAndroid Build Coastguard Worker // iterator, fetches the slice data via HTTP, buffers it in-memory, and then
124*14675a02SAndroid Build Coastguard Worker // exposes it to the plan via an `InMemoryFederatedSelectExampleIterator`.
125*14675a02SAndroid Build Coastguard Worker class HttpFederatedSelectExampleIteratorFactory
126*14675a02SAndroid Build Coastguard Worker     : public FederatedSelectExampleIteratorFactory {
127*14675a02SAndroid Build Coastguard Worker  public:
HttpFederatedSelectExampleIteratorFactory(LogManager * log_manager,Files * files,HttpClient * http_client,InterruptibleRunner * interruptible_runner,absl::string_view uri_template,std::atomic<int64_t> & bytes_sent_acc,std::atomic<int64_t> & bytes_received_acc,WallClockStopwatch * network_stopwatch)128*14675a02SAndroid Build Coastguard Worker   HttpFederatedSelectExampleIteratorFactory(
129*14675a02SAndroid Build Coastguard Worker       LogManager* log_manager, Files* files, HttpClient* http_client,
130*14675a02SAndroid Build Coastguard Worker       InterruptibleRunner* interruptible_runner, absl::string_view uri_template,
131*14675a02SAndroid Build Coastguard Worker       std::atomic<int64_t>& bytes_sent_acc,
132*14675a02SAndroid Build Coastguard Worker       std::atomic<int64_t>& bytes_received_acc,
133*14675a02SAndroid Build Coastguard Worker       WallClockStopwatch* network_stopwatch)
134*14675a02SAndroid Build Coastguard Worker       : log_manager_(*log_manager),
135*14675a02SAndroid Build Coastguard Worker         files_(*files),
136*14675a02SAndroid Build Coastguard Worker         http_client_(*http_client),
137*14675a02SAndroid Build Coastguard Worker         interruptible_runner_(*interruptible_runner),
138*14675a02SAndroid Build Coastguard Worker         uri_template_(uri_template),
139*14675a02SAndroid Build Coastguard Worker         bytes_sent_acc_(bytes_sent_acc),
140*14675a02SAndroid Build Coastguard Worker         bytes_received_acc_(bytes_received_acc),
141*14675a02SAndroid Build Coastguard Worker         network_stopwatch_(*network_stopwatch) {}
142*14675a02SAndroid Build Coastguard Worker 
143*14675a02SAndroid Build Coastguard Worker   // Will fetch the slice data via HTTP and return an error if any of the slice
144*14675a02SAndroid Build Coastguard Worker   // fetch requests failed.
145*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
146*14675a02SAndroid Build Coastguard Worker       const ::google::internal::federated::plan::ExampleSelector&
147*14675a02SAndroid Build Coastguard Worker           example_selector) override;
148*14675a02SAndroid Build Coastguard Worker 
149*14675a02SAndroid Build Coastguard Worker  private:
150*14675a02SAndroid Build Coastguard Worker   LogManager& log_manager_;
151*14675a02SAndroid Build Coastguard Worker   Files& files_;
152*14675a02SAndroid Build Coastguard Worker   HttpClient& http_client_;
153*14675a02SAndroid Build Coastguard Worker   InterruptibleRunner& interruptible_runner_;
154*14675a02SAndroid Build Coastguard Worker   std::string uri_template_;
155*14675a02SAndroid Build Coastguard Worker   std::atomic<int64_t>& bytes_sent_acc_;
156*14675a02SAndroid Build Coastguard Worker   std::atomic<int64_t>& bytes_received_acc_;
157*14675a02SAndroid Build Coastguard Worker   WallClockStopwatch& network_stopwatch_;
158*14675a02SAndroid Build Coastguard Worker };
159*14675a02SAndroid Build Coastguard Worker 
160*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::unique_ptr<ExampleIterator>>
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)161*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectExampleIteratorFactory::CreateExampleIterator(
162*14675a02SAndroid Build Coastguard Worker     const ::google::internal::federated::plan::ExampleSelector&
163*14675a02SAndroid Build Coastguard Worker         example_selector) {
164*14675a02SAndroid Build Coastguard Worker   SlicesSelector slices_selector;
165*14675a02SAndroid Build Coastguard Worker   if (!example_selector.criteria().UnpackTo(&slices_selector)) {
166*14675a02SAndroid Build Coastguard Worker     return absl::InvalidArgumentError(
167*14675a02SAndroid Build Coastguard Worker         absl::StrCat("Unexpected/unparseable selection criteria: ",
168*14675a02SAndroid Build Coastguard Worker                      example_selector.criteria().GetTypeName()));
169*14675a02SAndroid Build Coastguard Worker   }
170*14675a02SAndroid Build Coastguard Worker 
171*14675a02SAndroid Build Coastguard Worker   log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED);
172*14675a02SAndroid Build Coastguard Worker 
173*14675a02SAndroid Build Coastguard Worker   // Create the temporary scratch file to store the checkpoint data in.
174*14675a02SAndroid Build Coastguard Worker   // Deletion of the file is done in the
175*14675a02SAndroid Build Coastguard Worker   // InMemoryFederatedSelectExampleIterator::Close() method or its destructor.
176*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::string> scratch_filename =
177*14675a02SAndroid Build Coastguard Worker       files_.CreateTempFile("slice", ".ckp");
178*14675a02SAndroid Build Coastguard Worker 
179*14675a02SAndroid Build Coastguard Worker   if (!scratch_filename.ok()) {
180*14675a02SAndroid Build Coastguard Worker     return absl::InternalError(absl::StrCat(
181*14675a02SAndroid Build Coastguard Worker         "Failed to create scratch file for slice data (URI template: ",
182*14675a02SAndroid Build Coastguard Worker         uri_template_,
183*14675a02SAndroid Build Coastguard Worker         "): ", absl::StatusCodeToString(scratch_filename.status().code()), ": ",
184*14675a02SAndroid Build Coastguard Worker         scratch_filename.status().message()));
185*14675a02SAndroid Build Coastguard Worker   }
186*14675a02SAndroid Build Coastguard Worker 
187*14675a02SAndroid Build Coastguard Worker   // Fetch the slices.
188*14675a02SAndroid Build Coastguard Worker   int64_t bytes_received = 0;
189*14675a02SAndroid Build Coastguard Worker   int64_t bytes_sent = 0;
190*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::deque<absl::Cord>> slices;
191*14675a02SAndroid Build Coastguard Worker   {
192*14675a02SAndroid Build Coastguard Worker     auto started_stopwatch = network_stopwatch_.Start();
193*14675a02SAndroid Build Coastguard Worker     slices = FetchSlicesViaHttp(slices_selector, uri_template_, http_client_,
194*14675a02SAndroid Build Coastguard Worker                                 interruptible_runner_,
195*14675a02SAndroid Build Coastguard Worker                                 /*bytes_received_acc=*/&bytes_received,
196*14675a02SAndroid Build Coastguard Worker                                 /*bytes_sent_acc=*/&bytes_sent);
197*14675a02SAndroid Build Coastguard Worker   }
198*14675a02SAndroid Build Coastguard Worker   bytes_sent_acc_ += bytes_sent;
199*14675a02SAndroid Build Coastguard Worker   bytes_received_acc_ += bytes_received;
200*14675a02SAndroid Build Coastguard Worker   if (!slices.ok()) {
201*14675a02SAndroid Build Coastguard Worker     log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_FAILED);
202*14675a02SAndroid Build Coastguard Worker     return absl::Status(slices.status().code(),
203*14675a02SAndroid Build Coastguard Worker                         absl::StrCat("Failed to fetch slice data: ",
204*14675a02SAndroid Build Coastguard Worker                                      slices.status().message()));
205*14675a02SAndroid Build Coastguard Worker   }
206*14675a02SAndroid Build Coastguard Worker   log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_SUCCEEDED);
207*14675a02SAndroid Build Coastguard Worker 
208*14675a02SAndroid Build Coastguard Worker   return std::make_unique<InMemoryFederatedSelectExampleIterator>(
209*14675a02SAndroid Build Coastguard Worker       *scratch_filename, std::move(*slices));
210*14675a02SAndroid Build Coastguard Worker }
211*14675a02SAndroid Build Coastguard Worker }  // namespace
212*14675a02SAndroid Build Coastguard Worker 
DisabledFederatedSelectManager(LogManager * log_manager)213*14675a02SAndroid Build Coastguard Worker DisabledFederatedSelectManager::DisabledFederatedSelectManager(
214*14675a02SAndroid Build Coastguard Worker     LogManager* log_manager)
215*14675a02SAndroid Build Coastguard Worker     : log_manager_(*log_manager) {}
216*14675a02SAndroid Build Coastguard Worker 
217*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)218*14675a02SAndroid Build Coastguard Worker DisabledFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
219*14675a02SAndroid Build Coastguard Worker     absl::string_view uri_template) {
220*14675a02SAndroid Build Coastguard Worker   return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
221*14675a02SAndroid Build Coastguard Worker       &log_manager_);
222*14675a02SAndroid Build Coastguard Worker }
223*14675a02SAndroid Build Coastguard Worker 
HttpFederatedSelectManager(LogManager * log_manager,Files * files,fcp::client::http::HttpClient * http_client,std::function<bool ()> should_abort,const InterruptibleRunner::TimingConfig & timing_config)224*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectManager::HttpFederatedSelectManager(
225*14675a02SAndroid Build Coastguard Worker     LogManager* log_manager, Files* files,
226*14675a02SAndroid Build Coastguard Worker     fcp::client::http::HttpClient* http_client,
227*14675a02SAndroid Build Coastguard Worker     std::function<bool()> should_abort,
228*14675a02SAndroid Build Coastguard Worker     const InterruptibleRunner::TimingConfig& timing_config)
229*14675a02SAndroid Build Coastguard Worker     : log_manager_(*log_manager),
230*14675a02SAndroid Build Coastguard Worker       files_(*files),
231*14675a02SAndroid Build Coastguard Worker       http_client_(*http_client),
232*14675a02SAndroid Build Coastguard Worker       interruptible_runner_(std::make_unique<InterruptibleRunner>(
233*14675a02SAndroid Build Coastguard Worker           log_manager, should_abort, timing_config,
234*14675a02SAndroid Build Coastguard Worker           InterruptibleRunner::DiagnosticsConfig{
235*14675a02SAndroid Build Coastguard Worker               .interrupted = ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP,
236*14675a02SAndroid Build Coastguard Worker               .interrupt_timeout =
237*14675a02SAndroid Build Coastguard Worker                   ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP_TIMED_OUT,
238*14675a02SAndroid Build Coastguard Worker               .interrupted_extended = ProdDiagCode::
239*14675a02SAndroid Build Coastguard Worker                   BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_COMPLETED,
240*14675a02SAndroid Build Coastguard Worker               .interrupt_timeout_extended = ProdDiagCode::
241*14675a02SAndroid Build Coastguard Worker                   BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_TIMED_OUT})) {}
242*14675a02SAndroid Build Coastguard Worker 
243*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)244*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
245*14675a02SAndroid Build Coastguard Worker     absl::string_view uri_template) {
246*14675a02SAndroid Build Coastguard Worker   // If the server didn't populate the URI template then we can't support any
247*14675a02SAndroid Build Coastguard Worker   // slice fetch requests.
248*14675a02SAndroid Build Coastguard Worker   if (uri_template.empty()) {
249*14675a02SAndroid Build Coastguard Worker     return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
250*14675a02SAndroid Build Coastguard Worker         &log_manager_);
251*14675a02SAndroid Build Coastguard Worker   }
252*14675a02SAndroid Build Coastguard Worker   return std::make_unique<HttpFederatedSelectExampleIteratorFactory>(
253*14675a02SAndroid Build Coastguard Worker       &log_manager_, &files_, &http_client_, interruptible_runner_.get(),
254*14675a02SAndroid Build Coastguard Worker       uri_template,
255*14675a02SAndroid Build Coastguard Worker       /*bytes_sent_acc=*/bytes_sent_, /*bytes_received_acc=*/bytes_received_,
256*14675a02SAndroid Build Coastguard Worker       network_stopwatch_.get());
257*14675a02SAndroid Build Coastguard Worker }
258*14675a02SAndroid Build Coastguard Worker 
Next()259*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::string> InMemoryFederatedSelectExampleIterator::Next() {
260*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(&mutex_);
261*14675a02SAndroid Build Coastguard Worker 
262*14675a02SAndroid Build Coastguard Worker   if (slices_.empty()) {
263*14675a02SAndroid Build Coastguard Worker     // Eagerly delete the scratch file, since we won't need it anymore.
264*14675a02SAndroid Build Coastguard Worker     std::filesystem::remove(scratch_filename_);
265*14675a02SAndroid Build Coastguard Worker     return absl::OutOfRangeError("end of iterator reached");
266*14675a02SAndroid Build Coastguard Worker   }
267*14675a02SAndroid Build Coastguard Worker 
268*14675a02SAndroid Build Coastguard Worker   absl::Cord& slice_data = slices_.front();
269*14675a02SAndroid Build Coastguard Worker 
270*14675a02SAndroid Build Coastguard Worker   // Write the checkpoint data to the file (truncating any data previously
271*14675a02SAndroid Build Coastguard Worker   // written to the file).
272*14675a02SAndroid Build Coastguard Worker   std::fstream checkpoint_stream(scratch_filename_,
273*14675a02SAndroid Build Coastguard Worker                                  std::ios_base::out | std::ios_base::trunc);
274*14675a02SAndroid Build Coastguard Worker   if (checkpoint_stream.fail()) {
275*14675a02SAndroid Build Coastguard Worker     return absl::InternalError("Failed to write slice to file");
276*14675a02SAndroid Build Coastguard Worker   }
277*14675a02SAndroid Build Coastguard Worker   for (absl::string_view chunk : slice_data.Chunks()) {
278*14675a02SAndroid Build Coastguard Worker     if (!(checkpoint_stream << chunk).good()) {
279*14675a02SAndroid Build Coastguard Worker       return absl::InternalError("Failed to write slice to file");
280*14675a02SAndroid Build Coastguard Worker     }
281*14675a02SAndroid Build Coastguard Worker   }
282*14675a02SAndroid Build Coastguard Worker   checkpoint_stream.close();
283*14675a02SAndroid Build Coastguard Worker 
284*14675a02SAndroid Build Coastguard Worker   // Remove the slice from the deque, releasing its data from memory.
285*14675a02SAndroid Build Coastguard Worker   slices_.pop_front();
286*14675a02SAndroid Build Coastguard Worker 
287*14675a02SAndroid Build Coastguard Worker   return scratch_filename_;
288*14675a02SAndroid Build Coastguard Worker }
289*14675a02SAndroid Build Coastguard Worker 
Close()290*14675a02SAndroid Build Coastguard Worker void InMemoryFederatedSelectExampleIterator::Close() { CleanupInternal(); }
291*14675a02SAndroid Build Coastguard Worker 
292*14675a02SAndroid Build Coastguard Worker InMemoryFederatedSelectExampleIterator::
~InMemoryFederatedSelectExampleIterator()293*14675a02SAndroid Build Coastguard Worker     ~InMemoryFederatedSelectExampleIterator() {
294*14675a02SAndroid Build Coastguard Worker   // Remove the scratch file, even if Close() wasn't called first.
295*14675a02SAndroid Build Coastguard Worker   CleanupInternal();
296*14675a02SAndroid Build Coastguard Worker }
297*14675a02SAndroid Build Coastguard Worker 
CleanupInternal()298*14675a02SAndroid Build Coastguard Worker void InMemoryFederatedSelectExampleIterator::CleanupInternal() {
299*14675a02SAndroid Build Coastguard Worker   absl::MutexLock lock(&mutex_);
300*14675a02SAndroid Build Coastguard Worker   // Remove the scratch filename, if it hadn't been removed yet.
301*14675a02SAndroid Build Coastguard Worker   slices_.clear();
302*14675a02SAndroid Build Coastguard Worker   std::filesystem::remove(scratch_filename_);
303*14675a02SAndroid Build Coastguard Worker }
304*14675a02SAndroid Build Coastguard Worker 
305*14675a02SAndroid Build Coastguard Worker }  // namespace client
306*14675a02SAndroid Build Coastguard Worker }  // namespace fcp
307