1*14675a02SAndroid Build Coastguard Worker /* 2*14675a02SAndroid Build Coastguard Worker * Copyright 2022 Google LLC 3*14675a02SAndroid Build Coastguard Worker * 4*14675a02SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License"); 5*14675a02SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License. 6*14675a02SAndroid Build Coastguard Worker * You may obtain a copy of the License at 7*14675a02SAndroid Build Coastguard Worker * 8*14675a02SAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0 9*14675a02SAndroid Build Coastguard Worker * 10*14675a02SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software 11*14675a02SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS, 12*14675a02SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*14675a02SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and 14*14675a02SAndroid Build Coastguard Worker * limitations under the License. 15*14675a02SAndroid Build Coastguard Worker */ 16*14675a02SAndroid Build Coastguard Worker #ifndef FCP_CLIENT_FEDERATED_SELECT_H_ 17*14675a02SAndroid Build Coastguard Worker #define FCP_CLIENT_FEDERATED_SELECT_H_ 18*14675a02SAndroid Build Coastguard Worker 19*14675a02SAndroid Build Coastguard Worker #include <deque> 20*14675a02SAndroid Build Coastguard Worker #include <functional> 21*14675a02SAndroid Build Coastguard Worker #include <memory> 22*14675a02SAndroid Build Coastguard Worker #include <string> 23*14675a02SAndroid Build Coastguard Worker #include <utility> 24*14675a02SAndroid Build Coastguard Worker 25*14675a02SAndroid Build Coastguard Worker #include "absl/status/statusor.h" 26*14675a02SAndroid Build Coastguard Worker #include "absl/strings/cord.h" 27*14675a02SAndroid Build Coastguard Worker #include "absl/time/time.h" 28*14675a02SAndroid Build Coastguard Worker #include "fcp/base/wall_clock_stopwatch.h" 29*14675a02SAndroid Build Coastguard Worker #include "fcp/client/engine/example_iterator_factory.h" 30*14675a02SAndroid Build Coastguard Worker #include "fcp/client/files.h" 31*14675a02SAndroid Build Coastguard Worker #include "fcp/client/http/http_client.h" 32*14675a02SAndroid Build Coastguard Worker #include "fcp/client/interruptible_runner.h" 33*14675a02SAndroid Build Coastguard Worker #include "fcp/client/log_manager.h" 34*14675a02SAndroid Build Coastguard Worker #include "fcp/client/simple_task_environment.h" 35*14675a02SAndroid Build Coastguard Worker #include "fcp/client/stats.h" 36*14675a02SAndroid Build Coastguard Worker #include "fcp/protos/plan.pb.h" 37*14675a02SAndroid Build Coastguard Worker 38*14675a02SAndroid Build Coastguard Worker namespace fcp { 39*14675a02SAndroid Build Coastguard Worker namespace client { 40*14675a02SAndroid Build Coastguard Worker 41*14675a02SAndroid Build Coastguard Worker // The example query collection URI via which slice fetch requests will arrive. 42*14675a02SAndroid Build Coastguard Worker inline static constexpr char kFederatedSelectCollectionUri[] = 43*14675a02SAndroid Build Coastguard Worker "internal:/federated_select"; 44*14675a02SAndroid Build Coastguard Worker 45*14675a02SAndroid Build Coastguard Worker // An interface via which a Federated Select `ExampleIteratorFactory` can be 46*14675a02SAndroid Build Coastguard Worker // created. Each factory is expected to fetch slice data using the given 47*14675a02SAndroid Build Coastguard Worker // `uri_template`, and to then serve the slice data by writing it to a file and 48*14675a02SAndroid Build Coastguard Worker // by then returning that filename as a tf.Example to the plan. 49*14675a02SAndroid Build Coastguard Worker class FederatedSelectManager { 50*14675a02SAndroid Build Coastguard Worker public: 51*14675a02SAndroid Build Coastguard Worker virtual std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 52*14675a02SAndroid Build Coastguard Worker CreateExampleIteratorFactoryForUriTemplate( 53*14675a02SAndroid Build Coastguard Worker absl::string_view uri_template) = 0; 54*14675a02SAndroid Build Coastguard Worker 55*14675a02SAndroid Build Coastguard Worker // The best estimate of the over-the-wire bytes downloaded and uploadeded over 56*14675a02SAndroid Build Coastguard Worker // the network, and the total duration of wall clock time spent waiting on 57*14675a02SAndroid Build Coastguard Worker // network requests. 58*14675a02SAndroid Build Coastguard Worker 59*14675a02SAndroid Build Coastguard Worker // Note that if two different slice fetches are in flight from different 60*14675a02SAndroid Build Coastguard Worker // threads, this should measure just the wall clock time spent completing both 61*14675a02SAndroid Build Coastguard Worker // sets of fetches (i.e. it should not double-count the wall clock time by 62*14675a02SAndroid Build Coastguard Worker // summing each per-thread duration individually). 63*14675a02SAndroid Build Coastguard Worker // 64*14675a02SAndroid Build Coastguard Worker // If possible, this estimate should also include time spent decompressing 65*14675a02SAndroid Build Coastguard Worker // payloads after reading them from the network. 66*14675a02SAndroid Build Coastguard Worker virtual NetworkStats GetNetworkStats() = 0; 67*14675a02SAndroid Build Coastguard Worker ~FederatedSelectManager()68*14675a02SAndroid Build Coastguard Worker virtual ~FederatedSelectManager() {} 69*14675a02SAndroid Build Coastguard Worker }; 70*14675a02SAndroid Build Coastguard Worker 71*14675a02SAndroid Build Coastguard Worker // An base class for `ExampleIteratorFactory` implementations that can handle 72*14675a02SAndroid Build Coastguard Worker // Federated Select example queries. 73*14675a02SAndroid Build Coastguard Worker class FederatedSelectExampleIteratorFactory 74*14675a02SAndroid Build Coastguard Worker : public ::fcp::client::engine::ExampleIteratorFactory { 75*14675a02SAndroid Build Coastguard Worker public: CanHandle(const::google::internal::federated::plan::ExampleSelector & example_selector)76*14675a02SAndroid Build Coastguard Worker bool CanHandle(const ::google::internal::federated::plan::ExampleSelector& 77*14675a02SAndroid Build Coastguard Worker example_selector) override { 78*14675a02SAndroid Build Coastguard Worker return example_selector.collection_uri() == kFederatedSelectCollectionUri; 79*14675a02SAndroid Build Coastguard Worker } 80*14675a02SAndroid Build Coastguard Worker ShouldCollectStats()81*14675a02SAndroid Build Coastguard Worker bool ShouldCollectStats() override { 82*14675a02SAndroid Build Coastguard Worker // Federated Select example queries should not be recorded in the OpStats 83*14675a02SAndroid Build Coastguard Worker // DB, since the fact that Federated Select uses the example iterator 84*14675a02SAndroid Build Coastguard Worker // interface is an internal implementation detail. 85*14675a02SAndroid Build Coastguard Worker return false; 86*14675a02SAndroid Build Coastguard Worker } 87*14675a02SAndroid Build Coastguard Worker }; 88*14675a02SAndroid Build Coastguard Worker 89*14675a02SAndroid Build Coastguard Worker class DisabledFederatedSelectManager : public FederatedSelectManager { 90*14675a02SAndroid Build Coastguard Worker public: 91*14675a02SAndroid Build Coastguard Worker explicit DisabledFederatedSelectManager(LogManager* log_manager); 92*14675a02SAndroid Build Coastguard Worker 93*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 94*14675a02SAndroid Build Coastguard Worker CreateExampleIteratorFactoryForUriTemplate( 95*14675a02SAndroid Build Coastguard Worker absl::string_view uri_template) override; 96*14675a02SAndroid Build Coastguard Worker GetNetworkStats()97*14675a02SAndroid Build Coastguard Worker NetworkStats GetNetworkStats() override { return NetworkStats(); } 98*14675a02SAndroid Build Coastguard Worker 99*14675a02SAndroid Build Coastguard Worker private: 100*14675a02SAndroid Build Coastguard Worker LogManager& log_manager_; 101*14675a02SAndroid Build Coastguard Worker }; 102*14675a02SAndroid Build Coastguard Worker 103*14675a02SAndroid Build Coastguard Worker // A FederatedSelectManager implementation that actually issues HTTP requests to 104*14675a02SAndroid Build Coastguard Worker // fetch slice data (i.e. the "real" implementation). 105*14675a02SAndroid Build Coastguard Worker class HttpFederatedSelectManager : public FederatedSelectManager { 106*14675a02SAndroid Build Coastguard Worker public: 107*14675a02SAndroid Build Coastguard Worker HttpFederatedSelectManager( 108*14675a02SAndroid Build Coastguard Worker LogManager* log_manager, Files* files, 109*14675a02SAndroid Build Coastguard Worker fcp::client::http::HttpClient* http_client, 110*14675a02SAndroid Build Coastguard Worker std::function<bool()> should_abort, 111*14675a02SAndroid Build Coastguard Worker const InterruptibleRunner::TimingConfig& timing_config); 112*14675a02SAndroid Build Coastguard Worker 113*14675a02SAndroid Build Coastguard Worker std::unique_ptr<::fcp::client::engine::ExampleIteratorFactory> 114*14675a02SAndroid Build Coastguard Worker CreateExampleIteratorFactoryForUriTemplate( 115*14675a02SAndroid Build Coastguard Worker absl::string_view uri_template) override; 116*14675a02SAndroid Build Coastguard Worker GetNetworkStats()117*14675a02SAndroid Build Coastguard Worker NetworkStats GetNetworkStats() override { 118*14675a02SAndroid Build Coastguard Worker return {.bytes_downloaded = bytes_received_.load(), 119*14675a02SAndroid Build Coastguard Worker .bytes_uploaded = bytes_sent_.load(), 120*14675a02SAndroid Build Coastguard Worker .network_duration = network_stopwatch_->GetTotalDuration()}; 121*14675a02SAndroid Build Coastguard Worker } 122*14675a02SAndroid Build Coastguard Worker 123*14675a02SAndroid Build Coastguard Worker private: 124*14675a02SAndroid Build Coastguard Worker LogManager& log_manager_; 125*14675a02SAndroid Build Coastguard Worker Files& files_; 126*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t> bytes_sent_ = 0; 127*14675a02SAndroid Build Coastguard Worker std::atomic<int64_t> bytes_received_ = 0; 128*14675a02SAndroid Build Coastguard Worker std::unique_ptr<WallClockStopwatch> network_stopwatch_ = 129*14675a02SAndroid Build Coastguard Worker WallClockStopwatch::Create(); 130*14675a02SAndroid Build Coastguard Worker fcp::client::http::HttpClient& http_client_; 131*14675a02SAndroid Build Coastguard Worker std::unique_ptr<InterruptibleRunner> interruptible_runner_; 132*14675a02SAndroid Build Coastguard Worker }; 133*14675a02SAndroid Build Coastguard Worker 134*14675a02SAndroid Build Coastguard Worker // A Federated Select ExampleIterator that simply returns slice data that is 135*14675a02SAndroid Build Coastguard Worker // already in-memory. 136*14675a02SAndroid Build Coastguard Worker class InMemoryFederatedSelectExampleIterator : public ExampleIterator { 137*14675a02SAndroid Build Coastguard Worker public: 138*14675a02SAndroid Build Coastguard Worker // Each time another slice is requested by a call to Next(), the slice data at 139*14675a02SAndroid Build Coastguard Worker // the front of the `slices` deque will be written to the `scratch_filename` 140*14675a02SAndroid Build Coastguard Worker // and the filename will be returned as the example data. The scratch file 141*14675a02SAndroid Build Coastguard Worker // will be deleted at the end of the iterator, or when the iterator is closed. InMemoryFederatedSelectExampleIterator(std::string scratch_filename,std::deque<absl::Cord> slices)142*14675a02SAndroid Build Coastguard Worker InMemoryFederatedSelectExampleIterator(std::string scratch_filename, 143*14675a02SAndroid Build Coastguard Worker std::deque<absl::Cord> slices) 144*14675a02SAndroid Build Coastguard Worker : scratch_filename_(scratch_filename), slices_(std::move(slices)) {} 145*14675a02SAndroid Build Coastguard Worker absl::StatusOr<std::string> Next() override; 146*14675a02SAndroid Build Coastguard Worker void Close() override; 147*14675a02SAndroid Build Coastguard Worker 148*14675a02SAndroid Build Coastguard Worker ~InMemoryFederatedSelectExampleIterator() override; 149*14675a02SAndroid Build Coastguard Worker 150*14675a02SAndroid Build Coastguard Worker private: 151*14675a02SAndroid Build Coastguard Worker void CleanupInternal() ABSL_LOCKS_EXCLUDED(mutex_); 152*14675a02SAndroid Build Coastguard Worker 153*14675a02SAndroid Build Coastguard Worker std::string scratch_filename_; 154*14675a02SAndroid Build Coastguard Worker 155*14675a02SAndroid Build Coastguard Worker absl::Mutex mutex_; 156*14675a02SAndroid Build Coastguard Worker std::deque<absl::Cord> slices_ ABSL_GUARDED_BY(mutex_); 157*14675a02SAndroid Build Coastguard Worker }; 158*14675a02SAndroid Build Coastguard Worker 159*14675a02SAndroid Build Coastguard Worker } // namespace client 160*14675a02SAndroid Build Coastguard Worker } // namespace fcp 161*14675a02SAndroid Build Coastguard Worker 162*14675a02SAndroid Build Coastguard Worker #endif // FCP_CLIENT_FEDERATED_SELECT_H_ 163