1 /* SPDX-License-Identifier: MIT */
2 /*
3 * Description: Test two ring deadlock. A buggy kernel will end up
4 * having io_wq_* workers pending, as the circular reference
5 * will prevent full exit.
6 *
7 * Based on a test case from Josef <[email protected]>
8 *
9 */
10 #include <errno.h>
11 #include <fcntl.h>
12 #include <netinet/in.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <strings.h>
17 #include <poll.h>
18 #include <sys/socket.h>
19 #include <unistd.h>
20 #include <sys/eventfd.h>
21 #include <pthread.h>
22
23 #include "liburing.h"
24 #include "../src/syscall.h"
25
26 enum {
27 ACCEPT,
28 READ,
29 WRITE,
30 POLLING_IN,
31 POLLING_RDHUP,
32 CLOSE,
33 EVENTFD_READ,
34 };
35
36 typedef struct conn_info {
37 __u32 fd;
38 __u16 type;
39 __u16 bid;
40 } conn_info;
41
42 static char read_eventfd_buffer[8];
43
44 static pthread_mutex_t lock;
45 static struct io_uring *client_ring;
46
47 static int client_eventfd = -1;
48
setup_io_uring(struct io_uring * ring)49 int setup_io_uring(struct io_uring *ring)
50 {
51 struct io_uring_params p = { };
52 int ret;
53
54 ret = io_uring_queue_init_params(8, ring, &p);
55 if (ret) {
56 fprintf(stderr, "Unable to setup io_uring: %s\n",
57 strerror(-ret));
58 return 1;
59 }
60 return 0;
61 }
62
add_socket_eventfd_read(struct io_uring * ring,int fd)63 static void add_socket_eventfd_read(struct io_uring *ring, int fd)
64 {
65 struct io_uring_sqe *sqe;
66 conn_info conn_i = {
67 .fd = fd,
68 .type = EVENTFD_READ,
69 };
70
71 sqe = io_uring_get_sqe(ring);
72 io_uring_prep_read(sqe, fd, &read_eventfd_buffer, 8, 0);
73 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
74
75 memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
76 }
77
add_socket_pollin(struct io_uring * ring,int fd)78 static void add_socket_pollin(struct io_uring *ring, int fd)
79 {
80 struct io_uring_sqe *sqe;
81 conn_info conn_i = {
82 .fd = fd,
83 .type = POLLING_IN,
84 };
85
86 sqe = io_uring_get_sqe(ring);
87 io_uring_prep_poll_add(sqe, fd, POLL_IN);
88
89 memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
90 }
91
server_thread(void * arg)92 static void *server_thread(void *arg)
93 {
94 struct sockaddr_in serv_addr;
95 int port = 0;
96 int sock_listen_fd, evfd;
97 const int val = 1;
98 struct io_uring ring;
99
100 sock_listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
101 setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
102
103 memset(&serv_addr, 0, sizeof(serv_addr));
104 serv_addr.sin_family = AF_INET;
105 serv_addr.sin_port = htons(port);
106 serv_addr.sin_addr.s_addr = INADDR_ANY;
107
108 evfd = eventfd(0, EFD_CLOEXEC);
109
110 // bind and listen
111 if (bind(sock_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
112 perror("Error binding socket...\n");
113 exit(1);
114 }
115 if (listen(sock_listen_fd, 1) < 0) {
116 perror("Error listening on socket...\n");
117 exit(1);
118 }
119
120 setup_io_uring(&ring);
121 add_socket_eventfd_read(&ring, evfd);
122 add_socket_pollin(&ring, sock_listen_fd);
123
124 while (1) {
125 struct io_uring_cqe *cqe;
126 unsigned head;
127 unsigned count = 0;
128
129 io_uring_submit_and_wait(&ring, 1);
130
131 io_uring_for_each_cqe(&ring, head, cqe) {
132 struct conn_info conn_i;
133
134 count++;
135 memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
136
137 if (conn_i.type == ACCEPT) {
138 int sock_conn_fd = cqe->res;
139 // only read when there is no error, >= 0
140 if (sock_conn_fd > 0) {
141 add_socket_pollin(&ring, sock_listen_fd);
142
143 pthread_mutex_lock(&lock);
144 io_uring_submit(client_ring);
145 pthread_mutex_unlock(&lock);
146
147 }
148 } else if (conn_i.type == POLLING_IN) {
149 break;
150 }
151 }
152 io_uring_cq_advance(&ring, count);
153 }
154 }
155
client_thread(void * arg)156 static void *client_thread(void *arg)
157 {
158 struct io_uring ring;
159 int ret;
160
161 setup_io_uring(&ring);
162 client_ring = ˚
163
164 client_eventfd = eventfd(0, EFD_CLOEXEC);
165 pthread_mutex_lock(&lock);
166 add_socket_eventfd_read(&ring, client_eventfd);
167 pthread_mutex_unlock(&lock);
168
169 while (1) {
170 struct io_uring_cqe *cqe;
171 unsigned head;
172 unsigned count = 0;
173
174 pthread_mutex_lock(&lock);
175 io_uring_submit(&ring);
176 pthread_mutex_unlock(&lock);
177
178 ret = __sys_io_uring_enter(ring.ring_fd, 0, 1, IORING_ENTER_GETEVENTS, NULL);
179 if (ret < 0) {
180 perror("Error io_uring_enter...\n");
181 exit(1);
182 }
183
184 // go through all CQEs
185 io_uring_for_each_cqe(&ring, head, cqe) {
186 struct conn_info conn_i;
187 int type;
188
189 count++;
190 memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));
191
192 type = conn_i.type;
193 if (type == READ) {
194 pthread_mutex_lock(&lock);
195
196 if (cqe->res <= 0) {
197 // connection closed or error
198 shutdown(conn_i.fd, SHUT_RDWR);
199 } else {
200 pthread_mutex_unlock(&lock);
201 break;
202 }
203 add_socket_pollin(&ring, conn_i.fd);
204 pthread_mutex_unlock(&lock);
205 } else if (type == WRITE) {
206 } else if (type == POLLING_IN) {
207 break;
208 } else if (type == POLLING_RDHUP) {
209 break;
210 } else if (type == CLOSE) {
211 } else if (type == EVENTFD_READ) {
212 add_socket_eventfd_read(&ring, client_eventfd);
213 }
214 }
215
216 io_uring_cq_advance(&ring, count);
217 }
218 }
219
sig_alrm(int sig)220 static void sig_alrm(int sig)
221 {
222 exit(0);
223 }
224
main(int argc,char * argv[])225 int main(int argc, char *argv[])
226 {
227 pthread_t server_thread_t, client_thread_t;
228 struct sigaction act;
229
230 if (argc > 1)
231 return 0;
232
233 if (pthread_mutex_init(&lock, NULL) != 0) {
234 printf("\n mutex init failed\n");
235 return 1;
236 }
237
238 pthread_create(&server_thread_t, NULL, &server_thread, NULL);
239 pthread_create(&client_thread_t, NULL, &client_thread, NULL);
240
241 memset(&act, 0, sizeof(act));
242 act.sa_handler = sig_alrm;
243 act.sa_flags = SA_RESTART;
244 sigaction(SIGALRM, &act, NULL);
245 alarm(1);
246
247 pthread_join(server_thread_t, NULL);
248 return 0;
249 }
250