1 // SPDX-License-Identifier: MIT or LGPL-2.1-only
2
3 #ifndef _GNU_SOURCE
4 #define _GNU_SOURCE
5 #endif
6
7 #include "ublksrv_priv.h"
8
9 #define aio_log ublk_log
10
ublksrv_aio_submit_worker(struct ublksrv_aio_ctx * ctx,ublksrv_aio_submit_fn * fn,struct aio_list * done)11 int ublksrv_aio_submit_worker(struct ublksrv_aio_ctx *ctx,
12 ublksrv_aio_submit_fn *fn, struct aio_list *done)
13 {
14 struct ublksrv_aio *req = NULL;
15 unsigned long long data;
16 struct aio_list sl;
17 int total = 0;
18 bool more;
19
20 aio_list_init(&sl);
21 again:
22 pthread_spin_lock(&ctx->submit.lock);
23 aio_list_splice(&ctx->submit.list, &sl);
24 pthread_spin_unlock(&ctx->submit.lock);
25
26 while ((req = aio_list_pop(&sl))) {
27 int ret = fn(ctx, req);
28
29 /*
30 * submission failed, so set result for this request,
31 * otherwise it is user's responsibility to set correct
32 * ->res after the request is completed
33 */
34 if (ret < 0) {
35 req->res = ret;
36 aio_log("ublk aio submission fail, %d\n", ret);
37 }
38 total += 1;
39 if (ret && done)
40 aio_list_add(done, req);
41 }
42
43 ublk_ignore_result(read(ctx->efd, &data, 8));
44
45 pthread_spin_lock(&ctx->submit.lock);
46 more = !aio_list_empty(&ctx->submit.list);
47 pthread_spin_unlock(&ctx->submit.lock);
48 if (more)
49 goto again;
50
51 return total;
52 }
53
move_to_queue_complete_list(struct ublksrv_aio_ctx * ctx,struct _ublksrv_queue * q,struct aio_list * list)54 static void move_to_queue_complete_list(struct ublksrv_aio_ctx *ctx,
55 struct _ublksrv_queue *q, struct aio_list *list)
56 {
57 struct ublksrv_aio_list *compl;
58
59 if (aio_list_empty(list))
60 return;
61
62 compl = &ctx->complete[q->q_id];
63 pthread_spin_lock(&compl->lock);
64 aio_list_splice(list, &compl->list);
65 pthread_spin_unlock(&compl->lock);
66 }
67
ublksrv_aio_complete_worker(struct ublksrv_aio_ctx * ctx,struct aio_list * completed)68 void ublksrv_aio_complete_worker(struct ublksrv_aio_ctx *ctx,
69 struct aio_list *completed)
70 {
71 struct aio_list this, others;
72 struct ublksrv_aio *req = NULL;
73 struct _ublksrv_queue *this_q = NULL;
74
75 if (aio_list_empty(completed))
76 return;
77
78 if (ctx->flags & UBLKSRV_AIO_QUEUE_WIDE) {
79 const struct ublksrv_queue *tq = ublksrv_get_queue(ctx->dev,
80 ublksrv_aio_qid(completed->head->id));
81
82 this_q = tq_to_local(tq);
83 move_to_queue_complete_list(ctx, this_q, completed);
84 ublksrv_queue_send_event(tq);
85 return;
86 }
87
88 aio_list_init(&this);
89 aio_list_init(&others);
90
91 while (!aio_list_empty(completed)) {
92 const struct ublksrv_queue *tq = ublksrv_get_queue(ctx->dev,
93 ublksrv_aio_qid(completed->head->id));
94
95 this_q = tq_to_local(tq);
96 while ((req = aio_list_pop(completed))) {
97 const struct ublksrv_queue *q = ublksrv_get_queue(
98 ctx->dev, ublksrv_aio_qid(req->id));
99
100 if (q == local_to_tq(this_q))
101 aio_list_add(&this, req);
102 else
103 aio_list_add(&others, req);
104 }
105
106 move_to_queue_complete_list(ctx, this_q, &this);
107 ublksrv_queue_send_event(tq);
108 aio_list_splice(&others, completed);
109 }
110 }
111
ublksrv_aio_ctx_init(const struct ublksrv_dev * dev,unsigned flags)112 struct ublksrv_aio_ctx *ublksrv_aio_ctx_init(const struct ublksrv_dev *dev,
113 unsigned flags)
114 {
115 unsigned nr_hw_queues = tdev_to_local(dev)->ctrl_dev->dev_info.nr_hw_queues;
116 struct ublksrv_aio_ctx *ctx;
117 int i;
118
119 if (!(tdev_to_local(dev)->ctrl_dev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_EVENTFD))
120 return NULL;
121
122 ctx = calloc(1, sizeof(*ctx));
123 if (!ctx)
124 return NULL;
125
126 ctx->complete = malloc(nr_hw_queues * sizeof(struct ublksrv_aio_list));
127 if (!ctx->complete) {
128 free(ctx);
129 return NULL;
130 }
131 for (i = 0; i < nr_hw_queues; i++)
132 ublksrv_aio_init_list(&ctx->complete[i]);
133
134 ublksrv_aio_init_list(&ctx->submit);
135
136 ctx->flags = flags;
137 ctx->dev = dev;
138 ctx->dead = false;
139 ctx->efd = eventfd(0, O_NONBLOCK);
140
141 return ctx;
142 }
143
144 /* called before pthread_join() of the pthread context */
ublksrv_aio_ctx_shutdown(struct ublksrv_aio_ctx * ctx)145 void ublksrv_aio_ctx_shutdown(struct ublksrv_aio_ctx *ctx)
146 {
147 unsigned long long data = 1;
148 int ret;
149
150 ctx->dead = true;
151 ret = write(ctx->efd, &data, 8);
152 if (ret != 8)
153 ublk_err("%s:%d write fail %d/%d\n",
154 __func__, __LINE__, ret, 8);
155 }
156
157 /* called afer pthread_join() of the pthread context returns */
ublksrv_aio_ctx_deinit(struct ublksrv_aio_ctx * ctx)158 void ublksrv_aio_ctx_deinit(struct ublksrv_aio_ctx *ctx)
159 {
160 close(ctx->efd);
161 free(ctx);
162 }
163
ublksrv_aio_alloc_req(struct ublksrv_aio_ctx * ctx,int payload_size)164 struct ublksrv_aio *ublksrv_aio_alloc_req(struct ublksrv_aio_ctx *ctx,
165 int payload_size)
166 {
167 const int sz = (sizeof(struct ublksrv_aio) + payload_size + 7) & ~ 0x7;
168
169 return (struct ublksrv_aio *)calloc(1, sz);
170 }
171
ublksrv_aio_free_req(struct ublksrv_aio_ctx * ctx,struct ublksrv_aio * req)172 void ublksrv_aio_free_req(struct ublksrv_aio_ctx *ctx, struct ublksrv_aio *req)
173 {
174 free(req);
175 }
176
ublksrv_aio_add_ctx_for_submit(struct _ublksrv_queue * q,struct ublksrv_aio_ctx * ctx)177 static bool ublksrv_aio_add_ctx_for_submit(struct _ublksrv_queue *q,
178 struct ublksrv_aio_ctx *ctx)
179 {
180 int i = 0;
181
182 for (i = 0; i < q->nr_ctxs; i++) {
183 if (q->ctxs[i] == ctx)
184 return true;
185 }
186
187 if (q->nr_ctxs < UBLKSRV_NR_CTX_BATCH - 1) {
188 q->ctxs[q->nr_ctxs++] = ctx;
189 return true;
190 }
191
192 return false;
193 }
194
ublksrv_aio_submit_req(struct ublksrv_aio_ctx * ctx,const struct ublksrv_queue * tq,struct ublksrv_aio * req)195 void ublksrv_aio_submit_req(struct ublksrv_aio_ctx *ctx,
196 const struct ublksrv_queue *tq, struct ublksrv_aio *req)
197 {
198 struct _ublksrv_queue *q = tq_to_local(tq);
199 unsigned long long data = 1;
200
201 pthread_spin_lock(&ctx->submit.lock);
202 aio_list_add(&ctx->submit.list, req);
203 pthread_spin_unlock(&ctx->submit.lock);
204
205 if (!ublksrv_aio_add_ctx_for_submit(q, ctx)) {
206 int ret = write(ctx->efd, &data, 8);
207
208 if (ret != 8)
209 ublk_err("%s:%d write fail %d/%d\n",
210 __func__, __LINE__, ret, 8);
211 }
212 }
213
ublksrv_aio_get_completed_reqs(struct ublksrv_aio_ctx * ctx,const struct ublksrv_queue * q,struct aio_list * al)214 void ublksrv_aio_get_completed_reqs(struct ublksrv_aio_ctx *ctx,
215 const struct ublksrv_queue *q,
216 struct aio_list *al)
217 {
218 struct ublksrv_aio_list *compl = &ctx->complete[q->q_id];
219
220 pthread_spin_lock(&compl->lock);
221 aio_list_splice(&compl->list, al);
222 pthread_spin_unlock(&compl->lock);
223 }
224
ublksrv_aio_handle_event(struct ublksrv_aio_ctx * ctx,const struct ublksrv_queue * q)225 void ublksrv_aio_handle_event(struct ublksrv_aio_ctx *ctx,
226 const struct ublksrv_queue *q)
227 {
228 struct ublksrv_aio_list *compl = &ctx->complete[q->q_id];
229 struct ublksrv_aio *req;
230 struct aio_list al;
231
232 aio_list_init(&al);
233 pthread_spin_lock(&compl->lock);
234 aio_list_splice(&compl->list, &al);
235 ublksrv_queue_handled_event(q);
236 pthread_spin_unlock(&compl->lock);
237
238 while ((req = aio_list_pop(&al))) {
239 ublksrv_complete_io(q, ublksrv_aio_tag(req->id),
240 req->res);
241 ublksrv_aio_free_req(ctx, req);
242 }
243 }
244
ublksrv_aio_get_efd(struct ublksrv_aio_ctx * ctx)245 int ublksrv_aio_get_efd(struct ublksrv_aio_ctx *ctx)
246 {
247 return ctx->efd;
248 }
249
ublksrv_aio_set_ctx_data(struct ublksrv_aio_ctx * ctx,void * data)250 void ublksrv_aio_set_ctx_data(struct ublksrv_aio_ctx *ctx, void *data)
251 {
252 ctx->ctx_data = data;
253 }
254
ublksrv_aio_get_ctx_data(struct ublksrv_aio_ctx * ctx)255 void *ublksrv_aio_get_ctx_data(struct ublksrv_aio_ctx *ctx)
256 {
257 return ctx->ctx_data;
258 }
259
ublksrv_aio_ctx_dead(struct ublksrv_aio_ctx * ctx)260 bool ublksrv_aio_ctx_dead(struct ublksrv_aio_ctx *ctx)
261 {
262 return ctx->dead;
263 }
264
ublksrv_aio_get_dev(struct ublksrv_aio_ctx * ctx)265 const struct ublksrv_dev *ublksrv_aio_get_dev(struct ublksrv_aio_ctx *ctx)
266 {
267 return ctx->dev;
268 }
269