1 /******************************************************************************
2  *
3  *  Copyright 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include "osi/include/fixed_queue.h"
20 
21 #include <bluetooth/log.h>
22 #include <string.h>
23 
24 #include <mutex>
25 
26 #include "osi/include/allocator.h"
27 #include "osi/include/list.h"
28 #include "osi/include/osi.h"
29 #include "osi/include/reactor.h"
30 #include "osi/semaphore.h"
31 
32 using namespace bluetooth;
33 
34 typedef struct fixed_queue_t {
35   list_t* list;
36   semaphore_t* enqueue_sem;
37   semaphore_t* dequeue_sem;
38   std::mutex* mutex;
39   size_t capacity;
40 
41   reactor_object_t* dequeue_object;
42   fixed_queue_cb dequeue_ready;
43   void* dequeue_context;
44 } fixed_queue_t;
45 
46 static void internal_dequeue_ready(void* context);
47 
fixed_queue_new(size_t capacity)48 fixed_queue_t* fixed_queue_new(size_t capacity) {
49   fixed_queue_t* ret = static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
50 
51   ret->mutex = new std::mutex;
52   ret->capacity = capacity;
53 
54   ret->list = list_new(NULL);
55   if (!ret->list) {
56     goto error;
57   }
58 
59   ret->enqueue_sem = semaphore_new(capacity);
60   if (!ret->enqueue_sem) {
61     goto error;
62   }
63 
64   ret->dequeue_sem = semaphore_new(0);
65   if (!ret->dequeue_sem) {
66     goto error;
67   }
68 
69   return ret;
70 
71 error:
72   fixed_queue_free(ret, NULL);
73   return NULL;
74 }
75 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)76 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
77   if (!queue) {
78     return;
79   }
80 
81   fixed_queue_unregister_dequeue(queue);
82 
83   if (free_cb) {
84     for (const list_node_t* node = list_begin(queue->list); node != list_end(queue->list);
85          node = list_next(node)) {
86       free_cb(list_node(node));
87     }
88   }
89 
90   list_free(queue->list);
91   semaphore_free(queue->enqueue_sem);
92   semaphore_free(queue->dequeue_sem);
93   delete queue->mutex;
94   osi_free(queue);
95 }
96 
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)97 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
98   if (!queue) {
99     return;
100   }
101 
102   while (!fixed_queue_is_empty(queue)) {
103     void* data = fixed_queue_try_dequeue(queue);
104     if (free_cb != NULL) {
105       free_cb(data);
106     }
107   }
108 }
109 
fixed_queue_is_empty(fixed_queue_t * queue)110 bool fixed_queue_is_empty(fixed_queue_t* queue) {
111   if (queue == NULL) {
112     return true;
113   }
114 
115   std::lock_guard<std::mutex> lock(*queue->mutex);
116   return list_is_empty(queue->list);
117 }
118 
fixed_queue_length(fixed_queue_t * queue)119 size_t fixed_queue_length(fixed_queue_t* queue) {
120   if (queue == NULL) {
121     return 0;
122   }
123 
124   std::lock_guard<std::mutex> lock(*queue->mutex);
125   return list_length(queue->list);
126 }
127 
fixed_queue_capacity(fixed_queue_t * queue)128 size_t fixed_queue_capacity(fixed_queue_t* queue) {
129   log::assert_that(queue != NULL, "assert failed: queue != NULL");
130 
131   return queue->capacity;
132 }
133 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)134 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
135   log::assert_that(queue != NULL, "assert failed: queue != NULL");
136   log::assert_that(data != NULL, "assert failed: data != NULL");
137 
138   semaphore_wait(queue->enqueue_sem);
139 
140   {
141     std::lock_guard<std::mutex> lock(*queue->mutex);
142     list_append(queue->list, data);
143   }
144 
145   semaphore_post(queue->dequeue_sem);
146 }
147 
fixed_queue_dequeue(fixed_queue_t * queue)148 void* fixed_queue_dequeue(fixed_queue_t* queue) {
149   log::assert_that(queue != NULL, "assert failed: queue != NULL");
150 
151   semaphore_wait(queue->dequeue_sem);
152 
153   void* ret = NULL;
154   {
155     std::lock_guard<std::mutex> lock(*queue->mutex);
156     ret = list_front(queue->list);
157     list_remove(queue->list, ret);
158   }
159 
160   semaphore_post(queue->enqueue_sem);
161 
162   return ret;
163 }
164 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)165 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
166   log::assert_that(queue != NULL, "assert failed: queue != NULL");
167   log::assert_that(data != NULL, "assert failed: data != NULL");
168 
169   if (!semaphore_try_wait(queue->enqueue_sem)) {
170     return false;
171   }
172 
173   {
174     std::lock_guard<std::mutex> lock(*queue->mutex);
175     list_append(queue->list, data);
176   }
177 
178   semaphore_post(queue->dequeue_sem);
179   return true;
180 }
181 
fixed_queue_try_dequeue(fixed_queue_t * queue)182 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
183   if (queue == NULL) {
184     return NULL;
185   }
186 
187   if (!semaphore_try_wait(queue->dequeue_sem)) {
188     return NULL;
189   }
190 
191   void* ret = NULL;
192   {
193     std::lock_guard<std::mutex> lock(*queue->mutex);
194     ret = list_front(queue->list);
195     list_remove(queue->list, ret);
196   }
197 
198   semaphore_post(queue->enqueue_sem);
199 
200   return ret;
201 }
202 
fixed_queue_try_peek_first(fixed_queue_t * queue)203 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
204   if (queue == NULL) {
205     return NULL;
206   }
207 
208   std::lock_guard<std::mutex> lock(*queue->mutex);
209   return list_is_empty(queue->list) ? NULL : list_front(queue->list);
210 }
211 
fixed_queue_try_peek_last(fixed_queue_t * queue)212 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
213   if (queue == NULL) {
214     return NULL;
215   }
216 
217   std::lock_guard<std::mutex> lock(*queue->mutex);
218   return list_is_empty(queue->list) ? NULL : list_back(queue->list);
219 }
220 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)221 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
222   if (queue == NULL) {
223     return NULL;
224   }
225 
226   bool removed = false;
227   {
228     std::lock_guard<std::mutex> lock(*queue->mutex);
229     if (list_contains(queue->list, data) && semaphore_try_wait(queue->dequeue_sem)) {
230       removed = list_remove(queue->list, data);
231       log::assert_that(removed, "assert failed: removed");
232     }
233   }
234 
235   if (removed) {
236     semaphore_post(queue->enqueue_sem);
237     return data;
238   }
239   return NULL;
240 }
241 
fixed_queue_get_list(fixed_queue_t * queue)242 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
243   log::assert_that(queue != NULL, "assert failed: queue != NULL");
244 
245   // NOTE: Using the list in this way is not thread-safe.
246   // Using this list in any context where threads can call other functions
247   // to the queue can break our assumptions and the queue in general.
248   return queue->list;
249 }
250 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)251 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
252   log::assert_that(queue != NULL, "assert failed: queue != NULL");
253   return semaphore_get_fd(queue->dequeue_sem);
254 }
255 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)256 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
257   log::assert_that(queue != NULL, "assert failed: queue != NULL");
258   return semaphore_get_fd(queue->enqueue_sem);
259 }
260 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)261 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor, fixed_queue_cb ready_cb,
262                                   void* context) {
263   log::assert_that(queue != NULL, "assert failed: queue != NULL");
264   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
265   log::assert_that(ready_cb != NULL, "assert failed: ready_cb != NULL");
266 
267   // Make sure we're not already registered
268   fixed_queue_unregister_dequeue(queue);
269 
270   queue->dequeue_ready = ready_cb;
271   queue->dequeue_context = context;
272   queue->dequeue_object = reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
273                                            internal_dequeue_ready, NULL);
274 }
275 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)276 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
277   log::assert_that(queue != NULL, "assert failed: queue != NULL");
278 
279   if (queue->dequeue_object) {
280     reactor_unregister(queue->dequeue_object);
281     queue->dequeue_object = NULL;
282   }
283 }
284 
internal_dequeue_ready(void * context)285 static void internal_dequeue_ready(void* context) {
286   log::assert_that(context != NULL, "assert failed: context != NULL");
287 
288   fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
289   queue->dequeue_ready(queue, queue->dequeue_context);
290 }
291