1 /* 2 * COPYRIGHT (C) 2018, Real-Thread Information Technology Ltd 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 * 2013-11-04 Grissiom add comment 9 */ 10 11 #include <rthw.h> 12 #include <rtthread.h> 13 14 #include "prio_queue.h" 15 16 struct rt_prio_queue_item { 17 struct rt_prio_queue_item *next; 18 /* data follows */ 19 }; 20 21 static void _do_push(struct rt_prio_queue *que, 22 rt_uint8_t prio, 23 struct rt_prio_queue_item *item) 24 { 25 if (que->head[prio] == RT_NULL) 26 { 27 que->head[prio] = item; 28 que->bitmap |= 1 << prio; 29 } 30 else 31 { 32 RT_ASSERT(que->tail[prio]); 33 que->tail[prio]->next = item; 34 } 35 que->tail[prio] = item; 36 } 37 38 static struct rt_prio_queue_item* _do_pop(struct rt_prio_queue *que) 39 { 40 int ffs; 41 struct rt_prio_queue_item *item; 42 43 ffs = __rt_ffs(que->bitmap); 44 if (ffs == 0) 45 return RT_NULL; 46 ffs--; 47 48 item = que->head[ffs]; 49 RT_ASSERT(item); 50 51 que->head[ffs] = item->next; 52 if (que->head[ffs] == RT_NULL) 53 { 54 que->bitmap &= ~(1 << ffs); 55 } 56 57 return item; 58 } 59 60 rt_err_t rt_prio_queue_init(struct rt_prio_queue *que, 61 const char *name, 62 void *buf, 63 rt_size_t bufsz, 64 rt_size_t itemsz) 65 { 66 RT_ASSERT(que); 67 68 rt_memset(que, 0, sizeof(*que)); 69 70 rt_list_init(&(que->suspended_pop_list)); 71 72 rt_mp_init(&que->pool, name, buf, bufsz, 73 sizeof(struct rt_prio_queue_item) + itemsz); 74 75 que->item_sz = itemsz; 76 77 return RT_EOK; 78 } 79 80 void rt_prio_queue_detach(struct rt_prio_queue *que) 81 { 82 /* wake up all suspended pop threads, push thread is suspended on mempool. 83 */ 84 while (!rt_list_isempty(&(que->suspended_pop_list))) 85 { 86 rt_thread_t thread; 87 88 /* disable interrupt */ 89 rt_ubase_t temp = rt_hw_interrupt_disable(); 90 91 /* get next suspend thread */ 92 thread = rt_list_entry(que->suspended_pop_list.next, struct rt_thread, tlist); 93 /* set error code to RT_ERROR */ 94 thread->error = -RT_ERROR; 95 96 rt_thread_resume(thread); 97 98 /* enable interrupt */ 99 rt_hw_interrupt_enable(temp); 100 } 101 rt_mp_detach(&que->pool); 102 } 103 104 #ifdef RT_USING_HEAP 105 struct rt_prio_queue* rt_prio_queue_create(const char *name, 106 rt_size_t item_nr, 107 rt_size_t item_sz) 108 { 109 struct rt_prio_queue *que; 110 rt_size_t bufsz; 111 112 bufsz = item_nr * (sizeof(struct rt_prio_queue_item) 113 + item_sz 114 + sizeof(void*)); 115 116 RT_ASSERT(item_nr); 117 118 que = rt_malloc(sizeof(*que) + bufsz); 119 if (!que) 120 return RT_NULL; 121 122 rt_prio_queue_init(que, name, que+1, bufsz, item_sz); 123 124 return que; 125 } 126 127 void rt_prio_queue_delete(struct rt_prio_queue *que) 128 { 129 rt_prio_queue_detach(que); 130 rt_free(que); 131 } 132 #endif 133 134 rt_err_t rt_prio_queue_push(struct rt_prio_queue *que, 135 rt_uint8_t prio, 136 void *data, 137 rt_int32_t timeout) 138 { 139 rt_ubase_t level; 140 struct rt_prio_queue_item *item; 141 142 RT_ASSERT(que); 143 144 if (prio >= RT_PRIO_QUEUE_PRIO_MAX) 145 return -RT_ERROR; 146 147 item = rt_mp_alloc(&que->pool, timeout); 148 if (item == RT_NULL) 149 return -RT_ENOMEM; 150 151 rt_memcpy(item+1, data, que->item_sz); 152 item->next = RT_NULL; 153 154 level = rt_hw_interrupt_disable(); 155 156 _do_push(que, prio, item); 157 158 if (!rt_list_isempty(&(que->suspended_pop_list))) 159 { 160 rt_thread_t thread; 161 162 /* get thread entry */ 163 thread = rt_list_entry(que->suspended_pop_list.next, 164 struct rt_thread, 165 tlist); 166 /* resume it */ 167 rt_thread_resume(thread); 168 rt_hw_interrupt_enable(level); 169 170 /* perform a schedule */ 171 rt_schedule(); 172 173 return RT_EOK; 174 } 175 176 rt_hw_interrupt_enable(level); 177 178 return RT_EOK; 179 } 180 181 rt_err_t rt_prio_queue_pop(struct rt_prio_queue *que, 182 void *data, 183 rt_int32_t timeout) 184 { 185 rt_ubase_t level; 186 struct rt_prio_queue_item *item; 187 188 RT_ASSERT(que); 189 RT_ASSERT(data); 190 191 level = rt_hw_interrupt_disable(); 192 for (item = _do_pop(que); 193 item == RT_NULL; 194 item = _do_pop(que)) 195 { 196 rt_thread_t thread; 197 198 if (timeout == 0) 199 { 200 rt_hw_interrupt_enable(level); 201 return -RT_ETIMEOUT; 202 } 203 204 RT_DEBUG_NOT_IN_INTERRUPT; 205 206 thread = rt_thread_self(); 207 thread->error = RT_EOK; 208 rt_thread_suspend(thread); 209 210 rt_list_insert_before(&(que->suspended_pop_list), &(thread->tlist)); 211 212 if (timeout > 0) 213 { 214 rt_timer_control(&(thread->thread_timer), 215 RT_TIMER_CTRL_SET_TIME, 216 &timeout); 217 rt_timer_start(&(thread->thread_timer)); 218 } 219 220 rt_hw_interrupt_enable(level); 221 222 rt_schedule(); 223 224 /* thread is waked up */ 225 if (thread->error != RT_EOK) 226 return thread->error; 227 level = rt_hw_interrupt_disable(); 228 } 229 230 rt_hw_interrupt_enable(level); 231 232 rt_memcpy(data, item+1, que->item_sz); 233 rt_mp_free(item); 234 235 return RT_EOK; 236 } 237 238 void rt_prio_queue_dump(struct rt_prio_queue *que) 239 { 240 int level = 0; 241 242 rt_kprintf("bitmap: %08x\n", que->bitmap); 243 for (level = 0; level < RT_PRIO_QUEUE_PRIO_MAX; level++) 244 { 245 struct rt_prio_queue_item *item; 246 247 rt_kprintf("%2d: ", level); 248 for (item = que->head[level]; 249 item; 250 item = item->next) 251 { 252 rt_kprintf("%p, ", item); 253 } 254 rt_kprintf("\n"); 255 } 256 } 257 258