// SPDX-License-Identifier: MIT or LGPL-2.1-only #include #include #include #include #include "ublksrv_priv.h" #include "ublksrv_aio.h" bool ublksrv_is_recovering(const struct ublksrv_ctrl_dev *ctrl_dev) { return ctrl_dev->tgt_argc == -1; } static inline struct ublksrv_io_desc *ublksrv_get_iod( const struct _ublksrv_queue *q, int tag) { return (struct ublksrv_io_desc *) &(q->io_cmd_buf[tag * sizeof(struct ublksrv_io_desc)]); } /* * /dev/ublkbN shares same lifetime with the ublk io daemon: * * 1) IO from /dev/ublkbN is handled by the io daemon directly * * 2) io cmd buffer is allocated from ublk driver, mapped to * io daemon vm space via mmap, and each hw queue has its own * io cmd buffer * * 3) io buffers are pre-allocated from the io daemon and pass * to ublk driver via io command, meantime ublk driver may choose * to pin these user pages before starting device * * Each /dev/ublkcN is owned by only one io daemon, and can't be * opened by other daemon. And the io daemon uses its allocated * io_uring to communicate with ublk driver. * * For each request of /dev/ublkbN, the io daemon submits one * sqe for both fetching IO from ublk driver and commiting IO result * to ublk driver, and the io daemon has to issue all sqes * to /dev/ublkcN before sending START_DEV to /dev/udc-control. * * After STOP_DEV is sent to /dev/udc-control, udc driver needs * to freeze the request queue, and completes all pending sqes, * meantime tell the io daemon via cqe->res that don't issue seq * any more, also delete /dev/ublkbN. After io daemon figures out * all sqes have been free, exit itself. Then STOP_DEV returns. */ /* * If ublksrv queue is idle in the past 20 seconds, start to discard * pages mapped to io buffer via madivise(MADV_DONTNEED), so these * pages can be available for others without needing swap out */ #define UBLKSRV_IO_IDLE_SECS 20 static int __ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name, const struct ublksrv_tgt_type *ops, int type, int argc, char *argv[]) { struct ublksrv_tgt_info *tgt = &dev->tgt; int ret; if (!ops) return -EINVAL; if (strcmp(ops->name, type_name)) return -EINVAL; if (!ops->init_tgt) return -EINVAL; if (!ops->handle_io_async) return -EINVAL; if (!ops->alloc_io_buf ^ !ops->free_io_buf) return -EINVAL; optind = 0; /* so that we can parse our arguments */ tgt->ops = ops; if (!ublksrv_is_recovering(dev->ctrl_dev)) ret = ops->init_tgt(local_to_tdev(dev), type, argc, argv); else { if (ops->recovery_tgt) ret = ops->recovery_tgt(local_to_tdev(dev), type); else ret = -ENOTSUP; } if (ret) { tgt->ops = NULL; return ret; } return 0; } static int ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name, const struct ublksrv_tgt_type *ops, int argc, char *argv[]) { if (type_name == NULL) return -EINVAL; if (ops) return __ublksrv_tgt_init(dev, type_name, ops, ops->type, argc, argv); return -EINVAL; } static inline void ublksrv_tgt_exit(struct ublksrv_tgt_info *tgt) { int i; for (i = 1; i < tgt->nr_fds; i++) close(tgt->fds[i]); } static void ublksrv_tgt_deinit(struct _ublksrv_dev *dev) { struct ublksrv_tgt_info *tgt = &dev->tgt; ublksrv_tgt_exit(tgt); if (tgt->ops && tgt->ops->deinit_tgt) tgt->ops->deinit_tgt(local_to_tdev(dev)); } static inline int ublksrv_queue_io_cmd(struct _ublksrv_queue *q, struct ublk_io *io, unsigned tag) { struct ublksrv_io_cmd *cmd; struct io_uring_sqe *sqe; unsigned int cmd_op = 0; __u64 user_data; /* only freed io can be issued */ if (!(io->flags & UBLKSRV_IO_FREE)) return 0; /* we issue because we need either fetching or committing */ if (!(io->flags & (UBLKSRV_NEED_FETCH_RQ | UBLKSRV_NEED_GET_DATA | UBLKSRV_NEED_COMMIT_RQ_COMP))) return 0; if (io->flags & UBLKSRV_NEED_GET_DATA) cmd_op = UBLK_IO_NEED_GET_DATA; else if (io->flags & UBLKSRV_NEED_COMMIT_RQ_COMP) cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ; else if (io->flags & UBLKSRV_NEED_FETCH_RQ) cmd_op = UBLK_IO_FETCH_REQ; sqe = io_uring_get_sqe(&q->ring); if (!sqe) { ublk_err("%s: run out of sqe %d, tag %d\n", __func__, q->q_id, tag); return -1; } cmd = (struct ublksrv_io_cmd *)ublksrv_get_sqe_cmd(sqe); if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ) cmd->result = io->result; if (q->state & UBLKSRV_QUEUE_IOCTL_OP) cmd_op = _IOWR('u', _IOC_NR(cmd_op), struct ublksrv_io_cmd); /* These fields should be written once, never change */ ublksrv_set_sqe_cmd_op(sqe, cmd_op); sqe->fd = 0; /*dev->cdev_fd*/ sqe->opcode = IORING_OP_URING_CMD; sqe->flags = IOSQE_FIXED_FILE; sqe->rw_flags = 0; cmd->tag = tag; if (!(q->state & UBLKSRV_USER_COPY)) cmd->addr = (__u64)io->buf_addr; else cmd->addr = 0; cmd->q_id = q->q_id; user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, 0); io_uring_sqe_set_data64(sqe, user_data); io->flags = 0; q->cmd_inflight += 1; ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x stopping %d\n", __func__, q->q_id, tag, cmd_op, io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING)); return 1; } int ublksrv_complete_io(const struct ublksrv_queue *tq, unsigned tag, int res) { struct _ublksrv_queue *q = tq_to_local(tq); struct ublk_io *io = &q->ios[tag]; ublksrv_mark_io_done(io, res); return ublksrv_queue_io_cmd(q, io, tag); } /* * eventfd is treated as special target IO which has to be queued * when queue is setup */ static inline int __ublksrv_queue_event(struct _ublksrv_queue *q) { if (q->efd >= 0) { struct io_uring_sqe *sqe; __u64 user_data = build_eventfd_data(); if (q->state & UBLKSRV_QUEUE_STOPPING) return -EINVAL; sqe = io_uring_get_sqe(&q->ring); if (!sqe) { ublk_err("%s: queue %d run out of sqe\n", __func__, q->q_id); return -1; } io_uring_prep_poll_add(sqe, q->efd, POLLIN); io_uring_sqe_set_data64(sqe, user_data); } return 0; } /* * This API is supposed to be called in ->handle_event() after current * events are handled. */ int ublksrv_queue_handled_event(const struct ublksrv_queue *tq) { struct _ublksrv_queue *q = tq_to_local(tq); if (q->efd >= 0) { uint64_t data; const int cnt = sizeof(uint64_t); /* read has to be done, otherwise poll event won't be stopped */ if (read(q->efd, &data, cnt) != cnt) ublk_err("%s: read wrong bytes from eventfd\n", __func__); /* * event needs to be issued immediately, since other io may rely * it */ if (!__ublksrv_queue_event(q)) io_uring_submit_and_wait(&q->ring, 0); } return 0; } /* * Send event to io command uring context, so that the queue pthread * can be waken up for handling io, then ->handle_event() will be * called to notify target code. * * This API is usually called from other context. */ int ublksrv_queue_send_event(const struct ublksrv_queue *tq) { struct _ublksrv_queue *q = tq_to_local(tq); if (q->efd >= 0) { uint64_t data = 1; const int cnt = sizeof(uint64_t); if (write(q->efd, &data, cnt) != cnt) { ublk_err("%s: wrote wrong bytes to eventfd\n", __func__); return -EPIPE; } } return 0; } /* * Issue all available commands to /dev/ublkcN and the exact cmd is figured * out in queue_io_cmd with help of each io->status. * * todo: queue io commands with batching */ static void ublksrv_submit_fetch_commands(struct _ublksrv_queue *q) { int i = 0; for (i = 0; i < q->q_depth; i++) ublksrv_queue_io_cmd(q, &q->ios[i], i); __ublksrv_queue_event(q); } static int ublksrv_queue_is_done(struct _ublksrv_queue *q) { return (q->state & UBLKSRV_QUEUE_STOPPING) && !io_uring_sq_ready(&q->ring); } /* used for allocating zero copy vma space */ static inline int ublk_queue_single_io_buf_size(struct _ublksrv_dev *dev) { unsigned max_io_sz = dev->ctrl_dev->dev_info.max_io_buf_bytes; unsigned int page_sz = getpagesize(); return round_up(max_io_sz, page_sz); } static inline int ublk_queue_io_buf_size(struct _ublksrv_dev *dev) { unsigned depth = dev->ctrl_dev->dev_info.queue_depth; return ublk_queue_single_io_buf_size(dev) * depth; } static inline int ublk_io_buf_size(struct _ublksrv_dev *dev) { unsigned nr_queues = dev->ctrl_dev->dev_info.nr_hw_queues; return ublk_queue_io_buf_size(dev) * nr_queues; } static int ublksrv_queue_cmd_buf_sz(struct _ublksrv_queue *q) { int size = q->q_depth * sizeof(struct ublksrv_io_desc); unsigned int page_sz = getpagesize(); return round_up(size, page_sz); } int ublksrv_queue_unconsumed_cqes(const struct ublksrv_queue *tq) { if (tq->ring_ptr) return io_uring_cq_ready(tq->ring_ptr); return -1; } void ublksrv_queue_deinit(const struct ublksrv_queue *tq) { struct _ublksrv_queue *q = tq_to_local(tq); int i; int nr_ios = q->dev->tgt.extra_ios + q->q_depth; if (q->dev->tgt.ops->deinit_queue) q->dev->tgt.ops->deinit_queue(tq); if (q->efd >= 0) close(q->efd); io_uring_unregister_ring_fd(&q->ring); if (q->ring.ring_fd > 0) { io_uring_unregister_files(&q->ring); close(q->ring.ring_fd); q->ring.ring_fd = -1; } if (q->io_cmd_buf) { munmap(q->io_cmd_buf, ublksrv_queue_cmd_buf_sz(q)); q->io_cmd_buf = NULL; } for (i = 0; i < nr_ios; i++) { if (q->ios[i].buf_addr) { if (q->dev->tgt.ops->free_io_buf) q->dev->tgt.ops->free_io_buf(tq, q->ios[i].buf_addr, i); else free(q->ios[i].buf_addr); q->ios[i].buf_addr = NULL; } free(q->ios[i].data.private_data); } q->dev->__queues[q->q_id] = NULL; free(q); } void ublksrv_build_cpu_str(char *buf, int len, const cpu_set_t *cpuset) { int nr_cores = sysconf(_SC_NPROCESSORS_ONLN); int i, offset = 0; for (i = 0; i < nr_cores; i++) { int n; if (!CPU_ISSET(i, cpuset)) continue; n = snprintf(&buf[offset], len - offset, "%d ", i); if (n < 0 || n >= len - offset) break; offset += n; } } static void ublksrv_set_sched_affinity(struct _ublksrv_dev *dev, unsigned short q_id) { const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev; unsigned dev_id = cdev->dev_info.dev_id; cpu_set_t *cpuset = ublksrv_get_queue_affinity(cdev, q_id); pthread_t thread = pthread_self(); int ret; ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), cpuset); if (ret) ublk_err("ublk dev %u queue %u set affinity failed", dev_id, q_id); } static void ublksrv_kill_eventfd(struct _ublksrv_queue *q) { if ((q->state & UBLKSRV_QUEUE_STOPPING) && q->efd >= 0) { uint64_t data = 1; int ret; ret = write(q->efd, &data, sizeof(uint64_t)); if (ret != sizeof(uint64_t)) ublk_err("%s:%d write fail %d/%zu\n", __func__, __LINE__, ret, sizeof(uint64_t)); } } /* * Return eventfs or negative errno */ static int ublksrv_setup_eventfd(struct _ublksrv_queue *q) { const struct ublksrv_ctrl_dev_info *info = &q->dev->ctrl_dev->dev_info; if (!(info->ublksrv_flags & UBLKSRV_F_NEED_EVENTFD)) { q->efd = -1; return 0; } if (q->dev->tgt.tgt_ring_depth == 0) { ublk_err("ublk dev %d queue %d zero tgt queue depth", info->dev_id, q->q_id); return -EINVAL; } if (!q->dev->tgt.ops->handle_event) { ublk_err("ublk dev %d/%d not define ->handle_event", info->dev_id, q->q_id); return -EINVAL; } q->efd = eventfd(0, 0); if (q->efd < 0) return -errno; return 0; } static void ublksrv_queue_adjust_uring_io_wq_workers(struct _ublksrv_queue *q) { struct _ublksrv_dev *dev = q->dev; unsigned int val[2] = {0, 0}; int ret; if (!dev->tgt.iowq_max_workers[0] && !dev->tgt.iowq_max_workers[1]) return; ret = io_uring_register_iowq_max_workers(&q->ring, val); if (ret) ublk_err("%s: register iowq max workers failed %d\n", __func__, ret); if (!dev->tgt.iowq_max_workers[0]) dev->tgt.iowq_max_workers[0] = val[0]; if (!dev->tgt.iowq_max_workers[1]) dev->tgt.iowq_max_workers[1] = val[1]; ret = io_uring_register_iowq_max_workers(&q->ring, dev->tgt.iowq_max_workers); if (ret) ublk_err("%s: register iowq max workers failed %d\n", __func__, ret); } static void ublksrv_calculate_depths(const struct _ublksrv_dev *dev, int *ring_depth, int *cq_depth, int *nr_ios) { const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev; /* * eventfd consumes one extra sqe, and it can be thought as one target * depth */ int aio_depth = (cdev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_EVENTFD) ? 1 : 0; int depth = cdev->dev_info.queue_depth; int tgt_depth = dev->tgt.tgt_ring_depth + aio_depth; *nr_ios = depth + dev->tgt.extra_ios; /* * queue_depth represents the max count of io commands issued from ublk driver. * * After io command is fetched from ublk driver, the consumed sqe for * fetching io command has been available for target usage, so the uring * depth can be set as the max(queue_depth, tgt_depth). */ depth = depth > tgt_depth ? depth : tgt_depth; *ring_depth = depth; *cq_depth = dev->cq_depth ? dev->cq_depth : depth; } const struct ublksrv_queue *ublksrv_queue_init(const struct ublksrv_dev *tdev, unsigned short q_id, void *queue_data) { struct _ublksrv_dev *dev = tdev_to_local(tdev); struct _ublksrv_queue *q; const struct ublksrv_ctrl_dev *ctrl_dev = dev->ctrl_dev; int depth = ctrl_dev->dev_info.queue_depth; int i, ret = -1; int cmd_buf_size, io_buf_size; unsigned long off; int io_data_size = round_up(dev->tgt.io_data_size, sizeof(unsigned long)); int ring_depth, cq_depth, nr_ios; ublksrv_calculate_depths(dev, &ring_depth, &cq_depth, &nr_ios); /* * Too many extra ios */ if (nr_ios > depth * 3) return NULL; q = (struct _ublksrv_queue *)malloc(sizeof(struct _ublksrv_queue) + sizeof(struct ublk_io) * nr_ios); dev->__queues[q_id] = q; q->tgt_ops = dev->tgt.ops; //cache ops for fast path q->dev = dev; if (ctrl_dev->dev_info.flags & UBLK_F_CMD_IOCTL_ENCODE) q->state = UBLKSRV_QUEUE_IOCTL_OP; else q->state = 0; if (ctrl_dev->dev_info.flags & UBLK_F_USER_COPY) q->state |= UBLKSRV_USER_COPY; q->q_id = q_id; /* FIXME: depth has to be PO 2 */ q->q_depth = depth; q->io_cmd_buf = NULL; q->cmd_inflight = 0; q->tid = ublksrv_gettid(); cmd_buf_size = ublksrv_queue_cmd_buf_sz(q); off = UBLKSRV_CMD_BUF_OFFSET + q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc)); q->io_cmd_buf = (char *)mmap(0, cmd_buf_size, PROT_READ, MAP_SHARED | MAP_POPULATE, dev->cdev_fd, off); if (q->io_cmd_buf == MAP_FAILED) { ublk_err("ublk dev %d queue %d map io_cmd_buf failed", q->dev->ctrl_dev->dev_info.dev_id, q->q_id); goto fail; } io_buf_size = ctrl_dev->dev_info.max_io_buf_bytes; for (i = 0; i < nr_ios; i++) { q->ios[i].buf_addr = NULL; /* extra ios needn't to allocate io buffer */ if (i >= q->q_depth) goto skip_alloc_buf; if (dev->tgt.ops->alloc_io_buf) q->ios[i].buf_addr = dev->tgt.ops->alloc_io_buf(local_to_tq(q), i, io_buf_size); else if (posix_memalign((void **)&q->ios[i].buf_addr, getpagesize(), io_buf_size)) { ublk_err("ublk dev %d queue %d io %d posix_memalign failed", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i); goto fail; } //q->ios[i].buf_addr = malloc(io_buf_size); if (!q->ios[i].buf_addr) { ublk_err("ublk dev %d queue %d io %d alloc io_buf failed", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i); goto fail; } skip_alloc_buf: q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE; q->ios[i].data.private_data = malloc(io_data_size); q->ios[i].data.tag = i; if (i < q->q_depth) q->ios[i].data.iod = ublksrv_get_iod(q, i); else q->ios[i].data.iod = NULL; //ublk_assert(io_data_size ^ (unsigned long)q->ios[i].data.private_data); } ret = ublksrv_setup_ring(&q->ring, ring_depth, cq_depth, IORING_SETUP_SQE128 | IORING_SETUP_COOP_TASKRUN); if (ret < 0) { ublk_err("ublk dev %d queue %d setup io_uring failed %d", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret); goto fail; } q->ring_ptr = &q->ring; ret = io_uring_register_files(&q->ring, dev->tgt.fds, dev->tgt.nr_fds + 1); if (ret) { ublk_err("ublk dev %d queue %d register files failed %d", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret); goto fail; } io_uring_register_ring_fd(&q->ring); /* * N.B. PR_SET_IO_FLUSHER was added with Linux 5.6+. */ #if defined(PR_SET_IO_FLUSHER) if (prctl(PR_SET_IO_FLUSHER, 0, 0, 0, 0) != 0) ublk_err("ublk dev %d queue %d set_io_flusher failed", q->dev->ctrl_dev->dev_info.dev_id, q->q_id); #endif ublksrv_queue_adjust_uring_io_wq_workers(q); q->private_data = queue_data; if (ctrl_dev->tgt_ops->init_queue) { if (ctrl_dev->tgt_ops->init_queue(local_to_tq(q), &q->private_data)) goto fail; } if (ctrl_dev->queues_cpuset) ublksrv_set_sched_affinity(dev, q_id); setpriority(PRIO_PROCESS, getpid(), -20); ret = ublksrv_setup_eventfd(q); if (ret < 0) { ublk_err("ublk dev %d queue %d setup eventfd failed: %s", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, strerror(-ret)); goto fail; } /* submit all io commands to ublk driver */ ublksrv_submit_fetch_commands(q); return (struct ublksrv_queue *)q; fail: ublksrv_queue_deinit(local_to_tq(q)); ublk_err("ublk dev %d queue %d failed", ctrl_dev->dev_info.dev_id, q_id); return NULL; } static int ublksrv_create_pid_file(struct _ublksrv_dev *dev) { int dev_id = dev->ctrl_dev->dev_info.dev_id; char pid_file[64]; int ret, pid_fd; if (!dev->ctrl_dev->run_dir) return 0; /* create pid file and lock it, so that others can't */ snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id); ret = create_pid_file(pid_file, &pid_fd); if (ret < 0) { /* -1 means the file is locked, and we need to remove it */ if (ret == -1) { close(pid_fd); unlink(pid_file); } return ret; } dev->pid_file_fd = pid_fd; return 0; } static void ublksrv_remove_pid_file(const struct _ublksrv_dev *dev) { int dev_id = dev->ctrl_dev->dev_info.dev_id; char pid_file[64]; if (!dev->ctrl_dev->run_dir) return; close(dev->pid_file_fd); snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id); unlink(pid_file); } void ublksrv_dev_deinit(const struct ublksrv_dev *tdev) { struct _ublksrv_dev *dev = tdev_to_local(tdev); ublksrv_remove_pid_file(dev); ublksrv_tgt_deinit(dev); free(dev->thread); if (dev->cdev_fd >= 0) { close(dev->cdev_fd); dev->cdev_fd = -1; } free(dev); } const struct ublksrv_dev *ublksrv_dev_init(const struct ublksrv_ctrl_dev *ctrl_dev) { int dev_id = ctrl_dev->dev_info.dev_id; char buf[64]; int ret = -1; struct _ublksrv_dev *dev = (struct _ublksrv_dev *)calloc(1, sizeof(*dev)); struct ublksrv_tgt_info *tgt; if (!dev) return local_to_tdev(dev); tgt = &dev->tgt; dev->ctrl_dev = ctrl_dev; dev->cdev_fd = -1; snprintf(buf, 64, "%s%d", UBLKC_DEV, dev_id); dev->cdev_fd = open(buf, O_RDWR | O_NONBLOCK); if (dev->cdev_fd < 0) { ublk_err("can't open %s, ret %d\n", buf, dev->cdev_fd); goto fail; } tgt->fds[0] = dev->cdev_fd; ret = ublksrv_tgt_init(dev, ctrl_dev->tgt_type, ctrl_dev->tgt_ops, ctrl_dev->tgt_argc, ctrl_dev->tgt_argv); if (ret) { ublk_err( "can't init tgt %d/%s/%d, ret %d\n", dev_id, ctrl_dev->tgt_type, ctrl_dev->tgt_argc, ret); goto fail; } ret = ublksrv_create_pid_file(dev); if (ret) { ublk_err( "can't create pid file for dev %d, ret %d\n", dev_id, ret); goto fail; } return local_to_tdev(dev); fail: ublksrv_dev_deinit(local_to_tdev(dev)); return NULL; } /* Be careful, target io may not have one ublk_io associated with */ static inline void ublksrv_handle_tgt_cqe(struct _ublksrv_queue *q, struct io_uring_cqe *cqe) { unsigned tag = user_data_to_tag(cqe->user_data); if (cqe->res < 0 && cqe->res != -EAGAIN) { ublk_err("%s: failed tgt io: res %d qid %u tag %u, cmd_op %u\n", __func__, cqe->res, q->q_id, user_data_to_tag(cqe->user_data), user_data_to_op(cqe->user_data)); } if (is_eventfd_io(cqe->user_data)) { if (q->tgt_ops->handle_event) q->tgt_ops->handle_event(local_to_tq(q)); } else { if (q->tgt_ops->tgt_io_done) q->tgt_ops->tgt_io_done(local_to_tq(q), &q->ios[tag].data, cqe); } } static void ublksrv_handle_cqe(struct io_uring *r, struct io_uring_cqe *cqe, void *data) { struct _ublksrv_queue *q = container_of(r, struct _ublksrv_queue, ring); unsigned tag = user_data_to_tag(cqe->user_data); unsigned cmd_op = user_data_to_op(cqe->user_data); int fetch = (cqe->res != UBLK_IO_RES_ABORT) && !(q->state & UBLKSRV_QUEUE_STOPPING); struct ublk_io *io; ublk_dbg(UBLK_DBG_IO_CMD, "%s: res %d (qid %d tag %u cmd_op %u target %d event %d) stopping %d\n", __func__, cqe->res, q->q_id, tag, cmd_op, is_target_io(cqe->user_data), is_eventfd_io(cqe->user_data), (q->state & UBLKSRV_QUEUE_STOPPING)); /* Don't retrieve io in case of target io */ if (is_target_io(cqe->user_data)) { ublksrv_handle_tgt_cqe(q, cqe); return; } io = &q->ios[tag]; q->cmd_inflight--; if (!fetch) { q->state |= UBLKSRV_QUEUE_STOPPING; io->flags &= ~UBLKSRV_NEED_FETCH_RQ; } /* * So far, only sync tgt's io handling is implemented. * * todo: support async tgt io handling via io_uring, and the ublksrv * daemon can poll on both two rings. */ if (cqe->res == UBLK_IO_RES_OK) { //ublk_assert(tag < q->q_depth); q->tgt_ops->handle_io_async(local_to_tq(q), &io->data); } else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) { io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE; ublksrv_queue_io_cmd(q, io, tag); } else { /* * COMMIT_REQ will be completed immediately since no fetching * piggyback is required. * * Marking IO_FREE only, then this io won't be issued since * we only issue io with (UBLKSRV_IO_FREE | UBLKSRV_NEED_*) * * */ io->flags = UBLKSRV_IO_FREE; } } static int ublksrv_reap_events_uring(struct io_uring *r) { struct io_uring_cqe *cqe; unsigned head; int count = 0; io_uring_for_each_cqe(r, head, cqe) { ublksrv_handle_cqe(r, cqe, NULL); count += 1; } io_uring_cq_advance(r, count); return count; } static void ublksrv_queue_discard_io_pages(struct _ublksrv_queue *q) { const struct ublksrv_ctrl_dev *cdev = q->dev->ctrl_dev; unsigned int io_buf_size = cdev->dev_info.max_io_buf_bytes; int i = 0; for (i = 0; i < q->q_depth; i++) madvise(q->ios[i].buf_addr, io_buf_size, MADV_DONTNEED); } static void ublksrv_queue_idle_enter(struct _ublksrv_queue *q) { if (q->state & UBLKSRV_QUEUE_IDLE) return; ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: enter idle %x\n", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state); ublksrv_queue_discard_io_pages(q); q->state |= UBLKSRV_QUEUE_IDLE; if (q->tgt_ops->idle_fn) q->tgt_ops->idle_fn(local_to_tq(q), true); } static inline void ublksrv_queue_idle_exit(struct _ublksrv_queue *q) { if (q->state & UBLKSRV_QUEUE_IDLE) { ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: exit idle %x\n", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state); q->state &= ~UBLKSRV_QUEUE_IDLE; if (q->tgt_ops->idle_fn) q->tgt_ops->idle_fn(local_to_tq(q), false); } } static void ublksrv_reset_aio_batch(struct _ublksrv_queue *q) { q->nr_ctxs = 0; } static void ublksrv_submit_aio_batch(struct _ublksrv_queue *q) { int i; for (i = 0; i < q->nr_ctxs; i++) { struct ublksrv_aio_ctx *ctx = q->ctxs[i]; uint64_t data = 1; int ret; ret = write(ctx->efd, &data, sizeof(uint64_t)); if (ret != sizeof(uint64_t)) ublk_err("%s:%d write fail ctx[%d]: %d/%zu\n", __func__, __LINE__, i, ret, sizeof(uint64_t)); } } int ublksrv_process_io(const struct ublksrv_queue *tq) { struct _ublksrv_queue *q = tq_to_local(tq); int ret, reapped; struct __kernel_timespec ts = { .tv_sec = UBLKSRV_IO_IDLE_SECS, .tv_nsec = 0 }; struct __kernel_timespec *tsp = (q->state & UBLKSRV_QUEUE_IDLE) ? NULL : &ts; struct io_uring_cqe *cqe; ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight %u/%u stopping %d\n", q->dev->ctrl_dev->dev_info.dev_id, q->q_id, io_uring_sq_ready(&q->ring), q->cmd_inflight, q->tgt_io_inflight, (q->state & UBLKSRV_QUEUE_STOPPING)); if (ublksrv_queue_is_done(q)) return -ENODEV; ret = io_uring_submit_and_wait_timeout(&q->ring, &cqe, 1, tsp, NULL); ublksrv_reset_aio_batch(q); reapped = ublksrv_reap_events_uring(&q->ring); ublksrv_submit_aio_batch(q); if (q->tgt_ops->handle_io_background) q->tgt_ops->handle_io_background(local_to_tq(q), io_uring_sq_ready(&q->ring)); ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle %d", ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING), (q->state & UBLKSRV_QUEUE_IDLE)); if ((q->state & UBLKSRV_QUEUE_STOPPING)) ublksrv_kill_eventfd(q); else { if (ret == -ETIME && reapped == 0 && !io_uring_sq_ready(&q->ring)) ublksrv_queue_idle_enter(q); else ublksrv_queue_idle_exit(q); } return reapped; } const struct ublksrv_queue *ublksrv_get_queue(const struct ublksrv_dev *dev, int q_id) { return (const struct ublksrv_queue *)tdev_to_local(dev)->__queues[q_id]; } /* called in ublksrv process context */ void ublksrv_apply_oom_protection() { char oom_score_adj_path[64]; pid_t pid = getpid(); int fd; snprintf(oom_score_adj_path, 64, "/proc/%d/oom_score_adj", pid); fd = open(oom_score_adj_path, O_RDWR); if (fd > 0) { char val[32]; int len, ret; len = snprintf(val, 32, "%d", -1000); ret = write(fd, val, len); if (ret != len) ublk_err("%s:%d write fail %d/%d\n", __func__, __LINE__, ret, len); close(fd); } } const struct ublksrv_ctrl_dev *ublksrv_get_ctrl_dev( const struct ublksrv_dev *dev) { return tdev_to_local(dev)->ctrl_dev; } int ublksrv_get_pidfile_fd(const struct ublksrv_dev *dev) { return tdev_to_local(dev)->pid_file_fd; } void *ublksrv_io_private_data(const struct ublksrv_queue *tq, int tag) { struct _ublksrv_queue *q = tq_to_local(tq); return q->ios[tag].data.private_data; } unsigned int ublksrv_queue_state(const struct ublksrv_queue *q) { return tq_to_local(q)->state; } const struct ublk_io_data * ublksrv_queue_get_io_data(const struct ublksrv_queue *tq, int tag) { struct _ublksrv_queue *q = tq_to_local(tq); return &q->ios[tag].data; } void *ublksrv_queue_get_io_buf(const struct ublksrv_queue *tq, int tag) { struct _ublksrv_queue *q = tq_to_local(tq); if (tag < q->q_depth) return q->ios[tag].buf_addr; return NULL; } /* * The default io_uring cq depth equals to queue depth plus * .tgt_ring_depth, which is usually enough for typical ublk targets, * such as loop and qcow2, but it may not be enough for nbd with send_zc * which needs extra cqe for buffer notification. * * So add API to allow target to override default io_uring cq depth. */ void ublksrv_dev_set_cq_depth(struct ublksrv_dev *tdev, int cq_depth) { tdev_to_local(tdev)->cq_depth = cq_depth; } int ublksrv_dev_get_cq_depth(struct ublksrv_dev *tdev) { return tdev_to_local(tdev)->cq_depth; }