1 /******************************************************************************
2 *
3 * Copyright 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include "osi/include/fixed_queue.h"
20
21 #include <bluetooth/log.h>
22 #include <string.h>
23
24 #include <mutex>
25
26 #include "osi/include/allocator.h"
27 #include "osi/include/list.h"
28 #include "osi/include/osi.h"
29 #include "osi/include/reactor.h"
30 #include "osi/semaphore.h"
31
32 using namespace bluetooth;
33
34 typedef struct fixed_queue_t {
35 list_t* list;
36 semaphore_t* enqueue_sem;
37 semaphore_t* dequeue_sem;
38 std::mutex* mutex;
39 size_t capacity;
40
41 reactor_object_t* dequeue_object;
42 fixed_queue_cb dequeue_ready;
43 void* dequeue_context;
44 } fixed_queue_t;
45
46 static void internal_dequeue_ready(void* context);
47
fixed_queue_new(size_t capacity)48 fixed_queue_t* fixed_queue_new(size_t capacity) {
49 fixed_queue_t* ret = static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
50
51 ret->mutex = new std::mutex;
52 ret->capacity = capacity;
53
54 ret->list = list_new(NULL);
55 if (!ret->list) {
56 goto error;
57 }
58
59 ret->enqueue_sem = semaphore_new(capacity);
60 if (!ret->enqueue_sem) {
61 goto error;
62 }
63
64 ret->dequeue_sem = semaphore_new(0);
65 if (!ret->dequeue_sem) {
66 goto error;
67 }
68
69 return ret;
70
71 error:
72 fixed_queue_free(ret, NULL);
73 return NULL;
74 }
75
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)76 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
77 if (!queue) {
78 return;
79 }
80
81 fixed_queue_unregister_dequeue(queue);
82
83 if (free_cb) {
84 for (const list_node_t* node = list_begin(queue->list); node != list_end(queue->list);
85 node = list_next(node)) {
86 free_cb(list_node(node));
87 }
88 }
89
90 list_free(queue->list);
91 semaphore_free(queue->enqueue_sem);
92 semaphore_free(queue->dequeue_sem);
93 delete queue->mutex;
94 osi_free(queue);
95 }
96
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)97 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
98 if (!queue) {
99 return;
100 }
101
102 while (!fixed_queue_is_empty(queue)) {
103 void* data = fixed_queue_try_dequeue(queue);
104 if (free_cb != NULL) {
105 free_cb(data);
106 }
107 }
108 }
109
fixed_queue_is_empty(fixed_queue_t * queue)110 bool fixed_queue_is_empty(fixed_queue_t* queue) {
111 if (queue == NULL) {
112 return true;
113 }
114
115 std::lock_guard<std::mutex> lock(*queue->mutex);
116 return list_is_empty(queue->list);
117 }
118
fixed_queue_length(fixed_queue_t * queue)119 size_t fixed_queue_length(fixed_queue_t* queue) {
120 if (queue == NULL) {
121 return 0;
122 }
123
124 std::lock_guard<std::mutex> lock(*queue->mutex);
125 return list_length(queue->list);
126 }
127
fixed_queue_capacity(fixed_queue_t * queue)128 size_t fixed_queue_capacity(fixed_queue_t* queue) {
129 log::assert_that(queue != NULL, "assert failed: queue != NULL");
130
131 return queue->capacity;
132 }
133
fixed_queue_enqueue(fixed_queue_t * queue,void * data)134 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
135 log::assert_that(queue != NULL, "assert failed: queue != NULL");
136 log::assert_that(data != NULL, "assert failed: data != NULL");
137
138 semaphore_wait(queue->enqueue_sem);
139
140 {
141 std::lock_guard<std::mutex> lock(*queue->mutex);
142 list_append(queue->list, data);
143 }
144
145 semaphore_post(queue->dequeue_sem);
146 }
147
fixed_queue_dequeue(fixed_queue_t * queue)148 void* fixed_queue_dequeue(fixed_queue_t* queue) {
149 log::assert_that(queue != NULL, "assert failed: queue != NULL");
150
151 semaphore_wait(queue->dequeue_sem);
152
153 void* ret = NULL;
154 {
155 std::lock_guard<std::mutex> lock(*queue->mutex);
156 ret = list_front(queue->list);
157 list_remove(queue->list, ret);
158 }
159
160 semaphore_post(queue->enqueue_sem);
161
162 return ret;
163 }
164
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)165 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
166 log::assert_that(queue != NULL, "assert failed: queue != NULL");
167 log::assert_that(data != NULL, "assert failed: data != NULL");
168
169 if (!semaphore_try_wait(queue->enqueue_sem)) {
170 return false;
171 }
172
173 {
174 std::lock_guard<std::mutex> lock(*queue->mutex);
175 list_append(queue->list, data);
176 }
177
178 semaphore_post(queue->dequeue_sem);
179 return true;
180 }
181
fixed_queue_try_dequeue(fixed_queue_t * queue)182 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
183 if (queue == NULL) {
184 return NULL;
185 }
186
187 if (!semaphore_try_wait(queue->dequeue_sem)) {
188 return NULL;
189 }
190
191 void* ret = NULL;
192 {
193 std::lock_guard<std::mutex> lock(*queue->mutex);
194 ret = list_front(queue->list);
195 list_remove(queue->list, ret);
196 }
197
198 semaphore_post(queue->enqueue_sem);
199
200 return ret;
201 }
202
fixed_queue_try_peek_first(fixed_queue_t * queue)203 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
204 if (queue == NULL) {
205 return NULL;
206 }
207
208 std::lock_guard<std::mutex> lock(*queue->mutex);
209 return list_is_empty(queue->list) ? NULL : list_front(queue->list);
210 }
211
fixed_queue_try_peek_last(fixed_queue_t * queue)212 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
213 if (queue == NULL) {
214 return NULL;
215 }
216
217 std::lock_guard<std::mutex> lock(*queue->mutex);
218 return list_is_empty(queue->list) ? NULL : list_back(queue->list);
219 }
220
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)221 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
222 if (queue == NULL) {
223 return NULL;
224 }
225
226 bool removed = false;
227 {
228 std::lock_guard<std::mutex> lock(*queue->mutex);
229 if (list_contains(queue->list, data) && semaphore_try_wait(queue->dequeue_sem)) {
230 removed = list_remove(queue->list, data);
231 log::assert_that(removed, "assert failed: removed");
232 }
233 }
234
235 if (removed) {
236 semaphore_post(queue->enqueue_sem);
237 return data;
238 }
239 return NULL;
240 }
241
fixed_queue_get_list(fixed_queue_t * queue)242 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
243 log::assert_that(queue != NULL, "assert failed: queue != NULL");
244
245 // NOTE: Using the list in this way is not thread-safe.
246 // Using this list in any context where threads can call other functions
247 // to the queue can break our assumptions and the queue in general.
248 return queue->list;
249 }
250
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)251 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
252 log::assert_that(queue != NULL, "assert failed: queue != NULL");
253 return semaphore_get_fd(queue->dequeue_sem);
254 }
255
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)256 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
257 log::assert_that(queue != NULL, "assert failed: queue != NULL");
258 return semaphore_get_fd(queue->enqueue_sem);
259 }
260
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)261 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor, fixed_queue_cb ready_cb,
262 void* context) {
263 log::assert_that(queue != NULL, "assert failed: queue != NULL");
264 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
265 log::assert_that(ready_cb != NULL, "assert failed: ready_cb != NULL");
266
267 // Make sure we're not already registered
268 fixed_queue_unregister_dequeue(queue);
269
270 queue->dequeue_ready = ready_cb;
271 queue->dequeue_context = context;
272 queue->dequeue_object = reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
273 internal_dequeue_ready, NULL);
274 }
275
fixed_queue_unregister_dequeue(fixed_queue_t * queue)276 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
277 log::assert_that(queue != NULL, "assert failed: queue != NULL");
278
279 if (queue->dequeue_object) {
280 reactor_unregister(queue->dequeue_object);
281 queue->dequeue_object = NULL;
282 }
283 }
284
internal_dequeue_ready(void * context)285 static void internal_dequeue_ready(void* context) {
286 log::assert_that(context != NULL, "assert failed: context != NULL");
287
288 fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
289 queue->dequeue_ready(queue, queue->dequeue_context);
290 }
291