1 // Copyright (C) 2013 Vicente J. Botet Escriba
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // <boost/thread/sync_queue.hpp>
7 
8 // class sync_queue<T>
9 
10 //    push || pull;
11 
12 #include <boost/config.hpp>
13 #if ! defined  BOOST_NO_CXX11_DECLTYPE
14 #define BOOST_RESULT_OF_USE_DECLTYPE
15 #endif
16 
17 #define BOOST_THREAD_VERSION 4
18 
19 #include <boost/thread/sync_queue.hpp>
20 #include <boost/thread/future.hpp>
21 #include <boost/thread/barrier.hpp>
22 
23 #include <boost/detail/lightweight_test.hpp>
24 
25 template <typename ValueType>
26 struct call_push
27 {
28   boost::sync_queue<ValueType> *q_;
29   boost::barrier *go_;
30 
call_pushcall_push31   call_push(boost::sync_queue<ValueType> *q, boost::barrier *go) :
32     q_(q), go_(go)
33   {
34   }
35   typedef void result_type;
operator ()call_push36   void operator()()
37   {
38     go_->count_down_and_wait();
39     q_->push(42);
40   }
41 };
42 
43 template <typename ValueType>
44 struct call_pull
45 {
46   boost::sync_queue<ValueType> *q_;
47   boost::barrier *go_;
48 
call_pullcall_pull49   call_pull(boost::sync_queue<ValueType> *q, boost::barrier *go) :
50     q_(q), go_(go)
51   {
52   }
53   typedef ValueType result_type;
operator ()call_pull54   ValueType operator()()
55   {
56     go_->count_down_and_wait();
57     return q_->pull();
58   }
59 };
60 
61 template <typename ValueType>
62 struct call_wait_pull
63 {
64   boost::sync_queue<ValueType> *q_;
65   boost::barrier *go_;
66 
call_wait_pullcall_wait_pull67   call_wait_pull(boost::sync_queue<ValueType> *q, boost::barrier *go) :
68     q_(q), go_(go)
69   {
70   }
71   typedef boost::queue_op_status result_type;
operator ()call_wait_pull72   boost::queue_op_status operator()(ValueType& v)
73   {
74     go_->wait();
75     return q_->wait_pull(v);
76   }
77 };
78 
test_concurrent_push_and_pull_on_empty_queue()79 void test_concurrent_push_and_pull_on_empty_queue()
80 {
81   boost::sync_queue<int> q;
82 
83   boost::barrier go(2);
84 
85   boost::future<void> push_done;
86   boost::future<int> pull_done;
87 
88   try
89   {
90     push_done=boost::async(boost::launch::async,
91                            call_push<int>(&q,&go));
92     pull_done=boost::async(boost::launch::async,
93                            call_pull<int>(&q,&go));
94 
95     push_done.get();
96     BOOST_TEST_EQ(pull_done.get(), 42);
97     BOOST_TEST(q.empty());
98   }
99   catch (...)
100   {
101     BOOST_TEST(false);
102   }
103 }
104 
105 #if defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
test_concurrent_push_and_wait_pull_on_empty_queue()106 void test_concurrent_push_and_wait_pull_on_empty_queue()
107 {
108   boost::sync_queue<int> q;
109   const unsigned int n = 3;
110   boost::barrier go(n);
111 
112   boost::future<boost::queue_op_status> pull_done[n];
113   int results[n];
114 
115   try
116   {
117     for (unsigned int i =0; i< n; ++i)
118       pull_done[i]=boost::async(boost::launch::async,
119                                 call_wait_pull<int>(&q,&go),
120                                 boost::ref(results[i]));
121 
122     for (unsigned int i =0; i< n; ++i)
123       q.push(42);
124 
125     for (unsigned int i = 0; i < n; ++i) {
126       BOOST_TEST(pull_done[i].get() == boost::queue_op_status::success);
127       BOOST_TEST_EQ(results[i], 42);
128     }
129     BOOST_TEST(q.empty());
130   }
131   catch (...)
132   {
133     BOOST_TEST(false);
134   }
135 }
136 
test_concurrent_wait_pull_and_close_on_empty_queue()137 void test_concurrent_wait_pull_and_close_on_empty_queue()
138 {
139   boost::sync_queue<int> q;
140   const unsigned int n = 3;
141   boost::barrier go(n);
142 
143   boost::future<boost::queue_op_status> pull_done[n];
144   int results[n];
145 
146   try
147   {
148     for (unsigned int i =0; i< n; ++i)
149       pull_done[i]=boost::async(boost::launch::async,
150                                 call_wait_pull<int>(&q,&go),
151                                 boost::ref(results[i]));
152 
153     q.close();
154 
155     for (unsigned int i = 0; i < n; ++i) {
156       BOOST_TEST(pull_done[i].get() == boost::queue_op_status::closed);
157     }
158     BOOST_TEST(q.empty());
159   }
160   catch (...)
161   {
162     BOOST_TEST(false);
163   }
164 }
165 #endif
166 
test_concurrent_push_on_empty_queue()167 void test_concurrent_push_on_empty_queue()
168 {
169   boost::sync_queue<int> q;
170   const unsigned int n = 3;
171   boost::barrier go(n);
172   boost::future<void> push_done[n];
173 
174   try
175   {
176     for (unsigned int i =0; i< n; ++i)
177       push_done[i]=boost::async(boost::launch::async,
178                                 call_push<int>(&q,&go));
179 
180   }
181   catch (...)
182   {
183     BOOST_TEST(false);
184   }
185   try
186   {
187     for (unsigned int i = 0; i < n; ++i)
188       push_done[i].get();
189 
190   }
191   catch (...)
192   {
193     BOOST_TEST(false);
194   }
195   try
196   {
197     BOOST_TEST(!q.empty());
198     for (unsigned int i =0; i< n; ++i)
199       BOOST_TEST_EQ(q.pull(), 42);
200     BOOST_TEST(q.empty());
201 
202   }
203   catch (...)
204   {
205     BOOST_TEST(false);
206   }
207 }
208 
test_concurrent_pull_on_queue()209 void test_concurrent_pull_on_queue()
210 {
211   boost::sync_queue<int> q;
212   const unsigned int n = 3;
213   boost::barrier go(n);
214 
215   boost::future<int> pull_done[n];
216 
217   try
218   {
219     for (unsigned int i =0; i< n; ++i)
220       q.push(42);
221 
222     for (unsigned int i =0; i< n; ++i)
223       pull_done[i]=boost::async(boost::launch::async,
224 #if ! defined BOOST_NO_CXX11_LAMBDAS
225         [&q,&go]() -> int
226         {
227           go.wait();
228           return q.pull();
229         }
230 #else
231         call_pull<int>(&q,&go)
232 #endif
233       );
234 
235     for (unsigned int i = 0; i < n; ++i)
236       BOOST_TEST_EQ(pull_done[i].get(), 42);
237     BOOST_TEST(q.empty());
238   }
239   catch (...)
240   {
241     BOOST_TEST(false);
242   }
243 }
244 
main()245 int main()
246 {
247   test_concurrent_push_and_pull_on_empty_queue();
248 #if defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
249   test_concurrent_push_and_wait_pull_on_empty_queue();
250   test_concurrent_wait_pull_and_close_on_empty_queue();
251 #endif
252   test_concurrent_push_on_empty_queue();
253   test_concurrent_pull_on_queue();
254   return boost::report_errors();
255 }
256 
257