1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <bluetooth/log.h>
20 #include <unistd.h>
21
22 #include <functional>
23 #include <mutex>
24 #include <queue>
25
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "os/handler.h"
29 #include "os/linux_generic/reactive_semaphore.h"
30
31 namespace bluetooth {
32 namespace os {
33
34 // See documentation for |Queue|
35 template <typename T>
36 class IQueueEnqueue {
37 public:
38 using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
39 virtual ~IQueueEnqueue() = default;
40 virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) = 0;
41 virtual void UnregisterEnqueue() = 0;
42 };
43
44 // See documentation for |Queue|
45 template <typename T>
46 class IQueueDequeue {
47 public:
48 using DequeueCallback = common::Callback<void()>;
49 virtual ~IQueueDequeue() = default;
50 virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) = 0;
51 virtual void UnregisterDequeue() = 0;
52 virtual std::unique_ptr<T> TryDequeue() = 0;
53 };
54
55 //
56 // An interface facilitating flow-controlled and non-blocking queue operations.
57 //
58 // This Queue uses separate semaphores and callbacks for enqueue end (producer)
59 // and dequeue end (consumer) to manage data flow efficiently:
60 //
61 // Enqueue end (producer):
62 // - Registers an EnqueueCallback when producer has data to send.
63 // - Unregisters the EnqueueCallback when no data is available.
64 //
65 // Dequeue end (consumer):
66 // - Registers a DequeueCallback when consumer is ready to process data.
67 // - Unregisters the DequeueCallback when no longer ready.
68 //
69 template <typename T>
70 class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
71 public:
72 // A function moving data from enqueue end buffer to queue, it will be continually be invoked
73 // until queue is full. Enqueue end should make sure buffer isn't empty and UnregisterEnqueue when
74 // buffer become empty.
75 using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
76 // A function moving data form queue to dequeue end buffer, it will be continually be invoked
77 // until queue is empty. TryDequeue should be use in this function to get data from queue.
78 using DequeueCallback = common::Callback<void()>;
79 // Create a queue with |capacity| is the maximum number of messages a queue can contain
80 explicit Queue(size_t capacity);
81 ~Queue();
82 // Register |callback| that will be called on |handler| when the queue is able to enqueue one
83 // piece of data. This will cause a crash if handler or callback has already been registered
84 // before.
85 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override;
86 // Unregister current EnqueueCallback from this queue, this will cause a crash if not registered
87 // yet.
88 void UnregisterEnqueue() override;
89 // Register |callback| that will be called on |handler| when the queue has at least one piece of
90 // data ready for dequeue. This will cause a crash if handler or callback has already been
91 // registered before.
92 void RegisterDequeue(Handler* handler, DequeueCallback callback) override;
93 // Unregister current DequeueCallback from this queue, this will cause a crash if not registered
94 // yet.
95 void UnregisterDequeue() override;
96
97 // Try to dequeue an item from this queue. Return nullptr when there is nothing in the queue.
98 std::unique_ptr<T> TryDequeue() override;
99
100 private:
101 void EnqueueCallbackInternal(EnqueueCallback callback);
102 // An internal queue that holds at most |capacity| pieces of data
103 std::queue<std::unique_ptr<T>> queue_;
104 // A mutex that guards data in this queue
105 std::mutex mutex_;
106
107 class QueueEndpoint {
108 public:
QueueEndpoint(unsigned int initial_value)109 explicit QueueEndpoint(unsigned int initial_value)
110 : reactive_semaphore_(initial_value), handler_(nullptr), reactable_(nullptr) {}
111 ReactiveSemaphore reactive_semaphore_;
112 Handler* handler_;
113 Reactor::Reactable* reactable_;
114 };
115
116 QueueEndpoint enqueue_;
117 QueueEndpoint dequeue_;
118 };
119
120 template <typename T>
121 class EnqueueBuffer {
122 public:
EnqueueBuffer(IQueueEnqueue<T> * queue)123 EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
124
~EnqueueBuffer()125 ~EnqueueBuffer() {
126 if (enqueue_registered_.exchange(false)) {
127 queue_->UnregisterEnqueue();
128 }
129 }
130
Enqueue(std::unique_ptr<T> t,os::Handler * handler)131 void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
132 std::lock_guard<std::mutex> lock(mutex_);
133 buffer_.push(std::move(t));
134 if (!enqueue_registered_.exchange(true)) {
135 queue_->RegisterEnqueue(
136 handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
137 }
138 }
139
Clear()140 void Clear() {
141 std::lock_guard<std::mutex> lock(mutex_);
142 if (enqueue_registered_.exchange(false)) {
143 queue_->UnregisterEnqueue();
144 std::queue<std::unique_ptr<T>> empty;
145 std::swap(buffer_, empty);
146 }
147 }
148
Size()149 auto Size() const { return buffer_.size(); }
150
NotifyOnEmpty(common::OnceClosure callback)151 void NotifyOnEmpty(common::OnceClosure callback) {
152 std::lock_guard<std::mutex> lock(mutex_);
153 log::assert_that(callback_on_empty_.is_null(), "assert failed: callback_on_empty_.is_null()");
154 callback_on_empty_ = std::move(callback);
155 }
156
157 private:
enqueue_callback()158 std::unique_ptr<T> enqueue_callback() {
159 std::lock_guard<std::mutex> lock(mutex_);
160 std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
161 buffer_.pop();
162 if (buffer_.empty() && enqueue_registered_.exchange(false)) {
163 queue_->UnregisterEnqueue();
164 if (!callback_on_empty_.is_null()) {
165 std::move(callback_on_empty_).Run();
166 }
167 }
168 return enqueued_t;
169 }
170
171 mutable std::mutex mutex_;
172 IQueueEnqueue<T>* queue_;
173 std::atomic_bool enqueue_registered_ = false;
174 std::queue<std::unique_ptr<T>> buffer_;
175 common::OnceClosure callback_on_empty_;
176 };
177
178 template <typename T>
Queue(size_t capacity)179 Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
180
181 template <typename T>
~Queue()182 Queue<T>::~Queue() {
183 log::assert_that(enqueue_.handler_ == nullptr, "Enqueue is not unregistered");
184 log::assert_that(dequeue_.handler_ == nullptr, "Dequeue is not unregistered");
185 }
186
187 template <typename T>
RegisterEnqueue(Handler * handler,EnqueueCallback callback)188 void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
189 std::lock_guard<std::mutex> lock(mutex_);
190 log::assert_that(enqueue_.handler_ == nullptr, "assert failed: enqueue_.handler_ == nullptr");
191 log::assert_that(enqueue_.reactable_ == nullptr, "assert failed: enqueue_.reactable_ == nullptr");
192 enqueue_.handler_ = handler;
193 enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
194 enqueue_.reactive_semaphore_.GetFd(),
195 base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this),
196 std::move(callback)),
197 base::Closure());
198 }
199
200 template <typename T>
UnregisterEnqueue()201 void Queue<T>::UnregisterEnqueue() {
202 Reactor* reactor = nullptr;
203 Reactor::Reactable* to_unregister = nullptr;
204 bool wait_for_unregister = false;
205 {
206 std::lock_guard<std::mutex> lock(mutex_);
207 log::assert_that(enqueue_.reactable_ != nullptr,
208 "assert failed: enqueue_.reactable_ != nullptr");
209 reactor = enqueue_.handler_->thread_->GetReactor();
210 wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
211 to_unregister = enqueue_.reactable_;
212 enqueue_.reactable_ = nullptr;
213 enqueue_.handler_ = nullptr;
214 }
215 reactor->Unregister(to_unregister);
216 if (wait_for_unregister) {
217 reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
218 }
219 }
220
221 template <typename T>
RegisterDequeue(Handler * handler,DequeueCallback callback)222 void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
223 std::lock_guard<std::mutex> lock(mutex_);
224 log::assert_that(dequeue_.handler_ == nullptr, "assert failed: dequeue_.handler_ == nullptr");
225 log::assert_that(dequeue_.reactable_ == nullptr, "assert failed: dequeue_.reactable_ == nullptr");
226 dequeue_.handler_ = handler;
227 dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(
228 dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure());
229 }
230
231 template <typename T>
UnregisterDequeue()232 void Queue<T>::UnregisterDequeue() {
233 Reactor* reactor = nullptr;
234 Reactor::Reactable* to_unregister = nullptr;
235 bool wait_for_unregister = false;
236 {
237 std::lock_guard<std::mutex> lock(mutex_);
238 log::assert_that(dequeue_.reactable_ != nullptr,
239 "assert failed: dequeue_.reactable_ != nullptr");
240 reactor = dequeue_.handler_->thread_->GetReactor();
241 wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
242 to_unregister = dequeue_.reactable_;
243 dequeue_.reactable_ = nullptr;
244 dequeue_.handler_ = nullptr;
245 }
246 reactor->Unregister(to_unregister);
247 if (wait_for_unregister) {
248 reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
249 }
250 }
251
252 template <typename T>
TryDequeue()253 std::unique_ptr<T> Queue<T>::TryDequeue() {
254 std::lock_guard<std::mutex> lock(mutex_);
255
256 if (queue_.empty()) {
257 return nullptr;
258 }
259
260 dequeue_.reactive_semaphore_.Decrease();
261
262 std::unique_ptr<T> data = std::move(queue_.front());
263 queue_.pop();
264
265 enqueue_.reactive_semaphore_.Increase();
266
267 return data;
268 }
269
270 template <typename T>
EnqueueCallbackInternal(EnqueueCallback callback)271 void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
272 std::unique_ptr<T> data = callback.Run();
273 log::assert_that(data != nullptr, "assert failed: data != nullptr");
274 std::lock_guard<std::mutex> lock(mutex_);
275 enqueue_.reactive_semaphore_.Decrease();
276 queue_.push(std::move(data));
277 dequeue_.reactive_semaphore_.Increase();
278 }
279
280 } // namespace os
281 } // namespace bluetooth
282