1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <bluetooth/log.h>
20 #include <unistd.h>
21 
22 #include <functional>
23 #include <mutex>
24 #include <queue>
25 
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "os/handler.h"
29 #include "os/linux_generic/reactive_semaphore.h"
30 
31 namespace bluetooth {
32 namespace os {
33 
34 // See documentation for |Queue|
35 template <typename T>
36 class IQueueEnqueue {
37 public:
38   using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
39   virtual ~IQueueEnqueue() = default;
40   virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) = 0;
41   virtual void UnregisterEnqueue() = 0;
42 };
43 
44 // See documentation for |Queue|
45 template <typename T>
46 class IQueueDequeue {
47 public:
48   using DequeueCallback = common::Callback<void()>;
49   virtual ~IQueueDequeue() = default;
50   virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) = 0;
51   virtual void UnregisterDequeue() = 0;
52   virtual std::unique_ptr<T> TryDequeue() = 0;
53 };
54 
55 //
56 // An interface facilitating flow-controlled and non-blocking queue operations.
57 //
58 // This Queue uses separate semaphores and callbacks for enqueue end (producer)
59 // and dequeue end (consumer) to manage data flow efficiently:
60 //
61 // Enqueue end (producer):
62 // - Registers an EnqueueCallback when producer has data to send.
63 // - Unregisters the EnqueueCallback when no data is available.
64 //
65 // Dequeue end (consumer):
66 // - Registers a DequeueCallback when consumer is ready to process data.
67 // - Unregisters the DequeueCallback when no longer ready.
68 //
69 template <typename T>
70 class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
71 public:
72   // A function moving data from enqueue end buffer to queue, it will be continually be invoked
73   // until queue is full. Enqueue end should make sure buffer isn't empty and UnregisterEnqueue when
74   // buffer become empty.
75   using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
76   // A function moving data form queue to dequeue end buffer, it will be continually be invoked
77   // until queue is empty. TryDequeue should be use in this function to get data from queue.
78   using DequeueCallback = common::Callback<void()>;
79   // Create a queue with |capacity| is the maximum number of messages a queue can contain
80   explicit Queue(size_t capacity);
81   ~Queue();
82   // Register |callback| that will be called on |handler| when the queue is able to enqueue one
83   // piece of data. This will cause a crash if handler or callback has already been registered
84   // before.
85   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override;
86   // Unregister current EnqueueCallback from this queue, this will cause a crash if not registered
87   // yet.
88   void UnregisterEnqueue() override;
89   // Register |callback| that will be called on |handler| when the queue has at least one piece of
90   // data ready for dequeue. This will cause a crash if handler or callback has already been
91   // registered before.
92   void RegisterDequeue(Handler* handler, DequeueCallback callback) override;
93   // Unregister current DequeueCallback from this queue, this will cause a crash if not registered
94   // yet.
95   void UnregisterDequeue() override;
96 
97   // Try to dequeue an item from this queue. Return nullptr when there is nothing in the queue.
98   std::unique_ptr<T> TryDequeue() override;
99 
100 private:
101   void EnqueueCallbackInternal(EnqueueCallback callback);
102   // An internal queue that holds at most |capacity| pieces of data
103   std::queue<std::unique_ptr<T>> queue_;
104   // A mutex that guards data in this queue
105   std::mutex mutex_;
106 
107   class QueueEndpoint {
108   public:
QueueEndpoint(unsigned int initial_value)109     explicit QueueEndpoint(unsigned int initial_value)
110         : reactive_semaphore_(initial_value), handler_(nullptr), reactable_(nullptr) {}
111     ReactiveSemaphore reactive_semaphore_;
112     Handler* handler_;
113     Reactor::Reactable* reactable_;
114   };
115 
116   QueueEndpoint enqueue_;
117   QueueEndpoint dequeue_;
118 };
119 
120 template <typename T>
121 class EnqueueBuffer {
122 public:
EnqueueBuffer(IQueueEnqueue<T> * queue)123   EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
124 
~EnqueueBuffer()125   ~EnqueueBuffer() {
126     if (enqueue_registered_.exchange(false)) {
127       queue_->UnregisterEnqueue();
128     }
129   }
130 
Enqueue(std::unique_ptr<T> t,os::Handler * handler)131   void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
132     std::lock_guard<std::mutex> lock(mutex_);
133     buffer_.push(std::move(t));
134     if (!enqueue_registered_.exchange(true)) {
135       queue_->RegisterEnqueue(
136               handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
137     }
138   }
139 
Clear()140   void Clear() {
141     std::lock_guard<std::mutex> lock(mutex_);
142     if (enqueue_registered_.exchange(false)) {
143       queue_->UnregisterEnqueue();
144       std::queue<std::unique_ptr<T>> empty;
145       std::swap(buffer_, empty);
146     }
147   }
148 
Size()149   auto Size() const { return buffer_.size(); }
150 
NotifyOnEmpty(common::OnceClosure callback)151   void NotifyOnEmpty(common::OnceClosure callback) {
152     std::lock_guard<std::mutex> lock(mutex_);
153     log::assert_that(callback_on_empty_.is_null(), "assert failed: callback_on_empty_.is_null()");
154     callback_on_empty_ = std::move(callback);
155   }
156 
157 private:
enqueue_callback()158   std::unique_ptr<T> enqueue_callback() {
159     std::lock_guard<std::mutex> lock(mutex_);
160     std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
161     buffer_.pop();
162     if (buffer_.empty() && enqueue_registered_.exchange(false)) {
163       queue_->UnregisterEnqueue();
164       if (!callback_on_empty_.is_null()) {
165         std::move(callback_on_empty_).Run();
166       }
167     }
168     return enqueued_t;
169   }
170 
171   mutable std::mutex mutex_;
172   IQueueEnqueue<T>* queue_;
173   std::atomic_bool enqueue_registered_ = false;
174   std::queue<std::unique_ptr<T>> buffer_;
175   common::OnceClosure callback_on_empty_;
176 };
177 
178 template <typename T>
Queue(size_t capacity)179 Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
180 
181 template <typename T>
~Queue()182 Queue<T>::~Queue() {
183   log::assert_that(enqueue_.handler_ == nullptr, "Enqueue is not unregistered");
184   log::assert_that(dequeue_.handler_ == nullptr, "Dequeue is not unregistered");
185 }
186 
187 template <typename T>
RegisterEnqueue(Handler * handler,EnqueueCallback callback)188 void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
189   std::lock_guard<std::mutex> lock(mutex_);
190   log::assert_that(enqueue_.handler_ == nullptr, "assert failed: enqueue_.handler_ == nullptr");
191   log::assert_that(enqueue_.reactable_ == nullptr, "assert failed: enqueue_.reactable_ == nullptr");
192   enqueue_.handler_ = handler;
193   enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
194           enqueue_.reactive_semaphore_.GetFd(),
195           base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this),
196                      std::move(callback)),
197           base::Closure());
198 }
199 
200 template <typename T>
UnregisterEnqueue()201 void Queue<T>::UnregisterEnqueue() {
202   Reactor* reactor = nullptr;
203   Reactor::Reactable* to_unregister = nullptr;
204   bool wait_for_unregister = false;
205   {
206     std::lock_guard<std::mutex> lock(mutex_);
207     log::assert_that(enqueue_.reactable_ != nullptr,
208                      "assert failed: enqueue_.reactable_ != nullptr");
209     reactor = enqueue_.handler_->thread_->GetReactor();
210     wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
211     to_unregister = enqueue_.reactable_;
212     enqueue_.reactable_ = nullptr;
213     enqueue_.handler_ = nullptr;
214   }
215   reactor->Unregister(to_unregister);
216   if (wait_for_unregister) {
217     reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
218   }
219 }
220 
221 template <typename T>
RegisterDequeue(Handler * handler,DequeueCallback callback)222 void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
223   std::lock_guard<std::mutex> lock(mutex_);
224   log::assert_that(dequeue_.handler_ == nullptr, "assert failed: dequeue_.handler_ == nullptr");
225   log::assert_that(dequeue_.reactable_ == nullptr, "assert failed: dequeue_.reactable_ == nullptr");
226   dequeue_.handler_ = handler;
227   dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(
228           dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure());
229 }
230 
231 template <typename T>
UnregisterDequeue()232 void Queue<T>::UnregisterDequeue() {
233   Reactor* reactor = nullptr;
234   Reactor::Reactable* to_unregister = nullptr;
235   bool wait_for_unregister = false;
236   {
237     std::lock_guard<std::mutex> lock(mutex_);
238     log::assert_that(dequeue_.reactable_ != nullptr,
239                      "assert failed: dequeue_.reactable_ != nullptr");
240     reactor = dequeue_.handler_->thread_->GetReactor();
241     wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
242     to_unregister = dequeue_.reactable_;
243     dequeue_.reactable_ = nullptr;
244     dequeue_.handler_ = nullptr;
245   }
246   reactor->Unregister(to_unregister);
247   if (wait_for_unregister) {
248     reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
249   }
250 }
251 
252 template <typename T>
TryDequeue()253 std::unique_ptr<T> Queue<T>::TryDequeue() {
254   std::lock_guard<std::mutex> lock(mutex_);
255 
256   if (queue_.empty()) {
257     return nullptr;
258   }
259 
260   dequeue_.reactive_semaphore_.Decrease();
261 
262   std::unique_ptr<T> data = std::move(queue_.front());
263   queue_.pop();
264 
265   enqueue_.reactive_semaphore_.Increase();
266 
267   return data;
268 }
269 
270 template <typename T>
EnqueueCallbackInternal(EnqueueCallback callback)271 void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
272   std::unique_ptr<T> data = callback.Run();
273   log::assert_that(data != nullptr, "assert failed: data != nullptr");
274   std::lock_guard<std::mutex> lock(mutex_);
275   enqueue_.reactive_semaphore_.Decrease();
276   queue_.push(std::move(data));
277   dequeue_.reactive_semaphore_.Increase();
278 }
279 
280 }  // namespace os
281 }  // namespace bluetooth
282