1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright (c) 2004 SuSE, Inc. All Rights Reserved.
4 * Written by: Chris Mason <[email protected]>
5 * Copyright (C) 2022 SUSE LLC Andrea Cervesato <[email protected]>
6 */
7
8 /*\
9 * [Description]
10 *
11 * Test creates a series of files and start AIO operations on them.
12 * AIO is done in a rotating loop: first file1.bin gets 8 requests, then
13 * file2.bin, then file3.bin etc. As each file finishes writing, test switches
14 * to reads. IO buffers are aligned in case we want to do direct IO.
15 */
16
17 #define _FILE_OFFSET_BITS 64
18
19 #define _GNU_SOURCE
20 #include "tst_test.h"
21
22 #ifdef HAVE_LIBAIO
23 #include <stdio.h>
24 #include <errno.h>
25 #include <assert.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31 #include <sys/time.h>
32 #include <sys/ipc.h>
33 #include <sys/shm.h>
34 #include <sys/mman.h>
35 #include <string.h>
36 #include <pthread.h>
37 #include <libaio.h>
38 #include "tst_safe_pthread.h"
39 #include "tst_safe_sysv_ipc.h"
40
41 #define IO_FREE 0
42 #define IO_PENDING 1
43
44 enum {
45 WRITE,
46 READ,
47 RWRITE,
48 RREAD,
49 LAST_STAGE,
50 };
51
52 #define USE_MALLOC 0
53 #define USE_SHM 1
54 #define USE_SHMFS 2
55
56 static char *str_num_files;
57 static char *str_max_io_submit;
58 static char *str_num_contexts;
59 static char *str_context_offset;
60 static char *str_file_size;
61 static char *str_rec_len;
62 static char *str_depth;
63 static char *str_io_iter;
64 static char *str_iterations;
65 static char *str_o_flag;
66 static char *str_stages;
67 static char *str_use_shm;
68 static char *str_num_threads;
69
70 static int num_files = 1;
71 static long long file_size = 1024 * 1024 * 1024;
72 static long stages;
73 static unsigned long page_size_mask;
74 static int o_flag;
75 static char *latency_stats;
76 static char *completion_latency_stats;
77 static int io_iter = 8;
78 static int iterations = 500;
79 static int max_io_submit;
80 static long long rec_len = 64 * 1024;
81 static int depth = 64;
82 static int num_threads = 1;
83 static int num_contexts = 1;
84 static long long context_offset = 2 * 1024 * 1024;
85 static char *no_fsync_stages;
86 static int use_shm;
87 static int shm_id;
88 static char *unaligned_buffer;
89 static char *aligned_buffer;
90 static int padded_reclen;
91 static char *verify;
92 static char *verify_buf;
93 static char *unlink_files;
94
95 /*
96 * latencies during io_submit are measured, these are the
97 * granularities for deviations
98 */
99 #define DEVIATIONS 6
100 static int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
101
102 struct io_latency {
103 double max;
104 double min;
105 double total_io;
106 double total_lat;
107 double deviations[DEVIATIONS];
108 };
109
110 /* container for a series of operations to a file */
111 struct io_oper {
112 /* already open file descriptor, valid for whatever operation you want
113 */
114 int fd;
115
116 /* starting byte of the operation */
117 off_t start;
118
119 /* ending byte of the operation */
120 off_t end;
121
122 /* size of the read/write buffer */
123 int reclen;
124
125 /* max number of pending requests before a wait is triggered */
126 int depth;
127
128 /* current number of pending requests */
129 int num_pending;
130
131 /* last error, zero if there were none */
132 int last_err;
133
134 /* total number of errors hit. */
135 int num_err;
136
137 /* read,write, random, etc */
138 int rw;
139
140 /* number of I/O that will get sent to aio */
141 int total_ios;
142
143 /* number of I/O we've already sent */
144 int started_ios;
145
146 /* last offset used in an io operation */
147 off_t last_offset;
148
149 /* stonewalled = 1 when we got cut off before submitting all our I/O */
150 int stonewalled;
151
152 /* list management */
153 struct io_oper *next;
154 struct io_oper *prev;
155
156 struct timeval start_time;
157
158 char *file_name;
159 };
160
161 /* a single io, and all the tracking needed for it */
162 struct io_unit {
163 /* note, iocb must go first! */
164 struct iocb iocb;
165
166 /* pointer to parent io operation struct */
167 struct io_oper *io_oper;
168
169 /* aligned buffer */
170 char *buf;
171
172 /* size of the aligned buffer (record size) */
173 int buf_size;
174
175 /* state of this io unit (free, pending, done) */
176 int busy;
177
178 /* result of last operation */
179 long res;
180
181 struct io_unit *next;
182
183 struct timeval io_start_time; /* time of io_submit */
184 };
185
186 struct thread_info {
187 io_context_t io_ctx;
188 pthread_t tid;
189
190 /* allocated array of io_unit structs */
191 struct io_unit *ios;
192
193 /* list of io units available for io */
194 struct io_unit *free_ious;
195
196 /* number of io units in the I/O array */
197 int num_global_ios;
198
199 /* number of io units in flight */
200 int num_global_pending;
201
202 /* preallocated array of iocb pointers, only used in run_active */
203 struct iocb **iocbs;
204
205 /* preallocated array of events */
206 struct io_event *events;
207
208 /* size of the events array */
209 int num_global_events;
210
211 /* latency stats for io_submit */
212 struct io_latency io_submit_latency;
213
214 /* list of operations still in progress, and of those finished */
215 struct io_oper *active_opers;
216 struct io_oper *finished_opers;
217
218 /* number of files this thread is doing io on */
219 int num_files;
220
221 /* how much io this thread did in the last stage */
222 double stage_mb_trans;
223
224 /* latency completion stats i/o time from io_submit until io_getevents */
225 struct io_latency io_completion_latency;
226 };
227
228 /* pthread mutexes and other globals for keeping the threads in sync */
229 static pthread_barrier_t worker_barrier;
230 static struct timeval global_stage_start_time;
231 static struct thread_info *global_thread_info;
232
233 /*
234 * return seconds between start_tv and stop_tv in double precision
235 */
time_since(struct timeval * start_tv,struct timeval * stop_tv)236 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
237 {
238 double sec, usec;
239 double ret;
240
241 sec = stop_tv->tv_sec - start_tv->tv_sec;
242 usec = stop_tv->tv_usec - start_tv->tv_usec;
243 if (sec > 0 && usec < 0) {
244 sec--;
245 usec += 1000000;
246 }
247
248 ret = sec + usec / (double)1000000;
249 if (ret < 0)
250 ret = 0;
251
252 return ret;
253 }
254
255 /*
256 * return seconds between start_tv and now in double precision
257 */
time_since_now(struct timeval * start_tv)258 static double time_since_now(struct timeval *start_tv)
259 {
260 struct timeval stop_time;
261
262 gettimeofday(&stop_time, NULL);
263
264 return time_since(start_tv, &stop_time);
265 }
266
267 /*
268 * Add latency info to latency struct
269 */
calc_latency(struct timeval * start_tv,struct timeval * stop_tv,struct io_latency * lat)270 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
271 struct io_latency *lat)
272 {
273 double delta;
274 int i;
275
276 delta = time_since(start_tv, stop_tv);
277 delta = delta * 1000;
278
279 if (delta > lat->max)
280 lat->max = delta;
281
282 if (!lat->min || delta < lat->min)
283 lat->min = delta;
284
285 lat->total_io++;
286 lat->total_lat += delta;
287
288 for (i = 0; i < DEVIATIONS; i++) {
289 if (delta < deviations[i]) {
290 lat->deviations[i]++;
291 break;
292 }
293 }
294 }
295
oper_list_add(struct io_oper * oper,struct io_oper ** list)296 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
297 {
298 if (!*list) {
299 *list = oper;
300 oper->prev = oper->next = oper;
301 return;
302 }
303
304 oper->prev = (*list)->prev;
305 oper->next = *list;
306
307 (*list)->prev->next = oper;
308 (*list)->prev = oper;
309 }
310
oper_list_del(struct io_oper * oper,struct io_oper ** list)311 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
312 {
313 if ((*list)->next == (*list)->prev && *list == (*list)->next) {
314 *list = NULL;
315 return;
316 }
317
318 oper->prev->next = oper->next;
319 oper->next->prev = oper->prev;
320
321 if (*list == oper)
322 *list = oper->next;
323 }
324
325 /* worker func to check error fields in the io unit */
check_finished_io(struct io_unit * io)326 static int check_finished_io(struct io_unit *io)
327 {
328 int i;
329
330 if (io->res != io->buf_size) {
331 struct stat s;
332
333 SAFE_FSTAT(io->io_oper->fd, &s);
334
335 /*
336 * If file size is large enough for the read, then this short
337 * read is an error.
338 */
339 if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
340 s.st_size > (io->iocb.u.c.offset + io->res)) {
341
342 tst_res(TINFO, "io err %lu (%s) op %d, off %llu size %d",
343 io->res, tst_strerrno(-io->res), io->iocb.aio_lio_opcode,
344 io->iocb.u.c.offset, io->buf_size);
345 io->io_oper->last_err = io->res;
346 io->io_oper->num_err++;
347 return -1;
348 }
349 }
350
351 if (verify && io->io_oper->rw == READ) {
352 if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
353 tst_res(TINFO, "verify error, file %s offset %llu contents (offset:bad:good):",
354 io->io_oper->file_name, io->iocb.u.c.offset);
355
356 for (i = 0; i < io->io_oper->reclen; i++) {
357 if (io->buf[i] != verify_buf[i]) {
358 tst_res(TINFO, "%d:%c:%c ", i,
359 io->buf[i], verify_buf[i]);
360 }
361 }
362 }
363 }
364
365 return 0;
366 }
367
368 /* worker func to check the busy bits and get an io unit ready for use */
grab_iou(struct io_unit * io,struct io_oper * oper)369 static int grab_iou(struct io_unit *io, struct io_oper *oper)
370 {
371 if (io->busy == IO_PENDING)
372 return -1;
373
374 io->busy = IO_PENDING;
375 io->res = 0;
376 io->io_oper = oper;
377
378 return 0;
379 }
380
stage_name(int rw)381 static char *stage_name(int rw)
382 {
383 switch (rw) {
384 case WRITE:
385 return "write";
386 case READ:
387 return "read";
388 case RWRITE:
389 return "random write";
390 case RREAD:
391 return "random read";
392 }
393
394 return "unknown";
395 }
396
oper_mb_trans(struct io_oper * oper)397 static inline double oper_mb_trans(struct io_oper *oper)
398 {
399 return ((double)oper->started_ios * (double)oper->reclen) / (double)(1024 * 1024);
400 }
401
print_time(struct io_oper * oper)402 static void print_time(struct io_oper *oper)
403 {
404 double runtime;
405 double tput;
406 double mb;
407
408 runtime = time_since_now(&oper->start_time);
409 mb = oper_mb_trans(oper);
410 tput = mb / runtime;
411
412 tst_res(TINFO, "%s on %s (%.2f MB/s) %.2f MB in %.2fs",
413 stage_name(oper->rw), oper->file_name, tput, mb, runtime);
414 }
415
print_lat(char * str,struct io_latency * lat)416 static void print_lat(char *str, struct io_latency *lat)
417 {
418 char out[4 * 1024];
419 char *ptr = out;
420 double avg = lat->total_lat / lat->total_io;
421 int i;
422 double total_counted = 0;
423
424 tst_res(TINFO, "%s min %.2f avg %.2f max %.2f", str, lat->min, avg, lat->max);
425
426 for (i = 0; i < DEVIATIONS; i++) {
427 ptr += sprintf(ptr, "%.0f < %d", lat->deviations[i], deviations[i]);
428 total_counted += lat->deviations[i];
429 }
430
431 if (total_counted && lat->total_io - total_counted)
432 ptr += sprintf(ptr, " < %.0f", lat->total_io - total_counted);
433
434 tst_res(TINFO, "%s", out);
435
436 memset(lat, 0, sizeof(*lat));
437 }
438
print_latency(struct thread_info * t)439 static void print_latency(struct thread_info *t)
440 {
441 struct io_latency *lat = &t->io_submit_latency;
442
443 print_lat("latency", lat);
444 }
445
print_completion_latency(struct thread_info * t)446 static void print_completion_latency(struct thread_info *t)
447 {
448 struct io_latency *lat = &t->io_completion_latency;
449
450 print_lat("completion latency", lat);
451 }
452
453 /*
454 * updates the fields in the io operation struct that belongs to this
455 * io unit, and make the io unit reusable again
456 */
finish_io(struct thread_info * t,struct io_unit * io,long result,struct timeval * tv_now)457 static void finish_io(struct thread_info *t, struct io_unit *io, long result,
458 struct timeval *tv_now)
459 {
460 struct io_oper *oper = io->io_oper;
461
462 calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
463 io->res = result;
464 io->busy = IO_FREE;
465 io->next = t->free_ious;
466 t->free_ious = io;
467 oper->num_pending--;
468 t->num_global_pending--;
469 check_finished_io(io);
470
471 if (oper->num_pending == 0 &&
472 (oper->started_ios == oper->total_ios || oper->stonewalled)) {
473 print_time(oper);
474 }
475 }
476
read_some_events(struct thread_info * t)477 static int read_some_events(struct thread_info *t)
478 {
479 struct io_unit *event_io;
480 struct io_event *event;
481 int nr;
482 int i;
483 int min_nr = io_iter;
484 struct timeval stop_time;
485
486 if (t->num_global_pending < io_iter)
487 min_nr = t->num_global_pending;
488
489 nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events, NULL);
490 if (nr <= 0)
491 return nr;
492
493 gettimeofday(&stop_time, NULL);
494
495 for (i = 0; i < nr; i++) {
496 event = t->events + i;
497 event_io = (struct io_unit *)((unsigned long)event->obj);
498 finish_io(t, event_io, event->res, &stop_time);
499 }
500
501 return nr;
502 }
503
504 /*
505 * finds a free io unit, waiting for pending requests if required. returns
506 * null if none could be found
507 */
find_iou(struct thread_info * t,struct io_oper * oper)508 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
509 {
510 struct io_unit *event_io;
511 int nr;
512
513 retry:
514 if (t->free_ious) {
515 event_io = t->free_ious;
516 t->free_ious = t->free_ious->next;
517
518 if (grab_iou(event_io, oper))
519 tst_brk(TBROK, "io unit on free list but not free");
520
521 return event_io;
522 }
523
524 nr = read_some_events(t);
525 if (nr > 0)
526 goto retry;
527 else
528 tst_res(TINFO, "no free ious after read_some_events");
529
530 return NULL;
531 }
532
533 /*
534 * wait for all pending requests for this io operation to finish
535 */
io_oper_wait(struct thread_info * t,struct io_oper * oper)536 static int io_oper_wait(struct thread_info *t, struct io_oper *oper)
537 {
538 struct io_event event;
539 struct io_unit *event_io;
540
541 if (!oper)
542 return 0;
543
544 if (oper->num_pending == 0)
545 goto done;
546
547 /* this func is not speed sensitive, no need to go wild reading
548 * more than one event at a time
549 */
550 while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
551 struct timeval tv_now;
552
553 event_io = (struct io_unit *)((unsigned long)event.obj);
554
555 gettimeofday(&tv_now, NULL);
556 finish_io(t, event_io, event.res, &tv_now);
557
558 if (oper->num_pending == 0)
559 break;
560 }
561 done:
562 if (oper->num_err)
563 tst_res(TINFO, "%u errors on oper, last %u", oper->num_err, oper->last_err);
564
565 return 0;
566 }
567
random_byte_offset(struct io_oper * oper)568 static off_t random_byte_offset(struct io_oper *oper)
569 {
570 off_t num;
571 off_t rand_byte = oper->start;
572 off_t range;
573 off_t offset = 1;
574
575 range = (oper->end - oper->start) / (1024 * 1024);
576
577 if ((page_size_mask + 1) > (1024 * 1024))
578 offset = (page_size_mask + 1) / (1024 * 1024);
579
580 if (range < offset)
581 range = 0;
582 else
583 range -= offset;
584
585 /* find a random mb offset */
586 num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0));
587 rand_byte += num * 1024 * 1024;
588
589 /* find a random byte offset */
590 num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
591
592 /* page align */
593 num = (num + page_size_mask) & ~page_size_mask;
594 rand_byte += num;
595
596 if (rand_byte + oper->reclen > oper->end)
597 rand_byte -= oper->reclen;
598
599 return rand_byte;
600 }
601
602 /*
603 * build an aio iocb for an operation, based on oper->rw and the
604 * last offset used. This finds the struct io_unit that will be attached
605 * to the iocb, and things are ready for submission to aio after this
606 * is called.
607 *
608 * returns null on error
609 */
build_iocb(struct thread_info * t,struct io_oper * oper)610 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
611 {
612 struct io_unit *io;
613 off_t rand_byte;
614
615 io = find_iou(t, oper);
616 if (!io)
617 tst_brk(TBROK, "unable to find io unit");
618
619 switch (oper->rw) {
620 case WRITE:
621 io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, oper->last_offset);
622 oper->last_offset += oper->reclen;
623 break;
624 case READ:
625 io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, oper->last_offset);
626 oper->last_offset += oper->reclen;
627 break;
628 case RREAD:
629 rand_byte = random_byte_offset(oper);
630 oper->last_offset = rand_byte;
631 io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, rand_byte);
632 break;
633 case RWRITE:
634 rand_byte = random_byte_offset(oper);
635 oper->last_offset = rand_byte;
636 io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, rand_byte);
637
638 break;
639 }
640
641 return io;
642 }
643
644 /*
645 * wait for any pending requests, and then free all ram associated with
646 * an operation. returns the last error the operation hit (zero means none)
647 */
finish_oper(struct thread_info * t,struct io_oper * oper)648 static int finish_oper(struct thread_info *t, struct io_oper *oper)
649 {
650 unsigned long last_err;
651
652 io_oper_wait(t, oper);
653
654 last_err = oper->last_err;
655
656 if (oper->num_pending > 0)
657 tst_res(TINFO, "oper num_pending is %d", oper->num_pending);
658
659 SAFE_CLOSE(oper->fd);
660 free(oper);
661
662 return last_err;
663 }
664
665 /*
666 * allocates an io operation and fills in all the fields. returns
667 * null on error
668 */
create_oper(int fd,int rw,off_t start,off_t end,int reclen,int depth,char * file_name)669 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end,
670 int reclen, int depth, char *file_name)
671 {
672 struct io_oper *oper;
673
674 oper = SAFE_MALLOC(sizeof(*oper));
675 memset(oper, 0, sizeof(*oper));
676
677 oper->depth = depth;
678 oper->start = start;
679 oper->end = end;
680 oper->last_offset = oper->start;
681 oper->fd = fd;
682 oper->reclen = reclen;
683 oper->rw = rw;
684 oper->total_ios = (oper->end - oper->start) / oper->reclen;
685 oper->file_name = file_name;
686
687 return oper;
688 }
689
690 /*
691 * does setup on num_ios worth of iocbs, but does not actually
692 * start any io
693 */
build_oper(struct thread_info * t,struct io_oper * oper,int num_ios,struct iocb ** my_iocbs)694 static int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
695 struct iocb **my_iocbs)
696 {
697 int i;
698 struct io_unit *io;
699
700 if (oper->started_ios == 0)
701 gettimeofday(&oper->start_time, NULL);
702
703 if (num_ios == 0)
704 num_ios = oper->total_ios;
705
706 if ((oper->started_ios + num_ios) > oper->total_ios)
707 num_ios = oper->total_ios - oper->started_ios;
708
709 for (i = 0; i < num_ios; i++) {
710 io = build_iocb(t, oper);
711 if (!io)
712 return -1;
713
714 my_iocbs[i] = &io->iocb;
715 }
716
717 return num_ios;
718 }
719
720 /*
721 * runs through the iocbs in the array provided and updates
722 * counters in the associated oper struct
723 */
update_iou_counters(struct iocb ** my_iocbs,int nr,struct timeval * tv_now)724 static void update_iou_counters(struct iocb **my_iocbs, int nr, struct timeval *tv_now)
725 {
726 struct io_unit *io;
727 int i;
728
729 for (i = 0; i < nr; i++) {
730 io = (struct io_unit *)(my_iocbs[i]);
731 io->io_oper->num_pending++;
732 io->io_oper->started_ios++;
733 io->io_start_time = *tv_now; /* set time of io_submit */
734 }
735 }
736
737 /* starts some io for a given file, returns zero if all went well */
run_built(struct thread_info * t,int num_ios,struct iocb ** my_iocbs)738 static int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
739 {
740 int ret;
741 struct timeval start_time;
742 struct timeval stop_time;
743
744 resubmit:
745 gettimeofday(&start_time, NULL);
746 ret = io_submit(t->io_ctx, num_ios, my_iocbs);
747
748 gettimeofday(&stop_time, NULL);
749 calc_latency(&start_time, &stop_time, &t->io_submit_latency);
750
751 if (ret != num_ios) {
752 /* some I/O got through */
753 if (ret > 0) {
754 update_iou_counters(my_iocbs, ret, &stop_time);
755 my_iocbs += ret;
756 t->num_global_pending += ret;
757 num_ios -= ret;
758 }
759 /*
760 * we've used all the requests allocated in aio_init, wait and
761 * retry
762 */
763 if (ret > 0 || ret == -EAGAIN) {
764 int old_ret = ret;
765
766 ret = read_some_events(t);
767 if (ret <= 0)
768 tst_brk(TBROK, "ret was %d and now is %d", ret, old_ret);
769
770 goto resubmit;
771 }
772
773 tst_res(TINFO, "ret %d (%s) on io_submit", ret, tst_strerrno(-ret));
774 return -1;
775 }
776
777 update_iou_counters(my_iocbs, ret, &stop_time);
778 t->num_global_pending += ret;
779
780 return 0;
781 }
782
783 /*
784 * changes oper->rw to the next in a command sequence, or returns zero
785 * to say this operation is really, completely done for
786 */
restart_oper(struct io_oper * oper)787 static int restart_oper(struct io_oper *oper)
788 {
789 int new_rw = 0;
790
791 if (oper->last_err)
792 return 0;
793
794 if (oper->rw == WRITE && (stages & (1 << READ)))
795 new_rw = READ;
796
797 if (oper->rw == READ && (!new_rw && stages & (1 << RWRITE)))
798 new_rw = RWRITE;
799
800 if (oper->rw == RWRITE && (!new_rw && stages & (1 << RREAD)))
801 new_rw = RREAD;
802
803 if (new_rw) {
804 oper->started_ios = 0;
805 oper->last_offset = oper->start;
806 oper->stonewalled = 0;
807
808 /*
809 * we're restarting an operation with pending requests, so the
810 * timing info won't be printed by finish_io. Printing it here
811 */
812 if (oper->num_pending)
813 print_time(oper);
814
815 oper->rw = new_rw;
816 return 1;
817 }
818
819 return 0;
820 }
821
oper_runnable(struct io_oper * oper)822 static int oper_runnable(struct io_oper *oper)
823 {
824 struct stat buf;
825
826 /* first context is always runnable, if started_ios > 0, no need to
827 * redo the calculations
828 */
829 if (oper->started_ios || oper->start == 0)
830 return 1;
831
832 /* only the sequential phases force delays in starting */
833 if (oper->rw >= RWRITE)
834 return 1;
835
836 SAFE_FSTAT(oper->fd, &buf);
837 if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
838 return 0;
839
840 return 1;
841 }
842
843 /*
844 * runs through all the io operations on the active list, and starts
845 * a chunk of io on each. If any io operations are completely finished,
846 * it either switches them to the next stage or puts them on the
847 * finished list.
848 *
849 * this function stops after max_io_submit iocbs are sent down the
850 * pipe, even if it has not yet touched all the operations on the
851 * active list. Any operations that have finished are moved onto
852 * the finished_opers list.
853 */
run_active_list(struct thread_info * t,int io_iter,int max_io_submit)854 static int run_active_list(struct thread_info *t, int io_iter, int max_io_submit)
855 {
856 struct io_oper *oper;
857 struct io_oper *built_opers = NULL;
858 struct iocb **my_iocbs = t->iocbs;
859 int ret = 0;
860 int num_built = 0;
861
862 oper = t->active_opers;
863
864 while (oper) {
865 if (!oper_runnable(oper)) {
866 oper = oper->next;
867 if (oper == t->active_opers)
868 break;
869 continue;
870 }
871
872 ret = build_oper(t, oper, io_iter, my_iocbs);
873 if (ret >= 0) {
874 my_iocbs += ret;
875 num_built += ret;
876 oper_list_del(oper, &t->active_opers);
877 oper_list_add(oper, &built_opers);
878 oper = t->active_opers;
879 if (num_built + io_iter > max_io_submit)
880 break;
881 } else
882 break;
883 }
884
885 if (num_built) {
886 ret = run_built(t, num_built, t->iocbs);
887 if (ret < 0)
888 tst_brk(TBROK, "error %d on run_built", ret);
889
890 while (built_opers) {
891 oper = built_opers;
892 oper_list_del(oper, &built_opers);
893 oper_list_add(oper, &t->active_opers);
894 if (oper->started_ios == oper->total_ios) {
895 oper_list_del(oper, &t->active_opers);
896 oper_list_add(oper, &t->finished_opers);
897 }
898 }
899 }
900
901 return 0;
902 }
903
aio_setup(io_context_t * io_ctx,int n)904 static void aio_setup(io_context_t *io_ctx, int n)
905 {
906 int res = io_queue_init(n, io_ctx);
907
908 if (res != 0)
909 tst_brk(TBROK, "io_queue_setup(%d) returned %d (%s)", n, res, tst_strerrno(-res));
910 }
911
912 /*
913 * allocate io operation and event arrays for a given thread
914 */
setup_ious(struct thread_info * t,int num_files,int depth,int reclen,int max_io_submit)915 static void setup_ious(struct thread_info *t, int num_files, int depth, int reclen, int max_io_submit)
916 {
917 int i;
918 size_t bytes = num_files * depth * sizeof(*t->ios);
919
920 t->ios = SAFE_MALLOC(bytes);
921
922 memset(t->ios, 0, bytes);
923
924 for (i = 0; i < depth * num_files; i++) {
925 t->ios[i].buf = aligned_buffer;
926 aligned_buffer += padded_reclen;
927 t->ios[i].buf_size = reclen;
928 if (verify)
929 memset(t->ios[i].buf, 'b', reclen);
930 else
931 memset(t->ios[i].buf, 0, reclen);
932 t->ios[i].next = t->free_ious;
933 t->free_ious = t->ios + i;
934 }
935
936 if (verify) {
937 verify_buf = aligned_buffer;
938 memset(verify_buf, 'b', reclen);
939 }
940
941 t->iocbs = SAFE_MALLOC(sizeof(struct iocb *) * max_io_submit);
942 memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
943
944 t->events = SAFE_MALLOC(sizeof(struct io_event) * depth * num_files);
945 memset(t->events, 0, num_files * sizeof(struct io_event) * depth);
946
947 t->num_global_ios = num_files * depth;
948 t->num_global_events = t->num_global_ios;
949 }
950
951 /*
952 * The buffers used for file data are allocated as a single big
953 * malloc, and then each thread and operation takes a piece and uses
954 * that for file data. This lets us do a large shm or bigpages alloc
955 * and without trying to find a special place in each thread to map the
956 * buffers to
957 */
setup_shared_mem(int num_threads,int num_files,int depth,int reclen)958 static int setup_shared_mem(int num_threads, int num_files, int depth, int reclen)
959 {
960 char *p = NULL;
961 size_t total_ram;
962
963 padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1);
964 padded_reclen = padded_reclen * (page_size_mask + 1);
965 total_ram = num_files * depth * padded_reclen + num_threads;
966
967 if (verify)
968 total_ram += padded_reclen;
969
970 /* for aligning buffer after the allocation */
971 total_ram += page_size_mask;
972
973 if (use_shm == USE_MALLOC) {
974 p = SAFE_MALLOC(total_ram);
975 } else if (use_shm == USE_SHM) {
976 SAFE_SHMGET(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
977 p = SAFE_SHMAT(shm_id, (char *)0x50000000, 0);
978 } else if (use_shm == USE_SHMFS) {
979 char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
980 int fd;
981
982 strcpy(mmap_name, "/dev/shm/XXXXXX");
983 fd = mkstemp(mmap_name);
984 if (fd < 0)
985 tst_brk(TBROK, "mkstemp error");
986
987 SAFE_UNLINK(mmap_name);
988 SAFE_FTRUNCATE(fd, total_ram);
989
990 shm_id = fd;
991
992 p = SAFE_MMAP((char *)0x50000000, total_ram,
993 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
994 }
995
996 unaligned_buffer = p;
997 p = (char *)((intptr_t)(p + page_size_mask) & ~page_size_mask);
998 aligned_buffer = p;
999
1000 return 0;
1001 }
1002
1003 /*
1004 * runs through all the thread_info structs and calculates a combined
1005 * throughput
1006 */
global_thread_throughput(struct thread_info * t,char * this_stage)1007 static void global_thread_throughput(struct thread_info *t, char *this_stage)
1008 {
1009 int i;
1010 double runtime = time_since_now(&global_stage_start_time);
1011 double total_mb = 0;
1012 double min_trans = 0;
1013
1014 for (i = 0; i < num_threads; i++) {
1015 total_mb += global_thread_info[i].stage_mb_trans;
1016
1017 if (!min_trans || t->stage_mb_trans < min_trans)
1018 min_trans = t->stage_mb_trans;
1019 }
1020
1021 if (total_mb) {
1022 tst_res(TINFO, "%s throughput (%.2f MB/s)", this_stage, total_mb / runtime);
1023 tst_res(TINFO, "%.2f MB in %.2fs", total_mb, runtime);
1024 }
1025 }
1026
1027 /* this is the meat of the state machine. There is a list of
1028 * active operations structs, and as each one finishes the required
1029 * io it is moved to a list of finished operations. Once they have
1030 * all finished whatever stage they were in, they are given the chance
1031 * to restart and pick a different stage (read/write/random read etc)
1032 *
1033 * various timings are printed in between the stages, along with
1034 * thread synchronization if there are more than one threads.
1035 */
worker(struct thread_info * t)1036 static int *worker(struct thread_info *t)
1037 {
1038 struct io_oper *oper;
1039 char *this_stage = NULL;
1040 struct timeval stage_time;
1041 int status = 0;
1042 int cnt;
1043
1044 aio_setup(&t->io_ctx, 512);
1045
1046 restart:
1047 if (num_threads > 1) {
1048 if (pthread_barrier_wait(&worker_barrier))
1049 gettimeofday(&global_stage_start_time, NULL);
1050 }
1051
1052 if (t->active_opers) {
1053 this_stage = stage_name(t->active_opers->rw);
1054 gettimeofday(&stage_time, NULL);
1055 t->stage_mb_trans = 0;
1056 }
1057
1058 cnt = 0;
1059
1060 /* first we send everything through aio */
1061 while (t->active_opers && cnt < iterations) {
1062 run_active_list(t, io_iter, max_io_submit);
1063 cnt++;
1064 }
1065
1066 if (latency_stats)
1067 print_latency(t);
1068
1069 if (completion_latency_stats)
1070 print_completion_latency(t);
1071
1072 /* then we wait for all the operations to finish */
1073 oper = t->finished_opers;
1074 do {
1075 if (!oper)
1076 break;
1077 io_oper_wait(t, oper);
1078 oper = oper->next;
1079 } while (oper != t->finished_opers);
1080
1081 /* then we do an fsync to get the timing for any future operations
1082 * right, and check to see if any of these need to get restarted
1083 */
1084 oper = t->finished_opers;
1085 while (oper) {
1086 if (!no_fsync_stages)
1087 SAFE_FSYNC(oper->fd);
1088
1089 t->stage_mb_trans += oper_mb_trans(oper);
1090
1091 if (restart_oper(oper)) {
1092 oper_list_del(oper, &t->finished_opers);
1093 oper_list_add(oper, &t->active_opers);
1094 oper = t->finished_opers;
1095 continue;
1096 }
1097
1098 oper = oper->next;
1099
1100 if (oper == t->finished_opers)
1101 break;
1102 }
1103
1104 if (t->stage_mb_trans && t->num_files > 0) {
1105 double seconds = time_since_now(&stage_time);
1106
1107 tst_res(TINFO, "thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs",
1108 t - global_thread_info, this_stage,
1109 t->stage_mb_trans / seconds, t->stage_mb_trans, seconds);
1110 }
1111
1112 if (num_threads > 1) {
1113 if (pthread_barrier_wait(&worker_barrier))
1114 global_thread_throughput(t, this_stage);
1115 }
1116
1117 /* someone got restarted, go back to the beginning */
1118 if (t->active_opers && cnt < iterations)
1119 goto restart;
1120
1121 /* finally, free all the ram */
1122 while (t->finished_opers) {
1123 oper = t->finished_opers;
1124 oper_list_del(oper, &t->finished_opers);
1125 status = finish_oper(t, oper);
1126 }
1127
1128 if (t->num_global_pending)
1129 tst_res(TINFO, "global num pending is %d", t->num_global_pending);
1130
1131 io_queue_release(t->io_ctx);
1132
1133 return (void *)(intptr_t)status;
1134 }
1135
1136 typedef void *(*start_routine)(void *);
run_workers(struct thread_info * t,int num_threads)1137 static int run_workers(struct thread_info *t, int num_threads)
1138 {
1139 void *retval;
1140 int ret = 0;
1141 int i;
1142
1143 pthread_barrier_init(&worker_barrier, NULL, num_threads);
1144
1145 for (i = 0; i < num_threads; i++)
1146 SAFE_PTHREAD_CREATE(&t[i].tid, NULL, (start_routine)worker, t + i);
1147
1148 for (i = 0; i < num_threads; i++) {
1149 SAFE_PTHREAD_JOIN(t[i].tid, &retval);
1150 ret |= (intptr_t)retval;
1151 }
1152
1153 pthread_barrier_destroy(&worker_barrier);
1154
1155 return ret;
1156 }
1157
setup(void)1158 static void setup(void)
1159 {
1160 int maxaio;
1161 int stages_i;
1162
1163 page_size_mask = getpagesize() - 1;
1164
1165 SAFE_FILE_SCANF("/proc/sys/fs/aio-max-nr", "%d", &maxaio);
1166 tst_res(TINFO, "Maximum AIO blocks: %d", maxaio);
1167
1168 if (tst_parse_int(str_num_files, &num_files, 1, INT_MAX))
1169 tst_brk(TBROK, "Invalid number of files to generate '%s'", str_num_files);
1170
1171 if (tst_parse_int(str_max_io_submit, &max_io_submit, 0, INT_MAX))
1172 tst_brk(TBROK, "Invalid number of iocbs '%s'", str_max_io_submit);
1173
1174 if (max_io_submit > maxaio)
1175 tst_res(TCONF, "Number of async IO blocks passed the maximum (%d)", maxaio);
1176
1177 if (tst_parse_int(str_num_contexts, &num_contexts, 1, INT_MAX))
1178 tst_brk(TBROK, "Invalid number of contexts per file '%s'", str_num_contexts);
1179
1180 if (tst_parse_filesize(str_context_offset, &context_offset, 1, LLONG_MAX))
1181 tst_brk(TBROK, "Invalid offset between contexts '%s'", str_context_offset);
1182
1183 if (tst_parse_filesize(str_file_size, &file_size, 1, LLONG_MAX))
1184 tst_brk(TBROK, "Invalid file size '%s'", str_file_size);
1185
1186 if (tst_parse_filesize(str_rec_len, &rec_len, 1, LONG_MAX))
1187 tst_brk(TBROK, "Invalid record size '%s'", str_rec_len);
1188
1189 if (tst_parse_int(str_depth, &depth, 1, INT_MAX))
1190 tst_brk(TBROK, "Invalid number of pending aio requests '%s'", str_depth);
1191
1192 if (tst_parse_int(str_io_iter, &io_iter, 1, INT_MAX))
1193 tst_brk(TBROK, "Invalid number of I/O per file '%s'", str_io_iter);
1194
1195 if (tst_parse_int(str_iterations, &iterations, 1, INT_MAX))
1196 tst_brk(TBROK, "Invalid number of total ayncs I/O '%s'", str_iterations);
1197
1198 if (tst_parse_int(str_stages, &stages_i, 0, INT_MAX))
1199 tst_brk(TBROK, "Invalid stage number '%s'", str_stages);
1200
1201 if (stages_i) {
1202 stages |= 1 << stages_i;
1203 tst_res(TINFO, "Adding stage %s", stage_name(stages_i));
1204 }
1205
1206 if (tst_parse_int(str_num_threads, &num_threads, 1, INT_MAX))
1207 tst_brk(TBROK, "Invalid number of threads '%s'", str_num_threads);
1208
1209 if (str_o_flag) {
1210 if (tst_fs_type(".") == TST_TMPFS_MAGIC)
1211 tst_brk(TCONF, "O_DIRECT not supported on tmpfs");
1212 o_flag = O_DIRECT;
1213 } else {
1214 o_flag = O_SYNC;
1215 }
1216
1217 if (str_use_shm) {
1218 if (!strcmp(str_use_shm, "shm")) {
1219 tst_res(TINFO, "using ipc shm");
1220 use_shm = USE_SHM;
1221 } else if (!strcmp(str_use_shm, "shmfs")) {
1222 tst_res(TINFO, "using /dev/shm for buffers");
1223 use_shm = USE_SHMFS;
1224 } else {
1225 tst_brk(TBROK, "Invalid shm option '%s'", str_use_shm);
1226 }
1227 }
1228 }
1229
run(void)1230 static void run(void)
1231 {
1232 char files[num_files][265];
1233 int first_stage = WRITE;
1234 struct io_oper *oper;
1235 int status = 0;
1236 int open_fds = 0;
1237 struct thread_info *t;
1238 int rwfd;
1239 int i;
1240 int j;
1241
1242 /*
1243 * make sure we don't try to submit more I/O than we have allocated
1244 * memory for
1245 */
1246 if (depth < io_iter) {
1247 io_iter = depth;
1248 tst_res(TINFO, "dropping io_iter to %d", io_iter);
1249 }
1250
1251 if (num_threads > (num_files * num_contexts)) {
1252 num_threads = num_files * num_contexts;
1253 tst_res(TINFO, "Dropping thread count to the number of contexts %d", num_threads);
1254 }
1255
1256 t = SAFE_MALLOC(num_threads * sizeof(*t));
1257 memset(t, 0, num_threads * sizeof(*t));
1258 global_thread_info = t;
1259
1260 /* by default, allow a huge number of iocbs to be sent towards
1261 * io_submit
1262 */
1263 if (!max_io_submit)
1264 max_io_submit = num_files * io_iter * num_contexts;
1265
1266 /*
1267 * make sure we don't try to submit more I/O than max_io_submit allows
1268 */
1269 if (max_io_submit < io_iter) {
1270 io_iter = max_io_submit;
1271 tst_res(TINFO, "dropping io_iter to %d", io_iter);
1272 }
1273
1274 if (!stages) {
1275 stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1276 } else {
1277 for (i = 0; i < LAST_STAGE; i++) {
1278 if (stages & (1 << i)) {
1279 first_stage = i;
1280 tst_res(TINFO, "starting with %s", stage_name(i));
1281 break;
1282 }
1283 }
1284 }
1285
1286 if (file_size < num_contexts * context_offset) {
1287 tst_brk(TBROK, "file size %ld too small for %d contexts",
1288 (long)file_size, num_contexts);
1289 }
1290
1291 tst_res(TINFO, "file size %ldMB, record size %lldKB, depth %d, I/O per iteration %d",
1292 (long)(file_size / (1024 * 1024)), rec_len / 1024, depth, io_iter);
1293 tst_res(TINFO, "max io_submit %d, buffer alignment set to %luKB",
1294 max_io_submit, (page_size_mask + 1) / 1024);
1295 tst_res(TINFO, "threads %d files %d contexts %d context offset %ldMB verification %s",
1296 num_threads, num_files, num_contexts,
1297 (long)(context_offset / (1024 * 1024)), verify ? "on" : "off");
1298
1299 /* open all the files and do any required setup for them */
1300 for (i = 0; i < num_files; i++) {
1301 int thread_index;
1302
1303 snprintf(files[i], sizeof(files[i]), "file%d.bin", i);
1304
1305 for (j = 0; j < num_contexts; j++) {
1306 thread_index = open_fds % num_threads;
1307 open_fds++;
1308
1309 rwfd = SAFE_OPEN(files[i], O_CREAT | O_RDWR | o_flag, 0600);
1310
1311 oper = create_oper(rwfd, first_stage, j * context_offset,
1312 file_size - j * context_offset,
1313 rec_len, depth, files[i]);
1314 if (!oper)
1315 tst_brk(TBROK, "error in create_oper");
1316
1317 oper_list_add(oper, &t[thread_index].active_opers);
1318 t[thread_index].num_files++;
1319 }
1320 }
1321
1322 if (setup_shared_mem(num_threads, num_files * num_contexts, depth, rec_len))
1323 tst_brk(TBROK, "error in setup_shared_mem");
1324
1325 for (i = 0; i < num_threads; i++)
1326 setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit);
1327
1328 if (num_threads > 1) {
1329 tst_res(TINFO, "Running multi thread version num_threads: %d", num_threads);
1330 status = run_workers(t, num_threads);
1331 } else {
1332 tst_res(TINFO, "Running single thread version");
1333 status = (intptr_t)worker(t);
1334 }
1335
1336 for (i = 0; i < num_files; i++)
1337 SAFE_UNLINK(files[i]);
1338
1339 if (status)
1340 tst_res(TFAIL, "Test did not pass");
1341 else
1342 tst_res(TPASS, "Test passed");
1343 }
1344
1345 static struct tst_test test = {
1346 .test_all = run,
1347 .setup = setup,
1348 .needs_tmpdir = 1,
1349 .needs_root = 1,
1350 .max_runtime = 1800,
1351 .options = (struct tst_option[]){
1352 { "a:", &str_iterations, "Total number of ayncs I/O the program will run (default 500)" },
1353 { "b:", &str_max_io_submit, "Max number of iocbs to give io_submit at once" },
1354 { "c:", &str_num_contexts, "Number of io contexts per file" },
1355 { "d:", &str_depth, "Number of pending aio requests for each file (default 64)" },
1356 { "e:", &str_io_iter, "Number of I/O per file sent before switching to the next file (default 8)" },
1357 { "f:", &str_num_files, "Number of files to generate" },
1358 { "g:", &str_context_offset, "Offset between contexts (default 2M)" },
1359 { "l", &latency_stats, "Print io_submit latencies after each stage" },
1360 { "L", &completion_latency_stats, "Print io completion latencies after each stage" },
1361 { "m", &str_use_shm, "SHM use ipc shared memory for io buffers instead of malloc" },
1362 { "n", &no_fsync_stages, "No fsyncs between write stage and read stage" },
1363 { "o:", &str_stages, "Add an operation to the list: write=0, read=1, random write=2, random read=3" },
1364 { "O", &str_o_flag, "Use O_DIRECT" },
1365 { "r:", &str_rec_len, "Record size in KB used for each io (default 64K)" },
1366 { "s:", &str_file_size, "Size in MB of the test file(s) (default 1024M)" },
1367 { "t:", &str_num_threads, "Number of threads to run" },
1368 { "u", &unlink_files, "Unlink files after completion" },
1369 { "v", &verify, "Verification of bytes written" },
1370 {},
1371 },
1372 };
1373 #else
1374 TST_TEST_TCONF("test requires libaio and its development packages");
1375 #endif
1376