1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "message_loop_thread.h"
18
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/strings/stringprintf.h>
22 #include <base/time/time.h>
23 #include <bluetooth/log.h>
24 #include <sys/syscall.h>
25 #include <unistd.h>
26
27 #include <future>
28 #include <mutex>
29 #include <string>
30 #include <thread>
31
32 #include "common/postable_context.h"
33
34 namespace bluetooth {
35 namespace common {
36
37 static constexpr int kRealTimeFifoSchedulingPriority = 1;
38
timeDeltaFromMicroseconds(std::chrono::microseconds t)39 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
40 #if BASE_VER < 931007
41 return base::TimeDelta::FromMicroseconds(t.count());
42 #else
43 return base::Microseconds(t.count());
44 #endif
45 }
46
MessageLoopThread(const std::string & thread_name)47 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
48 : thread_name_(thread_name),
49 message_loop_(nullptr),
50 run_loop_(nullptr),
51 thread_(nullptr),
52 thread_id_(-1),
53 linux_tid_(-1),
54 weak_ptr_factory_(this),
55 shutting_down_(false) {}
56
~MessageLoopThread()57 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
58
StartUp()59 void MessageLoopThread::StartUp() {
60 std::promise<void> start_up_promise;
61 std::future<void> start_up_future = start_up_promise.get_future();
62 {
63 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
64 if (thread_ != nullptr) {
65 log::warn("thread {} is already started", *this);
66
67 return;
68 }
69 thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise));
70 }
71 start_up_future.wait();
72 }
73
DoInThread(const base::Location & from_here,base::OnceClosure task)74 bool MessageLoopThread::DoInThread(const base::Location& from_here, base::OnceClosure task) {
75 return DoInThreadDelayed(from_here, std::move(task), std::chrono::microseconds(0));
76 }
77
DoInThreadDelayed(const base::Location & from_here,base::OnceClosure task,std::chrono::microseconds delay)78 bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here, base::OnceClosure task,
79 std::chrono::microseconds delay) {
80 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
81
82 if (message_loop_ == nullptr) {
83 log::error("message loop is null for thread {}, from {}", *this, from_here.ToString());
84 return false;
85 }
86 if (!message_loop_->task_runner()->PostDelayedTask(from_here, std::move(task),
87 timeDeltaFromMicroseconds(delay))) {
88 log::error("failed to post task to message loop for thread {}, from {}", *this,
89 from_here.ToString());
90 return false;
91 }
92 return true;
93 }
94
ShutDown()95 void MessageLoopThread::ShutDown() {
96 {
97 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
98 if (thread_ == nullptr) {
99 log::info("thread {} is already stopped", *this);
100 return;
101 }
102 if (message_loop_ == nullptr) {
103 log::info("message_loop_ is null. Already stopping");
104 return;
105 }
106 if (shutting_down_) {
107 log::info("waiting for thread to join");
108 return;
109 }
110 shutting_down_ = true;
111 log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
112 "should not be called on the thread itself. Otherwise, deadlock may happen.");
113 run_loop_->QuitWhenIdle();
114 }
115 thread_->join();
116 {
117 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
118 delete thread_;
119 thread_ = nullptr;
120 shutting_down_ = false;
121 }
122 }
123
GetThreadId() const124 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
125 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
126 return thread_id_;
127 }
128
GetName() const129 std::string MessageLoopThread::GetName() const { return thread_name_; }
130
ToString() const131 std::string MessageLoopThread::ToString() const {
132 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
133 return base::StringPrintf("%s(%d)", thread_name_.c_str(), thread_id_);
134 }
135
IsRunning() const136 bool MessageLoopThread::IsRunning() const {
137 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
138 return thread_id_ != -1;
139 }
140
141 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)142 void MessageLoopThread::RunThread(MessageLoopThread* thread, std::promise<void> start_up_promise) {
143 thread->Run(std::move(start_up_promise));
144 }
145
146 // This is only for use in tests.
message_loop() const147 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
148 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
149 return message_loop_;
150 }
151
EnableRealTimeScheduling()152 bool MessageLoopThread::EnableRealTimeScheduling() {
153 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
154
155 if (!IsRunning()) {
156 log::error("thread {} is not running", *this);
157 return false;
158 }
159
160 struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority};
161 int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
162 if (rc != 0) {
163 log::error("unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, error: {}",
164 kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
165 return false;
166 }
167 return true;
168 }
169
GetWeakPtr()170 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
171 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
172 return weak_ptr_factory_.GetWeakPtr();
173 }
174
Run(std::promise<void> start_up_promise)175 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
176 {
177 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
178
179 log::info("message loop starting for thread {}", thread_name_);
180 base::PlatformThread::SetName(thread_name_);
181 message_loop_ = new btbase::AbstractMessageLoop();
182 run_loop_ = new base::RunLoop();
183 thread_id_ = base::PlatformThread::CurrentId();
184 linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
185 start_up_promise.set_value();
186 }
187
188 // Blocking until ShutDown() is called
189 run_loop_->Run();
190
191 {
192 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
193 thread_id_ = -1;
194 linux_tid_ = -1;
195 delete message_loop_;
196 message_loop_ = nullptr;
197 delete run_loop_;
198 run_loop_ = nullptr;
199 log::info("message loop finished for thread {}", thread_name_);
200 }
201 }
202
Post(base::OnceClosure closure)203 void MessageLoopThread::Post(base::OnceClosure closure) {
204 DoInThread(FROM_HERE, std::move(closure));
205 }
206
Postable()207 PostableContext* MessageLoopThread::Postable() { return this; }
208
209 } // namespace common
210 } // namespace bluetooth
211