xref: /nrf52832-nimble/rt-thread/components/drivers/src/dataqueue.c (revision 042d53a763ad75cb1465103098bb88c245d95138)
1 /*
2  * Copyright (c) 2006-2018, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2012-09-30     Bernard      first version.
9  * 2016-10-31     armink       fix some resume push and pop thread bugs
10  */
11 
12 #include <rtthread.h>
13 #include <rtdevice.h>
14 #include <rthw.h>
15 
16 struct rt_data_item
17 {
18     const void *data_ptr;
19     rt_size_t data_size;
20 };
21 
22 rt_err_t
23 rt_data_queue_init(struct rt_data_queue *queue,
24                    rt_uint16_t size,
25                    rt_uint16_t lwm,
26                    void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event))
27 {
28     RT_ASSERT(queue != RT_NULL);
29 
30     queue->evt_notify = evt_notify;
31 
32     queue->size = size;
33     queue->lwm = lwm;
34 
35     queue->get_index = 0;
36     queue->put_index = 0;
37 
38     rt_list_init(&(queue->suspended_push_list));
39     rt_list_init(&(queue->suspended_pop_list));
40 
41     queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size);
42     if (queue->queue == RT_NULL)
43     {
44         return -RT_ENOMEM;
45     }
46 
47     return RT_EOK;
48 }
49 RTM_EXPORT(rt_data_queue_init);
50 
51 rt_err_t rt_data_queue_push(struct rt_data_queue *queue,
52                             const void *data_ptr,
53                             rt_size_t data_size,
54                             rt_int32_t timeout)
55 {
56     rt_ubase_t  level;
57     rt_thread_t thread;
58     rt_err_t    result;
59 
60     RT_ASSERT(queue != RT_NULL);
61 
62     result = RT_EOK;
63     thread = rt_thread_self();
64 
65     level = rt_hw_interrupt_disable();
66     while (queue->put_index - queue->get_index == queue->size)
67     {
68         /* queue is full */
69         if (timeout == 0)
70         {
71             result = -RT_ETIMEOUT;
72 
73             goto __exit;
74         }
75 
76         /* current context checking */
77         RT_DEBUG_NOT_IN_INTERRUPT;
78 
79         /* reset thread error number */
80         thread->error = RT_EOK;
81 
82         /* suspend thread on the push list */
83         rt_thread_suspend(thread);
84         rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist));
85         /* start timer */
86         if (timeout > 0)
87         {
88             /* reset the timeout of thread timer and start it */
89             rt_timer_control(&(thread->thread_timer),
90                              RT_TIMER_CTRL_SET_TIME,
91                              &timeout);
92             rt_timer_start(&(thread->thread_timer));
93         }
94 
95         /* enable interrupt */
96         rt_hw_interrupt_enable(level);
97 
98         /* do schedule */
99         rt_schedule();
100 
101         /* thread is waked up */
102         result = thread->error;
103         level = rt_hw_interrupt_disable();
104         if (result != RT_EOK) goto __exit;
105     }
106 
107     queue->queue[queue->put_index % queue->size].data_ptr  = data_ptr;
108     queue->queue[queue->put_index % queue->size].data_size = data_size;
109     queue->put_index += 1;
110 
111     /* there is at least one thread in suspended list */
112     if (!rt_list_isempty(&(queue->suspended_pop_list)))
113     {
114         /* get thread entry */
115         thread = rt_list_entry(queue->suspended_pop_list.next,
116                                struct rt_thread,
117                                tlist);
118 
119         /* resume it */
120         rt_thread_resume(thread);
121         rt_hw_interrupt_enable(level);
122 
123         /* perform a schedule */
124         rt_schedule();
125 
126         return result;
127     }
128 
129 __exit:
130     rt_hw_interrupt_enable(level);
131     if ((result == RT_EOK) && queue->evt_notify != RT_NULL)
132     {
133         queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH);
134     }
135 
136     return result;
137 }
138 RTM_EXPORT(rt_data_queue_push);
139 
140 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue,
141                            const void** data_ptr,
142                            rt_size_t *size,
143                            rt_int32_t timeout)
144 {
145     rt_ubase_t  level;
146     rt_thread_t thread;
147     rt_err_t    result;
148 
149     RT_ASSERT(queue != RT_NULL);
150     RT_ASSERT(data_ptr != RT_NULL);
151     RT_ASSERT(size != RT_NULL);
152 
153     result = RT_EOK;
154     thread = rt_thread_self();
155 
156     level = rt_hw_interrupt_disable();
157     while (queue->get_index == queue->put_index)
158     {
159         /* queue is empty */
160         if (timeout == 0)
161         {
162             result = -RT_ETIMEOUT;
163             goto __exit;
164         }
165 
166         /* current context checking */
167         RT_DEBUG_NOT_IN_INTERRUPT;
168 
169         /* reset thread error number */
170         thread->error = RT_EOK;
171 
172         /* suspend thread on the pop list */
173         rt_thread_suspend(thread);
174         rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist));
175         /* start timer */
176         if (timeout > 0)
177         {
178             /* reset the timeout of thread timer and start it */
179             rt_timer_control(&(thread->thread_timer),
180                              RT_TIMER_CTRL_SET_TIME,
181                              &timeout);
182             rt_timer_start(&(thread->thread_timer));
183         }
184 
185         /* enable interrupt */
186         rt_hw_interrupt_enable(level);
187 
188         /* do schedule */
189         rt_schedule();
190 
191         /* thread is waked up */
192         result = thread->error;
193         level  = rt_hw_interrupt_disable();
194         if (result != RT_EOK)
195             goto __exit;
196     }
197 
198     *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
199     *size     = queue->queue[queue->get_index % queue->size].data_size;
200 
201     queue->get_index += 1;
202 
203     if ((queue->put_index - queue->get_index) <= queue->lwm)
204     {
205         /* there is at least one thread in suspended list */
206         if (!rt_list_isempty(&(queue->suspended_push_list)))
207         {
208             /* get thread entry */
209             thread = rt_list_entry(queue->suspended_push_list.next,
210                                    struct rt_thread,
211                                    tlist);
212 
213             /* resume it */
214             rt_thread_resume(thread);
215             rt_hw_interrupt_enable(level);
216 
217             /* perform a schedule */
218             rt_schedule();
219         }
220         else
221         {
222             rt_hw_interrupt_enable(level);
223         }
224 
225         if (queue->evt_notify != RT_NULL)
226             queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM);
227 
228         return result;
229     }
230 
231 __exit:
232     rt_hw_interrupt_enable(level);
233     if ((result == RT_EOK) && (queue->evt_notify != RT_NULL))
234     {
235         queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP);
236     }
237 
238     return result;
239 }
240 RTM_EXPORT(rt_data_queue_pop);
241 
242 rt_err_t rt_data_queue_peak(struct rt_data_queue *queue,
243                             const void** data_ptr,
244                             rt_size_t *size)
245 {
246     rt_ubase_t  level;
247 
248     RT_ASSERT(queue != RT_NULL);
249 
250     level = rt_hw_interrupt_disable();
251 
252     if (queue->get_index == queue->put_index)
253     {
254         rt_hw_interrupt_enable(level);
255 
256         return -RT_EEMPTY;
257     }
258 
259     *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr;
260     *size     = queue->queue[queue->get_index % queue->size].data_size;
261 
262     rt_hw_interrupt_enable(level);
263 
264     return RT_EOK;
265 }
266 RTM_EXPORT(rt_data_queue_peak);
267 
268 void rt_data_queue_reset(struct rt_data_queue *queue)
269 {
270     struct rt_thread *thread;
271     register rt_ubase_t temp;
272 
273     rt_enter_critical();
274     /* wakeup all suspend threads */
275 
276     /* resume on pop list */
277     while (!rt_list_isempty(&(queue->suspended_pop_list)))
278     {
279         /* disable interrupt */
280         temp = rt_hw_interrupt_disable();
281 
282         /* get next suspend thread */
283         thread = rt_list_entry(queue->suspended_pop_list.next,
284                                struct rt_thread,
285                                tlist);
286         /* set error code to RT_ERROR */
287         thread->error = -RT_ERROR;
288 
289         /*
290          * resume thread
291          * In rt_thread_resume function, it will remove current thread from
292          * suspend list
293          */
294         rt_thread_resume(thread);
295 
296         /* enable interrupt */
297         rt_hw_interrupt_enable(temp);
298     }
299 
300     /* resume on push list */
301     while (!rt_list_isempty(&(queue->suspended_push_list)))
302     {
303         /* disable interrupt */
304         temp = rt_hw_interrupt_disable();
305 
306         /* get next suspend thread */
307         thread = rt_list_entry(queue->suspended_push_list.next,
308                                struct rt_thread,
309                                tlist);
310         /* set error code to RT_ERROR */
311         thread->error = -RT_ERROR;
312 
313         /*
314          * resume thread
315          * In rt_thread_resume function, it will remove current thread from
316          * suspend list
317          */
318         rt_thread_resume(thread);
319 
320         /* enable interrupt */
321         rt_hw_interrupt_enable(temp);
322     }
323     rt_exit_critical();
324 
325     rt_schedule();
326 }
327 RTM_EXPORT(rt_data_queue_reset);
328