1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "message_loop_thread.h"
18 
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/strings/stringprintf.h>
22 #include <base/time/time.h>
23 #include <bluetooth/log.h>
24 #include <sys/syscall.h>
25 #include <unistd.h>
26 
27 #include <future>
28 #include <mutex>
29 #include <string>
30 #include <thread>
31 
32 #include "common/postable_context.h"
33 
34 namespace bluetooth {
35 namespace common {
36 
37 static constexpr int kRealTimeFifoSchedulingPriority = 1;
38 
timeDeltaFromMicroseconds(std::chrono::microseconds t)39 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
40 #if BASE_VER < 931007
41   return base::TimeDelta::FromMicroseconds(t.count());
42 #else
43   return base::Microseconds(t.count());
44 #endif
45 }
46 
MessageLoopThread(const std::string & thread_name)47 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
48     : thread_name_(thread_name),
49       message_loop_(nullptr),
50       run_loop_(nullptr),
51       thread_(nullptr),
52       thread_id_(-1),
53       linux_tid_(-1),
54       weak_ptr_factory_(this),
55       shutting_down_(false) {}
56 
~MessageLoopThread()57 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
58 
StartUp()59 void MessageLoopThread::StartUp() {
60   std::promise<void> start_up_promise;
61   std::future<void> start_up_future = start_up_promise.get_future();
62   {
63     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
64     if (thread_ != nullptr) {
65       log::warn("thread {} is already started", *this);
66 
67       return;
68     }
69     thread_ = new std::thread(&MessageLoopThread::RunThread, this, std::move(start_up_promise));
70   }
71   start_up_future.wait();
72 }
73 
DoInThread(const base::Location & from_here,base::OnceClosure task)74 bool MessageLoopThread::DoInThread(const base::Location& from_here, base::OnceClosure task) {
75   return DoInThreadDelayed(from_here, std::move(task), std::chrono::microseconds(0));
76 }
77 
DoInThreadDelayed(const base::Location & from_here,base::OnceClosure task,std::chrono::microseconds delay)78 bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here, base::OnceClosure task,
79                                           std::chrono::microseconds delay) {
80   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
81 
82   if (message_loop_ == nullptr) {
83     log::error("message loop is null for thread {}, from {}", *this, from_here.ToString());
84     return false;
85   }
86   if (!message_loop_->task_runner()->PostDelayedTask(from_here, std::move(task),
87                                                      timeDeltaFromMicroseconds(delay))) {
88     log::error("failed to post task to message loop for thread {}, from {}", *this,
89                from_here.ToString());
90     return false;
91   }
92   return true;
93 }
94 
ShutDown()95 void MessageLoopThread::ShutDown() {
96   {
97     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
98     if (thread_ == nullptr) {
99       log::info("thread {} is already stopped", *this);
100       return;
101     }
102     if (message_loop_ == nullptr) {
103       log::info("message_loop_ is null. Already stopping");
104       return;
105     }
106     if (shutting_down_) {
107       log::info("waiting for thread to join");
108       return;
109     }
110     shutting_down_ = true;
111     log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
112                      "should not be called on the thread itself. Otherwise, deadlock may happen.");
113     run_loop_->QuitWhenIdle();
114   }
115   thread_->join();
116   {
117     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
118     delete thread_;
119     thread_ = nullptr;
120     shutting_down_ = false;
121   }
122 }
123 
GetThreadId() const124 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
125   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
126   return thread_id_;
127 }
128 
GetName() const129 std::string MessageLoopThread::GetName() const { return thread_name_; }
130 
ToString() const131 std::string MessageLoopThread::ToString() const {
132   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
133   return base::StringPrintf("%s(%d)", thread_name_.c_str(), thread_id_);
134 }
135 
IsRunning() const136 bool MessageLoopThread::IsRunning() const {
137   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
138   return thread_id_ != -1;
139 }
140 
141 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)142 void MessageLoopThread::RunThread(MessageLoopThread* thread, std::promise<void> start_up_promise) {
143   thread->Run(std::move(start_up_promise));
144 }
145 
146 // This is only for use in tests.
message_loop() const147 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
148   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
149   return message_loop_;
150 }
151 
EnableRealTimeScheduling()152 bool MessageLoopThread::EnableRealTimeScheduling() {
153   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
154 
155   if (!IsRunning()) {
156     log::error("thread {} is not running", *this);
157     return false;
158   }
159 
160   struct sched_param rt_params = {.sched_priority = kRealTimeFifoSchedulingPriority};
161   int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
162   if (rc != 0) {
163     log::error("unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, error: {}",
164                kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
165     return false;
166   }
167   return true;
168 }
169 
GetWeakPtr()170 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
171   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
172   return weak_ptr_factory_.GetWeakPtr();
173 }
174 
Run(std::promise<void> start_up_promise)175 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
176   {
177     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
178 
179     log::info("message loop starting for thread {}", thread_name_);
180     base::PlatformThread::SetName(thread_name_);
181     message_loop_ = new btbase::AbstractMessageLoop();
182     run_loop_ = new base::RunLoop();
183     thread_id_ = base::PlatformThread::CurrentId();
184     linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
185     start_up_promise.set_value();
186   }
187 
188   // Blocking until ShutDown() is called
189   run_loop_->Run();
190 
191   {
192     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
193     thread_id_ = -1;
194     linux_tid_ = -1;
195     delete message_loop_;
196     message_loop_ = nullptr;
197     delete run_loop_;
198     run_loop_ = nullptr;
199     log::info("message loop finished for thread {}", thread_name_);
200   }
201 }
202 
Post(base::OnceClosure closure)203 void MessageLoopThread::Post(base::OnceClosure closure) {
204   DoInThread(FROM_HERE, std::move(closure));
205 }
206 
Postable()207 PostableContext* MessageLoopThread::Postable() { return this; }
208 
209 }  // namespace common
210 }  // namespace bluetooth
211