1 /*
2 * 程序清单:信号量实现生产者消费者间的互斥
3 *
4 * 在这个程序中,会创建两个线程,一个是生成者线程worker一个是消费者线程thread
5 *
6 * 在数据信息生产、消费的过程中,worker负责把数据将写入到环形buffer中,而thread
7 * 则从环形buffer中读出。
8 */
9 #include <rtthread.h>
10 #include "tc_comm.h"
11
12 /* 一个环形buffer的实现 */
13 struct rb
14 {
15 rt_uint16_t read_index, write_index;
16 rt_uint8_t *buffer_ptr;
17 rt_uint16_t buffer_size;
18 };
19
20 /* 指向信号量控制块的指针 */
21 static rt_sem_t sem = RT_NULL;
22 /* 指向线程控制块的指针 */
23 static rt_thread_t tid = RT_NULL, worker = RT_NULL;
24
25 /* 环形buffer的内存块(用数组体现出来) */
26 #define BUFFER_SIZE 256
27 #define BUFFER_ITEM 32
28 static rt_uint8_t working_buffer[BUFFER_SIZE];
29 struct rb working_rb;
30
31 /* 初始化环形buffer,size指的是buffer的大小。注:这里并没对数据地址对齐做处理 */
rb_init(struct rb * rb,rt_uint8_t * pool,rt_uint16_t size)32 static void rb_init(struct rb* rb, rt_uint8_t *pool, rt_uint16_t size)
33 {
34 RT_ASSERT(rb != RT_NULL);
35
36 /* 对读写指针清零*/
37 rb->read_index = rb->write_index = 0;
38
39 /* 设置环形buffer的内存数据块 */
40 rb->buffer_ptr = pool;
41 rb->buffer_size = size;
42 }
43
44 /* 向环形buffer中写入数据 */
rb_put(struct rb * rb,const rt_uint8_t * ptr,rt_uint16_t length)45 static rt_bool_t rb_put(struct rb* rb, const rt_uint8_t *ptr, rt_uint16_t length)
46 {
47 rt_size_t size;
48
49 /* 判断是否有足够的剩余空间 */
50 if (rb->read_index > rb->write_index)
51 size = rb->read_index - rb->write_index;
52 else
53 size = rb->buffer_size - rb->write_index + rb->read_index;
54
55 /* 没有多余的空间 */
56 if (size < length) return RT_FALSE;
57
58 if (rb->read_index > rb->write_index)
59 {
60 /* read_index - write_index 即为总的空余空间 */
61 memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
62 rb->write_index += length;
63 }
64 else
65 {
66 if (rb->buffer_size - rb->write_index > length)
67 {
68 /* write_index 后面剩余的空间有足够的长度 */
69 memcpy(&rb->buffer_ptr[rb->write_index], ptr, length);
70 rb->write_index += length;
71 }
72 else
73 {
74 /*
75 * write_index 后面剩余的空间不存在足够的长度,需要把部分数据复制到
76 * 前面的剩余空间中
77 */
78 memcpy(&rb->buffer_ptr[rb->write_index], ptr,
79 rb->buffer_size - rb->write_index);
80 memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - rb->write_index],
81 length - (rb->buffer_size - rb->write_index));
82 rb->write_index = length - (rb->buffer_size - rb->write_index);
83 }
84 }
85
86 return RT_TRUE;
87 }
88
89 /* 从环形buffer中读出数据 */
rb_get(struct rb * rb,rt_uint8_t * ptr,rt_uint16_t length)90 static rt_bool_t rb_get(struct rb* rb, rt_uint8_t *ptr, rt_uint16_t length)
91 {
92 rt_size_t size;
93
94 /* 判断是否有足够的数据 */
95 if (rb->read_index > rb->write_index)
96 size = rb->buffer_size - rb->read_index + rb->write_index;
97 else
98 size = rb->write_index - rb->read_index;
99
100 /* 没有足够的数据 */
101 if (size < length) return RT_FALSE;
102
103 if (rb->read_index > rb->write_index)
104 {
105 if (rb->buffer_size - rb->read_index > length)
106 {
107 /* read_index的数据足够多,直接复制 */
108 memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
109 rb->read_index += length;
110 }
111 else
112 {
113 /* read_index的数据不够,需要分段复制 */
114 memcpy(ptr, &rb->buffer_ptr[rb->read_index],
115 rb->buffer_size - rb->read_index);
116 memcpy(&ptr[rb->buffer_size - rb->read_index], &rb->buffer_ptr[0],
117 length - rb->buffer_size + rb->read_index);
118 rb->read_index = length - rb->buffer_size + rb->read_index;
119 }
120 }
121 else
122 {
123 /*
124 * read_index要比write_index小,总的数据量够(前面已经有总数据量的判
125 * 断),直接复制出数据。
126 */
127 memcpy(ptr, &rb->buffer_ptr[rb->read_index], length);
128 rb->read_index += length;
129 }
130
131 return RT_TRUE;
132 }
133
134 /* 生产者线程入口 */
thread_entry(void * parameter)135 static void thread_entry(void* parameter)
136 {
137 rt_bool_t result;
138 rt_uint8_t data_buffer[BUFFER_ITEM + 1];
139
140 while (1)
141 {
142 /* 持有信号量 */
143 rt_sem_take(sem, RT_WAITING_FOREVER);
144 /* 从环buffer中获得数据 */
145 result = rb_get(&working_rb, &data_buffer[0], BUFFER_ITEM);
146 /* 释放信号量 */
147 rt_sem_release(sem);
148 data_buffer[BUFFER_ITEM] = '\0';
149
150 if (result == RT_TRUE)
151 {
152 /* 获取数据成功,打印数据 */
153 rt_kprintf("%s\n", data_buffer);
154 }
155
156 /* 做一个5 OS Tick的休眠 */
157 rt_thread_delay(5);
158 }
159 }
160
161 /* worker线程入口 */
worker_entry(void * parameter)162 static void worker_entry(void* parameter)
163 {
164 rt_bool_t result;
165 rt_uint32_t index, setchar;
166 rt_uint8_t data_buffer[BUFFER_ITEM];
167
168 setchar = 0x21;
169 while (1)
170 {
171 /* 构造数据 */
172 for(index = 0; index < BUFFER_ITEM; index++)
173 {
174 data_buffer[index] = setchar;
175 if (++setchar == 0x7f)
176 setchar = 0x21;
177 }
178
179 /* 持有信号量 */
180 rt_sem_take(sem, RT_WAITING_FOREVER);
181
182 /* 把数据放到环形buffer中 */
183 result = rb_put(&working_rb, &data_buffer[0], BUFFER_ITEM);
184 if (result == RT_FALSE)
185 {
186 rt_kprintf("put error\n");
187 }
188
189 /* 释放信号量 */
190 rt_sem_release(sem);
191
192 /* 放入成功,做一个10 OS Tick的休眠 */
193 rt_thread_delay(10);
194 }
195 }
196
semaphore_buffer_worker_init()197 int semaphore_buffer_worker_init()
198 {
199 /* 初始化ring buffer */
200 rb_init(&working_rb, working_buffer, BUFFER_SIZE);
201
202 /* 创建信号量 */
203 sem = rt_sem_create("sem", 1, RT_IPC_FLAG_FIFO);
204 if (sem == RT_NULL)
205 {
206 tc_stat(TC_STAT_END | TC_STAT_FAILED);
207 return 0;
208 }
209
210 /* 创建线程1 */
211 tid = rt_thread_create("thread",
212 thread_entry, RT_NULL, /* 线程入口是thread_entry, 入口参数是RT_NULL */
213 THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
214 if (tid != RT_NULL)
215 rt_thread_startup(tid);
216 else
217 tc_stat(TC_STAT_END | TC_STAT_FAILED);
218
219 /* 创建线程2 */
220 worker = rt_thread_create("worker",
221 worker_entry, RT_NULL, /* 线程入口是worker_entry, 入口参数是RT_NULL */
222 THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE);
223 if (worker != RT_NULL)
224 rt_thread_startup(worker);
225 else
226 tc_stat(TC_STAT_END | TC_STAT_FAILED);
227
228 return 0;
229 }
230
231 #ifdef RT_USING_TC
_tc_cleanup()232 static void _tc_cleanup()
233 {
234 /* 调度器上锁,上锁后,将不再切换到其他线程,仅响应中断 */
235 rt_enter_critical();
236
237 /* 删除信号量 */
238 if (sem != RT_NULL)
239 rt_sem_delete(sem);
240
241 /* 删除线程 */
242 if (tid != RT_NULL && tid->stat != RT_THREAD_CLOSE)
243 rt_thread_delete(tid);
244 if (worker != RT_NULL && worker->stat != RT_THREAD_CLOSE)
245 rt_thread_delete(worker);
246
247 /* 调度器解锁 */
248 rt_exit_critical();
249
250 /* 设置TestCase状态 */
251 tc_done(TC_STAT_PASSED);
252 }
253
_tc_semaphore_buffer_worker()254 int _tc_semaphore_buffer_worker()
255 {
256 /* 设置TestCase清理回调函数 */
257 tc_cleanup(_tc_cleanup);
258 semaphore_buffer_worker_init();
259
260 /* 返回TestCase运行的最长时间 */
261 return 100;
262 }
263 /* 输出函数命令到finsh shell中 */
264 FINSH_FUNCTION_EXPORT(_tc_semaphore_buffer_worker, a buffer worker with semaphore example);
265 #else
266 /* 用户应用入口 */
rt_application_init()267 int rt_application_init()
268 {
269 semaphore_buffer_worker_init();
270
271 return 0;
272 }
273 #endif
274