1 /* Copyright 2021 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/tfrt/run_handler_thread_pool/run_handler_concurrent_work_queue.h"
16 
17 #include <memory>
18 
19 #include "tensorflow/core/tfrt/run_handler_thread_pool/run_handler.h"
20 #include "tfrt/host_context/async_dispatch.h"  // from @tf_runtime
21 #include "tfrt/host_context/async_value.h"  // from @tf_runtime
22 #include "tfrt/host_context/execution_context.h"  // from @tf_runtime
23 #include "tfrt/support/error_util.h"  // from @tf_runtime
24 
25 namespace tfrt {
26 namespace tf {
27 
RunHandlerThreadWorkQueue(const Options & options)28 RunHandlerThreadWorkQueue::RunHandlerThreadWorkQueue(const Options& options)
29     : options_(options),
30       quiescing_state_(std::make_unique<::tfrt::internal::QuiescingState>()),
31       non_blocking_work_queue_(quiescing_state_.get(),
32                                /*num_threads=*/1),
33       blocking_work_queue_(quiescing_state_.get(),
34                            /*num_threads=*/1) {
35   CHECK(options.num_threads_in_sub_thread_pool.size() ==  // Crash OK.
36         options.num_sub_thread_pool);
37   CHECK(options.sub_thread_request_percentage.size() ==  // Crash OK.
38         options.num_sub_thread_pool);
39 
40   RunHandlerPool::Options pool_options;
41   pool_options.num_inter_op_threads = options.num_main_threads;
42   pool_options.num_intra_op_threads = options.num_complementary_threads;
43   pool_options.max_concurrent_handler = options.max_concurrent_handler;
44   pool_options.blocking_threads_max_sleep_time_micro_sec =
45       options.blocking_threads_max_sleep_time_micro_sec;
46   pool_options.non_blocking_threads_sleep_time_micro_sec =
47       options.non_blocking_threads_sleep_time_micro_sec;
48   pool_options.num_sub_thread_pool = options.num_sub_thread_pool;
49   pool_options.num_threads_in_sub_thread_pool =
50       options.num_threads_in_sub_thread_pool;
51   pool_options.sub_thread_request_percentage =
52       options.sub_thread_request_percentage;
53   pool_options.enable_wake_up = options.enable_wake_up;
54   pool_options.wait_if_no_active_request = options.wait_if_no_active_request;
55   pool_options.use_adaptive_waiting_time = options.use_adaptive_waiting_time;
56   handler_pool_ = std::make_unique<RunHandlerPool>(pool_options);
57 }
58 
59 tensorflow::StatusOr<std::unique_ptr<tensorflow::tfrt_stub::WorkQueueInterface>>
InitializeRequest(tfrt::RequestContextBuilder * request_context_builder,tensorflow::thread::ThreadPoolInterface ** intra_op_threadpool) const60 RunHandlerThreadWorkQueue::InitializeRequest(
61     tfrt::RequestContextBuilder* request_context_builder,
62     tensorflow::thread::ThreadPoolInterface** intra_op_threadpool) const {
63   DCHECK(intra_op_threadpool);
64   RunHandlerOptions options;
65   options.priority = request_context_builder->request_options().priority;
66   std::unique_ptr<RunHandler> handler = handler_pool_->Get(
67       request_context_builder->id(), options_.init_timeout_ms, options);
68   if (!handler) {
69     return tensorflow::errors::Internal(absl::StrCat(
70         "Could not obtain RunHandler for request after waiting for ",
71         options_.init_timeout_ms, " ms."));
72   }
73 
74   *intra_op_threadpool = handler->AsIntraThreadPoolInterface();
75 
76   return {std::make_unique<RunHandlerWorkQueue>(std::move(handler))};
77 }
78 
AddTask(TaskFunction work)79 void RunHandlerThreadWorkQueue::AddTask(TaskFunction work) {
80   non_blocking_work_queue_.AddTask(std::move(work));
81 }
82 
AddBlockingTask(TaskFunction work,bool allow_queuing)83 Optional<TaskFunction> RunHandlerThreadWorkQueue::AddBlockingTask(
84     TaskFunction work, bool allow_queuing) {
85   if (allow_queuing) {
86     return blocking_work_queue_.EnqueueBlockingTask(std::move(work));
87   } else {
88     return blocking_work_queue_.RunBlockingTask(std::move(work));
89   }
90   return llvm::None;
91 }
92 
Quiesce()93 void RunHandlerThreadWorkQueue::Quiesce() {
94   handler_pool_->Quiesce();
95   non_blocking_work_queue_.Quiesce();
96   blocking_work_queue_.Quiesce();
97 }
98 
Await(ArrayRef<RCReference<AsyncValue>> values)99 void RunHandlerThreadWorkQueue::Await(
100     ArrayRef<RCReference<AsyncValue>> values) {
101   tfrt::Await(values);
102 }
103 
IsInWorkerThread() const104 bool RunHandlerThreadWorkQueue::IsInWorkerThread() const {
105   // TODO(b/192247530): Check if we have cases it is not true.
106   return true;
107 }
108 
109 }  // namespace tf
110 }  // namespace tfrt
111