xref: /aosp_15_r20/external/deqp/framework/delibs/decpp/deBlockBuffer.cpp (revision 35238bce31c2a825756842865a792f8cf7f89930)
1 /*-------------------------------------------------------------------------
2  * drawElements C++ Base Library
3  * -----------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Block-based thread-safe queue.
22  *//*--------------------------------------------------------------------*/
23 
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
27 #include "deInt32.h"
28 #include "deMemory.h"
29 
30 #include <vector>
31 
32 namespace de
33 {
34 
35 using std::vector;
36 
37 namespace BlockBufferBasicTest
38 {
39 
40 struct Message
41 {
42     uint32_t data;
43 
Messagede::BlockBufferBasicTest::Message44     Message(uint16_t threadId, uint16_t payload) : data((threadId << 16) | payload)
45     {
46     }
47 
Messagede::BlockBufferBasicTest::Message48     Message(void) : data(0)
49     {
50     }
51 
getThreadIdde::BlockBufferBasicTest::Message52     uint16_t getThreadId(void) const
53     {
54         return (uint16_t)(data >> 16);
55     }
getPayloadde::BlockBufferBasicTest::Message56     uint16_t getPayload(void) const
57     {
58         return (uint16_t)(data & 0xffff);
59     }
60 };
61 
62 typedef BlockBuffer<Message> MessageBuffer;
63 
64 class Consumer : public Thread
65 {
66 public:
Consumer(MessageBuffer & buffer,int numProducers)67     Consumer(MessageBuffer &buffer, int numProducers) : m_buffer(buffer)
68     {
69         m_lastPayload.resize(numProducers, 0);
70         m_payloadSum.resize(numProducers, 0);
71     }
72 
run(void)73     void run(void)
74     {
75         Random rnd((uint32_t)m_lastPayload.size());
76         Message tmpBuf[64];
77         bool consume = true;
78 
79         while (consume)
80         {
81             int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
82             int numRead   = m_buffer.tryRead(numToRead, &tmpBuf[0]);
83 
84             for (int ndx = 0; ndx < numRead; ndx++)
85             {
86                 const Message &msg = tmpBuf[ndx];
87 
88                 uint16_t threadId = msg.getThreadId();
89 
90                 if (threadId == 0xffff)
91                 {
92                     /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
93                     if (ndx + 1 < numRead)
94                     {
95                         m_buffer.write(numRead - ndx - 1, &tmpBuf[ndx + 1]);
96                         m_buffer.flush();
97                     }
98 
99                     consume = false;
100                     break;
101                 }
102                 else
103                 {
104                     /* Verify message. */
105                     DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
106                     DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) ||
107                                    m_lastPayload[threadId] < msg.getPayload());
108 
109                     m_lastPayload[threadId] = msg.getPayload();
110                     m_payloadSum[threadId] += (uint32_t)msg.getPayload();
111                 }
112             }
113         }
114     }
115 
getPayloadSum(uint16_t threadId) const116     uint32_t getPayloadSum(uint16_t threadId) const
117     {
118         return m_payloadSum[threadId];
119     }
120 
121 private:
122     MessageBuffer &m_buffer;
123     vector<uint16_t> m_lastPayload;
124     vector<uint32_t> m_payloadSum;
125 };
126 
127 class Producer : public Thread
128 {
129 public:
Producer(MessageBuffer & buffer,uint16_t threadId,int numMessages)130     Producer(MessageBuffer &buffer, uint16_t threadId, int numMessages)
131         : m_buffer(buffer)
132         , m_threadId(threadId)
133         , m_numMessages(numMessages)
134     {
135     }
136 
run(void)137     void run(void)
138     {
139         // Yield to give main thread chance to start other producers.
140         deSleep(1);
141 
142         Random rnd(m_threadId);
143         int msgNdx = 0;
144         Message tmpBuf[64];
145 
146         while (msgNdx < m_numMessages)
147         {
148             int writeSize = rnd.getInt(1, de::min(m_numMessages - msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
149             for (int ndx = 0; ndx < writeSize; ndx++)
150                 tmpBuf[ndx] = Message(m_threadId, (uint16_t)msgNdx++);
151 
152             m_buffer.write(writeSize, &tmpBuf[0]);
153             if (rnd.getBool())
154                 m_buffer.flush();
155         }
156     }
157 
158 private:
159     MessageBuffer &m_buffer;
160     uint16_t m_threadId;
161     int m_numMessages;
162 };
163 
runTest(void)164 void runTest(void)
165 {
166     const int numIterations = 8;
167     for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
168     {
169         Random rnd(iterNdx);
170         int numBlocks    = rnd.getInt(2, 128);
171         int blockSize    = rnd.getInt(1, 16);
172         int numProducers = rnd.getInt(1, 16);
173         int numConsumers = rnd.getInt(1, 16);
174         int dataSize     = rnd.getInt(50, 200);
175         MessageBuffer buffer(blockSize, numBlocks);
176         vector<Producer *> producers;
177         vector<Consumer *> consumers;
178 
179         for (int i = 0; i < numProducers; i++)
180             producers.push_back(new Producer(buffer, (uint16_t)i, dataSize));
181 
182         for (int i = 0; i < numConsumers; i++)
183             consumers.push_back(new Consumer(buffer, numProducers));
184 
185         // Start consumers.
186         for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
187             (*i)->start();
188 
189         // Start producers.
190         for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
191             (*i)->start();
192 
193         // Wait for producers.
194         for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
195             (*i)->join();
196 
197         // Write end messages for consumers.
198         const Message endMsg(0xffff, 0);
199         for (int i = 0; i < numConsumers; i++)
200             buffer.write(1, &endMsg);
201         buffer.flush();
202 
203         // Wait for consumers.
204         for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
205             (*i)->join();
206 
207         // Verify payload sums.
208         uint32_t refSum = 0;
209         for (int i = 0; i < dataSize; i++)
210             refSum += (uint32_t)(uint16_t)i;
211 
212         for (int i = 0; i < numProducers; i++)
213         {
214             uint32_t cmpSum = 0;
215             for (int j = 0; j < numConsumers; j++)
216                 cmpSum += consumers[j]->getPayloadSum((uint16_t)i);
217             DE_TEST_ASSERT(refSum == cmpSum);
218         }
219 
220         // Free resources.
221         for (vector<Producer *>::iterator i = producers.begin(); i != producers.end(); i++)
222             delete *i;
223         for (vector<Consumer *>::iterator i = consumers.begin(); i != consumers.end(); i++)
224             delete *i;
225     }
226 }
227 
228 } // namespace BlockBufferBasicTest
229 
230 namespace BlockBufferCancelTest
231 {
232 
233 class Producer : public Thread
234 {
235 public:
Producer(BlockBuffer<uint8_t> * buffer,uint32_t seed)236     Producer(BlockBuffer<uint8_t> *buffer, uint32_t seed) : m_buffer(buffer), m_seed(seed)
237     {
238     }
239 
run(void)240     void run(void)
241     {
242         uint8_t tmp[1024];
243         Random rnd(m_seed);
244 
245         deMemset(tmp, 0, DE_LENGTH_OF_ARRAY(tmp));
246 
247         for (;;)
248         {
249             int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
250 
251             try
252             {
253                 m_buffer->write(blockSize, &tmp[0]);
254 
255                 if (rnd.getBool())
256                     m_buffer->flush();
257             }
258             catch (const BlockBuffer<uint8_t>::CanceledException &)
259             {
260                 break;
261             }
262         }
263     }
264 
265 private:
266     BlockBuffer<uint8_t> *m_buffer;
267     uint32_t m_seed;
268 };
269 
270 class Consumer : public Thread
271 {
272 public:
Consumer(BlockBuffer<uint8_t> * buffer,uint32_t seed)273     Consumer(BlockBuffer<uint8_t> *buffer, uint32_t seed) : m_buffer(buffer), m_seed(seed)
274     {
275     }
276 
run(void)277     void run(void)
278     {
279         uint8_t tmp[1024];
280         Random rnd(m_seed);
281 
282         for (;;)
283         {
284             int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
285 
286             try
287             {
288                 m_buffer->read(blockSize, &tmp[0]);
289             }
290             catch (const BlockBuffer<uint8_t>::CanceledException &)
291             {
292                 break;
293             }
294         }
295     }
296 
297 private:
298     BlockBuffer<uint8_t> *m_buffer;
299     uint32_t m_seed;
300 };
301 
runTest(void)302 void runTest(void)
303 {
304     BlockBuffer<uint8_t> buffer(64, 16);
305     const int numIterations = 8;
306 
307     for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
308     {
309         Random rnd(deInt32Hash(iterNdx));
310         int numThreads = rnd.getInt(1, 16);
311         int sleepMs    = rnd.getInt(1, 200);
312         vector<Thread *> threads;
313 
314         for (int i = 0; i < numThreads; i++)
315         {
316             if (rnd.getBool())
317                 threads.push_back(new Consumer(&buffer, rnd.getUint32()));
318             else
319                 threads.push_back(new Producer(&buffer, rnd.getUint32()));
320         }
321 
322         // Start threads.
323         for (vector<Thread *>::iterator i = threads.begin(); i != threads.end(); i++)
324             (*i)->start();
325 
326         // Sleep for a while.
327         deSleep(sleepMs);
328 
329         // Cancel buffer.
330         buffer.cancel();
331 
332         // Wait for threads to finish.
333         for (vector<Thread *>::iterator i = threads.begin(); i != threads.end(); i++)
334             (*i)->join();
335 
336         // Reset buffer.
337         buffer.clear();
338 
339         // Delete threads
340         for (vector<Thread *>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
341             delete *thread;
342     }
343 }
344 
345 } // namespace BlockBufferCancelTest
346 
BlockBuffer_selfTest(void)347 void BlockBuffer_selfTest(void)
348 {
349     BlockBufferBasicTest::runTest();
350     BlockBufferCancelTest::runTest();
351 }
352 
353 } // namespace de
354