xref: /aosp_15_r20/external/ublksrv/lib/ublksrv.c (revision 94c4a1e103eb1715230460aab379dff275992c20)
1 // SPDX-License-Identifier: MIT or LGPL-2.1-only
2 
3 #include <config.h>
4 #include <sys/mman.h>
5 #include <sys/time.h>
6 #include <sys/resource.h>
7 
8 #include "ublksrv_priv.h"
9 #include "ublksrv_aio.h"
10 
ublksrv_is_recovering(const struct ublksrv_ctrl_dev * ctrl_dev)11 bool ublksrv_is_recovering(const struct ublksrv_ctrl_dev *ctrl_dev)
12 {
13 	return ctrl_dev->tgt_argc == -1;
14 }
15 
ublksrv_get_iod(const struct _ublksrv_queue * q,int tag)16 static inline struct ublksrv_io_desc *ublksrv_get_iod(
17 		const struct _ublksrv_queue *q, int tag)
18 {
19         return (struct ublksrv_io_desc *)
20                 &(q->io_cmd_buf[tag * sizeof(struct ublksrv_io_desc)]);
21 }
22 
23 /*
24  * /dev/ublkbN shares same lifetime with the ublk io daemon:
25  *
26  * 1) IO from /dev/ublkbN is handled by the io daemon directly
27  *
28  * 2) io cmd buffer is allocated from ublk driver, mapped to
29  * io daemon vm space via mmap, and each hw queue has its own
30  * io cmd buffer
31  *
32  * 3) io buffers are pre-allocated from the io daemon and pass
33  * to ublk driver via io command, meantime ublk driver may choose
34  * to pin these user pages before starting device
35  *
36  * Each /dev/ublkcN is owned by only one io daemon, and can't be
37  * opened by other daemon. And the io daemon uses its allocated
38  * io_uring to communicate with ublk driver.
39  *
40  * For each request of /dev/ublkbN, the io daemon submits one
41  * sqe for both fetching IO from ublk driver and commiting IO result
42  * to ublk driver, and the io daemon has to issue all sqes
43  * to /dev/ublkcN before sending START_DEV to /dev/udc-control.
44  *
45  * After STOP_DEV is sent to /dev/udc-control, udc driver needs
46  * to freeze the request queue, and completes all pending sqes,
47  * meantime tell the io daemon via cqe->res that don't issue seq
48  * any more, also delete /dev/ublkbN.  After io daemon figures out
49  * all sqes have been free, exit itself. Then STOP_DEV returns.
50  */
51 
52 /*
53  * If ublksrv queue is idle in the past 20 seconds, start to discard
54  * pages mapped to io buffer via madivise(MADV_DONTNEED), so these
55  * pages can be available for others without needing swap out
56  */
57 #define UBLKSRV_IO_IDLE_SECS    20
58 
__ublksrv_tgt_init(struct _ublksrv_dev * dev,const char * type_name,const struct ublksrv_tgt_type * ops,int type,int argc,char * argv[])59 static int __ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name,
60 		const struct ublksrv_tgt_type *ops, int type,
61 		int argc, char *argv[])
62 {
63 	struct ublksrv_tgt_info *tgt = &dev->tgt;
64 	int ret;
65 
66 	if (!ops)
67 		return -EINVAL;
68 
69 	if (strcmp(ops->name, type_name))
70 		return -EINVAL;
71 
72 	if (!ops->init_tgt)
73 		return -EINVAL;
74 	if (!ops->handle_io_async)
75 		return -EINVAL;
76 	if (!ops->alloc_io_buf ^ !ops->free_io_buf)
77 		return -EINVAL;
78 
79 	optind = 0;     /* so that we can parse our arguments */
80 	tgt->ops = ops;
81 
82 	if (!ublksrv_is_recovering(dev->ctrl_dev))
83 		ret = ops->init_tgt(local_to_tdev(dev), type, argc, argv);
84 	else {
85 		if (ops->recovery_tgt)
86 			ret = ops->recovery_tgt(local_to_tdev(dev), type);
87 		else
88 			ret = -ENOTSUP;
89 	}
90 	if (ret) {
91 		tgt->ops = NULL;
92 		return ret;
93 	}
94 	return 0;
95 }
96 
ublksrv_tgt_init(struct _ublksrv_dev * dev,const char * type_name,const struct ublksrv_tgt_type * ops,int argc,char * argv[])97 static int ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name,
98 		const struct ublksrv_tgt_type *ops,
99 		int argc, char *argv[])
100 {
101 	if (type_name == NULL)
102 		return -EINVAL;
103 
104 	if (ops)
105 		return __ublksrv_tgt_init(dev, type_name, ops,
106 				ops->type, argc, argv);
107 
108 	return -EINVAL;
109 }
110 
ublksrv_tgt_exit(struct ublksrv_tgt_info * tgt)111 static inline void ublksrv_tgt_exit(struct ublksrv_tgt_info *tgt)
112 {
113 	int i;
114 
115 	for (i = 1; i < tgt->nr_fds; i++)
116 		close(tgt->fds[i]);
117 }
118 
ublksrv_tgt_deinit(struct _ublksrv_dev * dev)119 static void ublksrv_tgt_deinit(struct _ublksrv_dev *dev)
120 {
121 	struct ublksrv_tgt_info *tgt = &dev->tgt;
122 
123 	ublksrv_tgt_exit(tgt);
124 
125 	if (tgt->ops && tgt->ops->deinit_tgt)
126 		tgt->ops->deinit_tgt(local_to_tdev(dev));
127 }
128 
ublksrv_queue_io_cmd(struct _ublksrv_queue * q,struct ublk_io * io,unsigned tag)129 static inline int ublksrv_queue_io_cmd(struct _ublksrv_queue *q,
130 		struct ublk_io *io, unsigned tag)
131 {
132 	struct ublksrv_io_cmd *cmd;
133 	struct io_uring_sqe *sqe;
134 	unsigned int cmd_op = 0;
135 	__u64 user_data;
136 
137 	/* only freed io can be issued */
138 	if (!(io->flags & UBLKSRV_IO_FREE))
139 		return 0;
140 
141 	/* we issue because we need either fetching or committing */
142 	if (!(io->flags &
143 		(UBLKSRV_NEED_FETCH_RQ | UBLKSRV_NEED_GET_DATA |
144 		 UBLKSRV_NEED_COMMIT_RQ_COMP)))
145 		return 0;
146 
147 	if (io->flags & UBLKSRV_NEED_GET_DATA)
148 		cmd_op = UBLK_IO_NEED_GET_DATA;
149 	else if (io->flags & UBLKSRV_NEED_COMMIT_RQ_COMP)
150 		cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
151 	else if (io->flags & UBLKSRV_NEED_FETCH_RQ)
152 		cmd_op = UBLK_IO_FETCH_REQ;
153 
154 	sqe = io_uring_get_sqe(&q->ring);
155 	if (!sqe) {
156 		ublk_err("%s: run out of sqe %d, tag %d\n",
157 				__func__, q->q_id, tag);
158 		return -1;
159 	}
160 
161 	cmd = (struct ublksrv_io_cmd *)ublksrv_get_sqe_cmd(sqe);
162 
163 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ)
164 		cmd->result = io->result;
165 
166 	if (q->state & UBLKSRV_QUEUE_IOCTL_OP)
167 		cmd_op = _IOWR('u', _IOC_NR(cmd_op), struct ublksrv_io_cmd);
168 
169 	/* These fields should be written once, never change */
170 	ublksrv_set_sqe_cmd_op(sqe, cmd_op);
171 	sqe->fd		= 0;	/*dev->cdev_fd*/
172 	sqe->opcode	=  IORING_OP_URING_CMD;
173 	sqe->flags	= IOSQE_FIXED_FILE;
174 	sqe->rw_flags	= 0;
175 	cmd->tag	= tag;
176 	if (!(q->state & UBLKSRV_USER_COPY))
177 		cmd->addr	= (__u64)io->buf_addr;
178 	else
179 		cmd->addr	= 0;
180 	cmd->q_id	= q->q_id;
181 
182 	user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, 0);
183 	io_uring_sqe_set_data64(sqe, user_data);
184 
185 	io->flags = 0;
186 
187 	q->cmd_inflight += 1;
188 
189 	ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x stopping %d\n",
190 			__func__, q->q_id, tag, cmd_op,
191 			io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING));
192 	return 1;
193 }
194 
ublksrv_complete_io(const struct ublksrv_queue * tq,unsigned tag,int res)195 int ublksrv_complete_io(const struct ublksrv_queue *tq, unsigned tag, int res)
196 {
197 	struct _ublksrv_queue *q = tq_to_local(tq);
198 
199 	struct ublk_io *io = &q->ios[tag];
200 
201 	ublksrv_mark_io_done(io, res);
202 
203 	return ublksrv_queue_io_cmd(q, io, tag);
204 }
205 
206 /*
207  * eventfd is treated as special target IO which has to be queued
208  * when queue is setup
209  */
__ublksrv_queue_event(struct _ublksrv_queue * q)210 static inline int __ublksrv_queue_event(struct _ublksrv_queue *q)
211 {
212 	if (q->efd >= 0) {
213 		struct io_uring_sqe *sqe;
214 		__u64 user_data = build_eventfd_data();
215 
216 		if (q->state & UBLKSRV_QUEUE_STOPPING)
217 			return -EINVAL;
218 
219 		sqe = io_uring_get_sqe(&q->ring);
220 		if (!sqe) {
221 			ublk_err("%s: queue %d run out of sqe\n",
222 				__func__, q->q_id);
223 			return -1;
224 		}
225 
226 		io_uring_prep_poll_add(sqe, q->efd, POLLIN);
227 		io_uring_sqe_set_data64(sqe, user_data);
228 	}
229 	return 0;
230 }
231 
232 /*
233  * This API is supposed to be called in ->handle_event() after current
234  * events are handled.
235  */
ublksrv_queue_handled_event(const struct ublksrv_queue * tq)236 int ublksrv_queue_handled_event(const struct ublksrv_queue *tq)
237 {
238 	struct _ublksrv_queue *q = tq_to_local(tq);
239 
240 	if (q->efd >= 0) {
241 		uint64_t data;
242 		const int cnt = sizeof(uint64_t);
243 
244 		/* read has to be done, otherwise poll event won't be stopped */
245 		if (read(q->efd, &data, cnt) != cnt)
246 			ublk_err("%s: read wrong bytes from eventfd\n",
247 					__func__);
248 		/*
249 		 * event needs to be issued immediately, since other io may rely
250 		 * it
251 		 */
252 		if (!__ublksrv_queue_event(q))
253 			io_uring_submit_and_wait(&q->ring, 0);
254 	}
255 	return 0;
256 }
257 
258 /*
259  * Send event to io command uring context, so that the queue pthread
260  * can be waken up for handling io, then ->handle_event() will be
261  * called to notify target code.
262  *
263  * This API is usually called from other context.
264  */
ublksrv_queue_send_event(const struct ublksrv_queue * tq)265 int ublksrv_queue_send_event(const struct ublksrv_queue *tq)
266 {
267 	struct _ublksrv_queue *q = tq_to_local(tq);
268 
269 	if (q->efd >= 0) {
270 		uint64_t data = 1;
271 		const int cnt = sizeof(uint64_t);
272 
273 		if (write(q->efd, &data, cnt) != cnt) {
274 			ublk_err("%s: wrote wrong bytes to eventfd\n",
275 					__func__);
276 			return -EPIPE;
277 		}
278 	}
279 	return 0;
280 }
281 
282 /*
283  * Issue all available commands to /dev/ublkcN  and the exact cmd is figured
284  * out in queue_io_cmd with help of each io->status.
285  *
286  * todo: queue io commands with batching
287  */
ublksrv_submit_fetch_commands(struct _ublksrv_queue * q)288 static void ublksrv_submit_fetch_commands(struct _ublksrv_queue *q)
289 {
290 	int i = 0;
291 
292 	for (i = 0; i < q->q_depth; i++)
293 		ublksrv_queue_io_cmd(q, &q->ios[i], i);
294 
295 	__ublksrv_queue_event(q);
296 }
297 
ublksrv_queue_is_done(struct _ublksrv_queue * q)298 static int ublksrv_queue_is_done(struct _ublksrv_queue *q)
299 {
300 	return (q->state & UBLKSRV_QUEUE_STOPPING) &&
301 		!io_uring_sq_ready(&q->ring);
302 }
303 
304 /* used for allocating zero copy vma space */
ublk_queue_single_io_buf_size(struct _ublksrv_dev * dev)305 static inline int ublk_queue_single_io_buf_size(struct _ublksrv_dev *dev)
306 {
307 	unsigned max_io_sz = dev->ctrl_dev->dev_info.max_io_buf_bytes;
308 	unsigned int page_sz = getpagesize();
309 
310 	return round_up(max_io_sz, page_sz);
311 }
ublk_queue_io_buf_size(struct _ublksrv_dev * dev)312 static inline int ublk_queue_io_buf_size(struct _ublksrv_dev *dev)
313 {
314 	unsigned depth = dev->ctrl_dev->dev_info.queue_depth;
315 
316 	return ublk_queue_single_io_buf_size(dev) * depth;
317 }
ublk_io_buf_size(struct _ublksrv_dev * dev)318 static inline int ublk_io_buf_size(struct _ublksrv_dev *dev)
319 {
320 	unsigned nr_queues = dev->ctrl_dev->dev_info.nr_hw_queues;
321 
322 	return ublk_queue_io_buf_size(dev) * nr_queues;
323 }
324 
ublksrv_queue_cmd_buf_sz(struct _ublksrv_queue * q)325 static int ublksrv_queue_cmd_buf_sz(struct _ublksrv_queue *q)
326 {
327 	int size =  q->q_depth * sizeof(struct ublksrv_io_desc);
328 	unsigned int page_sz = getpagesize();
329 
330 	return round_up(size, page_sz);
331 }
332 
ublksrv_queue_unconsumed_cqes(const struct ublksrv_queue * tq)333 int ublksrv_queue_unconsumed_cqes(const struct ublksrv_queue *tq)
334 {
335 	if (tq->ring_ptr)
336 		return io_uring_cq_ready(tq->ring_ptr);
337 
338 	return -1;
339 }
340 
ublksrv_queue_deinit(const struct ublksrv_queue * tq)341 void ublksrv_queue_deinit(const struct ublksrv_queue *tq)
342 {
343 	struct _ublksrv_queue *q = tq_to_local(tq);
344 	int i;
345 	int nr_ios = q->dev->tgt.extra_ios + q->q_depth;
346 
347 	if (q->dev->tgt.ops->deinit_queue)
348 		q->dev->tgt.ops->deinit_queue(tq);
349 
350 	if (q->efd >= 0)
351 		close(q->efd);
352 
353 	io_uring_unregister_ring_fd(&q->ring);
354 
355 	if (q->ring.ring_fd > 0) {
356 		io_uring_unregister_files(&q->ring);
357 		close(q->ring.ring_fd);
358 		q->ring.ring_fd = -1;
359 	}
360 	if (q->io_cmd_buf) {
361 		munmap(q->io_cmd_buf, ublksrv_queue_cmd_buf_sz(q));
362 		q->io_cmd_buf = NULL;
363 	}
364 	for (i = 0; i < nr_ios; i++) {
365 		if (q->ios[i].buf_addr) {
366 			if (q->dev->tgt.ops->free_io_buf)
367 				q->dev->tgt.ops->free_io_buf(tq,
368 						q->ios[i].buf_addr, i);
369 			else
370 				free(q->ios[i].buf_addr);
371 			q->ios[i].buf_addr = NULL;
372 		}
373 		free(q->ios[i].data.private_data);
374 	}
375 	q->dev->__queues[q->q_id] = NULL;
376 	free(q);
377 
378 }
379 
ublksrv_build_cpu_str(char * buf,int len,const cpu_set_t * cpuset)380 void ublksrv_build_cpu_str(char *buf, int len, const cpu_set_t *cpuset)
381 {
382 	int nr_cores = sysconf(_SC_NPROCESSORS_ONLN);
383 	int i, offset = 0;
384 
385 	for (i = 0; i < nr_cores; i++) {
386 		int n;
387 
388 		if (!CPU_ISSET(i, cpuset))
389 			continue;
390 		n = snprintf(&buf[offset], len - offset, "%d ", i);
391 		if (n < 0 || n >= len - offset)
392 			break;
393 		offset += n;
394 	}
395 }
396 
ublksrv_set_sched_affinity(struct _ublksrv_dev * dev,unsigned short q_id)397 static void ublksrv_set_sched_affinity(struct _ublksrv_dev *dev,
398 		unsigned short q_id)
399 {
400 	const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev;
401 	unsigned dev_id = cdev->dev_info.dev_id;
402 	cpu_set_t *cpuset = ublksrv_get_queue_affinity(cdev, q_id);
403 	pthread_t thread = pthread_self();
404 	int ret;
405 
406 	ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), cpuset);
407 	if (ret)
408 		ublk_err("ublk dev %u queue %u set affinity failed",
409 				dev_id, q_id);
410 }
411 
ublksrv_kill_eventfd(struct _ublksrv_queue * q)412 static void ublksrv_kill_eventfd(struct _ublksrv_queue *q)
413 {
414 	if ((q->state & UBLKSRV_QUEUE_STOPPING) && q->efd >= 0) {
415 		uint64_t data = 1;
416 		int ret;
417 
418 		ret = write(q->efd, &data, sizeof(uint64_t));
419 		if (ret != sizeof(uint64_t))
420 			ublk_err("%s:%d write fail %d/%zu\n",
421 					__func__, __LINE__, ret, sizeof(uint64_t));
422 	}
423 }
424 
425 /*
426  * Return eventfs or negative errno
427  */
ublksrv_setup_eventfd(struct _ublksrv_queue * q)428 static int ublksrv_setup_eventfd(struct _ublksrv_queue *q)
429 {
430 	const struct ublksrv_ctrl_dev_info *info = &q->dev->ctrl_dev->dev_info;
431 
432 	if (!(info->ublksrv_flags & UBLKSRV_F_NEED_EVENTFD)) {
433 		q->efd = -1;
434 		return 0;
435 	}
436 
437 	if (q->dev->tgt.tgt_ring_depth == 0) {
438 		ublk_err("ublk dev %d queue %d zero tgt queue depth",
439 			info->dev_id, q->q_id);
440 		return -EINVAL;
441 	}
442 
443 	if (!q->dev->tgt.ops->handle_event) {
444 		ublk_err("ublk dev %d/%d not define ->handle_event",
445 			info->dev_id, q->q_id);
446 		return -EINVAL;
447 	}
448 
449 	q->efd = eventfd(0, 0);
450 	if (q->efd < 0)
451 		return -errno;
452 	return 0;
453 }
454 
ublksrv_queue_adjust_uring_io_wq_workers(struct _ublksrv_queue * q)455 static void ublksrv_queue_adjust_uring_io_wq_workers(struct _ublksrv_queue *q)
456 {
457 	struct _ublksrv_dev *dev = q->dev;
458 	unsigned int val[2] = {0, 0};
459 	int ret;
460 
461 	if (!dev->tgt.iowq_max_workers[0] && !dev->tgt.iowq_max_workers[1])
462 		return;
463 
464 	ret = io_uring_register_iowq_max_workers(&q->ring, val);
465 	if (ret)
466 		ublk_err("%s: register iowq max workers failed %d\n",
467 				__func__, ret);
468 
469 	if (!dev->tgt.iowq_max_workers[0])
470 		dev->tgt.iowq_max_workers[0] = val[0];
471 	if (!dev->tgt.iowq_max_workers[1])
472 		dev->tgt.iowq_max_workers[1] = val[1];
473 
474 	ret = io_uring_register_iowq_max_workers(&q->ring,
475 			dev->tgt.iowq_max_workers);
476 	if (ret)
477 		ublk_err("%s: register iowq max workers failed %d\n",
478 				__func__, ret);
479 }
480 
ublksrv_calculate_depths(const struct _ublksrv_dev * dev,int * ring_depth,int * cq_depth,int * nr_ios)481 static void ublksrv_calculate_depths(const struct _ublksrv_dev *dev, int
482 		*ring_depth, int *cq_depth, int *nr_ios)
483 {
484 	const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev;
485 
486 	/*
487 	 * eventfd consumes one extra sqe, and it can be thought as one target
488 	 * depth
489 	 */
490 	int aio_depth = (cdev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_EVENTFD)
491 		? 1 : 0;
492 	int depth = cdev->dev_info.queue_depth;
493 	int tgt_depth = dev->tgt.tgt_ring_depth + aio_depth;
494 
495 	*nr_ios = depth + dev->tgt.extra_ios;
496 
497 	/*
498 	 * queue_depth represents the max count of io commands issued from ublk driver.
499 	 *
500 	 * After io command is fetched from ublk driver, the consumed sqe for
501 	 * fetching io command has been available for target usage, so the uring
502 	 * depth can be set as the max(queue_depth, tgt_depth).
503 	 */
504 	depth = depth > tgt_depth ? depth : tgt_depth;
505 	*ring_depth = depth;
506 	*cq_depth = dev->cq_depth ? dev->cq_depth : depth;
507 }
508 
ublksrv_queue_init(const struct ublksrv_dev * tdev,unsigned short q_id,void * queue_data)509 const struct ublksrv_queue *ublksrv_queue_init(const struct ublksrv_dev *tdev,
510 		unsigned short q_id, void *queue_data)
511 {
512 	struct _ublksrv_dev *dev = tdev_to_local(tdev);
513 	struct _ublksrv_queue *q;
514 	const struct ublksrv_ctrl_dev *ctrl_dev = dev->ctrl_dev;
515 	int depth = ctrl_dev->dev_info.queue_depth;
516 	int i, ret = -1;
517 	int cmd_buf_size, io_buf_size;
518 	unsigned long off;
519 	int io_data_size = round_up(dev->tgt.io_data_size,
520 			sizeof(unsigned long));
521 	int ring_depth, cq_depth, nr_ios;
522 
523 	ublksrv_calculate_depths(dev, &ring_depth, &cq_depth, &nr_ios);
524 
525 	/*
526 	 * Too many extra ios
527 	 */
528 	if (nr_ios > depth * 3)
529 		return NULL;
530 
531 	q = (struct _ublksrv_queue *)malloc(sizeof(struct _ublksrv_queue) +
532 			sizeof(struct ublk_io) * nr_ios);
533 	dev->__queues[q_id] = q;
534 
535 	q->tgt_ops = dev->tgt.ops;	//cache ops for fast path
536 	q->dev = dev;
537 	if (ctrl_dev->dev_info.flags & UBLK_F_CMD_IOCTL_ENCODE)
538 		q->state = UBLKSRV_QUEUE_IOCTL_OP;
539 	else
540 		q->state = 0;
541 	if (ctrl_dev->dev_info.flags & UBLK_F_USER_COPY)
542 		q->state |= UBLKSRV_USER_COPY;
543 	q->q_id = q_id;
544 	/* FIXME: depth has to be PO 2 */
545 	q->q_depth = depth;
546 	q->io_cmd_buf = NULL;
547 	q->cmd_inflight = 0;
548 	q->tid = ublksrv_gettid();
549 
550 	cmd_buf_size = ublksrv_queue_cmd_buf_sz(q);
551 	off = UBLKSRV_CMD_BUF_OFFSET +
552 		q_id * (UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc));
553 	q->io_cmd_buf = (char *)mmap(0, cmd_buf_size, PROT_READ,
554 			MAP_SHARED | MAP_POPULATE, dev->cdev_fd, off);
555 	if (q->io_cmd_buf == MAP_FAILED) {
556 		ublk_err("ublk dev %d queue %d map io_cmd_buf failed",
557 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id);
558 		goto fail;
559 	}
560 
561 	io_buf_size = ctrl_dev->dev_info.max_io_buf_bytes;
562 	for (i = 0; i < nr_ios; i++) {
563 		q->ios[i].buf_addr = NULL;
564 
565 		/* extra ios needn't to allocate io buffer */
566 		if (i >= q->q_depth)
567 			goto skip_alloc_buf;
568 
569 		if (dev->tgt.ops->alloc_io_buf)
570 			q->ios[i].buf_addr =
571 				dev->tgt.ops->alloc_io_buf(local_to_tq(q),
572 					i, io_buf_size);
573 		else
574 			if (posix_memalign((void **)&q->ios[i].buf_addr,
575 						getpagesize(), io_buf_size)) {
576 				ublk_err("ublk dev %d queue %d io %d posix_memalign failed",
577 						q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i);
578 				goto fail;
579 			}
580 		//q->ios[i].buf_addr = malloc(io_buf_size);
581 		if (!q->ios[i].buf_addr) {
582 			ublk_err("ublk dev %d queue %d io %d alloc io_buf failed",
583 					q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i);
584 			goto fail;
585 		}
586 skip_alloc_buf:
587 		q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE;
588 		q->ios[i].data.private_data = malloc(io_data_size);
589 		q->ios[i].data.tag = i;
590 		if (i < q->q_depth)
591 			q->ios[i].data.iod = ublksrv_get_iod(q, i);
592 		else
593 			q->ios[i].data.iod = NULL;
594 
595 		//ublk_assert(io_data_size ^ (unsigned long)q->ios[i].data.private_data);
596 	}
597 
598 	ret = ublksrv_setup_ring(&q->ring, ring_depth, cq_depth,
599 			IORING_SETUP_SQE128 | IORING_SETUP_COOP_TASKRUN);
600 	if (ret < 0) {
601 		ublk_err("ublk dev %d queue %d setup io_uring failed %d",
602 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret);
603 		goto fail;
604 	}
605 
606 	q->ring_ptr = &q->ring;
607 
608 	ret = io_uring_register_files(&q->ring, dev->tgt.fds,
609 			dev->tgt.nr_fds + 1);
610 	if (ret) {
611 		ublk_err("ublk dev %d queue %d register files failed %d",
612 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret);
613 		goto fail;
614 	}
615 
616 	io_uring_register_ring_fd(&q->ring);
617 
618 	/*
619 	* N.B. PR_SET_IO_FLUSHER was added with Linux 5.6+.
620 	*/
621 #if defined(PR_SET_IO_FLUSHER)
622 	if (prctl(PR_SET_IO_FLUSHER, 0, 0, 0, 0) != 0)
623 		ublk_err("ublk dev %d queue %d set_io_flusher failed",
624 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id);
625 #endif
626 
627 	ublksrv_queue_adjust_uring_io_wq_workers(q);
628 
629 	q->private_data = queue_data;
630 
631 	if (ctrl_dev->tgt_ops->init_queue) {
632 		if (ctrl_dev->tgt_ops->init_queue(local_to_tq(q),
633 					&q->private_data))
634 			goto fail;
635 	}
636 
637 	if (ctrl_dev->queues_cpuset)
638 		ublksrv_set_sched_affinity(dev, q_id);
639 
640 	setpriority(PRIO_PROCESS, getpid(), -20);
641 
642 	ret = ublksrv_setup_eventfd(q);
643 	if (ret < 0) {
644 		ublk_err("ublk dev %d queue %d setup eventfd failed: %s",
645 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id,
646 			strerror(-ret));
647 		goto fail;
648 	}
649 
650 	/* submit all io commands to ublk driver */
651 	ublksrv_submit_fetch_commands(q);
652 
653 	return (struct ublksrv_queue *)q;
654  fail:
655 	ublksrv_queue_deinit(local_to_tq(q));
656 	ublk_err("ublk dev %d queue %d failed",
657 			ctrl_dev->dev_info.dev_id, q_id);
658 	return NULL;
659 }
660 
ublksrv_create_pid_file(struct _ublksrv_dev * dev)661 static int ublksrv_create_pid_file(struct _ublksrv_dev *dev)
662 {
663 	int dev_id = dev->ctrl_dev->dev_info.dev_id;
664 	char pid_file[64];
665 	int ret, pid_fd;
666 
667 	if (!dev->ctrl_dev->run_dir)
668 		return 0;
669 
670 	/* create pid file and lock it, so that others can't */
671 	snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id);
672 
673 	ret = create_pid_file(pid_file, &pid_fd);
674 	if (ret < 0) {
675 		/* -1 means the file is locked, and we need to remove it */
676 		if (ret == -1) {
677 			close(pid_fd);
678 			unlink(pid_file);
679 		}
680 		return ret;
681 	}
682 	dev->pid_file_fd = pid_fd;
683 	return 0;
684 }
685 
ublksrv_remove_pid_file(const struct _ublksrv_dev * dev)686 static void ublksrv_remove_pid_file(const struct _ublksrv_dev *dev)
687 {
688 	int dev_id = dev->ctrl_dev->dev_info.dev_id;
689 	char pid_file[64];
690 
691 	if (!dev->ctrl_dev->run_dir)
692 		return;
693 
694 	close(dev->pid_file_fd);
695 	snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id);
696 	unlink(pid_file);
697 }
698 
ublksrv_dev_deinit(const struct ublksrv_dev * tdev)699 void ublksrv_dev_deinit(const struct ublksrv_dev *tdev)
700 {
701 	struct _ublksrv_dev *dev = tdev_to_local(tdev);
702 
703 	ublksrv_remove_pid_file(dev);
704 
705 	ublksrv_tgt_deinit(dev);
706 	free(dev->thread);
707 
708 	if (dev->cdev_fd >= 0) {
709 		close(dev->cdev_fd);
710 		dev->cdev_fd = -1;
711 	}
712 	free(dev);
713 }
714 
ublksrv_dev_init(const struct ublksrv_ctrl_dev * ctrl_dev)715 const struct ublksrv_dev *ublksrv_dev_init(const struct ublksrv_ctrl_dev *ctrl_dev)
716 {
717 	int dev_id = ctrl_dev->dev_info.dev_id;
718 	char buf[64];
719 	int ret = -1;
720 	struct _ublksrv_dev *dev = (struct _ublksrv_dev *)calloc(1, sizeof(*dev));
721 	struct ublksrv_tgt_info *tgt;
722 
723 	if (!dev)
724 		return local_to_tdev(dev);
725 
726 	tgt = &dev->tgt;
727 	dev->ctrl_dev = ctrl_dev;
728 	dev->cdev_fd = -1;
729 
730 	snprintf(buf, 64, "%s%d", UBLKC_DEV, dev_id);
731 	dev->cdev_fd = open(buf, O_RDWR | O_NONBLOCK);
732 	if (dev->cdev_fd < 0) {
733 		ublk_err("can't open %s, ret %d\n", buf, dev->cdev_fd);
734 		goto fail;
735 	}
736 
737 	tgt->fds[0] = dev->cdev_fd;
738 
739 	ret = ublksrv_tgt_init(dev, ctrl_dev->tgt_type, ctrl_dev->tgt_ops,
740 			ctrl_dev->tgt_argc, ctrl_dev->tgt_argv);
741 	if (ret) {
742 		ublk_err( "can't init tgt %d/%s/%d, ret %d\n",
743 				dev_id, ctrl_dev->tgt_type, ctrl_dev->tgt_argc,
744 				ret);
745 		goto fail;
746 	}
747 
748 	ret = ublksrv_create_pid_file(dev);
749 	if (ret) {
750 		ublk_err( "can't create pid file for dev %d, ret %d\n",
751 				dev_id, ret);
752 		goto fail;
753 	}
754 
755 	return local_to_tdev(dev);
756 fail:
757 	ublksrv_dev_deinit(local_to_tdev(dev));
758 	return NULL;
759 }
760 
761 /* Be careful, target io may not have one ublk_io associated with  */
ublksrv_handle_tgt_cqe(struct _ublksrv_queue * q,struct io_uring_cqe * cqe)762 static inline void ublksrv_handle_tgt_cqe(struct _ublksrv_queue *q,
763 		struct io_uring_cqe *cqe)
764 {
765 	unsigned tag = user_data_to_tag(cqe->user_data);
766 
767 	if (cqe->res < 0 && cqe->res != -EAGAIN) {
768 		ublk_err("%s: failed tgt io: res %d qid %u tag %u, cmd_op %u\n",
769 			__func__, cqe->res, q->q_id,
770 			user_data_to_tag(cqe->user_data),
771 			user_data_to_op(cqe->user_data));
772 	}
773 
774 	if (is_eventfd_io(cqe->user_data)) {
775 		if (q->tgt_ops->handle_event)
776 			q->tgt_ops->handle_event(local_to_tq(q));
777 	} else {
778 		if (q->tgt_ops->tgt_io_done)
779 			q->tgt_ops->tgt_io_done(local_to_tq(q),
780 					&q->ios[tag].data, cqe);
781 	}
782 }
783 
ublksrv_handle_cqe(struct io_uring * r,struct io_uring_cqe * cqe,void * data)784 static void ublksrv_handle_cqe(struct io_uring *r,
785 		struct io_uring_cqe *cqe, void *data)
786 {
787 	struct _ublksrv_queue *q = container_of(r, struct _ublksrv_queue, ring);
788 	unsigned tag = user_data_to_tag(cqe->user_data);
789 	unsigned cmd_op = user_data_to_op(cqe->user_data);
790 	int fetch = (cqe->res != UBLK_IO_RES_ABORT) &&
791 		!(q->state & UBLKSRV_QUEUE_STOPPING);
792 	struct ublk_io *io;
793 
794 	ublk_dbg(UBLK_DBG_IO_CMD, "%s: res %d (qid %d tag %u cmd_op %u target %d event %d) stopping %d\n",
795 			__func__, cqe->res, q->q_id, tag, cmd_op,
796 			is_target_io(cqe->user_data),
797 			is_eventfd_io(cqe->user_data),
798 			(q->state & UBLKSRV_QUEUE_STOPPING));
799 
800 	/* Don't retrieve io in case of target io */
801 	if (is_target_io(cqe->user_data)) {
802 		ublksrv_handle_tgt_cqe(q, cqe);
803 		return;
804 	}
805 
806 	io = &q->ios[tag];
807 	q->cmd_inflight--;
808 
809 	if (!fetch) {
810 		q->state |= UBLKSRV_QUEUE_STOPPING;
811 		io->flags &= ~UBLKSRV_NEED_FETCH_RQ;
812 	}
813 
814 	/*
815 	 * So far, only sync tgt's io handling is implemented.
816 	 *
817 	 * todo: support async tgt io handling via io_uring, and the ublksrv
818 	 * daemon can poll on both two rings.
819 	 */
820 	if (cqe->res == UBLK_IO_RES_OK) {
821 		//ublk_assert(tag < q->q_depth);
822 		q->tgt_ops->handle_io_async(local_to_tq(q), &io->data);
823 	} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
824 		io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE;
825 		ublksrv_queue_io_cmd(q, io, tag);
826 	} else {
827 		/*
828 		 * COMMIT_REQ will be completed immediately since no fetching
829 		 * piggyback is required.
830 		 *
831 		 * Marking IO_FREE only, then this io won't be issued since
832 		 * we only issue io with (UBLKSRV_IO_FREE | UBLKSRV_NEED_*)
833 		 *
834 		 * */
835 		io->flags = UBLKSRV_IO_FREE;
836 	}
837 }
838 
ublksrv_reap_events_uring(struct io_uring * r)839 static int ublksrv_reap_events_uring(struct io_uring *r)
840 {
841 	struct io_uring_cqe *cqe;
842 	unsigned head;
843 	int count = 0;
844 
845 	io_uring_for_each_cqe(r, head, cqe) {
846 		ublksrv_handle_cqe(r, cqe, NULL);
847 		count += 1;
848 	}
849 	io_uring_cq_advance(r, count);
850 
851 	return count;
852 }
853 
ublksrv_queue_discard_io_pages(struct _ublksrv_queue * q)854 static void ublksrv_queue_discard_io_pages(struct _ublksrv_queue *q)
855 {
856 	const struct ublksrv_ctrl_dev *cdev = q->dev->ctrl_dev;
857 	unsigned int io_buf_size = cdev->dev_info.max_io_buf_bytes;
858 	int i = 0;
859 
860 	for (i = 0; i < q->q_depth; i++)
861 		madvise(q->ios[i].buf_addr, io_buf_size, MADV_DONTNEED);
862 }
863 
ublksrv_queue_idle_enter(struct _ublksrv_queue * q)864 static void ublksrv_queue_idle_enter(struct _ublksrv_queue *q)
865 {
866 	if (q->state & UBLKSRV_QUEUE_IDLE)
867 		return;
868 
869 	ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: enter idle %x\n",
870 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state);
871 	ublksrv_queue_discard_io_pages(q);
872 	q->state |= UBLKSRV_QUEUE_IDLE;
873 
874 	if (q->tgt_ops->idle_fn)
875 		q->tgt_ops->idle_fn(local_to_tq(q), true);
876 }
877 
ublksrv_queue_idle_exit(struct _ublksrv_queue * q)878 static inline void ublksrv_queue_idle_exit(struct _ublksrv_queue *q)
879 {
880 	if (q->state & UBLKSRV_QUEUE_IDLE) {
881 		ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: exit idle %x\n",
882 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state);
883 		q->state &= ~UBLKSRV_QUEUE_IDLE;
884 		if (q->tgt_ops->idle_fn)
885 			q->tgt_ops->idle_fn(local_to_tq(q), false);
886 	}
887 }
888 
ublksrv_reset_aio_batch(struct _ublksrv_queue * q)889 static void ublksrv_reset_aio_batch(struct _ublksrv_queue *q)
890 {
891 	q->nr_ctxs = 0;
892 }
893 
ublksrv_submit_aio_batch(struct _ublksrv_queue * q)894 static void ublksrv_submit_aio_batch(struct _ublksrv_queue *q)
895 {
896 	int i;
897 
898 	for (i = 0; i < q->nr_ctxs; i++) {
899 		struct ublksrv_aio_ctx *ctx = q->ctxs[i];
900 		uint64_t data = 1;
901 		int ret;
902 
903 		ret = write(ctx->efd, &data, sizeof(uint64_t));
904 		if (ret != sizeof(uint64_t))
905 			ublk_err("%s:%d write fail ctx[%d]: %d/%zu\n",
906 					__func__, __LINE__, i, ret, sizeof(uint64_t));
907 	}
908 }
909 
ublksrv_process_io(const struct ublksrv_queue * tq)910 int ublksrv_process_io(const struct ublksrv_queue *tq)
911 {
912 	struct _ublksrv_queue *q = tq_to_local(tq);
913 	int ret, reapped;
914 	struct __kernel_timespec ts = {
915 		.tv_sec = UBLKSRV_IO_IDLE_SECS,
916 		.tv_nsec = 0
917         };
918 	struct __kernel_timespec *tsp = (q->state & UBLKSRV_QUEUE_IDLE) ?
919 		NULL : &ts;
920 	struct io_uring_cqe *cqe;
921 
922 	ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight %u/%u stopping %d\n",
923 				q->dev->ctrl_dev->dev_info.dev_id,
924 				q->q_id, io_uring_sq_ready(&q->ring),
925 				q->cmd_inflight, q->tgt_io_inflight,
926 				(q->state & UBLKSRV_QUEUE_STOPPING));
927 
928 	if (ublksrv_queue_is_done(q))
929 		return -ENODEV;
930 
931 	ret = io_uring_submit_and_wait_timeout(&q->ring, &cqe, 1, tsp, NULL);
932 
933 	ublksrv_reset_aio_batch(q);
934 	reapped = ublksrv_reap_events_uring(&q->ring);
935 	ublksrv_submit_aio_batch(q);
936 
937 	if (q->tgt_ops->handle_io_background)
938 		q->tgt_ops->handle_io_background(local_to_tq(q),
939 				io_uring_sq_ready(&q->ring));
940 
941 	ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle %d",
942 			ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING),
943 			(q->state & UBLKSRV_QUEUE_IDLE));
944 
945 	if ((q->state & UBLKSRV_QUEUE_STOPPING))
946 		ublksrv_kill_eventfd(q);
947 	else {
948 		if (ret == -ETIME && reapped == 0 &&
949 				!io_uring_sq_ready(&q->ring))
950 			ublksrv_queue_idle_enter(q);
951 		else
952 			ublksrv_queue_idle_exit(q);
953 	}
954 
955 	return reapped;
956 }
957 
ublksrv_get_queue(const struct ublksrv_dev * dev,int q_id)958 const struct ublksrv_queue *ublksrv_get_queue(const struct ublksrv_dev *dev,
959 		int q_id)
960 {
961 	return (const struct ublksrv_queue *)tdev_to_local(dev)->__queues[q_id];
962 }
963 
964 /* called in ublksrv process context */
ublksrv_apply_oom_protection()965 void ublksrv_apply_oom_protection()
966 {
967 	char oom_score_adj_path[64];
968 	pid_t pid = getpid();
969 	int fd;
970 
971 	snprintf(oom_score_adj_path, 64, "/proc/%d/oom_score_adj", pid);
972 
973 	fd = open(oom_score_adj_path, O_RDWR);
974 	if (fd > 0) {
975 		char val[32];
976 		int len, ret;
977 
978 		len = snprintf(val, 32, "%d", -1000);
979 		ret = write(fd, val, len);
980 		if (ret != len)
981 			ublk_err("%s:%d write fail %d/%d\n",
982 					__func__, __LINE__, ret, len);
983 		close(fd);
984 	}
985 }
986 
ublksrv_get_ctrl_dev(const struct ublksrv_dev * dev)987 const struct ublksrv_ctrl_dev *ublksrv_get_ctrl_dev(
988 		const struct ublksrv_dev *dev)
989 {
990 	return tdev_to_local(dev)->ctrl_dev;
991 }
992 
ublksrv_get_pidfile_fd(const struct ublksrv_dev * dev)993 int ublksrv_get_pidfile_fd(const struct ublksrv_dev *dev)
994 {
995 	return tdev_to_local(dev)->pid_file_fd;
996 }
997 
ublksrv_io_private_data(const struct ublksrv_queue * tq,int tag)998 void *ublksrv_io_private_data(const struct ublksrv_queue *tq, int tag)
999 {
1000 	struct _ublksrv_queue *q = tq_to_local(tq);
1001 
1002 	return q->ios[tag].data.private_data;
1003 }
1004 
ublksrv_queue_state(const struct ublksrv_queue * q)1005 unsigned int ublksrv_queue_state(const struct ublksrv_queue *q)
1006 {
1007 	return tq_to_local(q)->state;
1008 }
1009 
1010 const struct ublk_io_data *
ublksrv_queue_get_io_data(const struct ublksrv_queue * tq,int tag)1011 ublksrv_queue_get_io_data(const struct ublksrv_queue *tq, int tag)
1012 {
1013 	struct _ublksrv_queue *q = tq_to_local(tq);
1014 
1015 	return &q->ios[tag].data;
1016 }
1017 
ublksrv_queue_get_io_buf(const struct ublksrv_queue * tq,int tag)1018 void *ublksrv_queue_get_io_buf(const struct ublksrv_queue *tq, int tag)
1019 {
1020 	struct _ublksrv_queue *q = tq_to_local(tq);
1021 
1022 	if (tag < q->q_depth)
1023 		return q->ios[tag].buf_addr;
1024 	return NULL;
1025 }
1026 
1027 /*
1028  * The default io_uring cq depth equals to queue depth plus
1029  * .tgt_ring_depth, which is usually enough for typical ublk targets,
1030  * such as loop and qcow2, but it may not be enough for nbd with send_zc
1031  * which needs extra cqe for buffer notification.
1032  *
1033  * So add API to allow target to override default io_uring cq depth.
1034  */
ublksrv_dev_set_cq_depth(struct ublksrv_dev * tdev,int cq_depth)1035 void ublksrv_dev_set_cq_depth(struct ublksrv_dev *tdev, int cq_depth)
1036 {
1037 	tdev_to_local(tdev)->cq_depth = cq_depth;
1038 }
1039 
ublksrv_dev_get_cq_depth(struct ublksrv_dev * tdev)1040 int ublksrv_dev_get_cq_depth(struct ublksrv_dev *tdev)
1041 {
1042 	return tdev_to_local(tdev)->cq_depth;
1043 }
1044