1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Block-based thread-safe queue.
22 *//*--------------------------------------------------------------------*/
23
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28 #include "deMemory.h"
29
30 #include <vector>
31
32 namespace de
33 {
34
35 using std::vector;
36
37 namespace BlockBufferBasicTest
38 {
39
40 struct Message
41 {
42 uint32_t data;
43
Messagede::BlockBufferBasicTest::Message44 Message(uint16_t threadId, uint16_t payload) : data((threadId << 16) | payload)
45 {
46 }
47
Messagede::BlockBufferBasicTest::Message48 Message(void) : data(0)
49 {
50 }
51
getThreadIdde::BlockBufferBasicTest::Message52 uint16_t getThreadId(void) const
53 {
54 return (uint16_t)(data >> 16);
55 }
getPayloadde::BlockBufferBasicTest::Message56 uint16_t getPayload(void) const
57 {
58 return (uint16_t)(data & 0xffff);
59 }
60 };
61
62 typedef BlockBuffer<Message> MessageBuffer;
63
64 class Consumer : public Thread
65 {
66 public:
Consumer(MessageBuffer & buffer,int numProducers)67 Consumer(MessageBuffer &buffer, int numProducers) : m_buffer(buffer)
68 {
69 m_lastPayload.resize(numProducers, 0);
70 m_payloadSum.resize(numProducers, 0);
71 }
72
run(void)73 void run(void)
74 {
75 Random rnd((uint32_t)m_lastPayload.size());
76 Message tmpBuf[64];
77 bool consume = true;
78
79 while (consume)
80 {
81 int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
82 int numRead = m_buffer.tryRead(numToRead, &tmpBuf[0]);
83
84 for (int ndx = 0; ndx < numRead; ndx++)
85 {
86 const Message &msg = tmpBuf[ndx];
87
88 uint16_t threadId = msg.getThreadId();
89
90 if (threadId == 0xffff)
91 {
92 /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
93 if (ndx + 1 < numRead)
94 {
95 m_buffer.write(numRead - ndx - 1, &tmpBuf[ndx + 1]);
96 m_buffer.flush();
97 }
98
99 consume = false;
100 break;
101 }
102 else
103 {
104 /* Verify message. */
105 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
106 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) ||
107 m_lastPayload[threadId] < msg.getPayload());
108
109 m_lastPayload[threadId] = msg.getPayload();
110 m_payloadSum[threadId] += (uint32_t)msg.getPayload();
111 }
112 }
113 }
114 }
115
getPayloadSum(uint16_t threadId) const116 uint32_t getPayloadSum(uint16_t threadId) const
117 {
118 return m_payloadSum[threadId];
119 }
120
121 private:
122 MessageBuffer &m_buffer;
123 vector<uint16_t> m_lastPayload;
124 vector<uint32_t> m_payloadSum;
125 };
126
127 class Producer : public Thread
128 {
129 public:
Producer(MessageBuffer & buffer,uint16_t threadId,int numMessages)130 Producer(MessageBuffer &buffer, uint16_t threadId, int numMessages)
131 : m_buffer(buffer)
132 , m_threadId(threadId)
133 , m_numMessages(numMessages)
134 {
135 }
136
run(void)137 void run(void)
138 {
139 // Yield to give main thread chance to start other producers.
140 deSleep(1);
141
142 Random rnd(m_threadId);
143 int msgNdx = 0;
144 Message tmpBuf[64];
145
146 while (msgNdx < m_numMessages)
147 {
148 int writeSize = rnd.getInt(1, de::min(m_numMessages - msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
149 for (int ndx = 0; ndx < writeSize; ndx++)
150 tmpBuf[ndx] = Message(m_threadId, (uint16_t)msgNdx++);
151
152 m_buffer.write(writeSize, &tmpBuf[0]);
153 if (rnd.getBool())
154 m_buffer.flush();
155 }
156 }
157
158 private:
159 MessageBuffer &m_buffer;
160 uint16_t m_threadId;
161 int m_numMessages;
162 };
163
runTest(void)164 void runTest(void)
165 {
166 const int numIterations = 8;
167 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
168 {
169 Random rnd(iterNdx);
170 int numBlocks = rnd.getInt(2, 128);
171 int blockSize = rnd.getInt(1, 16);
172 int numProducers = rnd.getInt(1, 16);
173 int numConsumers = rnd.getInt(1, 16);
174 int dataSize = rnd.getInt(50, 200);
175 MessageBuffer buffer(blockSize, numBlocks);
176 vector<Producer *> producers;
177 vector<Consumer *> consumers;
178
179 for (int i = 0; i < numProducers; i++)
180 producers.push_back(new Producer(buffer, (uint16_t)i, dataSize));
181
182 for (int i = 0; i < numConsumers; i++)
183 consumers.push_back(new Consumer(buffer, numProducers));
184
185 // Start consumers.
186 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
187 (*i)->start();
188
189 // Start producers.
190 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
191 (*i)->start();
192
193 // Wait for producers.
194 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
195 (*i)->join();
196
197 // Write end messages for consumers.
198 const Message endMsg(0xffff, 0);
199 for (int i = 0; i < numConsumers; i++)
200 buffer.write(1, &endMsg);
201 buffer.flush();
202
203 // Wait for consumers.
204 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
205 (*i)->join();
206
207 // Verify payload sums.
208 uint32_t refSum = 0;
209 for (int i = 0; i < dataSize; i++)
210 refSum += (uint32_t)(uint16_t)i;
211
212 for (int i = 0; i < numProducers; i++)
213 {
214 uint32_t cmpSum = 0;
215 for (int j = 0; j < numConsumers; j++)
216 cmpSum += consumers[j]->getPayloadSum((uint16_t)i);
217 DE_TEST_ASSERT(refSum == cmpSum);
218 }
219
220 // Free resources.
221 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
222 delete *i;
223 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
224 delete *i;
225 }
226 }
227
228 } // namespace BlockBufferBasicTest
229
230 namespace BlockBufferCancelTest
231 {
232
233 class Producer : public Thread
234 {
235 public:
Producer(BlockBuffer<uint8_t> * buffer,uint32_t seed)236 Producer(BlockBuffer<uint8_t> *buffer, uint32_t seed) : m_buffer(buffer), m_seed(seed)
237 {
238 }
239
run(void)240 void run(void)
241 {
242 uint8_t tmp[1024];
243 Random rnd(m_seed);
244
245 deMemset(tmp, 0, DE_LENGTH_OF_ARRAY(tmp));
246
247 for (;;)
248 {
249 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
250
251 try
252 {
253 m_buffer->write(blockSize, &tmp[0]);
254
255 if (rnd.getBool())
256 m_buffer->flush();
257 }
258 catch (const BlockBuffer<uint8_t>::CanceledException &)
259 {
260 break;
261 }
262 }
263 }
264
265 private:
266 BlockBuffer<uint8_t> *m_buffer;
267 uint32_t m_seed;
268 };
269
270 class Consumer : public Thread
271 {
272 public:
Consumer(BlockBuffer<uint8_t> * buffer,uint32_t seed)273 Consumer(BlockBuffer<uint8_t> *buffer, uint32_t seed) : m_buffer(buffer), m_seed(seed)
274 {
275 }
276
run(void)277 void run(void)
278 {
279 uint8_t tmp[1024];
280 Random rnd(m_seed);
281
282 for (;;)
283 {
284 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
285
286 try
287 {
288 m_buffer->read(blockSize, &tmp[0]);
289 }
290 catch (const BlockBuffer<uint8_t>::CanceledException &)
291 {
292 break;
293 }
294 }
295 }
296
297 private:
298 BlockBuffer<uint8_t> *m_buffer;
299 uint32_t m_seed;
300 };
301
runTest(void)302 void runTest(void)
303 {
304 BlockBuffer<uint8_t> buffer(64, 16);
305 const int numIterations = 8;
306
307 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
308 {
309 Random rnd(deInt32Hash(iterNdx));
310 int numThreads = rnd.getInt(1, 16);
311 int sleepMs = rnd.getInt(1, 200);
312 vector<Thread *> threads;
313
314 for (int i = 0; i < numThreads; i++)
315 {
316 if (rnd.getBool())
317 threads.push_back(new Consumer(&buffer, rnd.getUint32()));
318 else
319 threads.push_back(new Producer(&buffer, rnd.getUint32()));
320 }
321
322 // Start threads.
323 for (vector<Thread *>::iterator i = threads.begin(); i != threads.end(); i++)
324 (*i)->start();
325
326 // Sleep for a while.
327 deSleep(sleepMs);
328
329 // Cancel buffer.
330 buffer.cancel();
331
332 // Wait for threads to finish.
333 for (vector<Thread *>::iterator i = threads.begin(); i != threads.end(); i++)
334 (*i)->join();
335
336 // Reset buffer.
337 buffer.clear();
338
339 // Delete threads
340 for (vector<Thread *>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
341 delete *thread;
342 }
343 }
344
345 } // namespace BlockBufferCancelTest
346
BlockBuffer_selfTest(void)347 void BlockBuffer_selfTest(void)
348 {
349 BlockBufferBasicTest::runTest();
350 BlockBufferCancelTest::runTest();
351 }
352
353 } // namespace de
354