1 /* 2 * Copyright (c) 2006-2018, RT-Thread Development Team 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 * 2012-09-30 Bernard first version. 9 * 2017-11-08 JasonJiaJie fix memory leak issue when close a pipe. 10 */ 11 #include <rthw.h> 12 #include <rtdevice.h> 13 #include <stdint.h> 14 15 #if defined(RT_USING_POSIX) 16 #include <dfs_file.h> 17 #include <dfs_posix.h> 18 #include <dfs_poll.h> 19 20 static int pipe_fops_open(struct dfs_fd *fd) 21 { 22 rt_device_t device; 23 rt_pipe_t *pipe; 24 25 pipe = (rt_pipe_t *)fd->data; 26 if (!pipe) return -1; 27 28 device = &(pipe->parent); 29 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 30 31 if (device->ref_count == 0) 32 { 33 pipe->fifo = rt_ringbuffer_create(pipe->bufsz); 34 } 35 36 switch (fd->flags & O_ACCMODE) 37 { 38 case O_RDONLY: 39 pipe->readers ++; 40 break; 41 case O_WRONLY: 42 pipe->writers ++; 43 break; 44 case O_RDWR: 45 pipe->readers ++; 46 pipe->writers ++; 47 break; 48 } 49 device->ref_count ++; 50 51 rt_mutex_release(&(pipe->lock)); 52 53 return 0; 54 } 55 56 static int pipe_fops_close(struct dfs_fd *fd) 57 { 58 rt_device_t device; 59 rt_pipe_t *pipe; 60 61 pipe = (rt_pipe_t *)fd->data; 62 if (!pipe) return -1; 63 64 device = &(pipe->parent); 65 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 66 67 switch (fd->flags & O_ACCMODE) 68 { 69 case O_RDONLY: 70 pipe->readers --; 71 break; 72 case O_WRONLY: 73 pipe->writers --; 74 break; 75 case O_RDWR: 76 pipe->readers --; 77 pipe->writers --; 78 break; 79 } 80 81 if (pipe->writers == 0) 82 { 83 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP)); 84 } 85 86 if (pipe->readers == 0) 87 { 88 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP)); 89 } 90 91 if (device->ref_count == 1) 92 { 93 rt_ringbuffer_destroy(pipe->fifo); 94 pipe->fifo = RT_NULL; 95 } 96 device->ref_count --; 97 98 rt_mutex_release(&(pipe->lock)); 99 100 return 0; 101 } 102 103 static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args) 104 { 105 rt_pipe_t *pipe; 106 int ret = 0; 107 108 pipe = (rt_pipe_t *)fd->data; 109 110 switch (cmd) 111 { 112 case FIONREAD: 113 *((int*)args) = rt_ringbuffer_data_len(pipe->fifo); 114 break; 115 case FIONWRITE: 116 *((int*)args) = rt_ringbuffer_space_len(pipe->fifo); 117 break; 118 default: 119 ret = -EINVAL; 120 break; 121 } 122 123 return ret; 124 } 125 126 static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count) 127 { 128 int len = 0; 129 rt_pipe_t *pipe; 130 131 pipe = (rt_pipe_t *)fd->data; 132 133 /* no process has the pipe open for writing, return end-of-file */ 134 if (pipe->writers == 0) 135 return 0; 136 137 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 138 139 while (1) 140 { 141 if (pipe->writers == 0) 142 { 143 goto out; 144 } 145 146 len = rt_ringbuffer_get(pipe->fifo, buf, count); 147 148 if (len > 0) 149 { 150 break; 151 } 152 else 153 { 154 if (fd->flags & O_NONBLOCK) 155 { 156 len = -EAGAIN; 157 goto out; 158 } 159 160 rt_mutex_release(&pipe->lock); 161 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT); 162 rt_wqueue_wait(&(pipe->reader_queue), 0, -1); 163 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 164 } 165 } 166 167 /* wakeup writer */ 168 rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT); 169 170 out: 171 rt_mutex_release(&pipe->lock); 172 return len; 173 } 174 175 static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count) 176 { 177 int len; 178 rt_pipe_t *pipe; 179 int wakeup = 0; 180 int ret = 0; 181 uint8_t *pbuf; 182 183 pipe = (rt_pipe_t *)fd->data; 184 185 if (pipe->readers == 0) 186 { 187 ret = -EPIPE; 188 goto out; 189 } 190 191 if (count == 0) 192 return 0; 193 194 pbuf = (uint8_t*)buf; 195 rt_mutex_take(&pipe->lock, -1); 196 197 while (1) 198 { 199 if (pipe->readers == 0) 200 { 201 if (ret == 0) 202 ret = -EPIPE; 203 break; 204 } 205 206 len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret); 207 ret += len; 208 pbuf += len; 209 wakeup = 1; 210 211 if (ret == count) 212 { 213 break; 214 } 215 else 216 { 217 if (fd->flags & O_NONBLOCK) 218 { 219 if (ret == 0) 220 { 221 ret = -EAGAIN; 222 } 223 224 break; 225 } 226 } 227 228 rt_mutex_release(&pipe->lock); 229 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN); 230 /* pipe full, waiting on suspended write list */ 231 rt_wqueue_wait(&(pipe->writer_queue), 0, -1); 232 rt_mutex_take(&pipe->lock, -1); 233 } 234 rt_mutex_release(&pipe->lock); 235 236 if (wakeup) 237 { 238 rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN); 239 } 240 241 out: 242 return ret; 243 } 244 245 static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req) 246 { 247 int mask = 0; 248 rt_pipe_t *pipe; 249 int mode = 0; 250 pipe = (rt_pipe_t *)fd->data; 251 252 rt_poll_add(&(pipe->reader_queue), req); 253 rt_poll_add(&(pipe->writer_queue), req); 254 255 switch (fd->flags & O_ACCMODE) 256 { 257 case O_RDONLY: 258 mode = 1; 259 break; 260 case O_WRONLY: 261 mode = 2; 262 break; 263 case O_RDWR: 264 mode = 3; 265 break; 266 } 267 268 if (mode & 1) 269 { 270 if (rt_ringbuffer_data_len(pipe->fifo) != 0) 271 { 272 mask |= POLLIN; 273 } 274 if (pipe->writers == 0) 275 { 276 mask |= POLLHUP; 277 } 278 } 279 280 if (mode & 2) 281 { 282 if (rt_ringbuffer_space_len(pipe->fifo) != 0) 283 { 284 mask |= POLLOUT; 285 } 286 if (pipe->readers == 0) 287 { 288 mask |= POLLERR; 289 } 290 } 291 292 return mask; 293 } 294 295 static const struct dfs_file_ops pipe_fops = 296 { 297 pipe_fops_open, 298 pipe_fops_close, 299 pipe_fops_ioctl, 300 pipe_fops_read, 301 pipe_fops_write, 302 RT_NULL, 303 RT_NULL, 304 RT_NULL, 305 pipe_fops_poll, 306 }; 307 #endif /* end of RT_USING_POSIX */ 308 309 rt_err_t rt_pipe_open (rt_device_t device, rt_uint16_t oflag) 310 { 311 rt_pipe_t *pipe = (rt_pipe_t *)device; 312 313 if (device == RT_NULL) return -RT_EINVAL; 314 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 315 316 if (pipe->fifo == RT_NULL) 317 { 318 pipe->fifo = rt_ringbuffer_create(pipe->bufsz); 319 } 320 321 rt_mutex_release(&(pipe->lock)); 322 323 return RT_EOK; 324 } 325 326 rt_err_t rt_pipe_close (rt_device_t device) 327 { 328 rt_pipe_t *pipe = (rt_pipe_t *)device; 329 330 if (device == RT_NULL) return -RT_EINVAL; 331 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 332 333 if (device->ref_count == 1) 334 { 335 rt_ringbuffer_destroy(pipe->fifo); 336 pipe->fifo = RT_NULL; 337 } 338 339 rt_mutex_release(&(pipe->lock)); 340 341 return RT_EOK; 342 } 343 344 rt_size_t rt_pipe_read (rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count) 345 { 346 uint8_t *pbuf; 347 rt_size_t read_bytes = 0; 348 rt_pipe_t *pipe = (rt_pipe_t *)device; 349 350 if (device == RT_NULL) 351 { 352 rt_set_errno(-EINVAL); 353 return 0; 354 } 355 if (count == 0) return 0; 356 357 pbuf = (uint8_t*)buffer; 358 rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER); 359 360 while (read_bytes < count) 361 { 362 int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes); 363 if (len <= 0) break; 364 365 read_bytes += len; 366 } 367 rt_mutex_release(&pipe->lock); 368 369 return read_bytes; 370 } 371 372 rt_size_t rt_pipe_write (rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count) 373 { 374 uint8_t *pbuf; 375 rt_size_t write_bytes = 0; 376 rt_pipe_t *pipe = (rt_pipe_t *)device; 377 378 if (device == RT_NULL) 379 { 380 rt_set_errno(-EINVAL); 381 return 0; 382 } 383 if (count == 0) return 0; 384 385 pbuf = (uint8_t*)buffer; 386 rt_mutex_take(&pipe->lock, -1); 387 388 while (write_bytes < count) 389 { 390 int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes); 391 if (len <= 0) break; 392 393 write_bytes += len; 394 } 395 rt_mutex_release(&pipe->lock); 396 397 return write_bytes; 398 } 399 400 rt_err_t rt_pipe_control(rt_device_t dev, int cmd, void *args) 401 { 402 return RT_EOK; 403 } 404 405 #ifdef RT_USING_DEVICE_OPS 406 const static struct rt_device_ops pipe_ops = 407 { 408 RT_NULL, 409 rt_pipe_open, 410 rt_pipe_close, 411 rt_pipe_read, 412 rt_pipe_write, 413 rt_pipe_control, 414 }; 415 #endif 416 417 rt_pipe_t *rt_pipe_create(const char *name, int bufsz) 418 { 419 rt_pipe_t *pipe; 420 rt_device_t dev; 421 422 pipe = rt_malloc(sizeof(rt_pipe_t)); 423 if (pipe == RT_NULL) return RT_NULL; 424 425 rt_memset(pipe, 0, sizeof(rt_pipe_t)); 426 rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO); 427 rt_wqueue_init(&(pipe->reader_queue)); 428 rt_wqueue_init(&(pipe->writer_queue)); 429 430 RT_ASSERT(bufsz < 0xFFFF); 431 pipe->bufsz = bufsz; 432 433 dev = &(pipe->parent); 434 dev->type = RT_Device_Class_Pipe; 435 #ifdef RT_USING_DEVICE_OPS 436 dev->ops = &pipe_ops; 437 #else 438 dev->init = RT_NULL; 439 dev->open = rt_pipe_open; 440 dev->read = rt_pipe_read; 441 dev->write = rt_pipe_write; 442 dev->close = rt_pipe_close; 443 dev->control = rt_pipe_control; 444 #endif 445 446 dev->rx_indicate = RT_NULL; 447 dev->tx_complete = RT_NULL; 448 449 if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0) 450 { 451 rt_free(pipe); 452 return RT_NULL; 453 } 454 #ifdef RT_USING_POSIX 455 dev->fops = (void*)&pipe_fops; 456 #endif 457 458 return pipe; 459 } 460 461 int rt_pipe_delete(const char *name) 462 { 463 int result = 0; 464 rt_device_t device; 465 466 device = rt_device_find(name); 467 if (device) 468 { 469 if (device->type == RT_Device_Class_Pipe) 470 { 471 rt_pipe_t *pipe; 472 473 if (device->ref_count != 0) 474 { 475 return -RT_EBUSY; 476 } 477 478 pipe = (rt_pipe_t *)device; 479 480 rt_mutex_detach(&(pipe->lock)); 481 rt_device_unregister(device); 482 483 /* close fifo ringbuffer */ 484 if (pipe->fifo) 485 { 486 rt_ringbuffer_destroy(pipe->fifo); 487 pipe->fifo = RT_NULL; 488 } 489 rt_free(pipe); 490 } 491 else 492 { 493 result = -ENODEV; 494 } 495 } 496 else 497 { 498 result = -ENODEV; 499 } 500 501 return result; 502 } 503 504 #ifdef RT_USING_POSIX 505 int pipe(int fildes[2]) 506 { 507 rt_pipe_t *pipe; 508 char dname[8]; 509 char dev_name[32]; 510 static int pipeno = 0; 511 512 rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++); 513 514 pipe = rt_pipe_create(dname, PIPE_BUFSZ); 515 if (pipe == RT_NULL) 516 { 517 return -1; 518 } 519 520 rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname); 521 fildes[0] = open(dev_name, O_RDONLY, 0); 522 if (fildes[0] < 0) 523 { 524 return -1; 525 } 526 527 fildes[1] = open(dev_name, O_WRONLY, 0); 528 if (fildes[1] < 0) 529 { 530 close(fildes[0]); 531 return -1; 532 } 533 534 return 0; 535 } 536 537 int mkfifo(const char *path, mode_t mode) 538 { 539 rt_pipe_t *pipe; 540 541 pipe = rt_pipe_create(path, PIPE_BUFSZ); 542 if (pipe == RT_NULL) 543 { 544 return -1; 545 } 546 547 return 0; 548 } 549 #endif 550