xref: /nrf52832-nimble/rt-thread/components/drivers/src/workqueue.c (revision 104654410c56c573564690304ae786df310c91fc)
1 /*
2  * Copyright (c) 2006-2018, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2017-02-27     bernard      fix the re-work issue.
9  */
10 
11 #include <rthw.h>
12 #include <rtthread.h>
13 #include <rtdevice.h>
14 
15 #ifdef RT_USING_HEAP
_workqueue_work_completion(struct rt_workqueue * queue)16 rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
17 {
18     rt_err_t result;
19 
20     rt_enter_critical();
21     while (1)
22     {
23         /* try to take condition semaphore */
24         result = rt_sem_trytake(&(queue->sem));
25         if (result == -RT_ETIMEOUT)
26         {
27             /* it's timeout, release this semaphore */
28             rt_sem_release(&(queue->sem));
29         }
30         else if (result == RT_EOK)
31         {
32             /* keep the sem value = 0 */
33             result = RT_EOK;
34             break;
35         }
36         else
37         {
38             result = -RT_ERROR;
39             break;
40         }
41     }
42     rt_exit_critical();
43 
44     return result;
45 }
46 
_workqueue_thread_entry(void * parameter)47 static void _workqueue_thread_entry(void* parameter)
48 {
49     rt_base_t level;
50     struct rt_work* work;
51     struct rt_workqueue* queue;
52 
53     queue = (struct rt_workqueue*) parameter;
54     RT_ASSERT(queue != RT_NULL);
55 
56     while (1)
57     {
58         if (rt_list_isempty(&(queue->work_list)))
59         {
60             /* no software timer exist, suspend self. */
61             rt_thread_suspend(rt_thread_self());
62             rt_schedule();
63         }
64 
65         /* we have work to do with. */
66         level = rt_hw_interrupt_disable();
67         work = rt_list_entry(queue->work_list.next, struct rt_work, list);
68         rt_list_remove(&(work->list));
69         queue->work_current = work;
70         rt_hw_interrupt_enable(level);
71 
72         /* do work */
73         work->work_func(work, work->work_data);
74         level = rt_hw_interrupt_disable();
75         /* clean current work */
76         queue->work_current = RT_NULL;
77         rt_hw_interrupt_enable(level);
78 
79         /* ack work completion */
80         _workqueue_work_completion(queue);
81     }
82 }
83 
rt_workqueue_create(const char * name,rt_uint16_t stack_size,rt_uint8_t priority)84 struct rt_workqueue *rt_workqueue_create(const char* name, rt_uint16_t stack_size, rt_uint8_t priority)
85 {
86     struct rt_workqueue *queue = RT_NULL;
87 
88     queue = (struct rt_workqueue*)RT_KERNEL_MALLOC(sizeof(struct rt_workqueue));
89     if (queue != RT_NULL)
90     {
91         /* initialize work list */
92         rt_list_init(&(queue->work_list));
93         queue->work_current = RT_NULL;
94         rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
95 
96         /* create the work thread */
97         queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
98         if (queue->work_thread == RT_NULL)
99         {
100             RT_KERNEL_FREE(queue);
101             return RT_NULL;
102         }
103 
104         rt_thread_startup(queue->work_thread);
105     }
106 
107     return queue;
108 }
109 
rt_workqueue_destroy(struct rt_workqueue * queue)110 rt_err_t rt_workqueue_destroy(struct rt_workqueue* queue)
111 {
112     RT_ASSERT(queue != RT_NULL);
113 
114     rt_thread_delete(queue->work_thread);
115     RT_KERNEL_FREE(queue);
116 
117     return RT_EOK;
118 }
119 
rt_workqueue_dowork(struct rt_workqueue * queue,struct rt_work * work)120 rt_err_t rt_workqueue_dowork(struct rt_workqueue* queue, struct rt_work* work)
121 {
122     rt_base_t level;
123     RT_ASSERT(queue != RT_NULL);
124     RT_ASSERT(work != RT_NULL);
125 
126     level = rt_hw_interrupt_disable();
127     if (queue->work_current == work)
128     {
129         rt_hw_interrupt_enable(level);
130         return -RT_EBUSY;
131     }
132 
133     /* NOTE: the work MUST be initialized firstly */
134     rt_list_remove(&(work->list));
135 
136     rt_list_insert_after(queue->work_list.prev, &(work->list));
137     /* whether the workqueue is doing work */
138     if (queue->work_current == RT_NULL)
139     {
140         rt_hw_interrupt_enable(level);
141         /* resume work thread */
142         rt_thread_resume(queue->work_thread);
143         rt_schedule();
144     }
145     else rt_hw_interrupt_enable(level);
146 
147     return RT_EOK;
148 }
149 
rt_workqueue_critical_work(struct rt_workqueue * queue,struct rt_work * work)150 rt_err_t rt_workqueue_critical_work(struct rt_workqueue* queue, struct rt_work* work)
151 {
152     rt_base_t level;
153     RT_ASSERT(queue != RT_NULL);
154     RT_ASSERT(work != RT_NULL);
155 
156     level = rt_hw_interrupt_disable();
157     if (queue->work_current == work)
158     {
159         rt_hw_interrupt_enable(level);
160         return -RT_EBUSY;
161     }
162 
163     /* NOTE: the work MUST be initialized firstly */
164     rt_list_remove(&(work->list));
165 
166     rt_list_insert_after(queue->work_list.prev, &(work->list));
167     if (queue->work_current == RT_NULL)
168     {
169         rt_hw_interrupt_enable(level);
170         /* resume work thread */
171         rt_thread_resume(queue->work_thread);
172         rt_schedule();
173     }
174     else rt_hw_interrupt_enable(level);
175 
176     return RT_EOK;
177 }
178 
rt_workqueue_cancel_work(struct rt_workqueue * queue,struct rt_work * work)179 rt_err_t rt_workqueue_cancel_work(struct rt_workqueue* queue, struct rt_work* work)
180 {
181     rt_base_t level;
182 
183     RT_ASSERT(queue != RT_NULL);
184     RT_ASSERT(work != RT_NULL);
185 
186     level = rt_hw_interrupt_disable();
187     if (queue->work_current == work)
188     {
189         rt_hw_interrupt_enable(level);
190         return -RT_EBUSY;
191     }
192     rt_list_remove(&(work->list));
193     rt_hw_interrupt_enable(level);
194 
195     return RT_EOK;
196 }
197 
rt_workqueue_cancel_work_sync(struct rt_workqueue * queue,struct rt_work * work)198 rt_err_t rt_workqueue_cancel_work_sync(struct rt_workqueue* queue, struct rt_work* work)
199 {
200     rt_base_t level;
201 
202     RT_ASSERT(queue != RT_NULL);
203     RT_ASSERT(work != RT_NULL);
204 
205     level = rt_hw_interrupt_disable();
206     if (queue->work_current == work) /* it's current work in the queue */
207     {
208         /* wait for work completion */
209         rt_sem_take(&(queue->sem), RT_WAITING_FOREVER);
210     }
211     else
212     {
213         rt_list_remove(&(work->list));
214     }
215     rt_hw_interrupt_enable(level);
216 
217     return RT_EOK;
218 }
219 
rt_workqueue_cancel_all_work(struct rt_workqueue * queue)220 rt_err_t rt_workqueue_cancel_all_work(struct rt_workqueue* queue)
221 {
222     struct rt_list_node *node, *next;
223     RT_ASSERT(queue != RT_NULL);
224 
225     rt_enter_critical();
226     for (node = queue->work_list.next; node != &(queue->work_list); node = next)
227     {
228         next = node->next;
229         rt_list_remove(node);
230     }
231     rt_exit_critical();
232 
233     return RT_EOK;
234 }
235 
236 #endif
237 
238