1 /* 2 * Copyright 2020 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #ifndef FCP_CLIENT_ENGINE_SIMPLE_PLAN_ENGINE_H_ 17 #define FCP_CLIENT_ENGINE_SIMPLE_PLAN_ENGINE_H_ 18 19 #include <functional> 20 #include <string> 21 #include <utility> 22 #include <vector> 23 24 #include "google/protobuf/any.pb.h" 25 #include "absl/status/statusor.h" 26 #include "absl/time/time.h" 27 #include "fcp/base/monitoring.h" 28 #include "fcp/client/engine/common.h" 29 #include "fcp/client/engine/example_iterator_factory.h" 30 #include "fcp/client/engine/plan_engine_helpers.h" 31 #include "fcp/client/engine/tf_wrapper.h" 32 #include "fcp/client/event_publisher.h" 33 #include "fcp/client/flags.h" 34 #include "fcp/client/histogram_counters.pb.h" 35 #include "fcp/client/interruptible_runner.h" 36 #include "fcp/client/log_manager.h" 37 #include "fcp/client/opstats/opstats_logger.h" 38 #include "fcp/client/simple_task_environment.h" 39 #include "fcp/protos/plan.pb.h" 40 #include "tensorflow/core/framework/tensor.h" 41 42 namespace fcp { 43 namespace client { 44 namespace engine { 45 46 // A class used to "run" (interpret) a TensorflowSpec-based plan. Each instance 47 // should generally only be used once to run a plan. 48 class SimplePlanEngine { 49 public: 50 // For each example query issued by the plan at runtime, the given 51 // `example_iterator_factories` parameter will be iterated and the first 52 // iterator factory that can handle the given query will be used to create the 53 // example iterator for that query. 54 SimplePlanEngine( 55 std::vector<ExampleIteratorFactory*> example_iterator_factories, 56 std::function<bool()> should_abort, LogManager* log_manager, 57 ::fcp::client::opstats::OpStatsLogger* opstats_logger, 58 const InterruptibleRunner::TimingConfig* timing_config, 59 bool support_constant_tf_inputs); 60 61 PlanResult RunPlan( 62 const google::internal::federated::plan::TensorflowSpec& tensorflow_spec, 63 const std::string& graph, const ::google::protobuf::Any& config_proto, 64 std::unique_ptr<std::vector<std::pair<std::string, tensorflow::Tensor>>> 65 inputs, 66 const std::vector<std::string>& output_names); 67 68 private: 69 // Runs the plan. Returns one of three error codes: 70 // OK, INVALID_ARGUMENT, CANCELLED. 71 absl::StatusOr<std::vector<tensorflow::Tensor>> RunPlanInternal( 72 TensorFlowWrapper* tf_wrapper, 73 const google::internal::federated::plan::TensorflowSpec& tensorflow_spec, 74 std::unique_ptr<std::vector<std::pair<std::string, tensorflow::Tensor>>> 75 inputs, 76 const std::vector<std::string>& output_names, 77 std::atomic<int>* total_example_count, 78 std::atomic<int64_t>* total_example_size_bytes, 79 ExampleIteratorStatus* example_iterator_status); 80 81 // Invokes TensorFlowWrapper, and takes care of logging TensorFlow errors and 82 // external interruptions via event_publisher. 83 // If the TF call fails because it got aborted externally, returns CANCELLED. 84 // If the TF call fails with an INVALID argument, indicating a TF error, 85 // publishes an event, then returns INVALID_ARGUMENT 86 // If the TF call reports an OUT_OF_RANGE error ("internal" abortion) or the 87 // TF call is successful, returns OK. 88 absl::StatusOr<std::vector<tensorflow::Tensor>> RunTensorFlowInternal( 89 TensorFlowWrapper* tf_wrapper, 90 const std::vector<std::pair<std::string, tensorflow::Tensor>>& inputs, 91 const std::vector<std::string>& output_tensor_names, 92 const std::vector<std::string>& target_node_names); 93 94 std::vector<ExampleIteratorFactory*> example_iterator_factories_; 95 std::function<bool()> should_abort_; 96 LogManager* log_manager_; 97 ::fcp::client::opstats::OpStatsLogger* opstats_logger_; 98 const InterruptibleRunner::TimingConfig* timing_config_; 99 const bool support_constant_tf_inputs_; 100 }; 101 102 } // namespace engine 103 } // namespace client 104 } // namespace fcp 105 106 #endif // FCP_CLIENT_ENGINE_SIMPLE_PLAN_ENGINE_H_ 107