1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/queue.h"
18
19 #include <sys/eventfd.h>
20
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29
30 using namespace std::chrono_literals;
31
32 namespace bluetooth {
33 namespace os {
34 namespace {
35
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40
41 class QueueTest : public ::testing::Test {
42 protected:
SetUp()43 void SetUp() override {
44 enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45 enqueue_handler_ = new Handler(enqueue_thread_);
46 dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47 dequeue_handler_ = new Handler(dequeue_thread_);
48 }
TearDown()49 void TearDown() override {
50 enqueue_handler_->Clear();
51 delete enqueue_handler_;
52 delete enqueue_thread_;
53 dequeue_handler_->Clear();
54 delete dequeue_handler_;
55 delete dequeue_thread_;
56 enqueue_handler_ = nullptr;
57 enqueue_thread_ = nullptr;
58 dequeue_handler_ = nullptr;
59 dequeue_thread_ = nullptr;
60 }
61
62 Thread* enqueue_thread_;
63 Handler* enqueue_handler_;
64 Thread* dequeue_thread_;
65 Handler* dequeue_handler_;
66
sync_enqueue_handler()67 void sync_enqueue_handler() {
68 log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr");
69 log::assert_that(enqueue_thread_->GetReactor()->WaitForIdle(2s),
70 "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)");
71 }
72 };
73
74 class TestEnqueueEnd {
75 public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)76 explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
77 : count(0), handler_(handler), queue_(queue), delay_(0) {}
78
~TestEnqueueEnd()79 ~TestEnqueueEnd() {}
80
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)81 void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
82 promise_map_ = promise_map;
83 handler_->Post(
84 common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
85 }
86
UnregisterEnqueue()87 void UnregisterEnqueue() {
88 std::promise<void> promise;
89 auto future = promise.get_future();
90
91 handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue,
92 common::Unretained(this), std::move(promise)));
93 future.wait();
94 }
95
EnqueueCallbackForTest()96 std::unique_ptr<std::string> EnqueueCallbackForTest() {
97 if (delay_ != 0) {
98 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
99 }
100
101 count++;
102 std::unique_ptr<std::string> data = std::move(buffer_.front());
103 buffer_.pop();
104 std::string copy = *data;
105 if (buffer_.empty()) {
106 queue_->UnregisterEnqueue();
107 }
108
109 auto key = buffer_.size();
110 auto node = promise_map_->extract(key);
111 if (node) {
112 node.mapped().set_value(key);
113 }
114
115 return data;
116 }
117
setDelay(int value)118 void setDelay(int value) { delay_ = value; }
119
120 std::queue<std::unique_ptr<std::string>> buffer_;
121 int count;
122
123 private:
124 Handler* handler_;
125 Queue<std::string>* queue_;
126 std::unordered_map<int, std::promise<int>>* promise_map_;
127 int delay_;
128
handle_register_enqueue()129 void handle_register_enqueue() {
130 queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest,
131 common::Unretained(this)));
132 }
133
handle_unregister_enqueue(std::promise<void> promise)134 void handle_unregister_enqueue(std::promise<void> promise) {
135 queue_->UnregisterEnqueue();
136 promise.set_value();
137 }
138 };
139
140 class TestDequeueEnd {
141 public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)142 explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
143 : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
144
~TestDequeueEnd()145 ~TestDequeueEnd() {}
146
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)147 void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
148 promise_map_ = promise_map;
149 handler_->Post(
150 common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
151 }
152
UnregisterDequeue()153 void UnregisterDequeue() {
154 std::promise<void> promise;
155 auto future = promise.get_future();
156
157 handler_->Post(common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue,
158 common::Unretained(this), std::move(promise)));
159 future.wait();
160 }
161
DequeueCallbackForTest()162 void DequeueCallbackForTest() {
163 if (delay_ != 0) {
164 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
165 }
166
167 count++;
168 std::unique_ptr<std::string> data = queue_->TryDequeue();
169 buffer_.push(std::move(data));
170
171 if (buffer_.size() == (size_t)capacity_) {
172 queue_->UnregisterDequeue();
173 }
174
175 auto key = buffer_.size();
176 auto node = promise_map_->extract(key);
177 if (node) {
178 node.mapped().set_value(key);
179 }
180 }
181
setDelay(int value)182 void setDelay(int value) { delay_ = value; }
183
184 std::queue<std::unique_ptr<std::string>> buffer_;
185 int count;
186
187 private:
188 Handler* handler_;
189 Queue<std::string>* queue_;
190 std::unordered_map<int, std::promise<int>>* promise_map_;
191 int capacity_;
192 int delay_;
193
handle_register_dequeue()194 void handle_register_dequeue() {
195 queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest,
196 common::Unretained(this)));
197 }
198
handle_unregister_dequeue(std::promise<void> promise)199 void handle_unregister_dequeue(std::promise<void> promise) {
200 queue_->UnregisterDequeue();
201 promise.set_value();
202 }
203 };
204
205 // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full
206 // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty
207
208 // Test 1 : Queue is empty
209
210 // Enqueue end level : 1
211 // Dequeue end level : 0
212 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)213 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
214 Queue<std::string> queue(kQueueSize);
215 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
216
217 // Push kQueueSize data to enqueue_end buffer
218 for (int i = 0; i < kQueueSize; i++) {
219 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
220 test_enqueue_end.buffer_.push(std::move(data));
221 }
222 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
223
224 // Register enqueue and expect data move to Queue
225 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
226 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
227 std::forward_as_tuple());
228 auto enqueue_future = enqueue_promise_map[0].get_future();
229 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
230 enqueue_future.wait();
231 EXPECT_EQ(enqueue_future.get(), 0);
232 std::this_thread::sleep_for(std::chrono::milliseconds(20));
233 }
234
235 // Enqueue end level : 1
236 // Dequeue end level : 0
237 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)238 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
239 Queue<std::string> queue(kQueueSize);
240 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
241
242 // Register dequeue, DequeueCallback shouldn't be invoked
243 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
244 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
245 std::this_thread::sleep_for(std::chrono::milliseconds(20));
246 EXPECT_EQ(test_dequeue_end.count, 0);
247
248 test_dequeue_end.UnregisterDequeue();
249 }
250
251 // Test 2 : Queue is full
252
253 // Enqueue end level : 0
254 // Dequeue end level : 1
255 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)256 TEST_F(QueueTest, register_enqueue_with_full_queue) {
257 Queue<std::string> queue(kQueueSize);
258 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
259
260 // make Queue full
261 for (int i = 0; i < kQueueSize; i++) {
262 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
263 test_enqueue_end.buffer_.push(std::move(data));
264 }
265 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
266 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
267 std::forward_as_tuple());
268 auto enqueue_future = enqueue_promise_map[0].get_future();
269 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
270 enqueue_future.wait();
271 EXPECT_EQ(enqueue_future.get(), 0);
272
273 // push some data to enqueue_end buffer and register enqueue;
274 for (int i = 0; i < kHalfOfQueueSize; i++) {
275 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
276 test_enqueue_end.buffer_.push(std::move(data));
277 }
278 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
279
280 // EnqueueCallback shouldn't be invoked
281 std::this_thread::sleep_for(std::chrono::milliseconds(20));
282 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
283 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
284
285 test_enqueue_end.UnregisterEnqueue();
286 }
287
288 // Enqueue end level : 0
289 // Dequeue end level : 1
290 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)291 TEST_F(QueueTest, register_dequeue_with_full_queue) {
292 Queue<std::string> queue(kQueueSize);
293 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
294 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
295
296 // make Queue full
297 for (int i = 0; i < kQueueSize; i++) {
298 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
299 test_enqueue_end.buffer_.push(std::move(data));
300 }
301 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
302 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
303 std::forward_as_tuple());
304 auto enqueue_future = enqueue_promise_map[0].get_future();
305 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
306 enqueue_future.wait();
307 EXPECT_EQ(enqueue_future.get(), 0);
308
309 // Register dequeue and expect data move to dequeue end buffer
310 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
311 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
312 std::forward_as_tuple());
313 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
314 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
315 dequeue_future.wait();
316 EXPECT_EQ(dequeue_future.get(), kQueueSize);
317
318 test_dequeue_end.UnregisterDequeue();
319 }
320
321 // Test 3 : Queue is non-empty and non-full
322
323 // Enqueue end level : 1
324 // Dequeue end level : 1
325 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)326 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
327 Queue<std::string> queue(kQueueSize);
328 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
329
330 // make Queue half empty
331 for (int i = 0; i < kHalfOfQueueSize; i++) {
332 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
333 test_enqueue_end.buffer_.push(std::move(data));
334 }
335 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
336 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
337 std::forward_as_tuple());
338 auto enqueue_future = enqueue_promise_map[0].get_future();
339 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
340 enqueue_future.wait();
341 EXPECT_EQ(enqueue_future.get(), 0);
342
343 // push some data to enqueue_end buffer and register enqueue;
344 for (int i = 0; i < kHalfOfQueueSize; i++) {
345 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
346 test_enqueue_end.buffer_.push(std::move(data));
347 }
348
349 // Register enqueue and expect data move to Queue
350 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
351 std::forward_as_tuple());
352 enqueue_future = enqueue_promise_map[0].get_future();
353 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
354 enqueue_future.wait();
355 EXPECT_EQ(enqueue_future.get(), 0);
356 sync_enqueue_handler();
357 }
358
359 // Enqueue end level : 1
360 // Dequeue end level : 1
361 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)362 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
363 Queue<std::string> queue(kQueueSize);
364 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
365 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
366
367 // make Queue half empty
368 for (int i = 0; i < kHalfOfQueueSize; i++) {
369 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
370 test_enqueue_end.buffer_.push(std::move(data));
371 }
372 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
373 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
374 std::forward_as_tuple());
375 auto enqueue_future = enqueue_promise_map[0].get_future();
376 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
377 enqueue_future.wait();
378 EXPECT_EQ(enqueue_future.get(), 0);
379
380 // Register dequeue and expect data move to dequeue end buffer
381 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
382 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
383 std::forward_as_tuple());
384 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
385 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
386 dequeue_future.wait();
387 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
388
389 test_dequeue_end.UnregisterDequeue();
390 }
391
392 // Dynamic level test
393
394 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
395
396 // Enqueue end level : 1 -> 0
397 // Dequeue end level : 1
398 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)399 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
400 Queue<std::string> queue(kQueueSize);
401 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
402
403 // push double of kQueueSize to enqueue end buffer
404 for (int i = 0; i < kDoubleOfQueueSize; i++) {
405 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
406 test_enqueue_end.buffer_.push(std::move(data));
407 }
408
409 // Register enqueue and expect kQueueSize data move to Queue
410 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
411 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
412 std::forward_as_tuple());
413 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
414 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
415 enqueue_future.wait();
416 EXPECT_EQ(enqueue_future.get(), kQueueSize);
417
418 // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
419 std::this_thread::sleep_for(std::chrono::milliseconds(20));
420 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
421 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
422
423 test_enqueue_end.UnregisterEnqueue();
424 }
425
426 // Enqueue end level : 1 -> 0
427 // Dequeue end level : 1
428 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)429 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
430 Queue<std::string> queue(kQueueSize);
431 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
432 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
433
434 // push double of kQueueSize to enqueue end buffer
435 for (int i = 0; i < kDoubleOfQueueSize; i++) {
436 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
437 test_enqueue_end.buffer_.push(std::move(data));
438 }
439
440 // Register dequeue
441 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
442 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
443 std::forward_as_tuple());
444 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
445 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
446
447 // Register enqueue
448 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
449 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
450 std::forward_as_tuple());
451 auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
452 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
453
454 // Dequeue end will unregister when buffer size is kHalfOfQueueSize
455 dequeue_future.wait();
456 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
457
458 // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
459 enqueue_future.wait();
460 EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
461 std::this_thread::sleep_for(std::chrono::milliseconds(20));
462 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
463 EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
464
465 test_enqueue_end.UnregisterEnqueue();
466 }
467
468 // Enqueue end level : 1 -> 0
469 // Dequeue end level : 1
470 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)471 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
472 Queue<std::string> queue(kQueueSize);
473 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
474 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
475
476 // push double of kDoubleOfQueueSize to enqueue end buffer
477 for (int i = 0; i < kDoubleOfQueueSize; i++) {
478 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
479 test_enqueue_end.buffer_.push(std::move(data));
480 }
481
482 // Set 20 ms delay for callback and register dequeue
483 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
484 test_dequeue_end.setDelay(20);
485 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
486 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
487
488 // Register enqueue
489 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
490 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
491 std::forward_as_tuple());
492 auto enqueue_future = enqueue_promise_map[0].get_future();
493 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
494
495 // Wait for enqueue buffer empty and expect queue is full
496 enqueue_future.wait();
497 EXPECT_EQ(enqueue_future.get(), 0);
498 EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
499
500 test_dequeue_end.UnregisterDequeue();
501 }
502
503 // Enqueue end level : 0 -> 1
504 // Dequeue end level : 1 -> 0
505 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)506 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
507 Queue<std::string> queue(kQueueSizeOne);
508 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
509 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
510
511 // push double of kQueueSize to enqueue end buffer
512 for (int i = 0; i < kQueueSize; i++) {
513 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
514 test_enqueue_end.buffer_.push(std::move(data));
515 }
516
517 // Register dequeue
518 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
519 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
520 std::forward_as_tuple());
521 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
522 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
523
524 // Register enqueue
525 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
526 auto enqueue_future = enqueue_promise_map[0].get_future();
527 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
528
529 // Wait for all data move from enqueue end buffer to dequeue end buffer
530 dequeue_future.wait();
531 EXPECT_EQ(dequeue_future.get(), kQueueSize);
532
533 test_dequeue_end.UnregisterDequeue();
534 }
535
536 // Enqueue end level : 1 -> 0
537 // Dequeue end level : 1
538 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)539 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
540 Queue<std::string> queue(kQueueSize);
541 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
542 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
543
544 // make Queue full
545 for (int i = 0; i < kDoubleOfQueueSize; i++) {
546 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
547 test_enqueue_end.buffer_.push(std::move(data));
548 }
549 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
550 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
551 std::forward_as_tuple());
552 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
553 std::forward_as_tuple());
554 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
555 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
556 enqueue_future.wait();
557 EXPECT_EQ(enqueue_future.get(), kQueueSize);
558
559 // Expect kQueueSize data block in enqueue end buffer
560 std::this_thread::sleep_for(std::chrono::milliseconds(20));
561 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
562
563 // Register dequeue
564 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
565 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
566
567 // Expect enqueue end will empty
568 enqueue_future = enqueue_promise_map[0].get_future();
569 enqueue_future.wait();
570 EXPECT_EQ(enqueue_future.get(), 0);
571
572 test_dequeue_end.UnregisterDequeue();
573 }
574
575 // Enqueue end level : 0 -> 1
576 // Dequeue end level : 1 -> 0
577 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)578 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
579 Queue<std::string> queue(kQueueSizeOne);
580 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
581 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
582
583 // push double of kQueueSize to enqueue end buffer
584 for (int i = 0; i < kQueueSize; i++) {
585 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
586 test_enqueue_end.buffer_.push(std::move(data));
587 }
588
589 // Register dequeue
590 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
591 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
592 std::forward_as_tuple());
593 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
594 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
595
596 // Register enqueue
597 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
598 auto enqueue_future = enqueue_promise_map[0].get_future();
599 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
600
601 // Wait for all data move from enqueue end buffer to dequeue end buffer
602 dequeue_future.wait();
603 EXPECT_EQ(dequeue_future.get(), kQueueSize);
604
605 test_dequeue_end.UnregisterDequeue();
606 }
607
608 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
609
610 // Enqueue end level : 1
611 // Dequeue end level : 1 -> 0
612 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)613 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
614 Queue<std::string> queue(kQueueSize);
615 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
616 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
617
618 // make Queue half empty
619 for (int i = 0; i < kHalfOfQueueSize; i++) {
620 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
621 test_enqueue_end.buffer_.push(std::move(data));
622 }
623 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
624 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
625 std::forward_as_tuple());
626 auto enqueue_future = enqueue_promise_map[0].get_future();
627 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
628 enqueue_future.wait();
629 EXPECT_EQ(enqueue_future.get(), 0);
630
631 // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
632 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
633 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
634 std::forward_as_tuple());
635 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
636 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
637 dequeue_future.wait();
638 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
639
640 // Expect DequeueCallback should stop to be invoked
641 std::this_thread::sleep_for(std::chrono::milliseconds(20));
642 EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
643 }
644
645 // Enqueue end level : 1
646 // Dequeue end level : 1 -> 0
647 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)648 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
649 Queue<std::string> queue(kQueueSize);
650 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
651 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
652
653 // make Queue half empty
654 for (int i = 0; i < kHalfOfQueueSize; i++) {
655 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
656 test_enqueue_end.buffer_.push(std::move(data));
657 }
658 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
659 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
660 std::forward_as_tuple());
661 auto enqueue_future = enqueue_promise_map[0].get_future();
662 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
663 enqueue_future.wait();
664 EXPECT_EQ(enqueue_future.get(), 0);
665
666 // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
667 for (int i = 0; i < kHalfOfQueueSize; i++) {
668 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
669 test_enqueue_end.buffer_.push(std::move(data));
670 }
671 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
672
673 // Register dequeue, expect kQueueSize move to dequeue end buffer
674 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
675 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
676 std::forward_as_tuple());
677 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
678 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
679 dequeue_future.wait();
680 EXPECT_EQ(dequeue_future.get(), kQueueSize);
681
682 // Expect DequeueCallback should stop to be invoked
683 std::this_thread::sleep_for(std::chrono::milliseconds(20));
684 EXPECT_EQ(test_dequeue_end.count, kQueueSize);
685 }
686
687 // Enqueue end level : 1
688 // Dequeue end level : 0 -> 1
689 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)690 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
691 Queue<std::string> queue(kQueueSize);
692 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
693 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
694
695 // Register dequeue
696 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
697 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
698 std::forward_as_tuple());
699 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
700
701 // push kQueueSize data to enqueue end buffer and register enqueue
702 for (int i = 0; i < kQueueSize; i++) {
703 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
704 test_enqueue_end.buffer_.push(std::move(data));
705 }
706 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
707 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
708
709 // Expect kQueueSize data move to dequeue end buffer
710 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
711 dequeue_future.wait();
712 EXPECT_EQ(dequeue_future.get(), kQueueSize);
713 }
714
TEST_F(QueueTest,pass_smart_pointer_and_unregister)715 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
716 Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
717
718 // Enqueue a string
719 std::string valid = "Valid String";
720 std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
721 queue->RegisterEnqueue(enqueue_handler_, common::Bind(
722 [](Queue<std::string>* queue,
723 std::shared_ptr<std::string> shared) {
724 queue->UnregisterEnqueue();
725 return std::make_unique<std::string>(*shared);
726 },
727 common::Unretained(queue), shared));
728
729 // Dequeue the string
730 queue->RegisterDequeue(dequeue_handler_,
731 common::Bind(
732 [](Queue<std::string>* queue, std::string valid) {
733 queue->UnregisterDequeue();
734 auto answer = *queue->TryDequeue();
735 ASSERT_EQ(answer, valid);
736 },
737 common::Unretained(queue), valid));
738
739 // Wait for both handlers to finish and delete the Queue
740 std::promise<void> promise;
741 auto future = promise.get_future();
742
743 enqueue_handler_->Post(common::BindOnce(
744 [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
745 dequeue_handler->Post(common::BindOnce(
746 [](Queue<std::string>* queue, std::promise<void>* promise) {
747 delete queue;
748 promise->set_value();
749 },
750 common::Unretained(queue), common::Unretained(promise)));
751 },
752 common::Unretained(dequeue_handler_), common::Unretained(queue),
753 common::Unretained(&promise)));
754 future.wait();
755 }
756
sleep_and_enqueue_callback(int * to_increase)757 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
758 std::this_thread::sleep_for(std::chrono::milliseconds(100));
759 (*to_increase)++;
760 return std::make_unique<std::string>("Hello");
761 }
762
TEST_F(QueueTest,unregister_enqueue_and_wait)763 TEST_F(QueueTest, unregister_enqueue_and_wait) {
764 Queue<std::string> queue(10);
765 int* indicator = new int(100);
766 queue.RegisterEnqueue(enqueue_handler_,
767 common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
768 std::this_thread::sleep_for(std::chrono::milliseconds(50));
769 queue.UnregisterEnqueue();
770 EXPECT_EQ(*indicator, 101);
771 delete indicator;
772 }
773
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)774 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
775 int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
776 std::this_thread::sleep_for(std::chrono::milliseconds(100));
777 (*to_increase)++;
778 if (is_registered->exchange(false)) {
779 queue->UnregisterEnqueue();
780 }
781 return std::make_unique<std::string>("Hello");
782 }
783
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)784 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
785 Queue<std::string> queue(10);
786 int* indicator = new int(100);
787 std::atomic_bool is_registered = true;
788 queue.RegisterEnqueue(
789 enqueue_handler_,
790 common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator),
791 common::Unretained(&queue), common::Unretained(&is_registered)));
792 std::this_thread::sleep_for(std::chrono::milliseconds(50));
793 if (is_registered.exchange(false)) {
794 queue.UnregisterEnqueue();
795 }
796 EXPECT_EQ(*indicator, 101);
797 delete indicator;
798 }
799
sleep_and_dequeue_callback(int * to_increase)800 void sleep_and_dequeue_callback(int* to_increase) {
801 std::this_thread::sleep_for(std::chrono::milliseconds(100));
802 (*to_increase)++;
803 }
804
TEST_F(QueueTest,unregister_dequeue_and_wait)805 TEST_F(QueueTest, unregister_dequeue_and_wait) {
806 int* indicator = new int(100);
807 Queue<std::string> queue(10);
808 queue.RegisterEnqueue(enqueue_handler_, common::Bind(
809 [](Queue<std::string>* queue) {
810 queue->UnregisterEnqueue();
811 return std::make_unique<std::string>("Hello");
812 },
813 common::Unretained(&queue)));
814 queue.RegisterDequeue(enqueue_handler_,
815 common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
816 std::this_thread::sleep_for(std::chrono::milliseconds(50));
817 queue.UnregisterDequeue();
818 EXPECT_EQ(*indicator, 101);
819 delete indicator;
820 }
821
822 // Create all threads for death tests in the function that dies
823 class QueueDeathTest : public ::testing::Test {
824 public:
RegisterEnqueueAndDelete()825 void RegisterEnqueueAndDelete() {
826 Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
827 Handler* enqueue_handler = new Handler(enqueue_thread);
828 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
829 queue->RegisterEnqueue(enqueue_handler, common::Bind([]() {
830 return std::make_unique<std::string>("A string to fill the queue");
831 }));
832 delete queue;
833 }
834
RegisterDequeueAndDelete()835 void RegisterDequeueAndDelete() {
836 Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
837 Handler* dequeue_handler = new Handler(dequeue_thread);
838 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
839 queue->RegisterDequeue(dequeue_handler,
840 common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); },
841 common::Unretained(queue)));
842 delete queue;
843 }
844 };
845
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)846 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
847 EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
848 }
849
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)850 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
851 EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
852 }
853
854 class MockIQueueEnqueue : public IQueueEnqueue<int> {
855 public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)856 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
857 EXPECT_FALSE(registered_);
858 registered_ = true;
859 handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue,
860 common::Unretained(this), callback));
861 }
862
handle_register_enqueue(EnqueueCallback callback)863 void handle_register_enqueue(EnqueueCallback callback) {
864 if (dont_handle_register_enqueue_) {
865 return;
866 }
867 while (registered_) {
868 std::unique_ptr<int> front = callback.Run();
869 queue_.push(*front);
870 }
871 }
872
UnregisterEnqueue()873 void UnregisterEnqueue() override {
874 EXPECT_TRUE(registered_);
875 registered_ = false;
876 }
877
878 bool dont_handle_register_enqueue_ = false;
879 bool registered_ = false;
880 std::queue<int> queue_;
881 };
882
883 class EnqueueBufferTest : public ::testing::Test {
884 protected:
SetUp()885 void SetUp() override {
886 thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
887 handler_ = new Handler(thread_);
888 }
889
TearDown()890 void TearDown() override {
891 handler_->Clear();
892 delete handler_;
893 delete thread_;
894 }
895
SynchronizeHandler()896 void SynchronizeHandler() {
897 std::promise<void> promise;
898 auto future = promise.get_future();
899 handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); },
900 std::move(promise)));
901 future.wait();
902 }
903
904 MockIQueueEnqueue enqueue_;
905 EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
906 Thread* thread_;
907 Handler* handler_;
908 };
909
TEST_F(EnqueueBufferTest,enqueue)910 TEST_F(EnqueueBufferTest, enqueue) {
911 int num_items = 10;
912 for (int i = 0; i < num_items; i++) {
913 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
914 }
915 SynchronizeHandler();
916 for (int i = 0; i < num_items; i++) {
917 ASSERT_EQ(enqueue_.queue_.front(), i);
918 enqueue_.queue_.pop();
919 }
920 ASSERT_FALSE(enqueue_.registered_);
921 }
922
TEST_F(EnqueueBufferTest,clear)923 TEST_F(EnqueueBufferTest, clear) {
924 enqueue_.dont_handle_register_enqueue_ = true;
925 int num_items = 10;
926 for (int i = 0; i < num_items; i++) {
927 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
928 }
929 ASSERT_TRUE(enqueue_.registered_);
930 enqueue_buffer_.Clear();
931 ASSERT_FALSE(enqueue_.registered_);
932 }
933
TEST_F(EnqueueBufferTest,delete_when_in_callback)934 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
935 Queue<int>* queue = new Queue<int>(kQueueSize);
936 EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
937 int num_items = 10;
938 for (int i = 0; i < num_items; i++) {
939 enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
940 }
941
942 delete enqueue_buffer;
943 delete queue;
944 }
945
946 } // namespace
947 } // namespace os
948 } // namespace bluetooth
949