xref: /aosp_15_r20/external/ltp/testcases/kernel/io/ltp-aiodio/aio-stress.c (revision 49cdfc7efb34551c7342be41a7384b9c40d7cab7)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright (c) 2004 SuSE, Inc.  All Rights Reserved.
4  *               Written by: Chris Mason <[email protected]>
5  * Copyright (C) 2022 SUSE LLC Andrea Cervesato <[email protected]>
6  */
7 
8 /*\
9  * [Description]
10  *
11  * Test creates a series of files and start AIO operations on them.
12  * AIO is done in a rotating loop: first file1.bin gets 8 requests, then
13  * file2.bin, then file3.bin etc. As each file finishes writing, test switches
14  * to reads. IO buffers are aligned in case we want to do direct IO.
15  */
16 
17 #define _FILE_OFFSET_BITS 64
18 
19 #define _GNU_SOURCE
20 #include "tst_test.h"
21 
22 #ifdef HAVE_LIBAIO
23 #include <stdio.h>
24 #include <errno.h>
25 #include <assert.h>
26 #include <stdlib.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31 #include <sys/time.h>
32 #include <sys/ipc.h>
33 #include <sys/shm.h>
34 #include <sys/mman.h>
35 #include <string.h>
36 #include <pthread.h>
37 #include <libaio.h>
38 #include "tst_safe_pthread.h"
39 #include "tst_safe_sysv_ipc.h"
40 
41 #define IO_FREE 0
42 #define IO_PENDING 1
43 
44 enum {
45 	WRITE,
46 	READ,
47 	RWRITE,
48 	RREAD,
49 	LAST_STAGE,
50 };
51 
52 #define USE_MALLOC 0
53 #define USE_SHM 1
54 #define USE_SHMFS 2
55 
56 static char *str_num_files;
57 static char *str_max_io_submit;
58 static char *str_num_contexts;
59 static char *str_context_offset;
60 static char *str_file_size;
61 static char *str_rec_len;
62 static char *str_depth;
63 static char *str_io_iter;
64 static char *str_iterations;
65 static char *str_o_flag;
66 static char *str_stages;
67 static char *str_use_shm;
68 static char *str_num_threads;
69 
70 static int num_files = 1;
71 static long long file_size = 1024 * 1024 * 1024;
72 static long stages;
73 static unsigned long page_size_mask;
74 static int o_flag;
75 static char *latency_stats;
76 static char *completion_latency_stats;
77 static int io_iter = 8;
78 static int iterations = 500;
79 static int max_io_submit;
80 static long long rec_len = 64 * 1024;
81 static int depth = 64;
82 static int num_threads = 1;
83 static int num_contexts = 1;
84 static long long context_offset = 2 * 1024 * 1024;
85 static char *no_fsync_stages;
86 static int use_shm;
87 static int shm_id;
88 static char *unaligned_buffer;
89 static char *aligned_buffer;
90 static int padded_reclen;
91 static char *verify;
92 static char *verify_buf;
93 static char *unlink_files;
94 
95 /*
96  * latencies during io_submit are measured, these are the
97  * granularities for deviations
98  */
99 #define DEVIATIONS 6
100 static int deviations[DEVIATIONS] = { 100, 250, 500, 1000, 5000, 10000 };
101 
102 struct io_latency {
103 	double max;
104 	double min;
105 	double total_io;
106 	double total_lat;
107 	double deviations[DEVIATIONS];
108 };
109 
110 /* container for a series of operations to a file */
111 struct io_oper {
112 	/* already open file descriptor, valid for whatever operation you want
113 	 */
114 	int fd;
115 
116 	/* starting byte of the operation */
117 	off_t start;
118 
119 	/* ending byte of the operation */
120 	off_t end;
121 
122 	/* size of the read/write buffer */
123 	int reclen;
124 
125 	/* max number of pending requests before a wait is triggered */
126 	int depth;
127 
128 	/* current number of pending requests */
129 	int num_pending;
130 
131 	/* last error, zero if there were none */
132 	int last_err;
133 
134 	/* total number of errors hit. */
135 	int num_err;
136 
137 	/* read,write, random, etc */
138 	int rw;
139 
140 	/* number of I/O that will get sent to aio */
141 	int total_ios;
142 
143 	/* number of I/O we've already sent */
144 	int started_ios;
145 
146 	/* last offset used in an io operation */
147 	off_t last_offset;
148 
149 	/* stonewalled = 1 when we got cut off before submitting all our I/O */
150 	int stonewalled;
151 
152 	/* list management */
153 	struct io_oper *next;
154 	struct io_oper *prev;
155 
156 	struct timeval start_time;
157 
158 	char *file_name;
159 };
160 
161 /* a single io, and all the tracking needed for it */
162 struct io_unit {
163 	/* note, iocb must go first! */
164 	struct iocb iocb;
165 
166 	/* pointer to parent io operation struct */
167 	struct io_oper *io_oper;
168 
169 	/* aligned buffer */
170 	char *buf;
171 
172 	/* size of the aligned buffer (record size) */
173 	int buf_size;
174 
175 	/* state of this io unit (free, pending, done) */
176 	int busy;
177 
178 	/* result of last operation */
179 	long res;
180 
181 	struct io_unit *next;
182 
183 	struct timeval io_start_time; /* time of io_submit */
184 };
185 
186 struct thread_info {
187 	io_context_t io_ctx;
188 	pthread_t tid;
189 
190 	/* allocated array of io_unit structs */
191 	struct io_unit *ios;
192 
193 	/* list of io units available for io */
194 	struct io_unit *free_ious;
195 
196 	/* number of io units in the I/O array */
197 	int num_global_ios;
198 
199 	/* number of io units in flight */
200 	int num_global_pending;
201 
202 	/* preallocated array of iocb pointers, only used in run_active */
203 	struct iocb **iocbs;
204 
205 	/* preallocated array of events */
206 	struct io_event *events;
207 
208 	/* size of the events array */
209 	int num_global_events;
210 
211 	/* latency stats for io_submit */
212 	struct io_latency io_submit_latency;
213 
214 	/* list of operations still in progress, and of those finished */
215 	struct io_oper *active_opers;
216 	struct io_oper *finished_opers;
217 
218 	/* number of files this thread is doing io on */
219 	int num_files;
220 
221 	/* how much io this thread did in the last stage */
222 	double stage_mb_trans;
223 
224 	/* latency completion stats i/o time from io_submit until io_getevents */
225 	struct io_latency io_completion_latency;
226 };
227 
228 /* pthread mutexes and other globals for keeping the threads in sync */
229 static pthread_barrier_t worker_barrier;
230 static struct timeval global_stage_start_time;
231 static struct thread_info *global_thread_info;
232 
233 /*
234  * return seconds between start_tv and stop_tv in double precision
235  */
time_since(struct timeval * start_tv,struct timeval * stop_tv)236 static double time_since(struct timeval *start_tv, struct timeval *stop_tv)
237 {
238 	double sec, usec;
239 	double ret;
240 
241 	sec = stop_tv->tv_sec - start_tv->tv_sec;
242 	usec = stop_tv->tv_usec - start_tv->tv_usec;
243 	if (sec > 0 && usec < 0) {
244 		sec--;
245 		usec += 1000000;
246 	}
247 
248 	ret = sec + usec / (double)1000000;
249 	if (ret < 0)
250 		ret = 0;
251 
252 	return ret;
253 }
254 
255 /*
256  * return seconds between start_tv and now in double precision
257  */
time_since_now(struct timeval * start_tv)258 static double time_since_now(struct timeval *start_tv)
259 {
260 	struct timeval stop_time;
261 
262 	gettimeofday(&stop_time, NULL);
263 
264 	return time_since(start_tv, &stop_time);
265 }
266 
267 /*
268  * Add latency info to latency struct
269  */
calc_latency(struct timeval * start_tv,struct timeval * stop_tv,struct io_latency * lat)270 static void calc_latency(struct timeval *start_tv, struct timeval *stop_tv,
271 			 struct io_latency *lat)
272 {
273 	double delta;
274 	int i;
275 
276 	delta = time_since(start_tv, stop_tv);
277 	delta = delta * 1000;
278 
279 	if (delta > lat->max)
280 		lat->max = delta;
281 
282 	if (!lat->min || delta < lat->min)
283 		lat->min = delta;
284 
285 	lat->total_io++;
286 	lat->total_lat += delta;
287 
288 	for (i = 0; i < DEVIATIONS; i++) {
289 		if (delta < deviations[i]) {
290 			lat->deviations[i]++;
291 			break;
292 		}
293 	}
294 }
295 
oper_list_add(struct io_oper * oper,struct io_oper ** list)296 static void oper_list_add(struct io_oper *oper, struct io_oper **list)
297 {
298 	if (!*list) {
299 		*list = oper;
300 		oper->prev = oper->next = oper;
301 		return;
302 	}
303 
304 	oper->prev = (*list)->prev;
305 	oper->next = *list;
306 
307 	(*list)->prev->next = oper;
308 	(*list)->prev = oper;
309 }
310 
oper_list_del(struct io_oper * oper,struct io_oper ** list)311 static void oper_list_del(struct io_oper *oper, struct io_oper **list)
312 {
313 	if ((*list)->next == (*list)->prev && *list == (*list)->next) {
314 		*list = NULL;
315 		return;
316 	}
317 
318 	oper->prev->next = oper->next;
319 	oper->next->prev = oper->prev;
320 
321 	if (*list == oper)
322 		*list = oper->next;
323 }
324 
325 /* worker func to check error fields in the io unit */
check_finished_io(struct io_unit * io)326 static int check_finished_io(struct io_unit *io)
327 {
328 	int i;
329 
330 	if (io->res != io->buf_size) {
331 		struct stat s;
332 
333 		SAFE_FSTAT(io->io_oper->fd, &s);
334 
335 		/*
336 		 * If file size is large enough for the read, then this short
337 		 * read is an error.
338 		 */
339 		if ((io->io_oper->rw == READ || io->io_oper->rw == RREAD) &&
340 		    s.st_size > (io->iocb.u.c.offset + io->res)) {
341 
342 			tst_res(TINFO, "io err %lu (%s) op %d, off %llu size %d",
343 				io->res, tst_strerrno(-io->res), io->iocb.aio_lio_opcode,
344 				io->iocb.u.c.offset, io->buf_size);
345 			io->io_oper->last_err = io->res;
346 			io->io_oper->num_err++;
347 			return -1;
348 		}
349 	}
350 
351 	if (verify && io->io_oper->rw == READ) {
352 		if (memcmp(io->buf, verify_buf, io->io_oper->reclen)) {
353 			tst_res(TINFO, "verify error, file %s offset %llu contents (offset:bad:good):",
354 				io->io_oper->file_name, io->iocb.u.c.offset);
355 
356 			for (i = 0; i < io->io_oper->reclen; i++) {
357 				if (io->buf[i] != verify_buf[i]) {
358 					tst_res(TINFO, "%d:%c:%c ", i,
359 						io->buf[i], verify_buf[i]);
360 				}
361 			}
362 		}
363 	}
364 
365 	return 0;
366 }
367 
368 /* worker func to check the busy bits and get an io unit ready for use */
grab_iou(struct io_unit * io,struct io_oper * oper)369 static int grab_iou(struct io_unit *io, struct io_oper *oper)
370 {
371 	if (io->busy == IO_PENDING)
372 		return -1;
373 
374 	io->busy = IO_PENDING;
375 	io->res = 0;
376 	io->io_oper = oper;
377 
378 	return 0;
379 }
380 
stage_name(int rw)381 static char *stage_name(int rw)
382 {
383 	switch (rw) {
384 	case WRITE:
385 		return "write";
386 	case READ:
387 		return "read";
388 	case RWRITE:
389 		return "random write";
390 	case RREAD:
391 		return "random read";
392 	}
393 
394 	return "unknown";
395 }
396 
oper_mb_trans(struct io_oper * oper)397 static inline double oper_mb_trans(struct io_oper *oper)
398 {
399 	return ((double)oper->started_ios * (double)oper->reclen) / (double)(1024 * 1024);
400 }
401 
print_time(struct io_oper * oper)402 static void print_time(struct io_oper *oper)
403 {
404 	double runtime;
405 	double tput;
406 	double mb;
407 
408 	runtime = time_since_now(&oper->start_time);
409 	mb = oper_mb_trans(oper);
410 	tput = mb / runtime;
411 
412 	tst_res(TINFO, "%s on %s (%.2f MB/s) %.2f MB in %.2fs",
413 		stage_name(oper->rw), oper->file_name, tput, mb, runtime);
414 }
415 
print_lat(char * str,struct io_latency * lat)416 static void print_lat(char *str, struct io_latency *lat)
417 {
418 	char out[4 * 1024];
419 	char *ptr = out;
420 	double avg = lat->total_lat / lat->total_io;
421 	int i;
422 	double total_counted = 0;
423 
424 	tst_res(TINFO, "%s min %.2f avg %.2f max %.2f", str, lat->min, avg, lat->max);
425 
426 	for (i = 0; i < DEVIATIONS; i++) {
427 		ptr += sprintf(ptr, "%.0f < %d", lat->deviations[i], deviations[i]);
428 		total_counted += lat->deviations[i];
429 	}
430 
431 	if (total_counted && lat->total_io - total_counted)
432 		ptr += sprintf(ptr, " < %.0f", lat->total_io - total_counted);
433 
434 	tst_res(TINFO, "%s", out);
435 
436 	memset(lat, 0, sizeof(*lat));
437 }
438 
print_latency(struct thread_info * t)439 static void print_latency(struct thread_info *t)
440 {
441 	struct io_latency *lat = &t->io_submit_latency;
442 
443 	print_lat("latency", lat);
444 }
445 
print_completion_latency(struct thread_info * t)446 static void print_completion_latency(struct thread_info *t)
447 {
448 	struct io_latency *lat = &t->io_completion_latency;
449 
450 	print_lat("completion latency", lat);
451 }
452 
453 /*
454  * updates the fields in the io operation struct that belongs to this
455  * io unit, and make the io unit reusable again
456  */
finish_io(struct thread_info * t,struct io_unit * io,long result,struct timeval * tv_now)457 static void finish_io(struct thread_info *t, struct io_unit *io, long result,
458 		      struct timeval *tv_now)
459 {
460 	struct io_oper *oper = io->io_oper;
461 
462 	calc_latency(&io->io_start_time, tv_now, &t->io_completion_latency);
463 	io->res = result;
464 	io->busy = IO_FREE;
465 	io->next = t->free_ious;
466 	t->free_ious = io;
467 	oper->num_pending--;
468 	t->num_global_pending--;
469 	check_finished_io(io);
470 
471 	if (oper->num_pending == 0 &&
472 	    (oper->started_ios == oper->total_ios || oper->stonewalled)) {
473 		print_time(oper);
474 	}
475 }
476 
read_some_events(struct thread_info * t)477 static int read_some_events(struct thread_info *t)
478 {
479 	struct io_unit *event_io;
480 	struct io_event *event;
481 	int nr;
482 	int i;
483 	int min_nr = io_iter;
484 	struct timeval stop_time;
485 
486 	if (t->num_global_pending < io_iter)
487 		min_nr = t->num_global_pending;
488 
489 	nr = io_getevents(t->io_ctx, min_nr, t->num_global_events, t->events, NULL);
490 	if (nr <= 0)
491 		return nr;
492 
493 	gettimeofday(&stop_time, NULL);
494 
495 	for (i = 0; i < nr; i++) {
496 		event = t->events + i;
497 		event_io = (struct io_unit *)((unsigned long)event->obj);
498 		finish_io(t, event_io, event->res, &stop_time);
499 	}
500 
501 	return nr;
502 }
503 
504 /*
505  * finds a free io unit, waiting for pending requests if required.  returns
506  * null if none could be found
507  */
find_iou(struct thread_info * t,struct io_oper * oper)508 static struct io_unit *find_iou(struct thread_info *t, struct io_oper *oper)
509 {
510 	struct io_unit *event_io;
511 	int nr;
512 
513 retry:
514 	if (t->free_ious) {
515 		event_io = t->free_ious;
516 		t->free_ious = t->free_ious->next;
517 
518 		if (grab_iou(event_io, oper))
519 			tst_brk(TBROK, "io unit on free list but not free");
520 
521 		return event_io;
522 	}
523 
524 	nr = read_some_events(t);
525 	if (nr > 0)
526 		goto retry;
527 	else
528 		tst_res(TINFO, "no free ious after read_some_events");
529 
530 	return NULL;
531 }
532 
533 /*
534  * wait for all pending requests for this io operation to finish
535  */
io_oper_wait(struct thread_info * t,struct io_oper * oper)536 static int io_oper_wait(struct thread_info *t, struct io_oper *oper)
537 {
538 	struct io_event event;
539 	struct io_unit *event_io;
540 
541 	if (!oper)
542 		return 0;
543 
544 	if (oper->num_pending == 0)
545 		goto done;
546 
547 		/* this func is not speed sensitive, no need to go wild reading
548 		 * more than one event at a time
549 		 */
550 	while (io_getevents(t->io_ctx, 1, 1, &event, NULL) > 0) {
551 		struct timeval tv_now;
552 
553 		event_io = (struct io_unit *)((unsigned long)event.obj);
554 
555 		gettimeofday(&tv_now, NULL);
556 		finish_io(t, event_io, event.res, &tv_now);
557 
558 		if (oper->num_pending == 0)
559 			break;
560 	}
561 done:
562 	if (oper->num_err)
563 		tst_res(TINFO, "%u errors on oper, last %u", oper->num_err, oper->last_err);
564 
565 	return 0;
566 }
567 
random_byte_offset(struct io_oper * oper)568 static off_t random_byte_offset(struct io_oper *oper)
569 {
570 	off_t num;
571 	off_t rand_byte = oper->start;
572 	off_t range;
573 	off_t offset = 1;
574 
575 	range = (oper->end - oper->start) / (1024 * 1024);
576 
577 	if ((page_size_mask + 1) > (1024 * 1024))
578 		offset = (page_size_mask + 1) / (1024 * 1024);
579 
580 	if (range < offset)
581 		range = 0;
582 	else
583 		range -= offset;
584 
585 	/* find a random mb offset */
586 	num = 1 + (int)((double)range * rand() / (RAND_MAX + 1.0));
587 	rand_byte += num * 1024 * 1024;
588 
589 	/* find a random byte offset */
590 	num = 1 + (int)((double)(1024 * 1024) * rand() / (RAND_MAX + 1.0));
591 
592 	/* page align */
593 	num = (num + page_size_mask) & ~page_size_mask;
594 	rand_byte += num;
595 
596 	if (rand_byte + oper->reclen > oper->end)
597 		rand_byte -= oper->reclen;
598 
599 	return rand_byte;
600 }
601 
602 /*
603  * build an aio iocb for an operation, based on oper->rw and the
604  * last offset used.  This finds the struct io_unit that will be attached
605  * to the iocb, and things are ready for submission to aio after this
606  * is called.
607  *
608  * returns null on error
609  */
build_iocb(struct thread_info * t,struct io_oper * oper)610 static struct io_unit *build_iocb(struct thread_info *t, struct io_oper *oper)
611 {
612 	struct io_unit *io;
613 	off_t rand_byte;
614 
615 	io = find_iou(t, oper);
616 	if (!io)
617 		tst_brk(TBROK, "unable to find io unit");
618 
619 	switch (oper->rw) {
620 	case WRITE:
621 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, oper->last_offset);
622 		oper->last_offset += oper->reclen;
623 		break;
624 	case READ:
625 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, oper->last_offset);
626 		oper->last_offset += oper->reclen;
627 		break;
628 	case RREAD:
629 		rand_byte = random_byte_offset(oper);
630 		oper->last_offset = rand_byte;
631 		io_prep_pread(&io->iocb, oper->fd, io->buf, oper->reclen, rand_byte);
632 		break;
633 	case RWRITE:
634 		rand_byte = random_byte_offset(oper);
635 		oper->last_offset = rand_byte;
636 		io_prep_pwrite(&io->iocb, oper->fd, io->buf, oper->reclen, rand_byte);
637 
638 		break;
639 	}
640 
641 	return io;
642 }
643 
644 /*
645  * wait for any pending requests, and then free all ram associated with
646  * an operation.  returns the last error the operation hit (zero means none)
647  */
finish_oper(struct thread_info * t,struct io_oper * oper)648 static int finish_oper(struct thread_info *t, struct io_oper *oper)
649 {
650 	unsigned long last_err;
651 
652 	io_oper_wait(t, oper);
653 
654 	last_err = oper->last_err;
655 
656 	if (oper->num_pending > 0)
657 		tst_res(TINFO, "oper num_pending is %d", oper->num_pending);
658 
659 	SAFE_CLOSE(oper->fd);
660 	free(oper);
661 
662 	return last_err;
663 }
664 
665 /*
666  * allocates an io operation and fills in all the fields.  returns
667  * null on error
668  */
create_oper(int fd,int rw,off_t start,off_t end,int reclen,int depth,char * file_name)669 static struct io_oper *create_oper(int fd, int rw, off_t start, off_t end,
670 				   int reclen, int depth, char *file_name)
671 {
672 	struct io_oper *oper;
673 
674 	oper = SAFE_MALLOC(sizeof(*oper));
675 	memset(oper, 0, sizeof(*oper));
676 
677 	oper->depth = depth;
678 	oper->start = start;
679 	oper->end = end;
680 	oper->last_offset = oper->start;
681 	oper->fd = fd;
682 	oper->reclen = reclen;
683 	oper->rw = rw;
684 	oper->total_ios = (oper->end - oper->start) / oper->reclen;
685 	oper->file_name = file_name;
686 
687 	return oper;
688 }
689 
690 /*
691  * does setup on num_ios worth of iocbs, but does not actually
692  * start any io
693  */
build_oper(struct thread_info * t,struct io_oper * oper,int num_ios,struct iocb ** my_iocbs)694 static int build_oper(struct thread_info *t, struct io_oper *oper, int num_ios,
695 		      struct iocb **my_iocbs)
696 {
697 	int i;
698 	struct io_unit *io;
699 
700 	if (oper->started_ios == 0)
701 		gettimeofday(&oper->start_time, NULL);
702 
703 	if (num_ios == 0)
704 		num_ios = oper->total_ios;
705 
706 	if ((oper->started_ios + num_ios) > oper->total_ios)
707 		num_ios = oper->total_ios - oper->started_ios;
708 
709 	for (i = 0; i < num_ios; i++) {
710 		io = build_iocb(t, oper);
711 		if (!io)
712 			return -1;
713 
714 		my_iocbs[i] = &io->iocb;
715 	}
716 
717 	return num_ios;
718 }
719 
720 /*
721  * runs through the iocbs in the array provided and updates
722  * counters in the associated oper struct
723  */
update_iou_counters(struct iocb ** my_iocbs,int nr,struct timeval * tv_now)724 static void update_iou_counters(struct iocb **my_iocbs, int nr, struct timeval *tv_now)
725 {
726 	struct io_unit *io;
727 	int i;
728 
729 	for (i = 0; i < nr; i++) {
730 		io = (struct io_unit *)(my_iocbs[i]);
731 		io->io_oper->num_pending++;
732 		io->io_oper->started_ios++;
733 		io->io_start_time = *tv_now; /* set time of io_submit */
734 	}
735 }
736 
737 /* starts some io for a given file, returns zero if all went well */
run_built(struct thread_info * t,int num_ios,struct iocb ** my_iocbs)738 static int run_built(struct thread_info *t, int num_ios, struct iocb **my_iocbs)
739 {
740 	int ret;
741 	struct timeval start_time;
742 	struct timeval stop_time;
743 
744 resubmit:
745 	gettimeofday(&start_time, NULL);
746 	ret = io_submit(t->io_ctx, num_ios, my_iocbs);
747 
748 	gettimeofday(&stop_time, NULL);
749 	calc_latency(&start_time, &stop_time, &t->io_submit_latency);
750 
751 	if (ret != num_ios) {
752 		/* some I/O got through */
753 		if (ret > 0) {
754 			update_iou_counters(my_iocbs, ret, &stop_time);
755 			my_iocbs += ret;
756 			t->num_global_pending += ret;
757 			num_ios -= ret;
758 		}
759 		/*
760 		 * we've used all the requests allocated in aio_init, wait and
761 		 * retry
762 		 */
763 		if (ret > 0 || ret == -EAGAIN) {
764 			int old_ret = ret;
765 
766 			ret = read_some_events(t);
767 			if (ret <= 0)
768 				tst_brk(TBROK, "ret was %d and now is %d", ret, old_ret);
769 
770 			goto resubmit;
771 		}
772 
773 		tst_res(TINFO, "ret %d (%s) on io_submit", ret, tst_strerrno(-ret));
774 		return -1;
775 	}
776 
777 	update_iou_counters(my_iocbs, ret, &stop_time);
778 	t->num_global_pending += ret;
779 
780 	return 0;
781 }
782 
783 /*
784  * changes oper->rw to the next in a command sequence, or returns zero
785  * to say this operation is really, completely done for
786  */
restart_oper(struct io_oper * oper)787 static int restart_oper(struct io_oper *oper)
788 {
789 	int new_rw = 0;
790 
791 	if (oper->last_err)
792 		return 0;
793 
794 	if (oper->rw == WRITE && (stages & (1 << READ)))
795 		new_rw = READ;
796 
797 	if (oper->rw == READ && (!new_rw && stages & (1 << RWRITE)))
798 		new_rw = RWRITE;
799 
800 	if (oper->rw == RWRITE && (!new_rw && stages & (1 << RREAD)))
801 		new_rw = RREAD;
802 
803 	if (new_rw) {
804 		oper->started_ios = 0;
805 		oper->last_offset = oper->start;
806 		oper->stonewalled = 0;
807 
808 		/*
809 		 * we're restarting an operation with pending requests, so the
810 		 * timing info won't be printed by finish_io.  Printing it here
811 		 */
812 		if (oper->num_pending)
813 			print_time(oper);
814 
815 		oper->rw = new_rw;
816 		return 1;
817 	}
818 
819 	return 0;
820 }
821 
oper_runnable(struct io_oper * oper)822 static int oper_runnable(struct io_oper *oper)
823 {
824 	struct stat buf;
825 
826 	/* first context is always runnable, if started_ios > 0, no need to
827 	 * redo the calculations
828 	 */
829 	if (oper->started_ios || oper->start == 0)
830 		return 1;
831 
832 	/* only the sequential phases force delays in starting */
833 	if (oper->rw >= RWRITE)
834 		return 1;
835 
836 	SAFE_FSTAT(oper->fd, &buf);
837 	if (S_ISREG(buf.st_mode) && buf.st_size < oper->start)
838 		return 0;
839 
840 	return 1;
841 }
842 
843 /*
844  * runs through all the io operations on the active list, and starts
845  * a chunk of io on each.  If any io operations are completely finished,
846  * it either switches them to the next stage or puts them on the
847  * finished list.
848  *
849  * this function stops after max_io_submit iocbs are sent down the
850  * pipe, even if it has not yet touched all the operations on the
851  * active list.  Any operations that have finished are moved onto
852  * the finished_opers list.
853  */
run_active_list(struct thread_info * t,int io_iter,int max_io_submit)854 static int run_active_list(struct thread_info *t, int io_iter, int max_io_submit)
855 {
856 	struct io_oper *oper;
857 	struct io_oper *built_opers = NULL;
858 	struct iocb **my_iocbs = t->iocbs;
859 	int ret = 0;
860 	int num_built = 0;
861 
862 	oper = t->active_opers;
863 
864 	while (oper) {
865 		if (!oper_runnable(oper)) {
866 			oper = oper->next;
867 			if (oper == t->active_opers)
868 				break;
869 			continue;
870 		}
871 
872 		ret = build_oper(t, oper, io_iter, my_iocbs);
873 		if (ret >= 0) {
874 			my_iocbs += ret;
875 			num_built += ret;
876 			oper_list_del(oper, &t->active_opers);
877 			oper_list_add(oper, &built_opers);
878 			oper = t->active_opers;
879 			if (num_built + io_iter > max_io_submit)
880 				break;
881 		} else
882 			break;
883 	}
884 
885 	if (num_built) {
886 		ret = run_built(t, num_built, t->iocbs);
887 		if (ret < 0)
888 			tst_brk(TBROK, "error %d on run_built", ret);
889 
890 		while (built_opers) {
891 			oper = built_opers;
892 			oper_list_del(oper, &built_opers);
893 			oper_list_add(oper, &t->active_opers);
894 			if (oper->started_ios == oper->total_ios) {
895 				oper_list_del(oper, &t->active_opers);
896 				oper_list_add(oper, &t->finished_opers);
897 			}
898 		}
899 	}
900 
901 	return 0;
902 }
903 
aio_setup(io_context_t * io_ctx,int n)904 static void aio_setup(io_context_t *io_ctx, int n)
905 {
906 	int res = io_queue_init(n, io_ctx);
907 
908 	if (res != 0)
909 		tst_brk(TBROK, "io_queue_setup(%d) returned %d (%s)", n, res, tst_strerrno(-res));
910 }
911 
912 /*
913  * allocate io operation and event arrays for a given thread
914  */
setup_ious(struct thread_info * t,int num_files,int depth,int reclen,int max_io_submit)915 static void setup_ious(struct thread_info *t, int num_files, int depth, int reclen, int max_io_submit)
916 {
917 	int i;
918 	size_t bytes = num_files * depth * sizeof(*t->ios);
919 
920 	t->ios = SAFE_MALLOC(bytes);
921 
922 	memset(t->ios, 0, bytes);
923 
924 	for (i = 0; i < depth * num_files; i++) {
925 		t->ios[i].buf = aligned_buffer;
926 		aligned_buffer += padded_reclen;
927 		t->ios[i].buf_size = reclen;
928 		if (verify)
929 			memset(t->ios[i].buf, 'b', reclen);
930 		else
931 			memset(t->ios[i].buf, 0, reclen);
932 		t->ios[i].next = t->free_ious;
933 		t->free_ious = t->ios + i;
934 	}
935 
936 	if (verify) {
937 		verify_buf = aligned_buffer;
938 		memset(verify_buf, 'b', reclen);
939 	}
940 
941 	t->iocbs = SAFE_MALLOC(sizeof(struct iocb *) * max_io_submit);
942 	memset(t->iocbs, 0, max_io_submit * sizeof(struct iocb *));
943 
944 	t->events = SAFE_MALLOC(sizeof(struct io_event) * depth * num_files);
945 	memset(t->events, 0, num_files * sizeof(struct io_event) * depth);
946 
947 	t->num_global_ios = num_files * depth;
948 	t->num_global_events = t->num_global_ios;
949 }
950 
951 /*
952  * The buffers used for file data are allocated as a single big
953  * malloc, and then each thread and operation takes a piece and uses
954  * that for file data.  This lets us do a large shm or bigpages alloc
955  * and without trying to find a special place in each thread to map the
956  * buffers to
957  */
setup_shared_mem(int num_threads,int num_files,int depth,int reclen)958 static int setup_shared_mem(int num_threads, int num_files, int depth, int reclen)
959 {
960 	char *p = NULL;
961 	size_t total_ram;
962 
963 	padded_reclen = (reclen + page_size_mask) / (page_size_mask + 1);
964 	padded_reclen = padded_reclen * (page_size_mask + 1);
965 	total_ram = num_files * depth * padded_reclen + num_threads;
966 
967 	if (verify)
968 		total_ram += padded_reclen;
969 
970 	/* for aligning buffer after the allocation */
971 	total_ram += page_size_mask;
972 
973 	if (use_shm == USE_MALLOC) {
974 		p = SAFE_MALLOC(total_ram);
975 	} else if (use_shm == USE_SHM) {
976 		SAFE_SHMGET(IPC_PRIVATE, total_ram, IPC_CREAT | 0700);
977 		p = SAFE_SHMAT(shm_id, (char *)0x50000000, 0);
978 	} else if (use_shm == USE_SHMFS) {
979 		char mmap_name[16]; /* /dev/shm/ + null + XXXXXX */
980 		int fd;
981 
982 		strcpy(mmap_name, "/dev/shm/XXXXXX");
983 		fd = mkstemp(mmap_name);
984 		if (fd < 0)
985 			tst_brk(TBROK, "mkstemp error");
986 
987 		SAFE_UNLINK(mmap_name);
988 		SAFE_FTRUNCATE(fd, total_ram);
989 
990 		shm_id = fd;
991 
992 		p = SAFE_MMAP((char *)0x50000000, total_ram,
993 			      PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
994 	}
995 
996 	unaligned_buffer = p;
997 	p = (char *)((intptr_t)(p + page_size_mask) & ~page_size_mask);
998 	aligned_buffer = p;
999 
1000 	return 0;
1001 }
1002 
1003 /*
1004  * runs through all the thread_info structs and calculates a combined
1005  * throughput
1006  */
global_thread_throughput(struct thread_info * t,char * this_stage)1007 static void global_thread_throughput(struct thread_info *t, char *this_stage)
1008 {
1009 	int i;
1010 	double runtime = time_since_now(&global_stage_start_time);
1011 	double total_mb = 0;
1012 	double min_trans = 0;
1013 
1014 	for (i = 0; i < num_threads; i++) {
1015 		total_mb += global_thread_info[i].stage_mb_trans;
1016 
1017 		if (!min_trans || t->stage_mb_trans < min_trans)
1018 			min_trans = t->stage_mb_trans;
1019 	}
1020 
1021 	if (total_mb) {
1022 		tst_res(TINFO, "%s throughput (%.2f MB/s)", this_stage, total_mb / runtime);
1023 		tst_res(TINFO, "%.2f MB in %.2fs", total_mb, runtime);
1024 	}
1025 }
1026 
1027 /* this is the meat of the state machine.  There is a list of
1028  * active operations structs, and as each one finishes the required
1029  * io it is moved to a list of finished operations.  Once they have
1030  * all finished whatever stage they were in, they are given the chance
1031  * to restart and pick a different stage (read/write/random read etc)
1032  *
1033  * various timings are printed in between the stages, along with
1034  * thread synchronization if there are more than one threads.
1035  */
worker(struct thread_info * t)1036 static int *worker(struct thread_info *t)
1037 {
1038 	struct io_oper *oper;
1039 	char *this_stage = NULL;
1040 	struct timeval stage_time;
1041 	int status = 0;
1042 	int cnt;
1043 
1044 	aio_setup(&t->io_ctx, 512);
1045 
1046 restart:
1047 	if (num_threads > 1) {
1048 		if (pthread_barrier_wait(&worker_barrier))
1049 			gettimeofday(&global_stage_start_time, NULL);
1050 	}
1051 
1052 	if (t->active_opers) {
1053 		this_stage = stage_name(t->active_opers->rw);
1054 		gettimeofday(&stage_time, NULL);
1055 		t->stage_mb_trans = 0;
1056 	}
1057 
1058 	cnt = 0;
1059 
1060 	/* first we send everything through aio */
1061 	while (t->active_opers && cnt < iterations) {
1062 		run_active_list(t, io_iter, max_io_submit);
1063 		cnt++;
1064 	}
1065 
1066 	if (latency_stats)
1067 		print_latency(t);
1068 
1069 	if (completion_latency_stats)
1070 		print_completion_latency(t);
1071 
1072 	/* then we wait for all the operations to finish */
1073 	oper = t->finished_opers;
1074 	do {
1075 		if (!oper)
1076 			break;
1077 		io_oper_wait(t, oper);
1078 		oper = oper->next;
1079 	} while (oper != t->finished_opers);
1080 
1081 	/* then we do an fsync to get the timing for any future operations
1082 	 * right, and check to see if any of these need to get restarted
1083 	 */
1084 	oper = t->finished_opers;
1085 	while (oper) {
1086 		if (!no_fsync_stages)
1087 			SAFE_FSYNC(oper->fd);
1088 
1089 		t->stage_mb_trans += oper_mb_trans(oper);
1090 
1091 		if (restart_oper(oper)) {
1092 			oper_list_del(oper, &t->finished_opers);
1093 			oper_list_add(oper, &t->active_opers);
1094 			oper = t->finished_opers;
1095 			continue;
1096 		}
1097 
1098 		oper = oper->next;
1099 
1100 		if (oper == t->finished_opers)
1101 			break;
1102 	}
1103 
1104 	if (t->stage_mb_trans && t->num_files > 0) {
1105 		double seconds = time_since_now(&stage_time);
1106 
1107 		tst_res(TINFO, "thread %td %s totals (%.2f MB/s) %.2f MB in %.2fs",
1108 			t - global_thread_info, this_stage,
1109 			t->stage_mb_trans / seconds, t->stage_mb_trans, seconds);
1110 	}
1111 
1112 	if (num_threads > 1) {
1113 		if (pthread_barrier_wait(&worker_barrier))
1114 			global_thread_throughput(t, this_stage);
1115 	}
1116 
1117 	/* someone got restarted, go back to the beginning */
1118 	if (t->active_opers && cnt < iterations)
1119 		goto restart;
1120 
1121 	/* finally, free all the ram */
1122 	while (t->finished_opers) {
1123 		oper = t->finished_opers;
1124 		oper_list_del(oper, &t->finished_opers);
1125 		status = finish_oper(t, oper);
1126 	}
1127 
1128 	if (t->num_global_pending)
1129 		tst_res(TINFO, "global num pending is %d", t->num_global_pending);
1130 
1131 	io_queue_release(t->io_ctx);
1132 
1133 	return (void *)(intptr_t)status;
1134 }
1135 
1136 typedef void *(*start_routine)(void *);
run_workers(struct thread_info * t,int num_threads)1137 static int run_workers(struct thread_info *t, int num_threads)
1138 {
1139 	void *retval;
1140 	int ret = 0;
1141 	int i;
1142 
1143 	pthread_barrier_init(&worker_barrier, NULL, num_threads);
1144 
1145 	for (i = 0; i < num_threads; i++)
1146 		SAFE_PTHREAD_CREATE(&t[i].tid, NULL, (start_routine)worker, t + i);
1147 
1148 	for (i = 0; i < num_threads; i++) {
1149 		SAFE_PTHREAD_JOIN(t[i].tid, &retval);
1150 		ret |= (intptr_t)retval;
1151 	}
1152 
1153 	pthread_barrier_destroy(&worker_barrier);
1154 
1155 	return ret;
1156 }
1157 
setup(void)1158 static void setup(void)
1159 {
1160 	int maxaio;
1161 	int stages_i;
1162 
1163 	page_size_mask = getpagesize() - 1;
1164 
1165 	SAFE_FILE_SCANF("/proc/sys/fs/aio-max-nr", "%d", &maxaio);
1166 	tst_res(TINFO, "Maximum AIO blocks: %d", maxaio);
1167 
1168 	if (tst_parse_int(str_num_files, &num_files, 1, INT_MAX))
1169 		tst_brk(TBROK, "Invalid number of files to generate '%s'", str_num_files);
1170 
1171 	if (tst_parse_int(str_max_io_submit, &max_io_submit, 0, INT_MAX))
1172 		tst_brk(TBROK, "Invalid number of iocbs '%s'", str_max_io_submit);
1173 
1174 	if (max_io_submit > maxaio)
1175 		tst_res(TCONF, "Number of async IO blocks passed the maximum (%d)", maxaio);
1176 
1177 	if (tst_parse_int(str_num_contexts, &num_contexts, 1, INT_MAX))
1178 		tst_brk(TBROK, "Invalid number of contexts per file '%s'", str_num_contexts);
1179 
1180 	if (tst_parse_filesize(str_context_offset, &context_offset, 1, LLONG_MAX))
1181 		tst_brk(TBROK, "Invalid offset between contexts '%s'", str_context_offset);
1182 
1183 	if (tst_parse_filesize(str_file_size, &file_size, 1, LLONG_MAX))
1184 		tst_brk(TBROK, "Invalid file size '%s'", str_file_size);
1185 
1186 	if (tst_parse_filesize(str_rec_len, &rec_len, 1, LONG_MAX))
1187 		tst_brk(TBROK, "Invalid record size '%s'", str_rec_len);
1188 
1189 	if (tst_parse_int(str_depth, &depth, 1, INT_MAX))
1190 		tst_brk(TBROK, "Invalid number of pending aio requests '%s'", str_depth);
1191 
1192 	if (tst_parse_int(str_io_iter, &io_iter, 1, INT_MAX))
1193 		tst_brk(TBROK, "Invalid number of I/O per file '%s'", str_io_iter);
1194 
1195 	if (tst_parse_int(str_iterations, &iterations, 1, INT_MAX))
1196 		tst_brk(TBROK, "Invalid number of total ayncs I/O '%s'", str_iterations);
1197 
1198 	if (tst_parse_int(str_stages, &stages_i, 0, INT_MAX))
1199 		tst_brk(TBROK, "Invalid stage number '%s'", str_stages);
1200 
1201 	if (stages_i) {
1202 		stages |= 1 << stages_i;
1203 		tst_res(TINFO, "Adding stage %s", stage_name(stages_i));
1204 	}
1205 
1206 	if (tst_parse_int(str_num_threads, &num_threads, 1, INT_MAX))
1207 		tst_brk(TBROK, "Invalid number of threads '%s'", str_num_threads);
1208 
1209 	if (str_o_flag) {
1210 		if (tst_fs_type(".") == TST_TMPFS_MAGIC)
1211 			tst_brk(TCONF, "O_DIRECT not supported on tmpfs");
1212 		o_flag = O_DIRECT;
1213 	} else {
1214 		o_flag = O_SYNC;
1215 	}
1216 
1217 	if (str_use_shm) {
1218 		if (!strcmp(str_use_shm, "shm")) {
1219 			tst_res(TINFO, "using ipc shm");
1220 			use_shm = USE_SHM;
1221 		} else if (!strcmp(str_use_shm, "shmfs")) {
1222 			tst_res(TINFO, "using /dev/shm for buffers");
1223 			use_shm = USE_SHMFS;
1224 		} else {
1225 			tst_brk(TBROK, "Invalid shm option '%s'", str_use_shm);
1226 		}
1227 	}
1228 }
1229 
run(void)1230 static void run(void)
1231 {
1232 	char files[num_files][265];
1233 	int first_stage = WRITE;
1234 	struct io_oper *oper;
1235 	int status = 0;
1236 	int open_fds = 0;
1237 	struct thread_info *t;
1238 	int rwfd;
1239 	int i;
1240 	int j;
1241 
1242 	/*
1243 	 * make sure we don't try to submit more I/O than we have allocated
1244 	 * memory for
1245 	 */
1246 	if (depth < io_iter) {
1247 		io_iter = depth;
1248 		tst_res(TINFO, "dropping io_iter to %d", io_iter);
1249 	}
1250 
1251 	if (num_threads > (num_files * num_contexts)) {
1252 		num_threads = num_files * num_contexts;
1253 		tst_res(TINFO, "Dropping thread count to the number of contexts %d", num_threads);
1254 	}
1255 
1256 	t = SAFE_MALLOC(num_threads * sizeof(*t));
1257 	memset(t, 0, num_threads * sizeof(*t));
1258 	global_thread_info = t;
1259 
1260 	/* by default, allow a huge number of iocbs to be sent towards
1261 	 * io_submit
1262 	 */
1263 	if (!max_io_submit)
1264 		max_io_submit = num_files * io_iter * num_contexts;
1265 
1266 	/*
1267 	 * make sure we don't try to submit more I/O than max_io_submit allows
1268 	 */
1269 	if (max_io_submit < io_iter) {
1270 		io_iter = max_io_submit;
1271 		tst_res(TINFO, "dropping io_iter to %d", io_iter);
1272 	}
1273 
1274 	if (!stages) {
1275 		stages = (1 << WRITE) | (1 << READ) | (1 << RREAD) | (1 << RWRITE);
1276 	} else {
1277 		for (i = 0; i < LAST_STAGE; i++) {
1278 			if (stages & (1 << i)) {
1279 				first_stage = i;
1280 				tst_res(TINFO, "starting with %s", stage_name(i));
1281 				break;
1282 			}
1283 		}
1284 	}
1285 
1286 	if (file_size < num_contexts * context_offset) {
1287 		tst_brk(TBROK, "file size %ld too small for %d contexts",
1288 			(long)file_size, num_contexts);
1289 	}
1290 
1291 	tst_res(TINFO, "file size %ldMB, record size %lldKB, depth %d, I/O per iteration %d",
1292 		(long)(file_size / (1024 * 1024)), rec_len / 1024, depth, io_iter);
1293 	tst_res(TINFO, "max io_submit %d, buffer alignment set to %luKB",
1294 		max_io_submit, (page_size_mask + 1) / 1024);
1295 	tst_res(TINFO, "threads %d files %d contexts %d context offset %ldMB verification %s",
1296 		num_threads, num_files, num_contexts,
1297 		(long)(context_offset / (1024 * 1024)), verify ? "on" : "off");
1298 
1299 	/* open all the files and do any required setup for them */
1300 	for (i = 0; i < num_files; i++) {
1301 		int thread_index;
1302 
1303 		snprintf(files[i], sizeof(files[i]), "file%d.bin", i);
1304 
1305 		for (j = 0; j < num_contexts; j++) {
1306 			thread_index = open_fds % num_threads;
1307 			open_fds++;
1308 
1309 			rwfd = SAFE_OPEN(files[i], O_CREAT | O_RDWR | o_flag, 0600);
1310 
1311 			oper = create_oper(rwfd, first_stage, j * context_offset,
1312 					   file_size - j * context_offset,
1313 					   rec_len, depth, files[i]);
1314 			if (!oper)
1315 				tst_brk(TBROK, "error in create_oper");
1316 
1317 			oper_list_add(oper, &t[thread_index].active_opers);
1318 			t[thread_index].num_files++;
1319 		}
1320 	}
1321 
1322 	if (setup_shared_mem(num_threads, num_files * num_contexts, depth, rec_len))
1323 		tst_brk(TBROK, "error in setup_shared_mem");
1324 
1325 	for (i = 0; i < num_threads; i++)
1326 		setup_ious(&t[i], t[i].num_files, depth, rec_len, max_io_submit);
1327 
1328 	if (num_threads > 1) {
1329 		tst_res(TINFO, "Running multi thread version num_threads: %d", num_threads);
1330 		status = run_workers(t, num_threads);
1331 	} else {
1332 		tst_res(TINFO, "Running single thread version");
1333 		status = (intptr_t)worker(t);
1334 	}
1335 
1336 	for (i = 0; i < num_files; i++)
1337 		SAFE_UNLINK(files[i]);
1338 
1339 	if (status)
1340 		tst_res(TFAIL, "Test did not pass");
1341 	else
1342 		tst_res(TPASS, "Test passed");
1343 }
1344 
1345 static struct tst_test test = {
1346 	.test_all = run,
1347 	.setup = setup,
1348 	.needs_tmpdir = 1,
1349 	.needs_root = 1,
1350 	.max_runtime = 1800,
1351 	.options = (struct tst_option[]){
1352 		{ "a:", &str_iterations, "Total number of ayncs I/O the program will run (default 500)" },
1353 		{ "b:", &str_max_io_submit, "Max number of iocbs to give io_submit at once" },
1354 		{ "c:", &str_num_contexts, "Number of io contexts per file" },
1355 		{ "d:", &str_depth, "Number of pending aio requests for each file (default 64)" },
1356 		{ "e:", &str_io_iter, "Number of I/O per file sent before switching to the next file (default 8)" },
1357 		{ "f:", &str_num_files, "Number of files to generate" },
1358 		{ "g:", &str_context_offset, "Offset between contexts (default 2M)" },
1359 		{ "l", &latency_stats, "Print io_submit latencies after each stage" },
1360 		{ "L", &completion_latency_stats, "Print io completion latencies after each stage" },
1361 		{ "m", &str_use_shm, "SHM use ipc shared memory for io buffers instead of malloc" },
1362 		{ "n", &no_fsync_stages, "No fsyncs between write stage and read stage" },
1363 		{ "o:", &str_stages, "Add an operation to the list: write=0, read=1, random write=2, random read=3" },
1364 		{ "O", &str_o_flag, "Use O_DIRECT" },
1365 		{ "r:", &str_rec_len, "Record size in KB used for each io (default 64K)" },
1366 		{ "s:", &str_file_size, "Size in MB of the test file(s) (default 1024M)" },
1367 		{ "t:", &str_num_threads, "Number of threads to run" },
1368 		{ "u", &unlink_files, "Unlink files after completion" },
1369 		{ "v", &verify, "Verification of bytes written" },
1370 		{},
1371 	},
1372 };
1373 #else
1374 TST_TEST_TCONF("test requires libaio and its development packages");
1375 #endif
1376