1 /* 2 * Copyright (c) 2006-2018, RT-Thread Development Team 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 * 2012-09-30 Bernard first version. 9 */ 10 11 #include <rthw.h> 12 #include <rtthread.h> 13 #include <rtdevice.h> 14 15 #include "audio_pipe.h" 16 17 static void _rt_pipe_resume_writer(struct rt_audio_pipe *pipe) 18 { 19 if (!rt_list_isempty(&pipe->suspended_write_list)) 20 { 21 rt_thread_t thread; 22 23 RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR); 24 25 /* get suspended thread */ 26 thread = rt_list_entry(pipe->suspended_write_list.next, 27 struct rt_thread, 28 tlist); 29 30 /* resume the write thread */ 31 rt_thread_resume(thread); 32 33 rt_schedule(); 34 } 35 } 36 37 static rt_size_t rt_pipe_read(rt_device_t dev, 38 rt_off_t pos, 39 void *buffer, 40 rt_size_t size) 41 { 42 rt_uint32_t level; 43 rt_thread_t thread; 44 struct rt_audio_pipe *pipe; 45 rt_size_t read_nbytes; 46 47 pipe = (struct rt_audio_pipe *)dev; 48 RT_ASSERT(pipe != RT_NULL); 49 50 if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD)) 51 { 52 level = rt_hw_interrupt_disable(); 53 read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); 54 55 /* if the ringbuffer is empty, there won't be any writer waiting */ 56 if (read_nbytes) 57 _rt_pipe_resume_writer(pipe); 58 59 rt_hw_interrupt_enable(level); 60 61 return read_nbytes; 62 } 63 64 thread = rt_thread_self(); 65 66 /* current context checking */ 67 RT_DEBUG_NOT_IN_INTERRUPT; 68 69 do { 70 level = rt_hw_interrupt_disable(); 71 read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size); 72 if (read_nbytes == 0) 73 { 74 rt_thread_suspend(thread); 75 /* waiting on suspended read list */ 76 rt_list_insert_before(&(pipe->suspended_read_list), 77 &(thread->tlist)); 78 rt_hw_interrupt_enable(level); 79 80 rt_schedule(); 81 } 82 else 83 { 84 _rt_pipe_resume_writer(pipe); 85 rt_hw_interrupt_enable(level); 86 break; 87 } 88 } while (read_nbytes == 0); 89 90 return read_nbytes; 91 } 92 93 static void _rt_pipe_resume_reader(struct rt_audio_pipe *pipe) 94 { 95 if (pipe->parent.rx_indicate) 96 pipe->parent.rx_indicate(&pipe->parent, 97 rt_ringbuffer_data_len(&pipe->ringbuffer)); 98 99 if (!rt_list_isempty(&pipe->suspended_read_list)) 100 { 101 rt_thread_t thread; 102 103 RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD); 104 105 /* get suspended thread */ 106 thread = rt_list_entry(pipe->suspended_read_list.next, 107 struct rt_thread, 108 tlist); 109 110 /* resume the read thread */ 111 rt_thread_resume(thread); 112 113 rt_schedule(); 114 } 115 } 116 117 static rt_size_t rt_pipe_write(rt_device_t dev, 118 rt_off_t pos, 119 const void *buffer, 120 rt_size_t size) 121 { 122 rt_uint32_t level; 123 rt_thread_t thread; 124 struct rt_audio_pipe *pipe; 125 rt_size_t write_nbytes; 126 127 pipe = (struct rt_audio_pipe *)dev; 128 RT_ASSERT(pipe != RT_NULL); 129 130 if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) || 131 !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR)) 132 { 133 level = rt_hw_interrupt_disable(); 134 135 if (pipe->flag & RT_PIPE_FLAG_FORCE_WR) 136 write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer), 137 buffer, size); 138 else 139 write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), 140 buffer, size); 141 142 _rt_pipe_resume_reader(pipe); 143 144 rt_hw_interrupt_enable(level); 145 146 return write_nbytes; 147 } 148 149 thread = rt_thread_self(); 150 151 /* current context checking */ 152 RT_DEBUG_NOT_IN_INTERRUPT; 153 154 do { 155 level = rt_hw_interrupt_disable(); 156 write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size); 157 if (write_nbytes == 0) 158 { 159 /* pipe full, waiting on suspended write list */ 160 rt_thread_suspend(thread); 161 /* waiting on suspended read list */ 162 rt_list_insert_before(&(pipe->suspended_write_list), 163 &(thread->tlist)); 164 rt_hw_interrupt_enable(level); 165 166 rt_schedule(); 167 } 168 else 169 { 170 _rt_pipe_resume_reader(pipe); 171 rt_hw_interrupt_enable(level); 172 break; 173 } 174 } while (write_nbytes == 0); 175 176 return write_nbytes; 177 } 178 179 static rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args) 180 { 181 struct rt_audio_pipe *pipe; 182 183 pipe = (struct rt_audio_pipe *)dev; 184 185 if (cmd == PIPE_CTRL_GET_SPACE && args) 186 *(rt_size_t*)args = rt_ringbuffer_space_len(&pipe->ringbuffer); 187 return RT_EOK; 188 } 189 190 #ifdef RT_USING_DEVICE_OPS 191 const static struct rt_device_ops audio_pipe_ops = 192 { 193 RT_NULL, 194 RT_NULL, 195 RT_NULL, 196 rt_pipe_read, 197 rt_pipe_write, 198 rt_pipe_control 199 }; 200 #endif 201 202 /** 203 * This function will initialize a pipe device and put it under control of 204 * resource management. 205 * 206 * @param pipe the pipe device 207 * @param name the name of pipe device 208 * @param flag the attribute of the pipe device 209 * @param buf the buffer of pipe device 210 * @param size the size of pipe device buffer 211 * 212 * @return the operation status, RT_EOK on successful 213 */ 214 rt_err_t rt_audio_pipe_init(struct rt_audio_pipe *pipe, 215 const char *name, 216 rt_int32_t flag, 217 rt_uint8_t *buf, 218 rt_size_t size) 219 { 220 RT_ASSERT(pipe); 221 RT_ASSERT(buf); 222 223 /* initialize suspended list */ 224 rt_list_init(&pipe->suspended_read_list); 225 rt_list_init(&pipe->suspended_write_list); 226 227 /* initialize ring buffer */ 228 rt_ringbuffer_init(&pipe->ringbuffer, buf, size); 229 230 pipe->flag = flag; 231 232 /* create pipe */ 233 pipe->parent.type = RT_Device_Class_Pipe; 234 #ifdef RT_USING_DEVICE_OPS 235 pipe->parent.ops = &audio_pipe_ops; 236 #else 237 pipe->parent.init = RT_NULL; 238 pipe->parent.open = RT_NULL; 239 pipe->parent.close = RT_NULL; 240 pipe->parent.read = rt_pipe_read; 241 pipe->parent.write = rt_pipe_write; 242 pipe->parent.control = rt_pipe_control; 243 #endif 244 245 return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR); 246 } 247 248 /** 249 * This function will detach a pipe device from resource management 250 * 251 * @param pipe the pipe device 252 * 253 * @return the operation status, RT_EOK on successful 254 */ 255 rt_err_t rt_audio_pipe_detach(struct rt_audio_pipe *pipe) 256 { 257 return rt_device_unregister(&pipe->parent); 258 } 259 260 #ifdef RT_USING_HEAP 261 rt_err_t rt_audio_pipe_create(const char *name, rt_int32_t flag, rt_size_t size) 262 { 263 rt_uint8_t *rb_memptr = RT_NULL; 264 struct rt_audio_pipe *pipe = RT_NULL; 265 266 /* get aligned size */ 267 size = RT_ALIGN(size, RT_ALIGN_SIZE); 268 pipe = (struct rt_audio_pipe *)rt_calloc(1, sizeof(struct rt_audio_pipe)); 269 if (pipe == RT_NULL) 270 return -RT_ENOMEM; 271 272 /* create ring buffer of pipe */ 273 rb_memptr = rt_malloc(size); 274 if (rb_memptr == RT_NULL) 275 { 276 rt_free(pipe); 277 return -RT_ENOMEM; 278 } 279 280 return rt_audio_pipe_init(pipe, name, flag, rb_memptr, size); 281 } 282 283 void rt_audio_pipe_destroy(struct rt_audio_pipe *pipe) 284 { 285 if (pipe == RT_NULL) 286 return; 287 288 /* un-register pipe device */ 289 rt_audio_pipe_detach(pipe); 290 291 /* release memory */ 292 rt_free(pipe->ringbuffer.buffer_ptr); 293 rt_free(pipe); 294 295 return; 296 } 297 298 #endif /* RT_USING_HEAP */ 299