xref: /nrf52832-nimble/rt-thread/components/vbus/prio_queue.c (revision 104654410c56c573564690304ae786df310c91fc)
1 /*
2  * COPYRIGHT (C) 2018, Real-Thread Information Technology Ltd
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2013-11-04     Grissiom     add comment
9  */
10 
11 #include <rthw.h>
12 #include <rtthread.h>
13 
14 #include "prio_queue.h"
15 
16 struct rt_prio_queue_item {
17     struct rt_prio_queue_item *next;
18     /* data follows */
19 };
20 
_do_push(struct rt_prio_queue * que,rt_uint8_t prio,struct rt_prio_queue_item * item)21 static void _do_push(struct rt_prio_queue *que,
22                      rt_uint8_t prio,
23                      struct rt_prio_queue_item *item)
24 {
25     if (que->head[prio] == RT_NULL)
26     {
27         que->head[prio] = item;
28         que->bitmap |= 1 << prio;
29     }
30     else
31     {
32         RT_ASSERT(que->tail[prio]);
33         que->tail[prio]->next = item;
34     }
35     que->tail[prio] = item;
36 }
37 
_do_pop(struct rt_prio_queue * que)38 static struct rt_prio_queue_item* _do_pop(struct rt_prio_queue *que)
39 {
40     int ffs;
41     struct rt_prio_queue_item *item;
42 
43     ffs = __rt_ffs(que->bitmap);
44     if (ffs == 0)
45         return RT_NULL;
46     ffs--;
47 
48     item = que->head[ffs];
49     RT_ASSERT(item);
50 
51     que->head[ffs] = item->next;
52     if (que->head[ffs] == RT_NULL)
53     {
54         que->bitmap &= ~(1 << ffs);
55     }
56 
57     return item;
58 }
59 
rt_prio_queue_init(struct rt_prio_queue * que,const char * name,void * buf,rt_size_t bufsz,rt_size_t itemsz)60 rt_err_t rt_prio_queue_init(struct rt_prio_queue *que,
61                             const char *name,
62                             void *buf,
63                             rt_size_t bufsz,
64                             rt_size_t itemsz)
65 {
66     RT_ASSERT(que);
67 
68     rt_memset(que, 0, sizeof(*que));
69 
70     rt_list_init(&(que->suspended_pop_list));
71 
72     rt_mp_init(&que->pool, name, buf, bufsz,
73                sizeof(struct rt_prio_queue_item) + itemsz);
74 
75     que->item_sz = itemsz;
76 
77     return RT_EOK;
78 }
79 
rt_prio_queue_detach(struct rt_prio_queue * que)80 void rt_prio_queue_detach(struct rt_prio_queue *que)
81 {
82     /* wake up all suspended pop threads, push thread is suspended on mempool.
83      */
84     while (!rt_list_isempty(&(que->suspended_pop_list)))
85     {
86         rt_thread_t thread;
87 
88         /* disable interrupt */
89         rt_ubase_t temp = rt_hw_interrupt_disable();
90 
91         /* get next suspend thread */
92         thread = rt_list_entry(que->suspended_pop_list.next, struct rt_thread, tlist);
93         /* set error code to RT_ERROR */
94         thread->error = -RT_ERROR;
95 
96         rt_thread_resume(thread);
97 
98         /* enable interrupt */
99         rt_hw_interrupt_enable(temp);
100     }
101     rt_mp_detach(&que->pool);
102 }
103 
104 #ifdef RT_USING_HEAP
rt_prio_queue_create(const char * name,rt_size_t item_nr,rt_size_t item_sz)105 struct rt_prio_queue* rt_prio_queue_create(const char *name,
106                                            rt_size_t item_nr,
107                                            rt_size_t item_sz)
108 {
109     struct rt_prio_queue *que;
110     rt_size_t bufsz;
111 
112     bufsz = item_nr * (sizeof(struct rt_prio_queue_item)
113                        + item_sz
114                        + sizeof(void*));
115 
116     RT_ASSERT(item_nr);
117 
118     que = rt_malloc(sizeof(*que) + bufsz);
119     if (!que)
120         return RT_NULL;
121 
122     rt_prio_queue_init(que, name, que+1, bufsz, item_sz);
123 
124     return que;
125 }
126 
rt_prio_queue_delete(struct rt_prio_queue * que)127 void rt_prio_queue_delete(struct rt_prio_queue *que)
128 {
129     rt_prio_queue_detach(que);
130     rt_free(que);
131 }
132 #endif
133 
rt_prio_queue_push(struct rt_prio_queue * que,rt_uint8_t prio,void * data,rt_int32_t timeout)134 rt_err_t rt_prio_queue_push(struct rt_prio_queue *que,
135                             rt_uint8_t prio,
136                             void *data,
137                             rt_int32_t timeout)
138 {
139     rt_ubase_t level;
140     struct rt_prio_queue_item *item;
141 
142     RT_ASSERT(que);
143 
144     if (prio >= RT_PRIO_QUEUE_PRIO_MAX)
145         return -RT_ERROR;
146 
147     item = rt_mp_alloc(&que->pool, timeout);
148     if (item == RT_NULL)
149         return -RT_ENOMEM;
150 
151     rt_memcpy(item+1, data, que->item_sz);
152     item->next = RT_NULL;
153 
154     level = rt_hw_interrupt_disable();
155 
156     _do_push(que, prio, item);
157 
158     if (!rt_list_isempty(&(que->suspended_pop_list)))
159     {
160         rt_thread_t thread;
161 
162         /* get thread entry */
163         thread = rt_list_entry(que->suspended_pop_list.next,
164                                struct rt_thread,
165                                tlist);
166         /* resume it */
167         rt_thread_resume(thread);
168         rt_hw_interrupt_enable(level);
169 
170         /* perform a schedule */
171         rt_schedule();
172 
173         return RT_EOK;
174     }
175 
176     rt_hw_interrupt_enable(level);
177 
178     return RT_EOK;
179 }
180 
rt_prio_queue_pop(struct rt_prio_queue * que,void * data,rt_int32_t timeout)181 rt_err_t rt_prio_queue_pop(struct rt_prio_queue *que,
182                            void *data,
183                            rt_int32_t timeout)
184 {
185     rt_ubase_t level;
186     struct rt_prio_queue_item *item;
187 
188     RT_ASSERT(que);
189     RT_ASSERT(data);
190 
191     level = rt_hw_interrupt_disable();
192     for (item = _do_pop(que);
193          item == RT_NULL;
194          item = _do_pop(que))
195     {
196         rt_thread_t thread;
197 
198         if (timeout == 0)
199         {
200             rt_hw_interrupt_enable(level);
201             return -RT_ETIMEOUT;
202         }
203 
204         RT_DEBUG_NOT_IN_INTERRUPT;
205 
206         thread = rt_thread_self();
207         thread->error = RT_EOK;
208         rt_thread_suspend(thread);
209 
210         rt_list_insert_before(&(que->suspended_pop_list), &(thread->tlist));
211 
212         if (timeout > 0)
213         {
214             rt_timer_control(&(thread->thread_timer),
215                              RT_TIMER_CTRL_SET_TIME,
216                              &timeout);
217             rt_timer_start(&(thread->thread_timer));
218         }
219 
220         rt_hw_interrupt_enable(level);
221 
222         rt_schedule();
223 
224         /* thread is waked up */
225         if (thread->error != RT_EOK)
226             return thread->error;
227         level = rt_hw_interrupt_disable();
228     }
229 
230     rt_hw_interrupt_enable(level);
231 
232     rt_memcpy(data, item+1, que->item_sz);
233     rt_mp_free(item);
234 
235     return RT_EOK;
236 }
237 
rt_prio_queue_dump(struct rt_prio_queue * que)238 void rt_prio_queue_dump(struct rt_prio_queue *que)
239 {
240     int level = 0;
241 
242     rt_kprintf("bitmap: %08x\n", que->bitmap);
243     for (level = 0; level < RT_PRIO_QUEUE_PRIO_MAX; level++)
244     {
245         struct rt_prio_queue_item *item;
246 
247         rt_kprintf("%2d: ", level);
248         for (item = que->head[level];
249              item;
250              item = item->next)
251         {
252             rt_kprintf("%p, ", item);
253         }
254         rt_kprintf("\n");
255     }
256 }
257 
258