Lines Matching full:queue
23 rt_data_queue_init(struct rt_data_queue *queue, in rt_data_queue_init() argument
26 void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)) in rt_data_queue_init() argument
28 RT_ASSERT(queue != RT_NULL); in rt_data_queue_init()
30 queue->evt_notify = evt_notify; in rt_data_queue_init()
32 queue->size = size; in rt_data_queue_init()
33 queue->lwm = lwm; in rt_data_queue_init()
35 queue->get_index = 0; in rt_data_queue_init()
36 queue->put_index = 0; in rt_data_queue_init()
38 rt_list_init(&(queue->suspended_push_list)); in rt_data_queue_init()
39 rt_list_init(&(queue->suspended_pop_list)); in rt_data_queue_init()
41 queue->queue = (struct rt_data_item *)rt_malloc(sizeof(struct rt_data_item) * size); in rt_data_queue_init()
42 if (queue->queue == RT_NULL) in rt_data_queue_init()
51 rt_err_t rt_data_queue_push(struct rt_data_queue *queue, in rt_data_queue_push() argument
60 RT_ASSERT(queue != RT_NULL); in rt_data_queue_push()
66 while (queue->put_index - queue->get_index == queue->size) in rt_data_queue_push()
68 /* queue is full */ in rt_data_queue_push()
84 rt_list_insert_before(&(queue->suspended_push_list), &(thread->tlist)); in rt_data_queue_push()
107 queue->queue[queue->put_index % queue->size].data_ptr = data_ptr; in rt_data_queue_push()
108 queue->queue[queue->put_index % queue->size].data_size = data_size; in rt_data_queue_push()
109 queue->put_index += 1; in rt_data_queue_push()
112 if (!rt_list_isempty(&(queue->suspended_pop_list))) in rt_data_queue_push()
115 thread = rt_list_entry(queue->suspended_pop_list.next, in rt_data_queue_push()
131 if ((result == RT_EOK) && queue->evt_notify != RT_NULL) in rt_data_queue_push()
133 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_PUSH); in rt_data_queue_push()
140 rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, in rt_data_queue_pop() argument
149 RT_ASSERT(queue != RT_NULL); in rt_data_queue_pop()
157 while (queue->get_index == queue->put_index) in rt_data_queue_pop()
159 /* queue is empty */ in rt_data_queue_pop()
174 rt_list_insert_before(&(queue->suspended_pop_list), &(thread->tlist)); in rt_data_queue_pop()
198 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; in rt_data_queue_pop()
199 *size = queue->queue[queue->get_index % queue->size].data_size; in rt_data_queue_pop()
201 queue->get_index += 1; in rt_data_queue_pop()
203 if ((queue->put_index - queue->get_index) <= queue->lwm) in rt_data_queue_pop()
206 if (!rt_list_isempty(&(queue->suspended_push_list))) in rt_data_queue_pop()
209 thread = rt_list_entry(queue->suspended_push_list.next, in rt_data_queue_pop()
225 if (queue->evt_notify != RT_NULL) in rt_data_queue_pop()
226 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_LWM); in rt_data_queue_pop()
233 if ((result == RT_EOK) && (queue->evt_notify != RT_NULL)) in rt_data_queue_pop()
235 queue->evt_notify(queue, RT_DATAQUEUE_EVENT_POP); in rt_data_queue_pop()
242 rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, in rt_data_queue_peak() argument
248 RT_ASSERT(queue != RT_NULL); in rt_data_queue_peak()
252 if (queue->get_index == queue->put_index) in rt_data_queue_peak()
259 *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; in rt_data_queue_peak()
260 *size = queue->queue[queue->get_index % queue->size].data_size; in rt_data_queue_peak()
268 void rt_data_queue_reset(struct rt_data_queue *queue) in rt_data_queue_reset() argument
277 while (!rt_list_isempty(&(queue->suspended_pop_list))) in rt_data_queue_reset()
283 thread = rt_list_entry(queue->suspended_pop_list.next, in rt_data_queue_reset()
301 while (!rt_list_isempty(&(queue->suspended_push_list))) in rt_data_queue_reset()
307 thread = rt_list_entry(queue->suspended_push_list.next, in rt_data_queue_reset()