1 /*
2 * Copyright (c) 2006-2018, RT-Thread Development Team
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 *
6 * Change Logs:
7 * Date Author Notes
8 * 2012-09-30 Bernard first version.
9 * 2017-11-08 JasonJiaJie fix memory leak issue when close a pipe.
10 */
11 #include <rthw.h>
12 #include <rtdevice.h>
13 #include <stdint.h>
14
15 #if defined(RT_USING_POSIX)
16 #include <dfs_file.h>
17 #include <dfs_posix.h>
18 #include <dfs_poll.h>
19
pipe_fops_open(struct dfs_fd * fd)20 static int pipe_fops_open(struct dfs_fd *fd)
21 {
22 rt_device_t device;
23 rt_pipe_t *pipe;
24
25 pipe = (rt_pipe_t *)fd->data;
26 if (!pipe) return -1;
27
28 device = &(pipe->parent);
29 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
30
31 if (device->ref_count == 0)
32 {
33 pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
34 }
35
36 switch (fd->flags & O_ACCMODE)
37 {
38 case O_RDONLY:
39 pipe->readers ++;
40 break;
41 case O_WRONLY:
42 pipe->writers ++;
43 break;
44 case O_RDWR:
45 pipe->readers ++;
46 pipe->writers ++;
47 break;
48 }
49 device->ref_count ++;
50
51 rt_mutex_release(&(pipe->lock));
52
53 return 0;
54 }
55
pipe_fops_close(struct dfs_fd * fd)56 static int pipe_fops_close(struct dfs_fd *fd)
57 {
58 rt_device_t device;
59 rt_pipe_t *pipe;
60
61 pipe = (rt_pipe_t *)fd->data;
62 if (!pipe) return -1;
63
64 device = &(pipe->parent);
65 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
66
67 switch (fd->flags & O_ACCMODE)
68 {
69 case O_RDONLY:
70 pipe->readers --;
71 break;
72 case O_WRONLY:
73 pipe->writers --;
74 break;
75 case O_RDWR:
76 pipe->readers --;
77 pipe->writers --;
78 break;
79 }
80
81 if (pipe->writers == 0)
82 {
83 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP));
84 }
85
86 if (pipe->readers == 0)
87 {
88 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP));
89 }
90
91 if (device->ref_count == 1)
92 {
93 rt_ringbuffer_destroy(pipe->fifo);
94 pipe->fifo = RT_NULL;
95 }
96 device->ref_count --;
97
98 rt_mutex_release(&(pipe->lock));
99
100 return 0;
101 }
102
pipe_fops_ioctl(struct dfs_fd * fd,int cmd,void * args)103 static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
104 {
105 rt_pipe_t *pipe;
106 int ret = 0;
107
108 pipe = (rt_pipe_t *)fd->data;
109
110 switch (cmd)
111 {
112 case FIONREAD:
113 *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
114 break;
115 case FIONWRITE:
116 *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
117 break;
118 default:
119 ret = -EINVAL;
120 break;
121 }
122
123 return ret;
124 }
125
pipe_fops_read(struct dfs_fd * fd,void * buf,size_t count)126 static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
127 {
128 int len = 0;
129 rt_pipe_t *pipe;
130
131 pipe = (rt_pipe_t *)fd->data;
132
133 /* no process has the pipe open for writing, return end-of-file */
134 if (pipe->writers == 0)
135 return 0;
136
137 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
138
139 while (1)
140 {
141 if (pipe->writers == 0)
142 {
143 goto out;
144 }
145
146 len = rt_ringbuffer_get(pipe->fifo, buf, count);
147
148 if (len > 0)
149 {
150 break;
151 }
152 else
153 {
154 if (fd->flags & O_NONBLOCK)
155 {
156 len = -EAGAIN;
157 goto out;
158 }
159
160 rt_mutex_release(&pipe->lock);
161 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
162 rt_wqueue_wait(&(pipe->reader_queue), 0, -1);
163 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
164 }
165 }
166
167 /* wakeup writer */
168 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
169
170 out:
171 rt_mutex_release(&pipe->lock);
172 return len;
173 }
174
pipe_fops_write(struct dfs_fd * fd,const void * buf,size_t count)175 static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
176 {
177 int len;
178 rt_pipe_t *pipe;
179 int wakeup = 0;
180 int ret = 0;
181 uint8_t *pbuf;
182
183 pipe = (rt_pipe_t *)fd->data;
184
185 if (pipe->readers == 0)
186 {
187 ret = -EPIPE;
188 goto out;
189 }
190
191 if (count == 0)
192 return 0;
193
194 pbuf = (uint8_t*)buf;
195 rt_mutex_take(&pipe->lock, -1);
196
197 while (1)
198 {
199 if (pipe->readers == 0)
200 {
201 if (ret == 0)
202 ret = -EPIPE;
203 break;
204 }
205
206 len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
207 ret += len;
208 pbuf += len;
209 wakeup = 1;
210
211 if (ret == count)
212 {
213 break;
214 }
215 else
216 {
217 if (fd->flags & O_NONBLOCK)
218 {
219 if (ret == 0)
220 {
221 ret = -EAGAIN;
222 }
223
224 break;
225 }
226 }
227
228 rt_mutex_release(&pipe->lock);
229 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
230 /* pipe full, waiting on suspended write list */
231 rt_wqueue_wait(&(pipe->writer_queue), 0, -1);
232 rt_mutex_take(&pipe->lock, -1);
233 }
234 rt_mutex_release(&pipe->lock);
235
236 if (wakeup)
237 {
238 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
239 }
240
241 out:
242 return ret;
243 }
244
pipe_fops_poll(struct dfs_fd * fd,rt_pollreq_t * req)245 static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
246 {
247 int mask = 0;
248 rt_pipe_t *pipe;
249 int mode = 0;
250 pipe = (rt_pipe_t *)fd->data;
251
252 rt_poll_add(&(pipe->reader_queue), req);
253 rt_poll_add(&(pipe->writer_queue), req);
254
255 switch (fd->flags & O_ACCMODE)
256 {
257 case O_RDONLY:
258 mode = 1;
259 break;
260 case O_WRONLY:
261 mode = 2;
262 break;
263 case O_RDWR:
264 mode = 3;
265 break;
266 }
267
268 if (mode & 1)
269 {
270 if (rt_ringbuffer_data_len(pipe->fifo) != 0)
271 {
272 mask |= POLLIN;
273 }
274 if (pipe->writers == 0)
275 {
276 mask |= POLLHUP;
277 }
278 }
279
280 if (mode & 2)
281 {
282 if (rt_ringbuffer_space_len(pipe->fifo) != 0)
283 {
284 mask |= POLLOUT;
285 }
286 if (pipe->readers == 0)
287 {
288 mask |= POLLERR;
289 }
290 }
291
292 return mask;
293 }
294
295 static const struct dfs_file_ops pipe_fops =
296 {
297 pipe_fops_open,
298 pipe_fops_close,
299 pipe_fops_ioctl,
300 pipe_fops_read,
301 pipe_fops_write,
302 RT_NULL,
303 RT_NULL,
304 RT_NULL,
305 pipe_fops_poll,
306 };
307 #endif /* end of RT_USING_POSIX */
308
rt_pipe_open(rt_device_t device,rt_uint16_t oflag)309 rt_err_t rt_pipe_open (rt_device_t device, rt_uint16_t oflag)
310 {
311 rt_pipe_t *pipe = (rt_pipe_t *)device;
312
313 if (device == RT_NULL) return -RT_EINVAL;
314 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
315
316 if (pipe->fifo == RT_NULL)
317 {
318 pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
319 }
320
321 rt_mutex_release(&(pipe->lock));
322
323 return RT_EOK;
324 }
325
rt_pipe_close(rt_device_t device)326 rt_err_t rt_pipe_close (rt_device_t device)
327 {
328 rt_pipe_t *pipe = (rt_pipe_t *)device;
329
330 if (device == RT_NULL) return -RT_EINVAL;
331 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
332
333 if (device->ref_count == 1)
334 {
335 rt_ringbuffer_destroy(pipe->fifo);
336 pipe->fifo = RT_NULL;
337 }
338
339 rt_mutex_release(&(pipe->lock));
340
341 return RT_EOK;
342 }
343
rt_pipe_read(rt_device_t device,rt_off_t pos,void * buffer,rt_size_t count)344 rt_size_t rt_pipe_read (rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count)
345 {
346 uint8_t *pbuf;
347 rt_size_t read_bytes = 0;
348 rt_pipe_t *pipe = (rt_pipe_t *)device;
349
350 if (device == RT_NULL)
351 {
352 rt_set_errno(-EINVAL);
353 return 0;
354 }
355 if (count == 0) return 0;
356
357 pbuf = (uint8_t*)buffer;
358 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
359
360 while (read_bytes < count)
361 {
362 int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes);
363 if (len <= 0) break;
364
365 read_bytes += len;
366 }
367 rt_mutex_release(&pipe->lock);
368
369 return read_bytes;
370 }
371
rt_pipe_write(rt_device_t device,rt_off_t pos,const void * buffer,rt_size_t count)372 rt_size_t rt_pipe_write (rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count)
373 {
374 uint8_t *pbuf;
375 rt_size_t write_bytes = 0;
376 rt_pipe_t *pipe = (rt_pipe_t *)device;
377
378 if (device == RT_NULL)
379 {
380 rt_set_errno(-EINVAL);
381 return 0;
382 }
383 if (count == 0) return 0;
384
385 pbuf = (uint8_t*)buffer;
386 rt_mutex_take(&pipe->lock, -1);
387
388 while (write_bytes < count)
389 {
390 int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes);
391 if (len <= 0) break;
392
393 write_bytes += len;
394 }
395 rt_mutex_release(&pipe->lock);
396
397 return write_bytes;
398 }
399
rt_pipe_control(rt_device_t dev,int cmd,void * args)400 rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args)
401 {
402 return RT_EOK;
403 }
404
405 #ifdef RT_USING_DEVICE_OPS
406 const static struct rt_device_ops pipe_ops =
407 {
408 RT_NULL,
409 rt_pipe_open,
410 rt_pipe_close,
411 rt_pipe_read,
412 rt_pipe_write,
413 rt_pipe_control,
414 };
415 #endif
416
rt_pipe_create(const char * name,int bufsz)417 rt_pipe_t *rt_pipe_create(const char *name, int bufsz)
418 {
419 rt_pipe_t *pipe;
420 rt_device_t dev;
421
422 pipe = rt_malloc(sizeof(rt_pipe_t));
423 if (pipe == RT_NULL) return RT_NULL;
424
425 rt_memset(pipe, 0, sizeof(rt_pipe_t));
426 rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO);
427 rt_wqueue_init(&(pipe->reader_queue));
428 rt_wqueue_init(&(pipe->writer_queue));
429
430 RT_ASSERT(bufsz < 0xFFFF);
431 pipe->bufsz = bufsz;
432
433 dev = &(pipe->parent);
434 dev->type = RT_Device_Class_Pipe;
435 #ifdef RT_USING_DEVICE_OPS
436 dev->ops = &pipe_ops;
437 #else
438 dev->init = RT_NULL;
439 dev->open = rt_pipe_open;
440 dev->read = rt_pipe_read;
441 dev->write = rt_pipe_write;
442 dev->close = rt_pipe_close;
443 dev->control = rt_pipe_control;
444 #endif
445
446 dev->rx_indicate = RT_NULL;
447 dev->tx_complete = RT_NULL;
448
449 if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
450 {
451 rt_free(pipe);
452 return RT_NULL;
453 }
454 #ifdef RT_USING_POSIX
455 dev->fops = (void*)&pipe_fops;
456 #endif
457
458 return pipe;
459 }
460
rt_pipe_delete(const char * name)461 int rt_pipe_delete(const char *name)
462 {
463 int result = 0;
464 rt_device_t device;
465
466 device = rt_device_find(name);
467 if (device)
468 {
469 if (device->type == RT_Device_Class_Pipe)
470 {
471 rt_pipe_t *pipe;
472
473 if (device->ref_count != 0)
474 {
475 return -RT_EBUSY;
476 }
477
478 pipe = (rt_pipe_t *)device;
479
480 rt_mutex_detach(&(pipe->lock));
481 rt_device_unregister(device);
482
483 /* close fifo ringbuffer */
484 if (pipe->fifo)
485 {
486 rt_ringbuffer_destroy(pipe->fifo);
487 pipe->fifo = RT_NULL;
488 }
489 rt_free(pipe);
490 }
491 else
492 {
493 result = -ENODEV;
494 }
495 }
496 else
497 {
498 result = -ENODEV;
499 }
500
501 return result;
502 }
503
504 #ifdef RT_USING_POSIX
pipe(int fildes[2])505 int pipe(int fildes[2])
506 {
507 rt_pipe_t *pipe;
508 char dname[8];
509 char dev_name[32];
510 static int pipeno = 0;
511
512 rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);
513
514 pipe = rt_pipe_create(dname, PIPE_BUFSZ);
515 if (pipe == RT_NULL)
516 {
517 return -1;
518 }
519
520 rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
521 fildes[0] = open(dev_name, O_RDONLY, 0);
522 if (fildes[0] < 0)
523 {
524 return -1;
525 }
526
527 fildes[1] = open(dev_name, O_WRONLY, 0);
528 if (fildes[1] < 0)
529 {
530 close(fildes[0]);
531 return -1;
532 }
533
534 return 0;
535 }
536
mkfifo(const char * path,mode_t mode)537 int mkfifo(const char *path, mode_t mode)
538 {
539 rt_pipe_t *pipe;
540
541 pipe = rt_pipe_create(path, PIPE_BUFSZ);
542 if (pipe == RT_NULL)
543 {
544 return -1;
545 }
546
547 return 0;
548 }
549 #endif
550