1 /* 2 * Copyright (c) 2006-2018, RT-Thread Development Team 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 * 2017-02-27 bernard fix the re-work issue. 9 */ 10 11 #include <rthw.h> 12 #include <rtthread.h> 13 #include <rtdevice.h> 14 15 #ifdef RT_USING_HEAP 16 rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue) 17 { 18 rt_err_t result; 19 20 rt_enter_critical(); 21 while (1) 22 { 23 /* try to take condition semaphore */ 24 result = rt_sem_trytake(&(queue->sem)); 25 if (result == -RT_ETIMEOUT) 26 { 27 /* it's timeout, release this semaphore */ 28 rt_sem_release(&(queue->sem)); 29 } 30 else if (result == RT_EOK) 31 { 32 /* keep the sem value = 0 */ 33 result = RT_EOK; 34 break; 35 } 36 else 37 { 38 result = -RT_ERROR; 39 break; 40 } 41 } 42 rt_exit_critical(); 43 44 return result; 45 } 46 47 static void _workqueue_thread_entry(void* parameter) 48 { 49 rt_base_t level; 50 struct rt_work* work; 51 struct rt_workqueue* queue; 52 53 queue = (struct rt_workqueue*) parameter; 54 RT_ASSERT(queue != RT_NULL); 55 56 while (1) 57 { 58 if (rt_list_isempty(&(queue->work_list))) 59 { 60 /* no software timer exist, suspend self. */ 61 rt_thread_suspend(rt_thread_self()); 62 rt_schedule(); 63 } 64 65 /* we have work to do with. */ 66 level = rt_hw_interrupt_disable(); 67 work = rt_list_entry(queue->work_list.next, struct rt_work, list); 68 rt_list_remove(&(work->list)); 69 queue->work_current = work; 70 rt_hw_interrupt_enable(level); 71 72 /* do work */ 73 work->work_func(work, work->work_data); 74 level = rt_hw_interrupt_disable(); 75 /* clean current work */ 76 queue->work_current = RT_NULL; 77 rt_hw_interrupt_enable(level); 78 79 /* ack work completion */ 80 _workqueue_work_completion(queue); 81 } 82 } 83 84 struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority) 85 { 86 struct rt_workqueue *queue = RT_NULL; 87 88 queue = (struct rt_workqueue*)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue)); 89 if (queue != RT_NULL) 90 { 91 /* initialize work list */ 92 rt_list_init(&(queue->work_list)); 93 queue->work_current = RT_NULL; 94 rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); 95 96 /* create the work thread */ 97 queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10); 98 if (queue->work_thread == RT_NULL) 99 { 100 RT_KERNEL_FREE(queue); 101 return RT_NULL; 102 } 103 104 rt_thread_startup(queue->work_thread); 105 } 106 107 return queue; 108 } 109 110 rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue) 111 { 112 RT_ASSERT(queue != RT_NULL); 113 114 rt_thread_delete(queue->work_thread); 115 RT_KERNEL_FREE(queue); 116 117 return RT_EOK; 118 } 119 120 rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work) 121 { 122 rt_base_t level; 123 RT_ASSERT(queue != RT_NULL); 124 RT_ASSERT(work != RT_NULL); 125 126 level = rt_hw_interrupt_disable(); 127 if (queue->work_current == work) 128 { 129 rt_hw_interrupt_enable(level); 130 return -RT_EBUSY; 131 } 132 133 /* NOTE: the work MUST be initialized firstly */ 134 rt_list_remove(&(work->list)); 135 136 rt_list_insert_after(queue->work_list.prev, &(work->list)); 137 /* whether the workqueue is doing work */ 138 if (queue->work_current == RT_NULL) 139 { 140 rt_hw_interrupt_enable(level); 141 /* resume work thread */ 142 rt_thread_resume(queue->work_thread); 143 rt_schedule(); 144 } 145 else rt_hw_interrupt_enable(level); 146 147 return RT_EOK; 148 } 149 150 rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work* work) 151 { 152 rt_base_t level; 153 RT_ASSERT(queue != RT_NULL); 154 RT_ASSERT(work != RT_NULL); 155 156 level = rt_hw_interrupt_disable(); 157 if (queue->work_current == work) 158 { 159 rt_hw_interrupt_enable(level); 160 return -RT_EBUSY; 161 } 162 163 /* NOTE: the work MUST be initialized firstly */ 164 rt_list_remove(&(work->list)); 165 166 rt_list_insert_after(queue->work_list.prev, &(work->list)); 167 if (queue->work_current == RT_NULL) 168 { 169 rt_hw_interrupt_enable(level); 170 /* resume work thread */ 171 rt_thread_resume(queue->work_thread); 172 rt_schedule(); 173 } 174 else rt_hw_interrupt_enable(level); 175 176 return RT_EOK; 177 } 178 179 rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work) 180 { 181 rt_base_t level; 182 183 RT_ASSERT(queue != RT_NULL); 184 RT_ASSERT(work != RT_NULL); 185 186 level = rt_hw_interrupt_disable(); 187 if (queue->work_current == work) 188 { 189 rt_hw_interrupt_enable(level); 190 return -RT_EBUSY; 191 } 192 rt_list_remove(&(work->list)); 193 rt_hw_interrupt_enable(level); 194 195 return RT_EOK; 196 } 197 198 rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work) 199 { 200 rt_base_t level; 201 202 RT_ASSERT(queue != RT_NULL); 203 RT_ASSERT(work != RT_NULL); 204 205 level = rt_hw_interrupt_disable(); 206 if (queue->work_current == work) /* it's current work in the queue */ 207 { 208 /* wait for work completion */ 209 rt_sem_take(&(queue->sem), RT_WAITING_FOREVER); 210 } 211 else 212 { 213 rt_list_remove(&(work->list)); 214 } 215 rt_hw_interrupt_enable(level); 216 217 return RT_EOK; 218 } 219 220 rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue) 221 { 222 struct rt_list_node *node, *next; 223 RT_ASSERT(queue != RT_NULL); 224 225 rt_enter_critical(); 226 for (node = queue->work_list.next; node != &(queue->work_list); node = next) 227 { 228 next = node->next; 229 rt_list_remove(node); 230 } 231 rt_exit_critical(); 232 233 return RT_EOK; 234 } 235 236 #endif 237 238