xref: /aosp_15_r20/external/federated-compute/fcp/client/federated_select.h (revision 14675a029014e728ec732f129a32e299b2da0601)
1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker  * Copyright 2022 Google LLC
3*14675a02SAndroid Build Coastguard Worker  *
4*14675a02SAndroid Build Coastguard Worker  * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker  * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker  * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker  *
8*14675a02SAndroid Build Coastguard Worker  *      http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker  *
10*14675a02SAndroid Build Coastguard Worker  * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker  * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker  * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker  * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker  */
16*14675a02SAndroid Build Coastguard Worker #ifndef FCP_CLIENT_FEDERATED_SELECT_H_
17*14675a02SAndroid Build Coastguard Worker #define FCP_CLIENT_FEDERATED_SELECT_H_
18*14675a02SAndroid Build Coastguard Worker 
19*14675a02SAndroid Build Coastguard Worker #include <deque>
20*14675a02SAndroid Build Coastguard Worker #include <functional>
21*14675a02SAndroid Build Coastguard Worker #include <memory>
22*14675a02SAndroid Build Coastguard Worker #include <string>
23*14675a02SAndroid Build Coastguard Worker #include <utility>
24*14675a02SAndroid Build Coastguard Worker 
25*14675a02SAndroid Build Coastguard Worker #include "absl/status/statusor.h"
26*14675a02SAndroid Build Coastguard Worker #include "absl/strings/cord.h"
27*14675a02SAndroid Build Coastguard Worker #include "absl/time/time.h"
28*14675a02SAndroid Build Coastguard Worker #include "fcp/base/wall_clock_stopwatch.h"
29*14675a02SAndroid Build Coastguard Worker #include "fcp/client/engine/example_iterator_factory.h"
30*14675a02SAndroid Build Coastguard Worker #include "fcp/client/files.h"
31*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client.h"
32*14675a02SAndroid Build Coastguard Worker #include "fcp/client/interruptible_runner.h"
33*14675a02SAndroid Build Coastguard Worker #include "fcp/client/log_manager.h"
34*14675a02SAndroid Build Coastguard Worker #include "fcp/client/simple_task_environment.h"
35*14675a02SAndroid Build Coastguard Worker #include "fcp/client/stats.h"
36*14675a02SAndroid Build Coastguard Worker #include "fcp/protos/plan.pb.h"
37*14675a02SAndroid Build Coastguard Worker 
38*14675a02SAndroid Build Coastguard Worker namespace fcp {
39*14675a02SAndroid Build Coastguard Worker namespace client {
40*14675a02SAndroid Build Coastguard Worker 
41*14675a02SAndroid Build Coastguard Worker // The example query collection URI via which slice fetch requests will arrive.
42*14675a02SAndroid Build Coastguard Worker inline static constexpr char kFederatedSelectCollectionUri[] =
43*14675a02SAndroid Build Coastguard Worker     "internal:/federated_select";
44*14675a02SAndroid Build Coastguard Worker 
45*14675a02SAndroid Build Coastguard Worker // An interface via which a Federated Select `ExampleIteratorFactory` can be
46*14675a02SAndroid Build Coastguard Worker // created. Each factory is expected to fetch slice data using the given
47*14675a02SAndroid Build Coastguard Worker // `uri_template`, and to then serve the slice data by writing it to a file and
48*14675a02SAndroid Build Coastguard Worker // by then returning that filename as a tf.Example to the plan.
49*14675a02SAndroid Build Coastguard Worker class FederatedSelectManager {
50*14675a02SAndroid Build Coastguard Worker  public:
51*14675a02SAndroid Build Coastguard Worker   virtual std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
52*14675a02SAndroid Build Coastguard Worker   CreateExampleIteratorFactoryForUriTemplate(
53*14675a02SAndroid Build Coastguard Worker       absl::string_view uri_template) = 0;
54*14675a02SAndroid Build Coastguard Worker 
55*14675a02SAndroid Build Coastguard Worker   // The best estimate of the over-the-wire bytes downloaded and uploadeded over
56*14675a02SAndroid Build Coastguard Worker   // the network, and the total duration of wall clock time spent waiting on
57*14675a02SAndroid Build Coastguard Worker   // network requests.
58*14675a02SAndroid Build Coastguard Worker 
59*14675a02SAndroid Build Coastguard Worker   // Note that if two different slice fetches are in flight from different
60*14675a02SAndroid Build Coastguard Worker   // threads, this should measure just the wall clock time spent completing both
61*14675a02SAndroid Build Coastguard Worker   // sets of fetches (i.e. it should not double-count the wall clock time by
62*14675a02SAndroid Build Coastguard Worker   // summing each per-thread duration individually).
63*14675a02SAndroid Build Coastguard Worker   //
64*14675a02SAndroid Build Coastguard Worker   // If possible, this estimate should also include time spent decompressing
65*14675a02SAndroid Build Coastguard Worker   // payloads after reading them from the network.
66*14675a02SAndroid Build Coastguard Worker   virtual NetworkStats GetNetworkStats() = 0;
67*14675a02SAndroid Build Coastguard Worker 
~FederatedSelectManager()68*14675a02SAndroid Build Coastguard Worker   virtual ~FederatedSelectManager() {}
69*14675a02SAndroid Build Coastguard Worker };
70*14675a02SAndroid Build Coastguard Worker 
71*14675a02SAndroid Build Coastguard Worker // An base class for `ExampleIteratorFactory` implementations that can handle
72*14675a02SAndroid Build Coastguard Worker // Federated Select example queries.
73*14675a02SAndroid Build Coastguard Worker class FederatedSelectExampleIteratorFactory
74*14675a02SAndroid Build Coastguard Worker     : public ::fcp::client::engine::ExampleIteratorFactory {
75*14675a02SAndroid Build Coastguard Worker  public:
CanHandle(const::google::internal::federated::plan::ExampleSelector & example_selector)76*14675a02SAndroid Build Coastguard Worker   bool CanHandle(const ::google::internal::federated::plan::ExampleSelector&
77*14675a02SAndroid Build Coastguard Worker                      example_selector) override {
78*14675a02SAndroid Build Coastguard Worker     return example_selector.collection_uri() == kFederatedSelectCollectionUri;
79*14675a02SAndroid Build Coastguard Worker   }
80*14675a02SAndroid Build Coastguard Worker 
ShouldCollectStats()81*14675a02SAndroid Build Coastguard Worker   bool ShouldCollectStats() override {
82*14675a02SAndroid Build Coastguard Worker     // Federated Select example queries should not be recorded in the OpStats
83*14675a02SAndroid Build Coastguard Worker     // DB, since the fact that Federated Select uses the example iterator
84*14675a02SAndroid Build Coastguard Worker     // interface is an internal implementation detail.
85*14675a02SAndroid Build Coastguard Worker     return false;
86*14675a02SAndroid Build Coastguard Worker   }
87*14675a02SAndroid Build Coastguard Worker };
88*14675a02SAndroid Build Coastguard Worker 
89*14675a02SAndroid Build Coastguard Worker class DisabledFederatedSelectManager : public FederatedSelectManager {
90*14675a02SAndroid Build Coastguard Worker  public:
91*14675a02SAndroid Build Coastguard Worker   explicit DisabledFederatedSelectManager(LogManager* log_manager);
92*14675a02SAndroid Build Coastguard Worker 
93*14675a02SAndroid Build Coastguard Worker   std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
94*14675a02SAndroid Build Coastguard Worker   CreateExampleIteratorFactoryForUriTemplate(
95*14675a02SAndroid Build Coastguard Worker       absl::string_view uri_template) override;
96*14675a02SAndroid Build Coastguard Worker 
GetNetworkStats()97*14675a02SAndroid Build Coastguard Worker   NetworkStats GetNetworkStats() override { return NetworkStats(); }
98*14675a02SAndroid Build Coastguard Worker 
99*14675a02SAndroid Build Coastguard Worker  private:
100*14675a02SAndroid Build Coastguard Worker   LogManager& log_manager_;
101*14675a02SAndroid Build Coastguard Worker };
102*14675a02SAndroid Build Coastguard Worker 
103*14675a02SAndroid Build Coastguard Worker // A FederatedSelectManager implementation that actually issues HTTP requests to
104*14675a02SAndroid Build Coastguard Worker // fetch slice data (i.e. the "real" implementation).
105*14675a02SAndroid Build Coastguard Worker class HttpFederatedSelectManager : public FederatedSelectManager {
106*14675a02SAndroid Build Coastguard Worker  public:
107*14675a02SAndroid Build Coastguard Worker   HttpFederatedSelectManager(
108*14675a02SAndroid Build Coastguard Worker       LogManager* log_manager, Files* files,
109*14675a02SAndroid Build Coastguard Worker       fcp::client::http::HttpClient* http_client,
110*14675a02SAndroid Build Coastguard Worker       std::function<bool()> should_abort,
111*14675a02SAndroid Build Coastguard Worker       const InterruptibleRunner::TimingConfig& timing_config);
112*14675a02SAndroid Build Coastguard Worker 
113*14675a02SAndroid Build Coastguard Worker   std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory>
114*14675a02SAndroid Build Coastguard Worker   CreateExampleIteratorFactoryForUriTemplate(
115*14675a02SAndroid Build Coastguard Worker       absl::string_view uri_template) override;
116*14675a02SAndroid Build Coastguard Worker 
GetNetworkStats()117*14675a02SAndroid Build Coastguard Worker   NetworkStats GetNetworkStats() override {
118*14675a02SAndroid Build Coastguard Worker     return {.bytes_downloaded = bytes_received_.load(),
119*14675a02SAndroid Build Coastguard Worker             .bytes_uploaded = bytes_sent_.load(),
120*14675a02SAndroid Build Coastguard Worker             .network_duration = network_stopwatch_->GetTotalDuration()};
121*14675a02SAndroid Build Coastguard Worker   }
122*14675a02SAndroid Build Coastguard Worker 
123*14675a02SAndroid Build Coastguard Worker  private:
124*14675a02SAndroid Build Coastguard Worker   LogManager& log_manager_;
125*14675a02SAndroid Build Coastguard Worker   Files& files_;
126*14675a02SAndroid Build Coastguard Worker   std::atomic<int64_t> bytes_sent_ = 0;
127*14675a02SAndroid Build Coastguard Worker   std::atomic<int64_t> bytes_received_ = 0;
128*14675a02SAndroid Build Coastguard Worker   std::unique_ptr<WallClockStopwatch> network_stopwatch_ =
129*14675a02SAndroid Build Coastguard Worker       WallClockStopwatch::Create();
130*14675a02SAndroid Build Coastguard Worker   fcp::client::http::HttpClient& http_client_;
131*14675a02SAndroid Build Coastguard Worker   std::unique_ptr<InterruptibleRunner> interruptible_runner_;
132*14675a02SAndroid Build Coastguard Worker };
133*14675a02SAndroid Build Coastguard Worker 
134*14675a02SAndroid Build Coastguard Worker // A Federated Select ExampleIterator that simply returns slice data that is
135*14675a02SAndroid Build Coastguard Worker // already in-memory.
136*14675a02SAndroid Build Coastguard Worker class InMemoryFederatedSelectExampleIterator : public ExampleIterator {
137*14675a02SAndroid Build Coastguard Worker  public:
138*14675a02SAndroid Build Coastguard Worker   // Each time another slice is requested by a call to Next(), the slice data at
139*14675a02SAndroid Build Coastguard Worker   // the front of the `slices` deque will be written to the `scratch_filename`
140*14675a02SAndroid Build Coastguard Worker   // and the filename will be returned as the example data. The scratch file
141*14675a02SAndroid Build Coastguard Worker   // will be deleted at the end of the iterator, or when the iterator is closed.
InMemoryFederatedSelectExampleIterator(std::string scratch_filename,std::deque<absl::Cord> slices)142*14675a02SAndroid Build Coastguard Worker   InMemoryFederatedSelectExampleIterator(std::string scratch_filename,
143*14675a02SAndroid Build Coastguard Worker                                          std::deque<absl::Cord> slices)
144*14675a02SAndroid Build Coastguard Worker       : scratch_filename_(scratch_filename), slices_(std::move(slices)) {}
145*14675a02SAndroid Build Coastguard Worker   absl::StatusOr<std::string> Next() override;
146*14675a02SAndroid Build Coastguard Worker   void Close() override;
147*14675a02SAndroid Build Coastguard Worker 
148*14675a02SAndroid Build Coastguard Worker   ~InMemoryFederatedSelectExampleIterator() override;
149*14675a02SAndroid Build Coastguard Worker 
150*14675a02SAndroid Build Coastguard Worker  private:
151*14675a02SAndroid Build Coastguard Worker   void CleanupInternal() ABSL_LOCKS_EXCLUDED(mutex_);
152*14675a02SAndroid Build Coastguard Worker 
153*14675a02SAndroid Build Coastguard Worker   std::string scratch_filename_;
154*14675a02SAndroid Build Coastguard Worker 
155*14675a02SAndroid Build Coastguard Worker   absl::Mutex mutex_;
156*14675a02SAndroid Build Coastguard Worker   std::deque<absl::Cord> slices_ ABSL_GUARDED_BY(mutex_);
157*14675a02SAndroid Build Coastguard Worker };
158*14675a02SAndroid Build Coastguard Worker 
159*14675a02SAndroid Build Coastguard Worker }  // namespace client
160*14675a02SAndroid Build Coastguard Worker }  // namespace fcp
161*14675a02SAndroid Build Coastguard Worker 
162*14675a02SAndroid Build Coastguard Worker #endif  // FCP_CLIENT_FEDERATED_SELECT_H_
163