1 /* 2 * Copyright (c) 2006-2018, RT-Thread Development Team 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 * 2012-09-30 Bernard first version. 9 * 2016-10-31 armink fix some resume push and pop thread bugs 10 */ 11 12 #include <rtthread.h> 13 #include <rtdevice.h> 14 #include <rthw.h> 15 16 struct rt_data_item 17 { 18 const void *data_ptr; 19 rt_size_t data_size; 20 }; 21 22 rt_err_t 23 rt_data_queue_init(struct rt_data_queue *queue, 24 rt_uint16_t size, 25 rt_uint16_t lwm, 26 void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)) 27 { 28 RT_ASSERT(queue != RT_NULL); 29 30 queue->evt_notify = evt_notify; 31 32 queue->size = size; 33 queue->lwm = lwm; 34 35 queue->get_index = 0; 36 queue->put_index = 0; 37 38 rt_list_init(&(queue->suspended_push_list)); 39 rt_list_init(&(queue->suspended_pop_list)); 40 41 queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size); 42 if (queue->queue == RT_NULL) 43 { 44 return -RT_ENOMEM; 45 } 46 47 return RT_EOK; 48 } 49 RTM_EXPORT(rt_data_queue_init); 50 51 rt_err_t rt_data_queue_push(struct rt_data_queue *queue, 52 const void *data_ptr, 53 rt_size_t data_size, 54 rt_int32_t timeout) 55 { 56 rt_ubase_t level; 57 rt_thread_t thread; 58 rt_err_t result; 59 60 RT_ASSERT(queue != RT_NULL); 61 62 result = RT_EOK; 63 thread = rt_thread_self(); 64 65 level = rt_hw_interrupt_disable(); 66 while (queue->put_index - queue->get_index == queue->size) 67 { 68 /* queue is full */ 69 if (timeout == 0) 70 { 71 result = -RT_ETIMEOUT; 72 73 goto __exit; 74 } 75 76 /* current context checking */ 77 RT_DEBUG_NOT_IN_INTERRUPT; 78 79 /* reset thread error number */ 80 thread->error = RT_EOK; 81 82 /* suspend thread on the push list */ 83 rt_thread_suspend(thread); 84 rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist)); 85 /* start timer */ 86 if (timeout > 0) 87 { 88 /* reset the timeout of thread timer and start it */ 89 rt_timer_control(&(thread->thread_timer), 90 RT_TIMER_CTRL_SET_TIME, 91 &timeout); 92 rt_timer_start(&(thread->thread_timer)); 93 } 94 95 /* enable interrupt */ 96 rt_hw_interrupt_enable(level); 97 98 /* do schedule */ 99 rt_schedule(); 100 101 /* thread is waked up */ 102 result = thread->error; 103 level = rt_hw_interrupt_disable(); 104 if (result != RT_EOK) goto __exit; 105 } 106 107 queue->queue[queue->put_index % queue->size].data_ptr = data_ptr; 108 queue->queue[queue->put_index % queue->size].data_size = data_size; 109 queue->put_index += 1; 110 111 /* there is at least one thread in suspended list */ 112 if (!rt_list_isempty(&(queue->suspended_pop_list))) 113 { 114 /* get thread entry */ 115 thread = rt_list_entry(queue->suspended_pop_list.next, 116 struct rt_thread, 117 tlist); 118 119 /* resume it */ 120 rt_thread_resume(thread); 121 rt_hw_interrupt_enable(level); 122 123 /* perform a schedule */ 124 rt_schedule(); 125 126 return result; 127 } 128 129 __exit: 130 rt_hw_interrupt_enable(level); 131 if ((result == RT_EOK) && queue->evt_notify != RT_NULL) 132 { 133 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH); 134 } 135 136 return result; 137 } 138 RTM_EXPORT(rt_data_queue_push); 139 140 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, 141 const void** data_ptr, 142 rt_size_t *size, 143 rt_int32_t timeout) 144 { 145 rt_ubase_t level; 146 rt_thread_t thread; 147 rt_err_t result; 148 149 RT_ASSERT(queue != RT_NULL); 150 RT_ASSERT(data_ptr != RT_NULL); 151 RT_ASSERT(size != RT_NULL); 152 153 result = RT_EOK; 154 thread = rt_thread_self(); 155 156 level = rt_hw_interrupt_disable(); 157 while (queue->get_index == queue->put_index) 158 { 159 /* queue is empty */ 160 if (timeout == 0) 161 { 162 result = -RT_ETIMEOUT; 163 goto __exit; 164 } 165 166 /* current context checking */ 167 RT_DEBUG_NOT_IN_INTERRUPT; 168 169 /* reset thread error number */ 170 thread->error = RT_EOK; 171 172 /* suspend thread on the pop list */ 173 rt_thread_suspend(thread); 174 rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist)); 175 /* start timer */ 176 if (timeout > 0) 177 { 178 /* reset the timeout of thread timer and start it */ 179 rt_timer_control(&(thread->thread_timer), 180 RT_TIMER_CTRL_SET_TIME, 181 &timeout); 182 rt_timer_start(&(thread->thread_timer)); 183 } 184 185 /* enable interrupt */ 186 rt_hw_interrupt_enable(level); 187 188 /* do schedule */ 189 rt_schedule(); 190 191 /* thread is waked up */ 192 result = thread->error; 193 level = rt_hw_interrupt_disable(); 194 if (result != RT_EOK) 195 goto __exit; 196 } 197 198 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; 199 *size = queue->queue[queue->get_index % queue->size].data_size; 200 201 queue->get_index += 1; 202 203 if ((queue->put_index - queue->get_index) <= queue->lwm) 204 { 205 /* there is at least one thread in suspended list */ 206 if (!rt_list_isempty(&(queue->suspended_push_list))) 207 { 208 /* get thread entry */ 209 thread = rt_list_entry(queue->suspended_push_list.next, 210 struct rt_thread, 211 tlist); 212 213 /* resume it */ 214 rt_thread_resume(thread); 215 rt_hw_interrupt_enable(level); 216 217 /* perform a schedule */ 218 rt_schedule(); 219 } 220 else 221 { 222 rt_hw_interrupt_enable(level); 223 } 224 225 if (queue->evt_notify != RT_NULL) 226 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM); 227 228 return result; 229 } 230 231 __exit: 232 rt_hw_interrupt_enable(level); 233 if ((result == RT_EOK) && (queue->evt_notify != RT_NULL)) 234 { 235 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP); 236 } 237 238 return result; 239 } 240 RTM_EXPORT(rt_data_queue_pop); 241 242 rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, 243 const void** data_ptr, 244 rt_size_t *size) 245 { 246 rt_ubase_t level; 247 248 RT_ASSERT(queue != RT_NULL); 249 250 level = rt_hw_interrupt_disable(); 251 252 if (queue->get_index == queue->put_index) 253 { 254 rt_hw_interrupt_enable(level); 255 256 return -RT_EEMPTY; 257 } 258 259 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; 260 *size = queue->queue[queue->get_index % queue->size].data_size; 261 262 rt_hw_interrupt_enable(level); 263 264 return RT_EOK; 265 } 266 RTM_EXPORT(rt_data_queue_peak); 267 268 void rt_data_queue_reset(struct rt_data_queue *queue) 269 { 270 struct rt_thread *thread; 271 register rt_ubase_t temp; 272 273 rt_enter_critical(); 274 /* wakeup all suspend threads */ 275 276 /* resume on pop list */ 277 while (!rt_list_isempty(&(queue->suspended_pop_list))) 278 { 279 /* disable interrupt */ 280 temp = rt_hw_interrupt_disable(); 281 282 /* get next suspend thread */ 283 thread = rt_list_entry(queue->suspended_pop_list.next, 284 struct rt_thread, 285 tlist); 286 /* set error code to RT_ERROR */ 287 thread->error = -RT_ERROR; 288 289 /* 290 * resume thread 291 * In rt_thread_resume function, it will remove current thread from 292 * suspend list 293 */ 294 rt_thread_resume(thread); 295 296 /* enable interrupt */ 297 rt_hw_interrupt_enable(temp); 298 } 299 300 /* resume on push list */ 301 while (!rt_list_isempty(&(queue->suspended_push_list))) 302 { 303 /* disable interrupt */ 304 temp = rt_hw_interrupt_disable(); 305 306 /* get next suspend thread */ 307 thread = rt_list_entry(queue->suspended_push_list.next, 308 struct rt_thread, 309 tlist); 310 /* set error code to RT_ERROR */ 311 thread->error = -RT_ERROR; 312 313 /* 314 * resume thread 315 * In rt_thread_resume function, it will remove current thread from 316 * suspend list 317 */ 318 rt_thread_resume(thread); 319 320 /* enable interrupt */ 321 rt_hw_interrupt_enable(temp); 322 } 323 rt_exit_critical(); 324 325 rt_schedule(); 326 } 327 RTM_EXPORT(rt_data_queue_reset); 328