1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker * Copyright 2022 Google LLC
3*14675a02SAndroid Build Coastguard Worker *
4*14675a02SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker *
8*14675a02SAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker *
10*14675a02SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker */
16*14675a02SAndroid Build Coastguard Worker #include "fcp/client/federated_select.h"
17*14675a02SAndroid Build Coastguard Worker
18*14675a02SAndroid Build Coastguard Worker #include <deque>
19*14675a02SAndroid Build Coastguard Worker #include <filesystem>
20*14675a02SAndroid Build Coastguard Worker #include <fstream>
21*14675a02SAndroid Build Coastguard Worker #include <functional>
22*14675a02SAndroid Build Coastguard Worker #include <ios>
23*14675a02SAndroid Build Coastguard Worker #include <memory>
24*14675a02SAndroid Build Coastguard Worker #include <string>
25*14675a02SAndroid Build Coastguard Worker #include <utility>
26*14675a02SAndroid Build Coastguard Worker #include <vector>
27*14675a02SAndroid Build Coastguard Worker
28*14675a02SAndroid Build Coastguard Worker #include "google/protobuf/any.pb.h"
29*14675a02SAndroid Build Coastguard Worker #include "absl/status/statusor.h"
30*14675a02SAndroid Build Coastguard Worker #include "absl/strings/cord.h"
31*14675a02SAndroid Build Coastguard Worker #include "absl/strings/str_replace.h"
32*14675a02SAndroid Build Coastguard Worker #include "fcp/base/monitoring.h"
33*14675a02SAndroid Build Coastguard Worker #include "fcp/base/wall_clock_stopwatch.h"
34*14675a02SAndroid Build Coastguard Worker #include "fcp/client/diag_codes.pb.h"
35*14675a02SAndroid Build Coastguard Worker #include "fcp/client/engine/example_iterator_factory.h"
36*14675a02SAndroid Build Coastguard Worker #include "fcp/client/files.h"
37*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client.h"
38*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client_util.h"
39*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/in_memory_request_response.h"
40*14675a02SAndroid Build Coastguard Worker #include "fcp/client/interruptible_runner.h"
41*14675a02SAndroid Build Coastguard Worker #include "fcp/client/log_manager.h"
42*14675a02SAndroid Build Coastguard Worker #include "fcp/client/stats.h"
43*14675a02SAndroid Build Coastguard Worker #include "fcp/protos/plan.pb.h"
44*14675a02SAndroid Build Coastguard Worker
45*14675a02SAndroid Build Coastguard Worker namespace fcp {
46*14675a02SAndroid Build Coastguard Worker namespace client {
47*14675a02SAndroid Build Coastguard Worker
48*14675a02SAndroid Build Coastguard Worker using fcp::client::http::HttpClient;
49*14675a02SAndroid Build Coastguard Worker using fcp::client::http::InMemoryHttpResponse;
50*14675a02SAndroid Build Coastguard Worker using fcp::client::http::UriOrInlineData;
51*14675a02SAndroid Build Coastguard Worker using ::google::internal::federated::plan::SlicesSelector;
52*14675a02SAndroid Build Coastguard Worker
53*14675a02SAndroid Build Coastguard Worker namespace {
54*14675a02SAndroid Build Coastguard Worker
55*14675a02SAndroid Build Coastguard Worker // A Federated Select `ExampleIteratorFactory` that fails all queries.
56*14675a02SAndroid Build Coastguard Worker class DisabledFederatedSelectExampleIteratorFactory
57*14675a02SAndroid Build Coastguard Worker : public FederatedSelectExampleIteratorFactory {
58*14675a02SAndroid Build Coastguard Worker public:
DisabledFederatedSelectExampleIteratorFactory(LogManager * log_manager)59*14675a02SAndroid Build Coastguard Worker explicit DisabledFederatedSelectExampleIteratorFactory(
60*14675a02SAndroid Build Coastguard Worker LogManager* log_manager)
61*14675a02SAndroid Build Coastguard Worker : log_manager_(*log_manager) {}
62*14675a02SAndroid Build Coastguard Worker
63*14675a02SAndroid Build Coastguard Worker // Will fetch the slice data via HTTP and return an error if any of the
64*14675a02SAndroid Build Coastguard Worker // slice fetch requests failed.
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)65*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
66*14675a02SAndroid Build Coastguard Worker const ::google::internal::federated::plan::ExampleSelector&
67*14675a02SAndroid Build Coastguard Worker example_selector) override {
68*14675a02SAndroid Build Coastguard Worker log_manager_.LogDiag(
69*14675a02SAndroid Build Coastguard Worker ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED_BUT_DISABLED);
70*14675a02SAndroid Build Coastguard Worker return absl::InvalidArgumentError("Federated Select is disabled.");
71*14675a02SAndroid Build Coastguard Worker }
72*14675a02SAndroid Build Coastguard Worker
73*14675a02SAndroid Build Coastguard Worker private:
74*14675a02SAndroid Build Coastguard Worker LogManager& log_manager_;
75*14675a02SAndroid Build Coastguard Worker };
76*14675a02SAndroid Build Coastguard Worker
FetchSlicesViaHttp(const SlicesSelector & slices_selector,absl::string_view uri_template,HttpClient & http_client,InterruptibleRunner & interruptible_runner,int64_t * bytes_received_acc,int64_t * bytes_sent_acc)77*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::deque<absl::Cord>> FetchSlicesViaHttp(
78*14675a02SAndroid Build Coastguard Worker const SlicesSelector& slices_selector, absl::string_view uri_template,
79*14675a02SAndroid Build Coastguard Worker HttpClient& http_client, InterruptibleRunner& interruptible_runner,
80*14675a02SAndroid Build Coastguard Worker int64_t* bytes_received_acc, int64_t* bytes_sent_acc) {
81*14675a02SAndroid Build Coastguard Worker std::vector<UriOrInlineData> resources;
82*14675a02SAndroid Build Coastguard Worker for (int32_t slice_key : slices_selector.keys()) {
83*14675a02SAndroid Build Coastguard Worker std::string slice_uri = absl::StrReplaceAll(
84*14675a02SAndroid Build Coastguard Worker // Note that `served_at_id` is documented to not require URL-escaping,
85*14675a02SAndroid Build Coastguard Worker // so we don't apply any here.
86*14675a02SAndroid Build Coastguard Worker uri_template, {{"{served_at_id}", slices_selector.served_at_id()},
87*14675a02SAndroid Build Coastguard Worker {"{key_base10}", absl::StrCat(slice_key)}});
88*14675a02SAndroid Build Coastguard Worker
89*14675a02SAndroid Build Coastguard Worker resources.push_back(
90*14675a02SAndroid Build Coastguard Worker UriOrInlineData::CreateUri(slice_uri, "", absl::ZeroDuration()));
91*14675a02SAndroid Build Coastguard Worker }
92*14675a02SAndroid Build Coastguard Worker
93*14675a02SAndroid Build Coastguard Worker // Perform the requests.
94*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::vector<absl::StatusOr<InMemoryHttpResponse>>>
95*14675a02SAndroid Build Coastguard Worker slice_fetch_result = http::FetchResourcesInMemory(
96*14675a02SAndroid Build Coastguard Worker http_client, interruptible_runner, std::move(resources),
97*14675a02SAndroid Build Coastguard Worker bytes_received_acc, bytes_sent_acc,
98*14675a02SAndroid Build Coastguard Worker // TODO(team): Enable caching for federated select slices.
99*14675a02SAndroid Build Coastguard Worker /*resource_cache=*/nullptr);
100*14675a02SAndroid Build Coastguard Worker
101*14675a02SAndroid Build Coastguard Worker // Check whether issuing the requests failed as a whole (generally indicating
102*14675a02SAndroid Build Coastguard Worker // a programming error).
103*14675a02SAndroid Build Coastguard Worker if (!slice_fetch_result.ok()) {
104*14675a02SAndroid Build Coastguard Worker return absl::InternalError(absl::StrCat(
105*14675a02SAndroid Build Coastguard Worker "Failed to perform HTTP requests (URI template: ", uri_template,
106*14675a02SAndroid Build Coastguard Worker "): ", absl::StatusCodeToString(slice_fetch_result.status().code())));
107*14675a02SAndroid Build Coastguard Worker }
108*14675a02SAndroid Build Coastguard Worker
109*14675a02SAndroid Build Coastguard Worker std::deque<absl::Cord> slices;
110*14675a02SAndroid Build Coastguard Worker for (const absl::StatusOr<InMemoryHttpResponse>& http_response :
111*14675a02SAndroid Build Coastguard Worker *slice_fetch_result) {
112*14675a02SAndroid Build Coastguard Worker if (!http_response.ok()) {
113*14675a02SAndroid Build Coastguard Worker return absl::UnavailableError(absl::StrCat(
114*14675a02SAndroid Build Coastguard Worker "Slice fetch request failed (URI template: ", uri_template,
115*14675a02SAndroid Build Coastguard Worker "): ", absl::StatusCodeToString(http_response.status().code())));
116*14675a02SAndroid Build Coastguard Worker }
117*14675a02SAndroid Build Coastguard Worker slices.push_back(http_response->body);
118*14675a02SAndroid Build Coastguard Worker }
119*14675a02SAndroid Build Coastguard Worker return slices;
120*14675a02SAndroid Build Coastguard Worker }
121*14675a02SAndroid Build Coastguard Worker
122*14675a02SAndroid Build Coastguard Worker // A Federated Select `ExampleIteratorFactory` that, upon creation of an
123*14675a02SAndroid Build Coastguard Worker // iterator, fetches the slice data via HTTP, buffers it in-memory, and then
124*14675a02SAndroid Build Coastguard Worker // exposes it to the plan via an `InMemoryFederatedSelectExampleIterator`.
125*14675a02SAndroid Build Coastguard Worker class HttpFederatedSelectExampleIteratorFactory
126*14675a02SAndroid Build Coastguard Worker : public FederatedSelectExampleIteratorFactory {
127*14675a02SAndroid Build Coastguard Worker public:
HttpFederatedSelectExampleIteratorFactory(LogManager * log_manager,Files * files,HttpClient * http_client,InterruptibleRunner * interruptible_runner,absl::string_view uri_template,std::atomic<int64_t> & bytes_sent_acc,std::atomic<int64_t> & bytes_received_acc,WallClockStopwatch * network_stopwatch)128*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectExampleIteratorFactory(
129*14675a02SAndroid Build Coastguard Worker LogManager* log_manager, Files* files, HttpClient* http_client,
130*14675a02SAndroid Build Coastguard Worker InterruptibleRunner* interruptible_runner, absl::string_view uri_template,
131*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t>& bytes_sent_acc,
132*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t>& bytes_received_acc,
133*14675a02SAndroid Build Coastguard Worker WallClockStopwatch* network_stopwatch)
134*14675a02SAndroid Build Coastguard Worker : log_manager_(*log_manager),
135*14675a02SAndroid Build Coastguard Worker files_(*files),
136*14675a02SAndroid Build Coastguard Worker http_client_(*http_client),
137*14675a02SAndroid Build Coastguard Worker interruptible_runner_(*interruptible_runner),
138*14675a02SAndroid Build Coastguard Worker uri_template_(uri_template),
139*14675a02SAndroid Build Coastguard Worker bytes_sent_acc_(bytes_sent_acc),
140*14675a02SAndroid Build Coastguard Worker bytes_received_acc_(bytes_received_acc),
141*14675a02SAndroid Build Coastguard Worker network_stopwatch_(*network_stopwatch) {}
142*14675a02SAndroid Build Coastguard Worker
143*14675a02SAndroid Build Coastguard Worker // Will fetch the slice data via HTTP and return an error if any of the slice
144*14675a02SAndroid Build Coastguard Worker // fetch requests failed.
145*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::unique_ptr<ExampleIterator>> CreateExampleIterator(
146*14675a02SAndroid Build Coastguard Worker const ::google::internal::federated::plan::ExampleSelector&
147*14675a02SAndroid Build Coastguard Worker example_selector) override;
148*14675a02SAndroid Build Coastguard Worker
149*14675a02SAndroid Build Coastguard Worker private:
150*14675a02SAndroid Build Coastguard Worker LogManager& log_manager_;
151*14675a02SAndroid Build Coastguard Worker Files& files_;
152*14675a02SAndroid Build Coastguard Worker HttpClient& http_client_;
153*14675a02SAndroid Build Coastguard Worker InterruptibleRunner& interruptible_runner_;
154*14675a02SAndroid Build Coastguard Worker std::string uri_template_;
155*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t>& bytes_sent_acc_;
156*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t>& bytes_received_acc_;
157*14675a02SAndroid Build Coastguard Worker WallClockStopwatch& network_stopwatch_;
158*14675a02SAndroid Build Coastguard Worker };
159*14675a02SAndroid Build Coastguard Worker
160*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::unique_ptr<ExampleIterator>>
CreateExampleIterator(const::google::internal::federated::plan::ExampleSelector & example_selector)161*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectExampleIteratorFactory::CreateExampleIterator(
162*14675a02SAndroid Build Coastguard Worker const ::google::internal::federated::plan::ExampleSelector&
163*14675a02SAndroid Build Coastguard Worker example_selector) {
164*14675a02SAndroid Build Coastguard Worker SlicesSelector slices_selector;
165*14675a02SAndroid Build Coastguard Worker if (!example_selector.criteria().UnpackTo(&slices_selector)) {
166*14675a02SAndroid Build Coastguard Worker return absl::InvalidArgumentError(
167*14675a02SAndroid Build Coastguard Worker absl::StrCat("Unexpected/unparseable selection criteria: ",
168*14675a02SAndroid Build Coastguard Worker example_selector.criteria().GetTypeName()));
169*14675a02SAndroid Build Coastguard Worker }
170*14675a02SAndroid Build Coastguard Worker
171*14675a02SAndroid Build Coastguard Worker log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_REQUESTED);
172*14675a02SAndroid Build Coastguard Worker
173*14675a02SAndroid Build Coastguard Worker // Create the temporary scratch file to store the checkpoint data in.
174*14675a02SAndroid Build Coastguard Worker // Deletion of the file is done in the
175*14675a02SAndroid Build Coastguard Worker // InMemoryFederatedSelectExampleIterator::Close() method or its destructor.
176*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::string> scratch_filename =
177*14675a02SAndroid Build Coastguard Worker files_.CreateTempFile("slice", ".ckp");
178*14675a02SAndroid Build Coastguard Worker
179*14675a02SAndroid Build Coastguard Worker if (!scratch_filename.ok()) {
180*14675a02SAndroid Build Coastguard Worker return absl::InternalError(absl::StrCat(
181*14675a02SAndroid Build Coastguard Worker "Failed to create scratch file for slice data (URI template: ",
182*14675a02SAndroid Build Coastguard Worker uri_template_,
183*14675a02SAndroid Build Coastguard Worker "): ", absl::StatusCodeToString(scratch_filename.status().code()), ": ",
184*14675a02SAndroid Build Coastguard Worker scratch_filename.status().message()));
185*14675a02SAndroid Build Coastguard Worker }
186*14675a02SAndroid Build Coastguard Worker
187*14675a02SAndroid Build Coastguard Worker // Fetch the slices.
188*14675a02SAndroid Build Coastguard Worker int64_t bytes_received = 0;
189*14675a02SAndroid Build Coastguard Worker int64_t bytes_sent = 0;
190*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::deque<absl::Cord>> slices;
191*14675a02SAndroid Build Coastguard Worker {
192*14675a02SAndroid Build Coastguard Worker auto started_stopwatch = network_stopwatch_.Start();
193*14675a02SAndroid Build Coastguard Worker slices = FetchSlicesViaHttp(slices_selector, uri_template_, http_client_,
194*14675a02SAndroid Build Coastguard Worker interruptible_runner_,
195*14675a02SAndroid Build Coastguard Worker /*bytes_received_acc=*/&bytes_received,
196*14675a02SAndroid Build Coastguard Worker /*bytes_sent_acc=*/&bytes_sent);
197*14675a02SAndroid Build Coastguard Worker }
198*14675a02SAndroid Build Coastguard Worker bytes_sent_acc_ += bytes_sent;
199*14675a02SAndroid Build Coastguard Worker bytes_received_acc_ += bytes_received;
200*14675a02SAndroid Build Coastguard Worker if (!slices.ok()) {
201*14675a02SAndroid Build Coastguard Worker log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_FAILED);
202*14675a02SAndroid Build Coastguard Worker return absl::Status(slices.status().code(),
203*14675a02SAndroid Build Coastguard Worker absl::StrCat("Failed to fetch slice data: ",
204*14675a02SAndroid Build Coastguard Worker slices.status().message()));
205*14675a02SAndroid Build Coastguard Worker }
206*14675a02SAndroid Build Coastguard Worker log_manager_.LogDiag(ProdDiagCode::FEDSELECT_SLICE_HTTP_FETCH_SUCCEEDED);
207*14675a02SAndroid Build Coastguard Worker
208*14675a02SAndroid Build Coastguard Worker return std::make_unique<InMemoryFederatedSelectExampleIterator>(
209*14675a02SAndroid Build Coastguard Worker *scratch_filename, std::move(*slices));
210*14675a02SAndroid Build Coastguard Worker }
211*14675a02SAndroid Build Coastguard Worker } // namespace
212*14675a02SAndroid Build Coastguard Worker
DisabledFederatedSelectManager(LogManager * log_manager)213*14675a02SAndroid Build Coastguard Worker DisabledFederatedSelectManager::DisabledFederatedSelectManager(
214*14675a02SAndroid Build Coastguard Worker LogManager* log_manager)
215*14675a02SAndroid Build Coastguard Worker : log_manager_(*log_manager) {}
216*14675a02SAndroid Build Coastguard Worker
217*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)218*14675a02SAndroid Build Coastguard Worker DisabledFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
219*14675a02SAndroid Build Coastguard Worker absl::string_view uri_template) {
220*14675a02SAndroid Build Coastguard Worker return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
221*14675a02SAndroid Build Coastguard Worker &log_manager_);
222*14675a02SAndroid Build Coastguard Worker }
223*14675a02SAndroid Build Coastguard Worker
HttpFederatedSelectManager(LogManager * log_manager,Files * files,fcp::client::http::HttpClient * http_client,std::function<bool ()> should_abort,const InterruptibleRunner::TimingConfig & timing_config)224*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectManager::HttpFederatedSelectManager(
225*14675a02SAndroid Build Coastguard Worker LogManager* log_manager, Files* files,
226*14675a02SAndroid Build Coastguard Worker fcp::client::http::HttpClient* http_client,
227*14675a02SAndroid Build Coastguard Worker std::function<bool()> should_abort,
228*14675a02SAndroid Build Coastguard Worker const InterruptibleRunner::TimingConfig& timing_config)
229*14675a02SAndroid Build Coastguard Worker : log_manager_(*log_manager),
230*14675a02SAndroid Build Coastguard Worker files_(*files),
231*14675a02SAndroid Build Coastguard Worker http_client_(*http_client),
232*14675a02SAndroid Build Coastguard Worker interruptible_runner_(std::make_unique<InterruptibleRunner>(
233*14675a02SAndroid Build Coastguard Worker log_manager, should_abort, timing_config,
234*14675a02SAndroid Build Coastguard Worker InterruptibleRunner::DiagnosticsConfig{
235*14675a02SAndroid Build Coastguard Worker .interrupted = ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP,
236*14675a02SAndroid Build Coastguard Worker .interrupt_timeout =
237*14675a02SAndroid Build Coastguard Worker ProdDiagCode::BACKGROUND_TRAINING_INTERRUPT_HTTP_TIMED_OUT,
238*14675a02SAndroid Build Coastguard Worker .interrupted_extended = ProdDiagCode::
239*14675a02SAndroid Build Coastguard Worker BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_COMPLETED,
240*14675a02SAndroid Build Coastguard Worker .interrupt_timeout_extended = ProdDiagCode::
241*14675a02SAndroid Build Coastguard Worker BACKGROUND_TRAINING_INTERRUPT_HTTP_EXTENDED_TIMED_OUT})) {}
242*14675a02SAndroid Build Coastguard Worker
243*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
CreateExampleIteratorFactoryForUriTemplate(absl::string_view uri_template)244*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectManager::CreateExampleIteratorFactoryForUriTemplate(
245*14675a02SAndroid Build Coastguard Worker absl::string_view uri_template) {
246*14675a02SAndroid Build Coastguard Worker // If the server didn't populate the URI template then we can't support any
247*14675a02SAndroid Build Coastguard Worker // slice fetch requests.
248*14675a02SAndroid Build Coastguard Worker if (uri_template.empty()) {
249*14675a02SAndroid Build Coastguard Worker return std::make_unique<DisabledFederatedSelectExampleIteratorFactory>(
250*14675a02SAndroid Build Coastguard Worker &log_manager_);
251*14675a02SAndroid Build Coastguard Worker }
252*14675a02SAndroid Build Coastguard Worker return std::make_unique<HttpFederatedSelectExampleIteratorFactory>(
253*14675a02SAndroid Build Coastguard Worker &log_manager_, &files_, &http_client_, interruptible_runner_.get(),
254*14675a02SAndroid Build Coastguard Worker uri_template,
255*14675a02SAndroid Build Coastguard Worker /*bytes_sent_acc=*/bytes_sent_, /*bytes_received_acc=*/bytes_received_,
256*14675a02SAndroid Build Coastguard Worker network_stopwatch_.get());
257*14675a02SAndroid Build Coastguard Worker }
258*14675a02SAndroid Build Coastguard Worker
Next()259*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::string> InMemoryFederatedSelectExampleIterator::Next() {
260*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(&mutex_);
261*14675a02SAndroid Build Coastguard Worker
262*14675a02SAndroid Build Coastguard Worker if (slices_.empty()) {
263*14675a02SAndroid Build Coastguard Worker // Eagerly delete the scratch file, since we won't need it anymore.
264*14675a02SAndroid Build Coastguard Worker std::filesystem::remove(scratch_filename_);
265*14675a02SAndroid Build Coastguard Worker return absl::OutOfRangeError("end of iterator reached");
266*14675a02SAndroid Build Coastguard Worker }
267*14675a02SAndroid Build Coastguard Worker
268*14675a02SAndroid Build Coastguard Worker absl::Cord& slice_data = slices_.front();
269*14675a02SAndroid Build Coastguard Worker
270*14675a02SAndroid Build Coastguard Worker // Write the checkpoint data to the file (truncating any data previously
271*14675a02SAndroid Build Coastguard Worker // written to the file).
272*14675a02SAndroid Build Coastguard Worker std::fstream checkpoint_stream(scratch_filename_,
273*14675a02SAndroid Build Coastguard Worker std::ios_base::out | std::ios_base::trunc);
274*14675a02SAndroid Build Coastguard Worker if (checkpoint_stream.fail()) {
275*14675a02SAndroid Build Coastguard Worker return absl::InternalError("Failed to write slice to file");
276*14675a02SAndroid Build Coastguard Worker }
277*14675a02SAndroid Build Coastguard Worker for (absl::string_view chunk : slice_data.Chunks()) {
278*14675a02SAndroid Build Coastguard Worker if (!(checkpoint_stream << chunk).good()) {
279*14675a02SAndroid Build Coastguard Worker return absl::InternalError("Failed to write slice to file");
280*14675a02SAndroid Build Coastguard Worker }
281*14675a02SAndroid Build Coastguard Worker }
282*14675a02SAndroid Build Coastguard Worker checkpoint_stream.close();
283*14675a02SAndroid Build Coastguard Worker
284*14675a02SAndroid Build Coastguard Worker // Remove the slice from the deque, releasing its data from memory.
285*14675a02SAndroid Build Coastguard Worker slices_.pop_front();
286*14675a02SAndroid Build Coastguard Worker
287*14675a02SAndroid Build Coastguard Worker return scratch_filename_;
288*14675a02SAndroid Build Coastguard Worker }
289*14675a02SAndroid Build Coastguard Worker
Close()290*14675a02SAndroid Build Coastguard Worker void InMemoryFederatedSelectExampleIterator::Close() { CleanupInternal(); }
291*14675a02SAndroid Build Coastguard Worker
292*14675a02SAndroid Build Coastguard Worker InMemoryFederatedSelectExampleIterator::
~InMemoryFederatedSelectExampleIterator()293*14675a02SAndroid Build Coastguard Worker ~InMemoryFederatedSelectExampleIterator() {
294*14675a02SAndroid Build Coastguard Worker // Remove the scratch file, even if Close() wasn't called first.
295*14675a02SAndroid Build Coastguard Worker CleanupInternal();
296*14675a02SAndroid Build Coastguard Worker }
297*14675a02SAndroid Build Coastguard Worker
CleanupInternal()298*14675a02SAndroid Build Coastguard Worker void InMemoryFederatedSelectExampleIterator::CleanupInternal() {
299*14675a02SAndroid Build Coastguard Worker absl::MutexLock lock(&mutex_);
300*14675a02SAndroid Build Coastguard Worker // Remove the scratch filename, if it hadn't been removed yet.
301*14675a02SAndroid Build Coastguard Worker slices_.clear();
302*14675a02SAndroid Build Coastguard Worker std::filesystem::remove(scratch_filename_);
303*14675a02SAndroid Build Coastguard Worker }
304*14675a02SAndroid Build Coastguard Worker
305*14675a02SAndroid Build Coastguard Worker } // namespace client
306*14675a02SAndroid Build Coastguard Worker } // namespace fcp
307