xref: /nrf52832-nimble/rt-thread/components/drivers/src/pipe.c (revision 104654410c56c573564690304ae786df310c91fc)
1 /*
2  * Copyright (c) 2006-2018, RT-Thread Development Team
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  *
6  * Change Logs:
7  * Date           Author       Notes
8  * 2012-09-30     Bernard      first version.
9  * 2017-11-08     JasonJiaJie  fix memory leak issue when close a pipe.
10  */
11 #include <rthw.h>
12 #include <rtdevice.h>
13 #include <stdint.h>
14 
15 #if defined(RT_USING_POSIX)
16 #include <dfs_file.h>
17 #include <dfs_posix.h>
18 #include <dfs_poll.h>
19 
pipe_fops_open(struct dfs_fd * fd)20 static int pipe_fops_open(struct dfs_fd *fd)
21 {
22     rt_device_t device;
23     rt_pipe_t *pipe;
24 
25     pipe = (rt_pipe_t *)fd->data;
26     if (!pipe) return -1;
27 
28     device = &(pipe->parent);
29     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
30 
31     if (device->ref_count == 0)
32     {
33         pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
34     }
35 
36     switch (fd->flags & O_ACCMODE)
37     {
38     case O_RDONLY:
39         pipe->readers ++;
40         break;
41     case O_WRONLY:
42         pipe->writers ++;
43         break;
44     case O_RDWR:
45         pipe->readers ++;
46         pipe->writers ++;
47         break;
48     }
49     device->ref_count ++;
50 
51     rt_mutex_release(&(pipe->lock));
52 
53     return 0;
54 }
55 
pipe_fops_close(struct dfs_fd * fd)56 static int pipe_fops_close(struct dfs_fd *fd)
57 {
58     rt_device_t device;
59     rt_pipe_t *pipe;
60 
61     pipe = (rt_pipe_t *)fd->data;
62     if (!pipe) return -1;
63 
64     device = &(pipe->parent);
65     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
66 
67     switch (fd->flags & O_ACCMODE)
68     {
69     case O_RDONLY:
70         pipe->readers --;
71         break;
72     case O_WRONLY:
73         pipe->writers --;
74         break;
75     case O_RDWR:
76         pipe->readers --;
77         pipe->writers --;
78         break;
79     }
80 
81     if (pipe->writers == 0)
82     {
83         rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP));
84     }
85 
86     if (pipe->readers == 0)
87     {
88         rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP));
89     }
90 
91     if (device->ref_count == 1)
92     {
93         rt_ringbuffer_destroy(pipe->fifo);
94         pipe->fifo = RT_NULL;
95     }
96     device->ref_count --;
97 
98     rt_mutex_release(&(pipe->lock));
99 
100     return 0;
101 }
102 
pipe_fops_ioctl(struct dfs_fd * fd,int cmd,void * args)103 static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
104 {
105     rt_pipe_t *pipe;
106     int ret = 0;
107 
108     pipe = (rt_pipe_t *)fd->data;
109 
110     switch (cmd)
111     {
112     case FIONREAD:
113         *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
114         break;
115     case FIONWRITE:
116         *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
117         break;
118     default:
119         ret = -EINVAL;
120         break;
121     }
122 
123     return ret;
124 }
125 
pipe_fops_read(struct dfs_fd * fd,void * buf,size_t count)126 static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
127 {
128     int len = 0;
129     rt_pipe_t *pipe;
130 
131     pipe = (rt_pipe_t *)fd->data;
132 
133     /* no process has the pipe open for writing, return end-of-file */
134     if (pipe->writers == 0)
135         return 0;
136 
137     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
138 
139     while (1)
140     {
141         if (pipe->writers == 0)
142         {
143             goto out;
144         }
145 
146         len = rt_ringbuffer_get(pipe->fifo, buf, count);
147 
148         if (len > 0)
149         {
150             break;
151         }
152         else
153         {
154             if (fd->flags & O_NONBLOCK)
155             {
156                 len = -EAGAIN;
157                 goto out;
158             }
159 
160             rt_mutex_release(&pipe->lock);
161             rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
162             rt_wqueue_wait(&(pipe->reader_queue), 0, -1);
163             rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
164         }
165     }
166 
167     /* wakeup writer */
168     rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
169 
170 out:
171     rt_mutex_release(&pipe->lock);
172     return len;
173 }
174 
pipe_fops_write(struct dfs_fd * fd,const void * buf,size_t count)175 static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
176 {
177     int len;
178     rt_pipe_t *pipe;
179     int wakeup = 0;
180     int ret = 0;
181     uint8_t *pbuf;
182 
183     pipe = (rt_pipe_t *)fd->data;
184 
185     if (pipe->readers == 0)
186     {
187         ret = -EPIPE;
188         goto out;
189     }
190 
191     if (count == 0)
192         return 0;
193 
194     pbuf = (uint8_t*)buf;
195     rt_mutex_take(&pipe->lock, -1);
196 
197     while (1)
198     {
199         if (pipe->readers == 0)
200         {
201             if (ret == 0)
202                 ret = -EPIPE;
203             break;
204         }
205 
206         len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
207         ret +=  len;
208         pbuf += len;
209         wakeup = 1;
210 
211         if (ret == count)
212         {
213             break;
214         }
215         else
216         {
217             if (fd->flags & O_NONBLOCK)
218             {
219                 if (ret == 0)
220                 {
221                     ret = -EAGAIN;
222                 }
223 
224                 break;
225             }
226         }
227 
228         rt_mutex_release(&pipe->lock);
229         rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
230         /* pipe full, waiting on suspended write list */
231         rt_wqueue_wait(&(pipe->writer_queue), 0, -1);
232         rt_mutex_take(&pipe->lock, -1);
233     }
234     rt_mutex_release(&pipe->lock);
235 
236     if (wakeup)
237     {
238         rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
239     }
240 
241 out:
242     return ret;
243 }
244 
pipe_fops_poll(struct dfs_fd * fd,rt_pollreq_t * req)245 static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
246 {
247     int mask = 0;
248     rt_pipe_t *pipe;
249     int mode = 0;
250     pipe = (rt_pipe_t *)fd->data;
251 
252     rt_poll_add(&(pipe->reader_queue), req);
253     rt_poll_add(&(pipe->writer_queue), req);
254 
255     switch (fd->flags & O_ACCMODE)
256     {
257     case O_RDONLY:
258         mode = 1;
259         break;
260     case O_WRONLY:
261         mode = 2;
262         break;
263     case O_RDWR:
264         mode = 3;
265         break;
266     }
267 
268     if (mode & 1)
269     {
270         if (rt_ringbuffer_data_len(pipe->fifo) != 0)
271         {
272             mask |= POLLIN;
273         }
274         if (pipe->writers == 0)
275         {
276             mask |= POLLHUP;
277         }
278     }
279 
280     if (mode & 2)
281     {
282         if (rt_ringbuffer_space_len(pipe->fifo) != 0)
283         {
284             mask |= POLLOUT;
285         }
286         if (pipe->readers == 0)
287         {
288             mask |= POLLERR;
289         }
290     }
291 
292     return mask;
293 }
294 
295 static const struct dfs_file_ops pipe_fops =
296 {
297     pipe_fops_open,
298     pipe_fops_close,
299     pipe_fops_ioctl,
300     pipe_fops_read,
301     pipe_fops_write,
302     RT_NULL,
303     RT_NULL,
304     RT_NULL,
305     pipe_fops_poll,
306 };
307 #endif /* end of RT_USING_POSIX */
308 
rt_pipe_open(rt_device_t device,rt_uint16_t oflag)309 rt_err_t  rt_pipe_open (rt_device_t device, rt_uint16_t oflag)
310 {
311     rt_pipe_t *pipe = (rt_pipe_t *)device;
312 
313     if (device == RT_NULL) return -RT_EINVAL;
314     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
315 
316     if (pipe->fifo == RT_NULL)
317     {
318         pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
319     }
320 
321     rt_mutex_release(&(pipe->lock));
322 
323     return RT_EOK;
324 }
325 
rt_pipe_close(rt_device_t device)326 rt_err_t  rt_pipe_close  (rt_device_t device)
327 {
328     rt_pipe_t *pipe = (rt_pipe_t *)device;
329 
330     if (device == RT_NULL) return -RT_EINVAL;
331     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
332 
333     if (device->ref_count == 1)
334     {
335         rt_ringbuffer_destroy(pipe->fifo);
336         pipe->fifo = RT_NULL;
337     }
338 
339     rt_mutex_release(&(pipe->lock));
340 
341     return RT_EOK;
342 }
343 
rt_pipe_read(rt_device_t device,rt_off_t pos,void * buffer,rt_size_t count)344 rt_size_t rt_pipe_read   (rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count)
345 {
346     uint8_t *pbuf;
347     rt_size_t read_bytes = 0;
348     rt_pipe_t *pipe = (rt_pipe_t *)device;
349 
350     if (device == RT_NULL)
351     {
352         rt_set_errno(-EINVAL);
353         return 0;
354     }
355     if (count == 0) return 0;
356 
357     pbuf = (uint8_t*)buffer;
358     rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
359 
360     while (read_bytes < count)
361     {
362         int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes);
363         if (len <= 0) break;
364 
365         read_bytes += len;
366     }
367     rt_mutex_release(&pipe->lock);
368 
369     return read_bytes;
370 }
371 
rt_pipe_write(rt_device_t device,rt_off_t pos,const void * buffer,rt_size_t count)372 rt_size_t rt_pipe_write  (rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count)
373 {
374     uint8_t *pbuf;
375     rt_size_t write_bytes = 0;
376     rt_pipe_t *pipe = (rt_pipe_t *)device;
377 
378     if (device == RT_NULL)
379     {
380         rt_set_errno(-EINVAL);
381         return 0;
382     }
383     if (count == 0) return 0;
384 
385     pbuf = (uint8_t*)buffer;
386     rt_mutex_take(&pipe->lock, -1);
387 
388     while (write_bytes < count)
389     {
390         int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes);
391         if (len <= 0) break;
392 
393         write_bytes += len;
394     }
395     rt_mutex_release(&pipe->lock);
396 
397     return write_bytes;
398 }
399 
rt_pipe_control(rt_device_t dev,int cmd,void * args)400 rt_err_t  rt_pipe_control(rt_device_t dev, int cmd, void *args)
401 {
402     return RT_EOK;
403 }
404 
405 #ifdef RT_USING_DEVICE_OPS
406 const static struct rt_device_ops pipe_ops =
407 {
408     RT_NULL,
409     rt_pipe_open,
410     rt_pipe_close,
411     rt_pipe_read,
412     rt_pipe_write,
413     rt_pipe_control,
414 };
415 #endif
416 
rt_pipe_create(const char * name,int bufsz)417 rt_pipe_t *rt_pipe_create(const char *name, int bufsz)
418 {
419     rt_pipe_t *pipe;
420     rt_device_t dev;
421 
422     pipe = rt_malloc(sizeof(rt_pipe_t));
423     if (pipe == RT_NULL) return RT_NULL;
424 
425     rt_memset(pipe, 0, sizeof(rt_pipe_t));
426     rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO);
427     rt_wqueue_init(&(pipe->reader_queue));
428     rt_wqueue_init(&(pipe->writer_queue));
429 
430     RT_ASSERT(bufsz < 0xFFFF);
431     pipe->bufsz = bufsz;
432 
433     dev = &(pipe->parent);
434     dev->type = RT_Device_Class_Pipe;
435 #ifdef RT_USING_DEVICE_OPS
436     dev->ops         = &pipe_ops;
437 #else
438     dev->init        = RT_NULL;
439     dev->open        = rt_pipe_open;
440     dev->read        = rt_pipe_read;
441     dev->write       = rt_pipe_write;
442     dev->close       = rt_pipe_close;
443     dev->control     = rt_pipe_control;
444 #endif
445 
446     dev->rx_indicate = RT_NULL;
447     dev->tx_complete = RT_NULL;
448 
449     if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
450     {
451         rt_free(pipe);
452         return RT_NULL;
453     }
454 #ifdef RT_USING_POSIX
455     dev->fops = (void*)&pipe_fops;
456 #endif
457 
458     return pipe;
459 }
460 
rt_pipe_delete(const char * name)461 int rt_pipe_delete(const char *name)
462 {
463     int result = 0;
464     rt_device_t device;
465 
466     device = rt_device_find(name);
467     if (device)
468     {
469         if (device->type == RT_Device_Class_Pipe)
470         {
471             rt_pipe_t *pipe;
472 
473             if (device->ref_count != 0)
474             {
475                 return -RT_EBUSY;
476             }
477 
478             pipe = (rt_pipe_t *)device;
479 
480             rt_mutex_detach(&(pipe->lock));
481             rt_device_unregister(device);
482 
483             /* close fifo ringbuffer */
484             if (pipe->fifo)
485             {
486                 rt_ringbuffer_destroy(pipe->fifo);
487                 pipe->fifo = RT_NULL;
488             }
489             rt_free(pipe);
490         }
491         else
492         {
493             result = -ENODEV;
494         }
495     }
496     else
497     {
498         result = -ENODEV;
499     }
500 
501     return result;
502 }
503 
504 #ifdef RT_USING_POSIX
pipe(int fildes[2])505 int pipe(int fildes[2])
506 {
507     rt_pipe_t *pipe;
508     char dname[8];
509     char dev_name[32];
510     static int pipeno = 0;
511 
512     rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);
513 
514     pipe = rt_pipe_create(dname, PIPE_BUFSZ);
515     if (pipe == RT_NULL)
516     {
517         return -1;
518     }
519 
520     rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
521     fildes[0] = open(dev_name, O_RDONLY, 0);
522     if (fildes[0] < 0)
523     {
524         return -1;
525     }
526 
527     fildes[1] = open(dev_name, O_WRONLY, 0);
528     if (fildes[1] < 0)
529     {
530         close(fildes[0]);
531         return -1;
532     }
533 
534     return 0;
535 }
536 
mkfifo(const char * path,mode_t mode)537 int mkfifo(const char *path, mode_t mode)
538 {
539     rt_pipe_t *pipe;
540 
541     pipe = rt_pipe_create(path, PIPE_BUFSZ);
542     if (pipe == RT_NULL)
543     {
544         return -1;
545     }
546 
547     return 0;
548 }
549 #endif
550