1 /*
2 * COPYRIGHT (C) 2018, Real-Thread Information Technology Ltd
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2013-11-04 Grissiom add comment
9 */
10
11 #include <rthw.h>
12 #include <rtthread.h>
13
14 #include "prio_queue.h"
15
16 struct rt_prio_queue_item {
17 struct rt_prio_queue_item *next;
18 /* data follows */
19 };
20
_do_push(struct rt_prio_queue * que,rt_uint8_t prio,struct rt_prio_queue_item * item)21 static void _do_push(struct rt_prio_queue *que,
22 rt_uint8_t prio,
23 struct rt_prio_queue_item *item)
24 {
25 if (que->head[prio] == RT_NULL)
26 {
27 que->head[prio] = item;
28 que->bitmap |= 1 << prio;
29 }
30 else
31 {
32 RT_ASSERT(que->tail[prio]);
33 que->tail[prio]->next = item;
34 }
35 que->tail[prio] = item;
36 }
37
_do_pop(struct rt_prio_queue * que)38 static struct rt_prio_queue_item* _do_pop(struct rt_prio_queue *que)
39 {
40 int ffs;
41 struct rt_prio_queue_item *item;
42
43 ffs = __rt_ffs(que->bitmap);
44 if (ffs == 0)
45 return RT_NULL;
46 ffs--;
47
48 item = que->head[ffs];
49 RT_ASSERT(item);
50
51 que->head[ffs] = item->next;
52 if (que->head[ffs] == RT_NULL)
53 {
54 que->bitmap &= ~(1 << ffs);
55 }
56
57 return item;
58 }
59
rt_prio_queue_init(struct rt_prio_queue * que,const char * name,void * buf,rt_size_t bufsz,rt_size_t itemsz)60 rt_err_t rt_prio_queue_init(struct rt_prio_queue *que,
61 const char *name,
62 void *buf,
63 rt_size_t bufsz,
64 rt_size_t itemsz)
65 {
66 RT_ASSERT(que);
67
68 rt_memset(que, 0, sizeof(*que));
69
70 rt_list_init(&(que->suspended_pop_list));
71
72 rt_mp_init(&que->pool, name, buf, bufsz,
73 sizeof(struct rt_prio_queue_item) + itemsz);
74
75 que->item_sz = itemsz;
76
77 return RT_EOK;
78 }
79
rt_prio_queue_detach(struct rt_prio_queue * que)80 void rt_prio_queue_detach(struct rt_prio_queue *que)
81 {
82 /* wake up all suspended pop threads, push thread is suspended on mempool.
83 */
84 while (!rt_list_isempty(&(que->suspended_pop_list)))
85 {
86 rt_thread_t thread;
87
88 /* disable interrupt */
89 rt_ubase_t temp = rt_hw_interrupt_disable();
90
91 /* get next suspend thread */
92 thread = rt_list_entry(que->suspended_pop_list.next, struct rt_thread, tlist);
93 /* set error code to RT_ERROR */
94 thread->error = -RT_ERROR;
95
96 rt_thread_resume(thread);
97
98 /* enable interrupt */
99 rt_hw_interrupt_enable(temp);
100 }
101 rt_mp_detach(&que->pool);
102 }
103
104 #ifdef RT_USING_HEAP
rt_prio_queue_create(const char * name,rt_size_t item_nr,rt_size_t item_sz)105 struct rt_prio_queue* rt_prio_queue_create(const char *name,
106 rt_size_t item_nr,
107 rt_size_t item_sz)
108 {
109 struct rt_prio_queue *que;
110 rt_size_t bufsz;
111
112 bufsz = item_nr * (sizeof(struct rt_prio_queue_item)
113 + item_sz
114 + sizeof(void*));
115
116 RT_ASSERT(item_nr);
117
118 que = rt_malloc(sizeof(*que) + bufsz);
119 if (!que)
120 return RT_NULL;
121
122 rt_prio_queue_init(que, name, que+1, bufsz, item_sz);
123
124 return que;
125 }
126
rt_prio_queue_delete(struct rt_prio_queue * que)127 void rt_prio_queue_delete(struct rt_prio_queue *que)
128 {
129 rt_prio_queue_detach(que);
130 rt_free(que);
131 }
132 #endif
133
rt_prio_queue_push(struct rt_prio_queue * que,rt_uint8_t prio,void * data,rt_int32_t timeout)134 rt_err_t rt_prio_queue_push(struct rt_prio_queue *que,
135 rt_uint8_t prio,
136 void *data,
137 rt_int32_t timeout)
138 {
139 rt_ubase_t level;
140 struct rt_prio_queue_item *item;
141
142 RT_ASSERT(que);
143
144 if (prio >= RT_PRIO_QUEUE_PRIO_MAX)
145 return -RT_ERROR;
146
147 item = rt_mp_alloc(&que->pool, timeout);
148 if (item == RT_NULL)
149 return -RT_ENOMEM;
150
151 rt_memcpy(item+1, data, que->item_sz);
152 item->next = RT_NULL;
153
154 level = rt_hw_interrupt_disable();
155
156 _do_push(que, prio, item);
157
158 if (!rt_list_isempty(&(que->suspended_pop_list)))
159 {
160 rt_thread_t thread;
161
162 /* get thread entry */
163 thread = rt_list_entry(que->suspended_pop_list.next,
164 struct rt_thread,
165 tlist);
166 /* resume it */
167 rt_thread_resume(thread);
168 rt_hw_interrupt_enable(level);
169
170 /* perform a schedule */
171 rt_schedule();
172
173 return RT_EOK;
174 }
175
176 rt_hw_interrupt_enable(level);
177
178 return RT_EOK;
179 }
180
rt_prio_queue_pop(struct rt_prio_queue * que,void * data,rt_int32_t timeout)181 rt_err_t rt_prio_queue_pop(struct rt_prio_queue *que,
182 void *data,
183 rt_int32_t timeout)
184 {
185 rt_ubase_t level;
186 struct rt_prio_queue_item *item;
187
188 RT_ASSERT(que);
189 RT_ASSERT(data);
190
191 level = rt_hw_interrupt_disable();
192 for (item = _do_pop(que);
193 item == RT_NULL;
194 item = _do_pop(que))
195 {
196 rt_thread_t thread;
197
198 if (timeout == 0)
199 {
200 rt_hw_interrupt_enable(level);
201 return -RT_ETIMEOUT;
202 }
203
204 RT_DEBUG_NOT_IN_INTERRUPT;
205
206 thread = rt_thread_self();
207 thread->error = RT_EOK;
208 rt_thread_suspend(thread);
209
210 rt_list_insert_before(&(que->suspended_pop_list), &(thread->tlist));
211
212 if (timeout > 0)
213 {
214 rt_timer_control(&(thread->thread_timer),
215 RT_TIMER_CTRL_SET_TIME,
216 &timeout);
217 rt_timer_start(&(thread->thread_timer));
218 }
219
220 rt_hw_interrupt_enable(level);
221
222 rt_schedule();
223
224 /* thread is waked up */
225 if (thread->error != RT_EOK)
226 return thread->error;
227 level = rt_hw_interrupt_disable();
228 }
229
230 rt_hw_interrupt_enable(level);
231
232 rt_memcpy(data, item+1, que->item_sz);
233 rt_mp_free(item);
234
235 return RT_EOK;
236 }
237
rt_prio_queue_dump(struct rt_prio_queue * que)238 void rt_prio_queue_dump(struct rt_prio_queue *que)
239 {
240 int level = 0;
241
242 rt_kprintf("bitmap: %08x\n", que->bitmap);
243 for (level = 0; level < RT_PRIO_QUEUE_PRIO_MAX; level++)
244 {
245 struct rt_prio_queue_item *item;
246
247 rt_kprintf("%2d: ", level);
248 for (item = que->head[level];
249 item;
250 item = item->next)
251 {
252 rt_kprintf("%p, ", item);
253 }
254 rt_kprintf("\n");
255 }
256 }
257
258