1 /*
2 * Copyright (c) 2006-2018, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2012-09-30 Bernard first version.
9 * 2016-10-31 armink fix some resume push and pop thread bugs
10 */
11
12 #include <rtthread.h>
13 #include <rtdevice.h>
14 #include <rthw.h>
15
16 struct rt_data_item
17 {
18 const void *data_ptr;
19 rt_size_t data_size;
20 };
21
22 rt_err_t
rt_data_queue_init(struct rt_data_queue * queue,rt_uint16_t size,rt_uint16_t lwm,void (* evt_notify)(struct rt_data_queue * queue,rt_uint32_t event))23 rt_data_queue_init(struct rt_data_queue *queue,
24 rt_uint16_t size,
25 rt_uint16_t lwm,
26 void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
27 {
28 RT_ASSERT(queue != RT_NULL);
29
30 queue->evt_notify = evt_notify;
31
32 queue->size = size;
33 queue->lwm = lwm;
34
35 queue->get_index = 0;
36 queue->put_index = 0;
37
38 rt_list_init(&(queue->suspended_push_list));
39 rt_list_init(&(queue->suspended_pop_list));
40
41 queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
42 if (queue->queue == RT_NULL)
43 {
44 return -RT_ENOMEM;
45 }
46
47 return RT_EOK;
48 }
49 RTM_EXPORT(rt_data_queue_init);
50
rt_data_queue_push(struct rt_data_queue * queue,const void * data_ptr,rt_size_t data_size,rt_int32_t timeout)51 rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
52 const void *data_ptr,
53 rt_size_t data_size,
54 rt_int32_t timeout)
55 {
56 rt_ubase_t level;
57 rt_thread_t thread;
58 rt_err_t result;
59
60 RT_ASSERT(queue != RT_NULL);
61
62 result = RT_EOK;
63 thread = rt_thread_self();
64
65 level = rt_hw_interrupt_disable();
66 while (queue->put_index - queue->get_index == queue->size)
67 {
68 /* queue is full */
69 if (timeout == 0)
70 {
71 result = -RT_ETIMEOUT;
72
73 goto __exit;
74 }
75
76 /* current context checking */
77 RT_DEBUG_NOT_IN_INTERRUPT;
78
79 /* reset thread error number */
80 thread->error = RT_EOK;
81
82 /* suspend thread on the push list */
83 rt_thread_suspend(thread);
84 rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
85 /* start timer */
86 if (timeout > 0)
87 {
88 /* reset the timeout of thread timer and start it */
89 rt_timer_control(&(thread->thread_timer),
90 RT_TIMER_CTRL_SET_TIME,
91 &timeout);
92 rt_timer_start(&(thread->thread_timer));
93 }
94
95 /* enable interrupt */
96 rt_hw_interrupt_enable(level);
97
98 /* do schedule */
99 rt_schedule();
100
101 /* thread is waked up */
102 result = thread->error;
103 level = rt_hw_interrupt_disable();
104 if (result != RT_EOK) goto __exit;
105 }
106
107 queue->queue[queue->put_index % queue->size].data_ptr = data_ptr;
108 queue->queue[queue->put_index % queue->size].data_size = data_size;
109 queue->put_index += 1;
110
111 /* there is at least one thread in suspended list */
112 if (!rt_list_isempty(&(queue->suspended_pop_list)))
113 {
114 /* get thread entry */
115 thread = rt_list_entry(queue->suspended_pop_list.next,
116 struct rt_thread,
117 tlist);
118
119 /* resume it */
120 rt_thread_resume(thread);
121 rt_hw_interrupt_enable(level);
122
123 /* perform a schedule */
124 rt_schedule();
125
126 return result;
127 }
128
129 __exit:
130 rt_hw_interrupt_enable(level);
131 if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
132 {
133 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
134 }
135
136 return result;
137 }
138 RTM_EXPORT(rt_data_queue_push);
139
rt_data_queue_pop(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size,rt_int32_t timeout)140 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
141 const void** data_ptr,
142 rt_size_t *size,
143 rt_int32_t timeout)
144 {
145 rt_ubase_t level;
146 rt_thread_t thread;
147 rt_err_t result;
148
149 RT_ASSERT(queue != RT_NULL);
150 RT_ASSERT(data_ptr != RT_NULL);
151 RT_ASSERT(size != RT_NULL);
152
153 result = RT_EOK;
154 thread = rt_thread_self();
155
156 level = rt_hw_interrupt_disable();
157 while (queue->get_index == queue->put_index)
158 {
159 /* queue is empty */
160 if (timeout == 0)
161 {
162 result = -RT_ETIMEOUT;
163 goto __exit;
164 }
165
166 /* current context checking */
167 RT_DEBUG_NOT_IN_INTERRUPT;
168
169 /* reset thread error number */
170 thread->error = RT_EOK;
171
172 /* suspend thread on the pop list */
173 rt_thread_suspend(thread);
174 rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
175 /* start timer */
176 if (timeout > 0)
177 {
178 /* reset the timeout of thread timer and start it */
179 rt_timer_control(&(thread->thread_timer),
180 RT_TIMER_CTRL_SET_TIME,
181 &timeout);
182 rt_timer_start(&(thread->thread_timer));
183 }
184
185 /* enable interrupt */
186 rt_hw_interrupt_enable(level);
187
188 /* do schedule */
189 rt_schedule();
190
191 /* thread is waked up */
192 result = thread->error;
193 level = rt_hw_interrupt_disable();
194 if (result != RT_EOK)
195 goto __exit;
196 }
197
198 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
199 *size = queue->queue[queue->get_index % queue->size].data_size;
200
201 queue->get_index += 1;
202
203 if ((queue->put_index - queue->get_index) <= queue->lwm)
204 {
205 /* there is at least one thread in suspended list */
206 if (!rt_list_isempty(&(queue->suspended_push_list)))
207 {
208 /* get thread entry */
209 thread = rt_list_entry(queue->suspended_push_list.next,
210 struct rt_thread,
211 tlist);
212
213 /* resume it */
214 rt_thread_resume(thread);
215 rt_hw_interrupt_enable(level);
216
217 /* perform a schedule */
218 rt_schedule();
219 }
220 else
221 {
222 rt_hw_interrupt_enable(level);
223 }
224
225 if (queue->evt_notify != RT_NULL)
226 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
227
228 return result;
229 }
230
231 __exit:
232 rt_hw_interrupt_enable(level);
233 if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
234 {
235 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
236 }
237
238 return result;
239 }
240 RTM_EXPORT(rt_data_queue_pop);
241
rt_data_queue_peak(struct rt_data_queue * queue,const void ** data_ptr,rt_size_t * size)242 rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
243 const void** data_ptr,
244 rt_size_t *size)
245 {
246 rt_ubase_t level;
247
248 RT_ASSERT(queue != RT_NULL);
249
250 level = rt_hw_interrupt_disable();
251
252 if (queue->get_index == queue->put_index)
253 {
254 rt_hw_interrupt_enable(level);
255
256 return -RT_EEMPTY;
257 }
258
259 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
260 *size = queue->queue[queue->get_index % queue->size].data_size;
261
262 rt_hw_interrupt_enable(level);
263
264 return RT_EOK;
265 }
266 RTM_EXPORT(rt_data_queue_peak);
267
rt_data_queue_reset(struct rt_data_queue * queue)268 void rt_data_queue_reset(struct rt_data_queue *queue)
269 {
270 struct rt_thread *thread;
271 register rt_ubase_t temp;
272
273 rt_enter_critical();
274 /* wakeup all suspend threads */
275
276 /* resume on pop list */
277 while (!rt_list_isempty(&(queue->suspended_pop_list)))
278 {
279 /* disable interrupt */
280 temp = rt_hw_interrupt_disable();
281
282 /* get next suspend thread */
283 thread = rt_list_entry(queue->suspended_pop_list.next,
284 struct rt_thread,
285 tlist);
286 /* set error code to RT_ERROR */
287 thread->error = -RT_ERROR;
288
289 /*
290 * resume thread
291 * In rt_thread_resume function, it will remove current thread from
292 * suspend list
293 */
294 rt_thread_resume(thread);
295
296 /* enable interrupt */
297 rt_hw_interrupt_enable(temp);
298 }
299
300 /* resume on push list */
301 while (!rt_list_isempty(&(queue->suspended_push_list)))
302 {
303 /* disable interrupt */
304 temp = rt_hw_interrupt_disable();
305
306 /* get next suspend thread */
307 thread = rt_list_entry(queue->suspended_push_list.next,
308 struct rt_thread,
309 tlist);
310 /* set error code to RT_ERROR */
311 thread->error = -RT_ERROR;
312
313 /*
314 * resume thread
315 * In rt_thread_resume function, it will remove current thread from
316 * suspend list
317 */
318 rt_thread_resume(thread);
319
320 /* enable interrupt */
321 rt_hw_interrupt_enable(temp);
322 }
323 rt_exit_critical();
324
325 rt_schedule();
326 }
327 RTM_EXPORT(rt_data_queue_reset);
328