1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/queue.h"
18 
19 #include <sys/eventfd.h>
20 
21 #include <atomic>
22 #include <chrono>
23 #include <future>
24 #include <unordered_map>
25 
26 #include "common/bind.h"
27 #include "gtest/gtest.h"
28 #include "os/reactor.h"
29 
30 using namespace std::chrono_literals;
31 
32 namespace bluetooth {
33 namespace os {
34 namespace {
35 
36 constexpr int kQueueSize = 10;
37 constexpr int kHalfOfQueueSize = kQueueSize / 2;
38 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
39 constexpr int kQueueSizeOne = 1;
40 
41 class QueueTest : public ::testing::Test {
42 protected:
SetUp()43   void SetUp() override {
44     enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
45     enqueue_handler_ = new Handler(enqueue_thread_);
46     dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
47     dequeue_handler_ = new Handler(dequeue_thread_);
48   }
TearDown()49   void TearDown() override {
50     enqueue_handler_->Clear();
51     delete enqueue_handler_;
52     delete enqueue_thread_;
53     dequeue_handler_->Clear();
54     delete dequeue_handler_;
55     delete dequeue_thread_;
56     enqueue_handler_ = nullptr;
57     enqueue_thread_ = nullptr;
58     dequeue_handler_ = nullptr;
59     dequeue_thread_ = nullptr;
60   }
61 
62   Thread* enqueue_thread_;
63   Handler* enqueue_handler_;
64   Thread* dequeue_thread_;
65   Handler* dequeue_handler_;
66 
sync_enqueue_handler()67   void sync_enqueue_handler() {
68     log::assert_that(enqueue_thread_ != nullptr, "assert failed: enqueue_thread_ != nullptr");
69     log::assert_that(enqueue_thread_->GetReactor()->WaitForIdle(2s),
70                      "assert failed: enqueue_thread_->GetReactor()->WaitForIdle(2s)");
71   }
72 };
73 
74 class TestEnqueueEnd {
75 public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)76   explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
77       : count(0), handler_(handler), queue_(queue), delay_(0) {}
78 
~TestEnqueueEnd()79   ~TestEnqueueEnd() {}
80 
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)81   void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
82     promise_map_ = promise_map;
83     handler_->Post(
84             common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
85   }
86 
UnregisterEnqueue()87   void UnregisterEnqueue() {
88     std::promise<void> promise;
89     auto future = promise.get_future();
90 
91     handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue,
92                                     common::Unretained(this), std::move(promise)));
93     future.wait();
94   }
95 
EnqueueCallbackForTest()96   std::unique_ptr<std::string> EnqueueCallbackForTest() {
97     if (delay_ != 0) {
98       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
99     }
100 
101     count++;
102     std::unique_ptr<std::string> data = std::move(buffer_.front());
103     buffer_.pop();
104     std::string copy = *data;
105     if (buffer_.empty()) {
106       queue_->UnregisterEnqueue();
107     }
108 
109     auto key = buffer_.size();
110     auto node = promise_map_->extract(key);
111     if (node) {
112       node.mapped().set_value(key);
113     }
114 
115     return data;
116   }
117 
setDelay(int value)118   void setDelay(int value) { delay_ = value; }
119 
120   std::queue<std::unique_ptr<std::string>> buffer_;
121   int count;
122 
123 private:
124   Handler* handler_;
125   Queue<std::string>* queue_;
126   std::unordered_map<int, std::promise<int>>* promise_map_;
127   int delay_;
128 
handle_register_enqueue()129   void handle_register_enqueue() {
130     queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest,
131                                                    common::Unretained(this)));
132   }
133 
handle_unregister_enqueue(std::promise<void> promise)134   void handle_unregister_enqueue(std::promise<void> promise) {
135     queue_->UnregisterEnqueue();
136     promise.set_value();
137   }
138 };
139 
140 class TestDequeueEnd {
141 public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)142   explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
143       : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
144 
~TestDequeueEnd()145   ~TestDequeueEnd() {}
146 
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)147   void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
148     promise_map_ = promise_map;
149     handler_->Post(
150             common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
151   }
152 
UnregisterDequeue()153   void UnregisterDequeue() {
154     std::promise<void> promise;
155     auto future = promise.get_future();
156 
157     handler_->Post(common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue,
158                                     common::Unretained(this), std::move(promise)));
159     future.wait();
160   }
161 
DequeueCallbackForTest()162   void DequeueCallbackForTest() {
163     if (delay_ != 0) {
164       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
165     }
166 
167     count++;
168     std::unique_ptr<std::string> data = queue_->TryDequeue();
169     buffer_.push(std::move(data));
170 
171     if (buffer_.size() == (size_t)capacity_) {
172       queue_->UnregisterDequeue();
173     }
174 
175     auto key = buffer_.size();
176     auto node = promise_map_->extract(key);
177     if (node) {
178       node.mapped().set_value(key);
179     }
180   }
181 
setDelay(int value)182   void setDelay(int value) { delay_ = value; }
183 
184   std::queue<std::unique_ptr<std::string>> buffer_;
185   int count;
186 
187 private:
188   Handler* handler_;
189   Queue<std::string>* queue_;
190   std::unordered_map<int, std::promise<int>>* promise_map_;
191   int capacity_;
192   int delay_;
193 
handle_register_dequeue()194   void handle_register_dequeue() {
195     queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest,
196                                                    common::Unretained(this)));
197   }
198 
handle_unregister_dequeue(std::promise<void> promise)199   void handle_unregister_dequeue(std::promise<void> promise) {
200     queue_->UnregisterDequeue();
201     promise.set_value();
202   }
203 };
204 
205 // Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
206 // Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty
207 
208 // Test 1 : Queue is empty
209 
210 // Enqueue end level : 1
211 // Dequeue end level : 0
212 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)213 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
214   Queue<std::string> queue(kQueueSize);
215   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
216 
217   // Push kQueueSize data to enqueue_end buffer
218   for (int i = 0; i < kQueueSize; i++) {
219     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
220     test_enqueue_end.buffer_.push(std::move(data));
221   }
222   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
223 
224   // Register enqueue and expect data move to Queue
225   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
226   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
227                               std::forward_as_tuple());
228   auto enqueue_future = enqueue_promise_map[0].get_future();
229   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
230   enqueue_future.wait();
231   EXPECT_EQ(enqueue_future.get(), 0);
232   std::this_thread::sleep_for(std::chrono::milliseconds(20));
233 }
234 
235 // Enqueue end level : 1
236 // Dequeue end level : 0
237 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)238 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
239   Queue<std::string> queue(kQueueSize);
240   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
241 
242   // Register dequeue, DequeueCallback shouldn't be invoked
243   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
244   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
245   std::this_thread::sleep_for(std::chrono::milliseconds(20));
246   EXPECT_EQ(test_dequeue_end.count, 0);
247 
248   test_dequeue_end.UnregisterDequeue();
249 }
250 
251 // Test 2 : Queue is full
252 
253 // Enqueue end level : 0
254 // Dequeue end level : 1
255 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)256 TEST_F(QueueTest, register_enqueue_with_full_queue) {
257   Queue<std::string> queue(kQueueSize);
258   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
259 
260   // make Queue full
261   for (int i = 0; i < kQueueSize; i++) {
262     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
263     test_enqueue_end.buffer_.push(std::move(data));
264   }
265   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
266   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
267                               std::forward_as_tuple());
268   auto enqueue_future = enqueue_promise_map[0].get_future();
269   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
270   enqueue_future.wait();
271   EXPECT_EQ(enqueue_future.get(), 0);
272 
273   // push some data to enqueue_end buffer and register enqueue;
274   for (int i = 0; i < kHalfOfQueueSize; i++) {
275     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
276     test_enqueue_end.buffer_.push(std::move(data));
277   }
278   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
279 
280   // EnqueueCallback shouldn't be invoked
281   std::this_thread::sleep_for(std::chrono::milliseconds(20));
282   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
283   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
284 
285   test_enqueue_end.UnregisterEnqueue();
286 }
287 
288 // Enqueue end level : 0
289 // Dequeue end level : 1
290 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)291 TEST_F(QueueTest, register_dequeue_with_full_queue) {
292   Queue<std::string> queue(kQueueSize);
293   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
294   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
295 
296   // make Queue full
297   for (int i = 0; i < kQueueSize; i++) {
298     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
299     test_enqueue_end.buffer_.push(std::move(data));
300   }
301   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
302   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
303                               std::forward_as_tuple());
304   auto enqueue_future = enqueue_promise_map[0].get_future();
305   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
306   enqueue_future.wait();
307   EXPECT_EQ(enqueue_future.get(), 0);
308 
309   // Register dequeue and expect data move to dequeue end buffer
310   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
311   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
312                               std::forward_as_tuple());
313   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
314   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
315   dequeue_future.wait();
316   EXPECT_EQ(dequeue_future.get(), kQueueSize);
317 
318   test_dequeue_end.UnregisterDequeue();
319 }
320 
321 // Test 3 : Queue is non-empty and non-full
322 
323 // Enqueue end level : 1
324 // Dequeue end level : 1
325 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)326 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
327   Queue<std::string> queue(kQueueSize);
328   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
329 
330   // make Queue half empty
331   for (int i = 0; i < kHalfOfQueueSize; i++) {
332     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
333     test_enqueue_end.buffer_.push(std::move(data));
334   }
335   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
336   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
337                               std::forward_as_tuple());
338   auto enqueue_future = enqueue_promise_map[0].get_future();
339   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
340   enqueue_future.wait();
341   EXPECT_EQ(enqueue_future.get(), 0);
342 
343   // push some data to enqueue_end buffer and register enqueue;
344   for (int i = 0; i < kHalfOfQueueSize; i++) {
345     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
346     test_enqueue_end.buffer_.push(std::move(data));
347   }
348 
349   // Register enqueue and expect data move to Queue
350   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
351                               std::forward_as_tuple());
352   enqueue_future = enqueue_promise_map[0].get_future();
353   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
354   enqueue_future.wait();
355   EXPECT_EQ(enqueue_future.get(), 0);
356   sync_enqueue_handler();
357 }
358 
359 // Enqueue end level : 1
360 // Dequeue end level : 1
361 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)362 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
363   Queue<std::string> queue(kQueueSize);
364   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
365   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
366 
367   // make Queue half empty
368   for (int i = 0; i < kHalfOfQueueSize; i++) {
369     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
370     test_enqueue_end.buffer_.push(std::move(data));
371   }
372   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
373   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
374                               std::forward_as_tuple());
375   auto enqueue_future = enqueue_promise_map[0].get_future();
376   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
377   enqueue_future.wait();
378   EXPECT_EQ(enqueue_future.get(), 0);
379 
380   // Register dequeue and expect data move to dequeue end buffer
381   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
382   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
383                               std::forward_as_tuple());
384   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
385   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
386   dequeue_future.wait();
387   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
388 
389   test_dequeue_end.UnregisterDequeue();
390 }
391 
392 // Dynamic level test
393 
394 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
395 
396 // Enqueue end level : 1 -> 0
397 // Dequeue end level : 1
398 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)399 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
400   Queue<std::string> queue(kQueueSize);
401   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
402 
403   // push double of kQueueSize to enqueue end buffer
404   for (int i = 0; i < kDoubleOfQueueSize; i++) {
405     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
406     test_enqueue_end.buffer_.push(std::move(data));
407   }
408 
409   // Register enqueue and expect kQueueSize data move to Queue
410   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
411   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
412                               std::forward_as_tuple());
413   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
414   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
415   enqueue_future.wait();
416   EXPECT_EQ(enqueue_future.get(), kQueueSize);
417 
418   // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
419   std::this_thread::sleep_for(std::chrono::milliseconds(20));
420   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
421   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
422 
423   test_enqueue_end.UnregisterEnqueue();
424 }
425 
426 // Enqueue end level : 1 -> 0
427 // Dequeue end level : 1
428 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)429 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
430   Queue<std::string> queue(kQueueSize);
431   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
432   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
433 
434   // push double of kQueueSize to enqueue end buffer
435   for (int i = 0; i < kDoubleOfQueueSize; i++) {
436     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
437     test_enqueue_end.buffer_.push(std::move(data));
438   }
439 
440   // Register dequeue
441   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
442   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
443                               std::forward_as_tuple());
444   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
445   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
446 
447   // Register enqueue
448   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
449   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
450                               std::forward_as_tuple());
451   auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
452   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
453 
454   // Dequeue end will unregister when buffer size is kHalfOfQueueSize
455   dequeue_future.wait();
456   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
457 
458   // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
459   enqueue_future.wait();
460   EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
461   std::this_thread::sleep_for(std::chrono::milliseconds(20));
462   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
463   EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
464 
465   test_enqueue_end.UnregisterEnqueue();
466 }
467 
468 // Enqueue end level : 1 -> 0
469 // Dequeue end level : 1
470 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)471 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
472   Queue<std::string> queue(kQueueSize);
473   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
474   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
475 
476   // push double of kDoubleOfQueueSize to enqueue end buffer
477   for (int i = 0; i < kDoubleOfQueueSize; i++) {
478     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
479     test_enqueue_end.buffer_.push(std::move(data));
480   }
481 
482   // Set 20 ms delay for callback and register dequeue
483   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
484   test_dequeue_end.setDelay(20);
485   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
486   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
487 
488   // Register enqueue
489   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
490   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
491                               std::forward_as_tuple());
492   auto enqueue_future = enqueue_promise_map[0].get_future();
493   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
494 
495   // Wait for enqueue buffer empty and expect queue is full
496   enqueue_future.wait();
497   EXPECT_EQ(enqueue_future.get(), 0);
498   EXPECT_GE(test_dequeue_end.buffer_.size(), (size_t)(kQueueSize - 1));
499 
500   test_dequeue_end.UnregisterDequeue();
501 }
502 
503 // Enqueue end level : 0 -> 1
504 // Dequeue end level : 1 -> 0
505 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)506 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
507   Queue<std::string> queue(kQueueSizeOne);
508   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
509   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
510 
511   // push double of kQueueSize to enqueue end buffer
512   for (int i = 0; i < kQueueSize; i++) {
513     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
514     test_enqueue_end.buffer_.push(std::move(data));
515   }
516 
517   // Register dequeue
518   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
519   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
520                               std::forward_as_tuple());
521   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
522   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
523 
524   // Register enqueue
525   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
526   auto enqueue_future = enqueue_promise_map[0].get_future();
527   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
528 
529   // Wait for all data move from enqueue end buffer to dequeue end buffer
530   dequeue_future.wait();
531   EXPECT_EQ(dequeue_future.get(), kQueueSize);
532 
533   test_dequeue_end.UnregisterDequeue();
534 }
535 
536 // Enqueue end level : 1 -> 0
537 // Dequeue end level : 1
538 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)539 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
540   Queue<std::string> queue(kQueueSize);
541   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
542   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
543 
544   // make Queue full
545   for (int i = 0; i < kDoubleOfQueueSize; i++) {
546     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
547     test_enqueue_end.buffer_.push(std::move(data));
548   }
549   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
550   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
551                               std::forward_as_tuple());
552   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
553                               std::forward_as_tuple());
554   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
555   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
556   enqueue_future.wait();
557   EXPECT_EQ(enqueue_future.get(), kQueueSize);
558 
559   // Expect kQueueSize data block in enqueue end buffer
560   std::this_thread::sleep_for(std::chrono::milliseconds(20));
561   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
562 
563   // Register dequeue
564   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
565   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
566 
567   // Expect enqueue end will empty
568   enqueue_future = enqueue_promise_map[0].get_future();
569   enqueue_future.wait();
570   EXPECT_EQ(enqueue_future.get(), 0);
571 
572   test_dequeue_end.UnregisterDequeue();
573 }
574 
575 // Enqueue end level : 0 -> 1
576 // Dequeue end level : 1 -> 0
577 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)578 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
579   Queue<std::string> queue(kQueueSizeOne);
580   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
581   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
582 
583   // push double of kQueueSize to enqueue end buffer
584   for (int i = 0; i < kQueueSize; i++) {
585     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
586     test_enqueue_end.buffer_.push(std::move(data));
587   }
588 
589   // Register dequeue
590   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
591   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
592                               std::forward_as_tuple());
593   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
594   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
595 
596   // Register enqueue
597   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
598   auto enqueue_future = enqueue_promise_map[0].get_future();
599   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
600 
601   // Wait for all data move from enqueue end buffer to dequeue end buffer
602   dequeue_future.wait();
603   EXPECT_EQ(dequeue_future.get(), kQueueSize);
604 
605   test_dequeue_end.UnregisterDequeue();
606 }
607 
608 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
609 
610 // Enqueue end level : 1
611 // Dequeue end level : 1 -> 0
612 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)613 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
614   Queue<std::string> queue(kQueueSize);
615   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
616   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
617 
618   // make Queue half empty
619   for (int i = 0; i < kHalfOfQueueSize; i++) {
620     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
621     test_enqueue_end.buffer_.push(std::move(data));
622   }
623   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
624   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
625                               std::forward_as_tuple());
626   auto enqueue_future = enqueue_promise_map[0].get_future();
627   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
628   enqueue_future.wait();
629   EXPECT_EQ(enqueue_future.get(), 0);
630 
631   // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
632   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
633   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize),
634                               std::forward_as_tuple());
635   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
636   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
637   dequeue_future.wait();
638   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
639 
640   // Expect DequeueCallback should stop to be invoked
641   std::this_thread::sleep_for(std::chrono::milliseconds(20));
642   EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
643 }
644 
645 // Enqueue end level : 1
646 // Dequeue end level : 1 -> 0
647 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)648 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
649   Queue<std::string> queue(kQueueSize);
650   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
651   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
652 
653   // make Queue half empty
654   for (int i = 0; i < kHalfOfQueueSize; i++) {
655     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
656     test_enqueue_end.buffer_.push(std::move(data));
657   }
658   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
659   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0),
660                               std::forward_as_tuple());
661   auto enqueue_future = enqueue_promise_map[0].get_future();
662   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
663   enqueue_future.wait();
664   EXPECT_EQ(enqueue_future.get(), 0);
665 
666   // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
667   for (int i = 0; i < kHalfOfQueueSize; i++) {
668     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
669     test_enqueue_end.buffer_.push(std::move(data));
670   }
671   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
672 
673   // Register dequeue, expect kQueueSize move to dequeue end buffer
674   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
675   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
676                               std::forward_as_tuple());
677   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
678   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
679   dequeue_future.wait();
680   EXPECT_EQ(dequeue_future.get(), kQueueSize);
681 
682   // Expect DequeueCallback should stop to be invoked
683   std::this_thread::sleep_for(std::chrono::milliseconds(20));
684   EXPECT_EQ(test_dequeue_end.count, kQueueSize);
685 }
686 
687 // Enqueue end level : 1
688 // Dequeue end level : 0 -> 1
689 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)690 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
691   Queue<std::string> queue(kQueueSize);
692   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
693   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
694 
695   // Register dequeue
696   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
697   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize),
698                               std::forward_as_tuple());
699   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
700 
701   // push kQueueSize data to enqueue end buffer and register enqueue
702   for (int i = 0; i < kQueueSize; i++) {
703     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
704     test_enqueue_end.buffer_.push(std::move(data));
705   }
706   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
707   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
708 
709   // Expect kQueueSize data move to dequeue end buffer
710   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
711   dequeue_future.wait();
712   EXPECT_EQ(dequeue_future.get(), kQueueSize);
713 }
714 
TEST_F(QueueTest,pass_smart_pointer_and_unregister)715 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
716   Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
717 
718   // Enqueue a string
719   std::string valid = "Valid String";
720   std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
721   queue->RegisterEnqueue(enqueue_handler_, common::Bind(
722                                                    [](Queue<std::string>* queue,
723                                                       std::shared_ptr<std::string> shared) {
724                                                      queue->UnregisterEnqueue();
725                                                      return std::make_unique<std::string>(*shared);
726                                                    },
727                                                    common::Unretained(queue), shared));
728 
729   // Dequeue the string
730   queue->RegisterDequeue(dequeue_handler_,
731                          common::Bind(
732                                  [](Queue<std::string>* queue, std::string valid) {
733                                    queue->UnregisterDequeue();
734                                    auto answer = *queue->TryDequeue();
735                                    ASSERT_EQ(answer, valid);
736                                  },
737                                  common::Unretained(queue), valid));
738 
739   // Wait for both handlers to finish and delete the Queue
740   std::promise<void> promise;
741   auto future = promise.get_future();
742 
743   enqueue_handler_->Post(common::BindOnce(
744           [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
745             dequeue_handler->Post(common::BindOnce(
746                     [](Queue<std::string>* queue, std::promise<void>* promise) {
747                       delete queue;
748                       promise->set_value();
749                     },
750                     common::Unretained(queue), common::Unretained(promise)));
751           },
752           common::Unretained(dequeue_handler_), common::Unretained(queue),
753           common::Unretained(&promise)));
754   future.wait();
755 }
756 
sleep_and_enqueue_callback(int * to_increase)757 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
758   std::this_thread::sleep_for(std::chrono::milliseconds(100));
759   (*to_increase)++;
760   return std::make_unique<std::string>("Hello");
761 }
762 
TEST_F(QueueTest,unregister_enqueue_and_wait)763 TEST_F(QueueTest, unregister_enqueue_and_wait) {
764   Queue<std::string> queue(10);
765   int* indicator = new int(100);
766   queue.RegisterEnqueue(enqueue_handler_,
767                         common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
768   std::this_thread::sleep_for(std::chrono::milliseconds(50));
769   queue.UnregisterEnqueue();
770   EXPECT_EQ(*indicator, 101);
771   delete indicator;
772 }
773 
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)774 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
775         int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
776   std::this_thread::sleep_for(std::chrono::milliseconds(100));
777   (*to_increase)++;
778   if (is_registered->exchange(false)) {
779     queue->UnregisterEnqueue();
780   }
781   return std::make_unique<std::string>("Hello");
782 }
783 
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)784 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
785   Queue<std::string> queue(10);
786   int* indicator = new int(100);
787   std::atomic_bool is_registered = true;
788   queue.RegisterEnqueue(
789           enqueue_handler_,
790           common::Bind(&sleep_and_enqueue_callback_and_unregister, common::Unretained(indicator),
791                        common::Unretained(&queue), common::Unretained(&is_registered)));
792   std::this_thread::sleep_for(std::chrono::milliseconds(50));
793   if (is_registered.exchange(false)) {
794     queue.UnregisterEnqueue();
795   }
796   EXPECT_EQ(*indicator, 101);
797   delete indicator;
798 }
799 
sleep_and_dequeue_callback(int * to_increase)800 void sleep_and_dequeue_callback(int* to_increase) {
801   std::this_thread::sleep_for(std::chrono::milliseconds(100));
802   (*to_increase)++;
803 }
804 
TEST_F(QueueTest,unregister_dequeue_and_wait)805 TEST_F(QueueTest, unregister_dequeue_and_wait) {
806   int* indicator = new int(100);
807   Queue<std::string> queue(10);
808   queue.RegisterEnqueue(enqueue_handler_, common::Bind(
809                                                   [](Queue<std::string>* queue) {
810                                                     queue->UnregisterEnqueue();
811                                                     return std::make_unique<std::string>("Hello");
812                                                   },
813                                                   common::Unretained(&queue)));
814   queue.RegisterDequeue(enqueue_handler_,
815                         common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
816   std::this_thread::sleep_for(std::chrono::milliseconds(50));
817   queue.UnregisterDequeue();
818   EXPECT_EQ(*indicator, 101);
819   delete indicator;
820 }
821 
822 // Create all threads for death tests in the function that dies
823 class QueueDeathTest : public ::testing::Test {
824 public:
RegisterEnqueueAndDelete()825   void RegisterEnqueueAndDelete() {
826     Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
827     Handler* enqueue_handler = new Handler(enqueue_thread);
828     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
829     queue->RegisterEnqueue(enqueue_handler, common::Bind([]() {
830                              return std::make_unique<std::string>("A string to fill the queue");
831                            }));
832     delete queue;
833   }
834 
RegisterDequeueAndDelete()835   void RegisterDequeueAndDelete() {
836     Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
837     Handler* dequeue_handler = new Handler(dequeue_thread);
838     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
839     queue->RegisterDequeue(dequeue_handler,
840                            common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); },
841                                         common::Unretained(queue)));
842     delete queue;
843   }
844 };
845 
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)846 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
847   EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
848 }
849 
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)850 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
851   EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
852 }
853 
854 class MockIQueueEnqueue : public IQueueEnqueue<int> {
855 public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)856   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
857     EXPECT_FALSE(registered_);
858     registered_ = true;
859     handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue,
860                                    common::Unretained(this), callback));
861   }
862 
handle_register_enqueue(EnqueueCallback callback)863   void handle_register_enqueue(EnqueueCallback callback) {
864     if (dont_handle_register_enqueue_) {
865       return;
866     }
867     while (registered_) {
868       std::unique_ptr<int> front = callback.Run();
869       queue_.push(*front);
870     }
871   }
872 
UnregisterEnqueue()873   void UnregisterEnqueue() override {
874     EXPECT_TRUE(registered_);
875     registered_ = false;
876   }
877 
878   bool dont_handle_register_enqueue_ = false;
879   bool registered_ = false;
880   std::queue<int> queue_;
881 };
882 
883 class EnqueueBufferTest : public ::testing::Test {
884 protected:
SetUp()885   void SetUp() override {
886     thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
887     handler_ = new Handler(thread_);
888   }
889 
TearDown()890   void TearDown() override {
891     handler_->Clear();
892     delete handler_;
893     delete thread_;
894   }
895 
SynchronizeHandler()896   void SynchronizeHandler() {
897     std::promise<void> promise;
898     auto future = promise.get_future();
899     handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); },
900                                     std::move(promise)));
901     future.wait();
902   }
903 
904   MockIQueueEnqueue enqueue_;
905   EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
906   Thread* thread_;
907   Handler* handler_;
908 };
909 
TEST_F(EnqueueBufferTest,enqueue)910 TEST_F(EnqueueBufferTest, enqueue) {
911   int num_items = 10;
912   for (int i = 0; i < num_items; i++) {
913     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
914   }
915   SynchronizeHandler();
916   for (int i = 0; i < num_items; i++) {
917     ASSERT_EQ(enqueue_.queue_.front(), i);
918     enqueue_.queue_.pop();
919   }
920   ASSERT_FALSE(enqueue_.registered_);
921 }
922 
TEST_F(EnqueueBufferTest,clear)923 TEST_F(EnqueueBufferTest, clear) {
924   enqueue_.dont_handle_register_enqueue_ = true;
925   int num_items = 10;
926   for (int i = 0; i < num_items; i++) {
927     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
928   }
929   ASSERT_TRUE(enqueue_.registered_);
930   enqueue_buffer_.Clear();
931   ASSERT_FALSE(enqueue_.registered_);
932 }
933 
TEST_F(EnqueueBufferTest,delete_when_in_callback)934 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
935   Queue<int>* queue = new Queue<int>(kQueueSize);
936   EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
937   int num_items = 10;
938   for (int i = 0; i < num_items; i++) {
939     enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
940   }
941 
942   delete enqueue_buffer;
943   delete queue;
944 }
945 
946 }  // namespace
947 }  // namespace os
948 }  // namespace bluetooth
949