1 /* 2 * Copyright 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 #include <condition_variable> 19 #include <deque> 20 #include <mutex> 21 22 namespace netsim { 23 namespace util { 24 25 /** 26 * @brief A queue with blocking behavior. 27 * 28 * @tparam T Type of the element 29 * 30 * The `BlockingQueue` is thread safe and blocks on `pop()` if no elements are 31 * available. 32 * 33 * Avoid copying by using a smart pointer for T. 34 * 35 */ 36 37 template <class T> 38 class BlockingQueue { 39 private: 40 std::mutex mutex; 41 std::condition_variable condition; 42 std::queue<T> queue; 43 bool stopped{false}; 44 45 public: 46 /** 47 * @brief Returns true if the queue is active. 48 */ Active()49 bool Active() { return !this->stopped; } 50 51 /** 52 * @brief Stops the queue and unblocks readers. 53 */ Stop()54 void Stop() { 55 if (!this->stopped) { 56 std::unique_lock<std::mutex> lock(this->mutex); 57 this->stopped = true; 58 } 59 this->condition.notify_one(); 60 } 61 62 /** 63 * @brief Add data to the end of the queue. 64 * 65 * This is a typical queue operation. 66 */ Push(const T & value)67 void Push(const T &value) { 68 if (!this->stopped) { 69 std::unique_lock<std::mutex> lock(this->mutex); 70 this->queue.push(value); 71 } 72 this->condition.notify_one(); 73 } 74 75 /** 76 * @brief Add data to the end of the queue. 77 * 78 * This is a typical queue operation. 79 */ Push(T && value)80 void Push(T &&value) { 81 if (!this->stopped) { 82 std::unique_lock<std::mutex> lock(this->mutex); 83 this->queue.push(std::move(value)); 84 } 85 this->condition.notify_one(); 86 } 87 88 /** 89 * @brief Retrieves the front element. 90 * 91 * This is a typical queue operation. 92 * 93 * Returns false if stopped, true otherwise 94 */ WaitAndPop(T & value)95 bool WaitAndPop(T &value) { 96 std::unique_lock<std::mutex> lock(this->mutex); 97 this->condition.wait(lock, 98 [=] { return this->stopped || !this->queue.empty(); }); 99 if (stopped) return false; 100 value = this->queue.front(); 101 this->queue.pop(); 102 return true; 103 } 104 }; 105 106 } // namespace util 107 } // namespace netsim 108