1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
4 *
5 * Copyright 2014 The Android Open Source Project
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 *//*!
20 * \file
21 * \brief Thread-safe ring buffer template.
22 *//*--------------------------------------------------------------------*/
23
24 #include "deThreadSafeRingBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27
28 #include <vector>
29
30 using std::vector;
31
32 namespace de
33 {
34
35 namespace
36 {
37
38 struct Message
39 {
40 uint32_t data;
41
Messagede::__anon331c95750111::Message42 Message(uint16_t threadId, uint16_t payload) : data((threadId << 16) | payload)
43 {
44 }
45
Messagede::__anon331c95750111::Message46 Message(void) : data(0)
47 {
48 }
49
getThreadIdde::__anon331c95750111::Message50 uint16_t getThreadId(void) const
51 {
52 return (uint16_t)(data >> 16);
53 }
getPayloadde::__anon331c95750111::Message54 uint16_t getPayload(void) const
55 {
56 return (uint16_t)(data & 0xffff);
57 }
58 };
59
60 class Consumer : public Thread
61 {
62 public:
Consumer(ThreadSafeRingBuffer<Message> & buffer,int numProducers)63 Consumer(ThreadSafeRingBuffer<Message> &buffer, int numProducers) : m_buffer(buffer)
64 {
65 m_lastPayload.resize(numProducers, 0);
66 m_payloadSum.resize(numProducers, 0);
67 }
68
run(void)69 void run(void)
70 {
71 for (;;)
72 {
73 Message msg = m_buffer.popBack();
74
75 uint16_t threadId = msg.getThreadId();
76
77 if (threadId == 0xffff)
78 break;
79
80 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
81 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) ||
82 m_lastPayload[threadId] < msg.getPayload());
83
84 m_lastPayload[threadId] = msg.getPayload();
85 m_payloadSum[threadId] += (uint32_t)msg.getPayload();
86 }
87 }
88
getPayloadSum(uint16_t threadId) const89 uint32_t getPayloadSum(uint16_t threadId) const
90 {
91 return m_payloadSum[threadId];
92 }
93
94 private:
95 ThreadSafeRingBuffer<Message> &m_buffer;
96 vector<uint16_t> m_lastPayload;
97 vector<uint32_t> m_payloadSum;
98 };
99
100 class Producer : public Thread
101 {
102 public:
Producer(ThreadSafeRingBuffer<Message> & buffer,uint16_t threadId,int dataSize)103 Producer(ThreadSafeRingBuffer<Message> &buffer, uint16_t threadId, int dataSize)
104 : m_buffer(buffer)
105 , m_threadId(threadId)
106 , m_dataSize(dataSize)
107 {
108 }
109
run(void)110 void run(void)
111 {
112 // Yield to give main thread chance to start other producers.
113 deSleep(1);
114
115 for (int ndx = 0; ndx < m_dataSize; ndx++)
116 m_buffer.pushFront(Message(m_threadId, (uint16_t)ndx));
117 }
118
119 private:
120 ThreadSafeRingBuffer<Message> &m_buffer;
121 uint16_t m_threadId;
122 int m_dataSize;
123 };
124
125 } // namespace
126
ThreadSafeRingBuffer_selfTest(void)127 void ThreadSafeRingBuffer_selfTest(void)
128 {
129 const int numIterations = 16;
130 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
131 {
132 Random rnd(iterNdx);
133 int bufSize = rnd.getInt(1, 2048);
134 int numProducers = rnd.getInt(1, 16);
135 int numConsumers = rnd.getInt(1, 16);
136 int dataSize = rnd.getInt(1000, 10000);
137 ThreadSafeRingBuffer<Message> buffer(bufSize);
138 vector<Producer *> producers;
139 vector<Consumer *> consumers;
140
141 for (int i = 0; i < numProducers; i++)
142 producers.push_back(new Producer(buffer, (uint16_t)i, dataSize));
143
144 for (int i = 0; i < numConsumers; i++)
145 consumers.push_back(new Consumer(buffer, numProducers));
146
147 // Start consumers.
148 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
149 (*i)->start();
150
151 // Start producers.
152 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
153 (*i)->start();
154
155 // Wait for producers.
156 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
157 (*i)->join();
158
159 // Write end messages for consumers.
160 for (int i = 0; i < numConsumers; i++)
161 buffer.pushFront(Message(0xffff, 0));
162
163 // Wait for consumers.
164 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
165 (*i)->join();
166
167 // Verify payload sums.
168 uint32_t refSum = 0;
169 for (int i = 0; i < dataSize; i++)
170 refSum += (uint32_t)(uint16_t)i;
171
172 for (int i = 0; i < numProducers; i++)
173 {
174 uint32_t cmpSum = 0;
175 for (int j = 0; j < numConsumers; j++)
176 cmpSum += consumers[j]->getPayloadSum((uint16_t)i);
177 DE_TEST_ASSERT(refSum == cmpSum);
178 }
179
180 // Free resources.
181 for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
182 delete *i;
183 for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
184 delete *i;
185 }
186 }
187
188 } // namespace de
189