1 /* 2 * 程序清单:信号量实现生产者消费者间的互斥 3 * 4 * 在这个程序中,会创建两个线程,一个是生成者线程worker一个是消费者线程thread 5 * 6 * 在数据信息生产、消费的过程中,worker负责把数据将写入到环形buffer中,而thread 7 * 则从环形buffer中读出。 8 */ 9 #include <rtthread.h> 10 #include "tc_comm.h" 11 12 /* 一个环形buffer的实现 */ 13 struct rb 14 { 15 rt_uint16_t read_index, write_index; 16 rt_uint8_t *buffer_ptr; 17 rt_uint16_t buffer_size; 18 }; 19 20 /* 指向信号量控制块的指针 */ 21 static rt_sem_t sem = RT_NULL; 22 /* 指向线程控制块的指针 */ 23 static rt_thread_t tid = RT_NULL, worker = RT_NULL; 24 25 /* 环形buffer的内存块(用数组体现出来) */ 26 #define BUFFER_SIZE 256 27 #define BUFFER_ITEM 32 28 static rt_uint8_t working_buffer[BUFFER_SIZE]; 29 struct rb working_rb; 30 31 /* 初始化环形buffer,size指的是buffer的大小。注:这里并没对数据地址对齐做处理 */ 32 static void rb_init(struct rb* rb, rt_uint8_t *pool, rt_uint16_t size) 33 { 34 RT_ASSERT(rb != RT_NULL); 35 36 /* 对读写指针清零*/ 37 rb->read_index = rb->write_index = 0; 38 39 /* 设置环形buffer的内存数据块 */ 40 rb->buffer_ptr = pool; 41 rb->buffer_size = size; 42 } 43 44 /* 向环形buffer中写入数据 */ 45 static rt_bool_t rb_put(struct rb* rb, const rt_uint8_t *ptr, rt_uint16_t length) 46 { 47 rt_size_t size; 48 49 /* 判断是否有足够的剩余空间 */ 50 if (rb->read_index > rb->write_index) 51 size = rb->read_index - rb->write_index; 52 else 53 size = rb->buffer_size - rb->write_index + rb->read_index; 54 55 /* 没有多余的空间 */ 56 if (size < length) return RT_FALSE; 57 58 if (rb->read_index > rb->write_index) 59 { 60 /* read_index - write_index 即为总的空余空间 */ 61 memcpy(&rb->buffer_ptr[rb->write_index], ptr, length); 62 rb->write_index += length; 63 } 64 else 65 { 66 if (rb->buffer_size - rb->write_index > length) 67 { 68 /* write_index 后面剩余的空间有足够的长度 */ 69 memcpy(&rb->buffer_ptr[rb->write_index], ptr, length); 70 rb->write_index += length; 71 } 72 else 73 { 74 /* 75 * write_index 后面剩余的空间不存在足够的长度,需要把部分数据复制到 76 * 前面的剩余空间中 77 */ 78 memcpy(&rb->buffer_ptr[rb->write_index], ptr, 79 rb->buffer_size - rb->write_index); 80 memcpy(&rb->buffer_ptr[0], &ptr[rb->buffer_size - rb->write_index], 81 length - (rb->buffer_size - rb->write_index)); 82 rb->write_index = length - (rb->buffer_size - rb->write_index); 83 } 84 } 85 86 return RT_TRUE; 87 } 88 89 /* 从环形buffer中读出数据 */ 90 static rt_bool_t rb_get(struct rb* rb, rt_uint8_t *ptr, rt_uint16_t length) 91 { 92 rt_size_t size; 93 94 /* 判断是否有足够的数据 */ 95 if (rb->read_index > rb->write_index) 96 size = rb->buffer_size - rb->read_index + rb->write_index; 97 else 98 size = rb->write_index - rb->read_index; 99 100 /* 没有足够的数据 */ 101 if (size < length) return RT_FALSE; 102 103 if (rb->read_index > rb->write_index) 104 { 105 if (rb->buffer_size - rb->read_index > length) 106 { 107 /* read_index的数据足够多,直接复制 */ 108 memcpy(ptr, &rb->buffer_ptr[rb->read_index], length); 109 rb->read_index += length; 110 } 111 else 112 { 113 /* read_index的数据不够,需要分段复制 */ 114 memcpy(ptr, &rb->buffer_ptr[rb->read_index], 115 rb->buffer_size - rb->read_index); 116 memcpy(&ptr[rb->buffer_size - rb->read_index], &rb->buffer_ptr[0], 117 length - rb->buffer_size + rb->read_index); 118 rb->read_index = length - rb->buffer_size + rb->read_index; 119 } 120 } 121 else 122 { 123 /* 124 * read_index要比write_index小,总的数据量够(前面已经有总数据量的判 125 * 断),直接复制出数据。 126 */ 127 memcpy(ptr, &rb->buffer_ptr[rb->read_index], length); 128 rb->read_index += length; 129 } 130 131 return RT_TRUE; 132 } 133 134 /* 生产者线程入口 */ 135 static void thread_entry(void* parameter) 136 { 137 rt_bool_t result; 138 rt_uint8_t data_buffer[BUFFER_ITEM + 1]; 139 140 while (1) 141 { 142 /* 持有信号量 */ 143 rt_sem_take(sem, RT_WAITING_FOREVER); 144 /* 从环buffer中获得数据 */ 145 result = rb_get(&working_rb, &data_buffer[0], BUFFER_ITEM); 146 /* 释放信号量 */ 147 rt_sem_release(sem); 148 data_buffer[BUFFER_ITEM] = '\0'; 149 150 if (result == RT_TRUE) 151 { 152 /* 获取数据成功,打印数据 */ 153 rt_kprintf("%s\n", data_buffer); 154 } 155 156 /* 做一个5 OS Tick的休眠 */ 157 rt_thread_delay(5); 158 } 159 } 160 161 /* worker线程入口 */ 162 static void worker_entry(void* parameter) 163 { 164 rt_bool_t result; 165 rt_uint32_t index, setchar; 166 rt_uint8_t data_buffer[BUFFER_ITEM]; 167 168 setchar = 0x21; 169 while (1) 170 { 171 /* 构造数据 */ 172 for(index = 0; index < BUFFER_ITEM; index++) 173 { 174 data_buffer[index] = setchar; 175 if (++setchar == 0x7f) 176 setchar = 0x21; 177 } 178 179 /* 持有信号量 */ 180 rt_sem_take(sem, RT_WAITING_FOREVER); 181 182 /* 把数据放到环形buffer中 */ 183 result = rb_put(&working_rb, &data_buffer[0], BUFFER_ITEM); 184 if (result == RT_FALSE) 185 { 186 rt_kprintf("put error\n"); 187 } 188 189 /* 释放信号量 */ 190 rt_sem_release(sem); 191 192 /* 放入成功,做一个10 OS Tick的休眠 */ 193 rt_thread_delay(10); 194 } 195 } 196 197 int semaphore_buffer_worker_init() 198 { 199 /* 初始化ring buffer */ 200 rb_init(&working_rb, working_buffer, BUFFER_SIZE); 201 202 /* 创建信号量 */ 203 sem = rt_sem_create("sem", 1, RT_IPC_FLAG_FIFO); 204 if (sem == RT_NULL) 205 { 206 tc_stat(TC_STAT_END | TC_STAT_FAILED); 207 return 0; 208 } 209 210 /* 创建线程1 */ 211 tid = rt_thread_create("thread", 212 thread_entry, RT_NULL, /* 线程入口是thread_entry, 入口参数是RT_NULL */ 213 THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE); 214 if (tid != RT_NULL) 215 rt_thread_startup(tid); 216 else 217 tc_stat(TC_STAT_END | TC_STAT_FAILED); 218 219 /* 创建线程2 */ 220 worker = rt_thread_create("worker", 221 worker_entry, RT_NULL, /* 线程入口是worker_entry, 入口参数是RT_NULL */ 222 THREAD_STACK_SIZE, THREAD_PRIORITY, THREAD_TIMESLICE); 223 if (worker != RT_NULL) 224 rt_thread_startup(worker); 225 else 226 tc_stat(TC_STAT_END | TC_STAT_FAILED); 227 228 return 0; 229 } 230 231 #ifdef RT_USING_TC 232 static void _tc_cleanup() 233 { 234 /* 调度器上锁,上锁后,将不再切换到其他线程,仅响应中断 */ 235 rt_enter_critical(); 236 237 /* 删除信号量 */ 238 if (sem != RT_NULL) 239 rt_sem_delete(sem); 240 241 /* 删除线程 */ 242 if (tid != RT_NULL && tid->stat != RT_THREAD_CLOSE) 243 rt_thread_delete(tid); 244 if (worker != RT_NULL && worker->stat != RT_THREAD_CLOSE) 245 rt_thread_delete(worker); 246 247 /* 调度器解锁 */ 248 rt_exit_critical(); 249 250 /* 设置TestCase状态 */ 251 tc_done(TC_STAT_PASSED); 252 } 253 254 int _tc_semaphore_buffer_worker() 255 { 256 /* 设置TestCase清理回调函数 */ 257 tc_cleanup(_tc_cleanup); 258 semaphore_buffer_worker_init(); 259 260 /* 返回TestCase运行的最长时间 */ 261 return 100; 262 } 263 /* 输出函数命令到finsh shell中 */ 264 FINSH_FUNCTION_EXPORT(_tc_semaphore_buffer_worker, a buffer worker with semaphore example); 265 #else 266 /* 用户应用入口 */ 267 int rt_application_init() 268 { 269 semaphore_buffer_worker_init(); 270 271 return 0; 272 } 273 #endif 274