1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 *
17 ******************************************************************************
18 *
19 * This implementation is based on a design by John Brooks (IBM Pok) which uses
20 * the z/OS sockets async i/o facility. When a
21 * socket is added to the pollset, an async poll is issued for that individual
22 * socket. It specifies that the kernel should send an IPC message when the
23 * socket becomes ready. The IPC messages are sent to a single message queue
24 * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC
25 * messages or the specified timeout.
26 *
27 * Since z/OS does not support async i/o for pipes or files at present, this
28 * implementation falls back to using ordinary poll() when
29 * APR_POLLSET_THREADSAFE is unset.
30 *
31 * Greg Ames
32 * April 2012
33 */
34
35 #include "apr.h"
36 #include "apr_hash.h"
37 #include "apr_poll.h"
38 #include "apr_time.h"
39 #include "apr_portable.h"
40 #include "apr_arch_inherit.h"
41 #include "apr_arch_file_io.h"
42 #include "apr_arch_networkio.h"
43 #include "apr_arch_poll_private.h"
44
45 #ifdef HAVE_AIO_MSGQ
46
47 #include <sys/msg.h> /* msgget etc */
48 #include <time.h> /* timestruct */
49 #include <poll.h> /* pollfd */
50 #include <limits.h> /* MAX_INT */
51
52 struct apr_pollset_private_t
53 {
54 int msg_q; /* IPC message queue. The z/OS kernel sends messages
55 * to this queue when our async polls on individual
56 * file descriptors complete
57 */
58 apr_pollfd_t *result_set;
59 apr_uint32_t size;
60
61 #if APR_HAS_THREADS
62 /* A thread mutex to protect operations on the rings and the hash */
63 apr_thread_mutex_t *ring_lock;
64 #endif
65
66 /* A hash of all active elements used for O(1) _remove operations */
67 apr_hash_t *elems;
68
69 APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring;
70 APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring;
71 APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring;
72
73 /* for pipes etc with no asio */
74 struct pollfd *pollset;
75 apr_pollfd_t *query_set;
76 };
77
78 typedef enum {
79 ASIO_INIT = 0,
80 ASIO_REMOVED,
81 ASIO_COMPLETE
82 } asio_state_e;
83
84 typedef struct asio_elem_t asio_elem_t;
85
86 struct asio_msgbuf_t {
87 long msg_type; /* must be > 0 */
88 asio_elem_t *msg_elem;
89 };
90
91 struct asio_elem_t
92 {
93 APR_RING_ENTRY(asio_elem_t) link;
94 apr_pollfd_t pfd;
95 struct pollfd os_pfd;
96 struct aiocb a;
97 asio_state_e state;
98 struct asio_msgbuf_t msg;
99 };
100
101 #define DEBUG 0
102
103 /* DEBUG settings: 0 - no debug messages at all,
104 * 1 - should not occur messages,
105 * 2 - apr_pollset_* entry and exit messages,
106 * 3 - state changes, memory usage,
107 * 4 - z/OS, APR, and internal calls,
108 * 5 - everything else except the timer pop path,
109 * 6 - everything, including the Event 1 sec timer pop path
110 *
111 * each DEBUG level includes all messages produced by lower numbered levels
112 */
113
114 #if DEBUG
115
116 #include <assert.h>
117 #include <unistd.h> /* getpid */
118
119 #define DBG_BUFF char dbg_msg_buff[256];
120
121 #define DBG_TEST(lvl) if (lvl <= DEBUG) {
122
123 #define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
124 " " msg, getpid()), \
125 fprintf(stderr, "%s", dbg_msg_buff);
126 #define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
127 " " msg, getpid(), var1), \
128 fprintf(stderr, "%s", dbg_msg_buff);
129 #define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
130 " " msg, getpid(), var1, var2), \
131 fprintf(stderr, "%s", dbg_msg_buff);
132 #define DBG_CORE3(msg, var1, var2, var3) \
133 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
134 " " msg, getpid(), var1, var2, var3), \
135 fprintf(stderr, "%s", dbg_msg_buff);
136 #define DBG_CORE4(msg, var1, var2, var3, var4) \
137 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
138 " " msg, getpid(), var1, var2, var3, var4),\
139 fprintf(stderr, "%s", dbg_msg_buff);
140
141 #define DBG_END }
142
143 #define DBG(lvl, msg) DBG_TEST(lvl) \
144 DBG_CORE(msg) \
145 DBG_END
146
147 #define DBG1(lvl, msg, var1) DBG_TEST(lvl) \
148 DBG_CORE1(msg, var1) \
149 DBG_END
150
151 #define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \
152 DBG_CORE2(msg, var1, var2) \
153 DBG_END
154
155 #define DBG3(lvl, msg, var1, var2, var3) \
156 DBG_TEST(lvl) \
157 DBG_CORE3(msg, var1, var2, var3) \
158 DBG_END
159
160 #define DBG4(lvl, msg, var1, var2, var3, var4) \
161 DBG_TEST(lvl) \
162 DBG_CORE4(msg, var1, var2, var3, var4) \
163 DBG_END
164
165 #else /* DEBUG is 0 */
166 #define DBG_BUFF
167 #define DBG(lvl, msg) ((void)0)
168 #define DBG1(lvl, msg, var1) ((void)0)
169 #define DBG2(lvl, msg, var1, var2) ((void)0)
170 #define DBG3(lvl, msg, var1, var2, var3) ((void)0)
171 #define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0)
172
173 #endif /* DEBUG */
174
asyncio(struct aiocb * a)175 static int asyncio(struct aiocb *a)
176 {
177 DBG_BUFF
178 int rv;
179
180 #ifdef _LP64
181 #define AIO BPX4AIO
182 #else
183 #define AIO BPX1AIO
184 #endif
185
186 AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad());
187 DBG2(4, "BPX4AIO aiocb %p rv %d\n",
188 a, rv);
189 #ifdef DEBUG
190 if (rv < 0) {
191 DBG2(4, "errno %d errnojr %08x\n",
192 errno, *__err2ad());
193 }
194 #endif
195 return rv;
196 }
197
get_event(apr_int16_t event)198 static apr_int16_t get_event(apr_int16_t event)
199 {
200 DBG_BUFF
201 apr_int16_t rv = 0;
202 DBG(4, "entered\n");
203
204 if (event & APR_POLLIN)
205 rv |= POLLIN;
206 if (event & APR_POLLPRI)
207 rv |= POLLPRI;
208 if (event & APR_POLLOUT)
209 rv |= POLLOUT;
210 if (event & APR_POLLERR)
211 rv |= POLLERR;
212 if (event & APR_POLLHUP)
213 rv |= POLLHUP;
214 if (event & APR_POLLNVAL)
215 rv |= POLLNVAL;
216
217 DBG(4, "exiting\n");
218 return rv;
219 }
220
get_revent(apr_int16_t event)221 static apr_int16_t get_revent(apr_int16_t event)
222 {
223 DBG_BUFF
224 apr_int16_t rv = 0;
225 DBG(4, "entered\n");
226
227 if (event & POLLIN)
228 rv |= APR_POLLIN;
229 if (event & POLLPRI)
230 rv |= APR_POLLPRI;
231 if (event & POLLOUT)
232 rv |= APR_POLLOUT;
233 if (event & POLLERR)
234 rv |= APR_POLLERR;
235 if (event & POLLHUP)
236 rv |= APR_POLLHUP;
237 if (event & POLLNVAL)
238 rv |= APR_POLLNVAL;
239
240 DBG(4, "exiting\n");
241 return rv;
242 }
243
asio_pollset_cleanup(apr_pollset_t * pollset)244 static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset)
245 {
246 DBG_BUFF
247 int rv;
248
249 DBG(4, "entered\n");
250 rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL);
251
252 DBG1(4, "exiting, msgctl(IPC_RMID) returned %d\n", rv);
253 return rv;
254 }
255
asio_pollset_create(apr_pollset_t * pollset,apr_uint32_t size,apr_pool_t * p,apr_uint32_t flags)256 static apr_status_t asio_pollset_create(apr_pollset_t *pollset,
257 apr_uint32_t size,
258 apr_pool_t *p,
259 apr_uint32_t flags)
260 {
261 DBG_BUFF
262 apr_status_t rv;
263 apr_pollset_private_t *priv;
264
265 DBG1(2, "entered, flags: %x\n", flags);
266
267 priv = pollset->p = apr_palloc(p, sizeof(*priv));
268
269 if (flags & APR_POLLSET_THREADSAFE) {
270 #if APR_HAS_THREADS
271 if (rv = apr_thread_mutex_create(&(priv->ring_lock),
272 APR_THREAD_MUTEX_DEFAULT,
273 p) != APR_SUCCESS) {
274 DBG1(1, "apr_thread_mutex_create returned %d\n", rv);
275 pollset->p = NULL;
276 return rv;
277 }
278 rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */
279 if (rv < 0) {
280 #if DEBUG
281 perror(__FUNCTION__ " msgget returned < 0 ");
282 #endif
283 pollset->p = NULL;
284 return rv;
285 }
286
287 DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv);
288 priv->msg_q = rv;
289 priv->elems = apr_hash_make(p);
290
291 APR_RING_INIT(&priv->free_ring, asio_elem_t, link);
292 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
293
294 #else /* APR doesn't have threads but caller wants a threadsafe pollset */
295 pollset->p = NULL;
296 return APR_ENOTIMPL;
297 #endif
298
299 } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o,
300 * init fields only needed in old style pollset
301 */
302
303 priv->pollset = apr_palloc(p, size * sizeof(struct pollfd));
304 priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
305
306 if ((!priv->pollset) || (!priv->query_set)) {
307 pollset->p = NULL;
308 return APR_ENOMEM;
309 }
310 }
311
312 pollset->nelts = 0;
313 pollset->flags = flags;
314 pollset->pool = p;
315 priv->size = size;
316 priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
317 if (!priv->result_set) {
318 if (flags & APR_POLLSET_THREADSAFE) {
319 msgctl(priv->msg_q, IPC_RMID, NULL);
320 }
321 pollset->p = NULL;
322 return APR_ENOMEM;
323 }
324
325 DBG2(2, "exiting, pollset: %p, type: %s\n",
326 pollset,
327 flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX");
328
329
330 return APR_SUCCESS;
331
332 } /* end of asio_pollset_create */
333
posix_add(apr_pollset_t * pollset,const apr_pollfd_t * descriptor)334 static apr_status_t posix_add(apr_pollset_t *pollset,
335 const apr_pollfd_t *descriptor)
336 {
337 DBG_BUFF
338 int fd;
339 apr_pool_t *p = pollset->pool;
340 apr_pollset_private_t *priv = pollset->p;
341
342 DBG(4, "entered\n");
343
344 if (pollset->nelts == priv->size) {
345 return APR_ENOMEM;
346 }
347
348 priv->query_set[pollset->nelts] = *descriptor;
349 if (descriptor->desc_type == APR_POLL_SOCKET) {
350 fd = descriptor->desc.s->socketdes;
351 }
352 else {
353 fd = descriptor->desc.f->filedes;
354 }
355
356 priv->pollset[pollset->nelts].fd = fd;
357
358 priv->pollset[pollset->nelts].events =
359 get_event(descriptor->reqevents);
360
361 pollset->nelts++;
362
363 DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset);
364
365 return APR_SUCCESS;
366 } /* end of posix_add */
367
368
asio_pollset_add(apr_pollset_t * pollset,const apr_pollfd_t * descriptor)369 static apr_status_t asio_pollset_add(apr_pollset_t *pollset,
370 const apr_pollfd_t *descriptor)
371 {
372 DBG_BUFF
373 asio_elem_t *elem;
374 apr_status_t rv = APR_SUCCESS;
375 apr_pollset_private_t *priv = pollset->p;
376
377 pollset_lock_rings();
378 DBG(2, "entered\n");
379
380 if (pollset->flags & APR_POLLSET_THREADSAFE) {
381
382 if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) {
383 elem = APR_RING_FIRST(&(priv->free_ring));
384 APR_RING_REMOVE(elem, link);
385 DBG1(3, "used recycled memory at %08p\n", elem);
386 elem->state = ASIO_INIT;
387 elem->a.aio_cflags = 0;
388 }
389 else {
390 elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t));
391 DBG1(3, "alloced new memory at %08p\n", elem);
392
393 elem->a.aio_notifytype = AIO_MSGQ;
394 elem->a.aio_msgev_qid = priv->msg_q;
395 DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid);
396 elem->a.aio_msgev_size = sizeof(asio_elem_t *);
397 elem->a.aio_msgev_flag = 0; /* wait if queue is full */
398 elem->a.aio_msgev_addr = &(elem->msg);
399 elem->a.aio_buf = &(elem->os_pfd);
400 elem->a.aio_nbytes = 1; /* number of pfds to poll */
401 elem->msg.msg_type = 1;
402 elem->msg.msg_elem = elem;
403 }
404
405 /* z/OS only supports async I/O for sockets for now */
406 elem->os_pfd.fd = descriptor->desc.s->socketdes;
407
408 APR_RING_ELEM_INIT(elem, link);
409 elem->a.aio_cmd = AIO_SELPOLL;
410 elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/
411 elem->pfd = *descriptor;
412 elem->os_pfd.events = get_event(descriptor->reqevents);
413
414 if (0 != asyncio(&elem->a)) {
415 rv = errno;
416 DBG3(4, "pollset %p asio failed fd %d, errno %p\n",
417 pollset, elem->os_pfd.fd, rv);
418 #if DEBUG
419 perror(__FUNCTION__ " asio failure");
420 #endif
421 }
422 else {
423 DBG2(4, "good asio call, adding fd %d to pollset %p\n",
424 elem->os_pfd.fd, pollset);
425
426 pollset->nelts++;
427 apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem);
428 }
429 }
430 else {
431 /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case
432 * pipes or files are used with this pollset
433 */
434
435 rv = posix_add(pollset, descriptor);
436 }
437
438 DBG1(2, "exiting, rv = %d\n", rv);
439
440 pollset_unlock_rings();
441 return rv;
442 } /* end of asio_pollset_add */
443
posix_remove(apr_pollset_t * pollset,const apr_pollfd_t * descriptor)444 static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor)
445 {
446 DBG_BUFF
447 apr_uint32_t i;
448 apr_pollset_private_t *priv = pollset->p;
449
450 DBG(4, "entered\n");
451 for (i = 0; i < pollset->nelts; i++) {
452 if (descriptor->desc.s == priv->query_set[i].desc.s) {
453 /* Found an instance of the fd: remove this and any other copies */
454 apr_uint32_t dst = i;
455 apr_uint32_t old_nelts = pollset->nelts;
456 pollset->nelts--;
457 for (i++; i < old_nelts; i++) {
458 if (descriptor->desc.s == priv->query_set[i].desc.s) {
459 pollset->nelts--;
460 }
461 else {
462 priv->pollset[dst] = priv->pollset[i];
463 priv->query_set[dst] = priv->query_set[i];
464 dst++;
465 }
466 }
467 DBG(4, "returning OK\n");
468 return APR_SUCCESS;
469 }
470 }
471
472 DBG(1, "returning APR_NOTFOUND\n");
473 return APR_NOTFOUND;
474
475 } /* end of posix_remove */
476
asio_pollset_remove(apr_pollset_t * pollset,const apr_pollfd_t * descriptor)477 static apr_status_t asio_pollset_remove(apr_pollset_t *pollset,
478 const apr_pollfd_t *descriptor)
479 {
480 DBG_BUFF
481 asio_elem_t *elem;
482 apr_status_t rv = APR_SUCCESS;
483 apr_pollset_private_t *priv = pollset->p;
484 struct aiocb cancel_a; /* AIO_CANCEL is synchronous, so autodata works fine */
485
486 int fd;
487
488 DBG(2, "entered\n");
489
490 if (!(pollset->flags & APR_POLLSET_THREADSAFE)) {
491 return posix_remove(pollset, descriptor);
492 }
493
494 pollset_lock_rings();
495
496 #if DEBUG
497 assert(descriptor->desc_type == APR_POLL_SOCKET);
498 #endif
499 /* zOS 1.12 doesn't support files for async i/o */
500 fd = descriptor->desc.s->socketdes;
501
502 elem = apr_hash_get(priv->elems, &(fd), sizeof(int));
503 if (elem == NULL) {
504 DBG1(1, "couldn't find fd %d\n", fd);
505 rv = APR_NOTFOUND;
506 } else {
507 DBG1(5, "hash found fd %d\n", fd);
508 /* delete this fd from the hash */
509 apr_hash_set(priv->elems, &(fd), sizeof(int), NULL);
510
511 if (elem->state == ASIO_INIT) {
512 /* asyncio call to cancel */
513 cancel_a.aio_cmd = AIO_CANCEL;
514 cancel_a.aio_buf = &elem->a; /* point to original aiocb */
515
516 cancel_a.aio_cflags = 0;
517 cancel_a.aio_cflags2 = 0;
518
519 /* we want the original aiocb to show up on the pollset message queue
520 * before recycling its memory to eliminate race conditions
521 */
522
523 rv = asyncio(&cancel_a);
524 DBG1(4, "asyncio returned %d\n", rv);
525
526 #if DEBUG
527 assert(rv == 1);
528 #endif
529 }
530 elem->state = ASIO_REMOVED;
531 rv = APR_SUCCESS;
532 }
533
534 DBG1(2, "exiting, rv: %d\n", rv);
535
536 pollset_unlock_rings();
537
538 return rv;
539 } /* end of asio_pollset_remove */
540
posix_poll(apr_pollset_t * pollset,apr_interval_time_t timeout,apr_int32_t * num,const apr_pollfd_t ** descriptors)541 static posix_poll(apr_pollset_t *pollset,
542 apr_interval_time_t timeout,
543 apr_int32_t *num,
544 const apr_pollfd_t **descriptors)
545 {
546 DBG_BUFF
547 int rv;
548 apr_uint32_t i, j;
549 apr_pollset_private_t *priv = pollset->p;
550
551 DBG(4, "entered\n");
552
553 if (timeout > 0) {
554 timeout /= 1000;
555 }
556 rv = poll(priv->pollset, pollset->nelts, timeout);
557 (*num) = rv;
558 if (rv < 0) {
559 return apr_get_netos_error();
560 }
561 if (rv == 0) {
562 return APR_TIMEUP;
563 }
564 j = 0;
565 for (i = 0; i < pollset->nelts; i++) {
566 if (priv->pollset[i].revents != 0) {
567 priv->result_set[j] = priv->query_set[i];
568 priv->result_set[j].rtnevents =
569 get_revent(priv->pollset[i].revents);
570 j++;
571 }
572 }
573 if (descriptors)
574 *descriptors = priv->result_set;
575
576 DBG(4, "exiting ok\n");
577 return APR_SUCCESS;
578
579 } /* end of posix_poll */
580
process_msg(apr_pollset_t * pollset,struct asio_msgbuf_t * msg)581 static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg)
582 {
583 DBG_BUFF
584 asio_elem_t *elem = msg->msg_elem;
585
586 switch(elem->state) {
587 case ASIO_REMOVED:
588 DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n",
589 elem, elem->os_pfd.fd);
590 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem,
591 asio_elem_t, link);
592 break;
593 case ASIO_INIT:
594 DBG2(4, "adding to ready ring: elem %08p, fd %d\n",
595 elem, elem->os_pfd.fd);
596 elem->state = ASIO_COMPLETE;
597 APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem,
598 asio_elem_t, link);
599 break;
600 default:
601 DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n",
602 elem, elem->os_pfd.fd, elem->state);
603 #if DEBUG
604 assert(0);
605 #endif
606 }
607 }
608
asio_pollset_poll(apr_pollset_t * pollset,apr_interval_time_t timeout,apr_int32_t * num,const apr_pollfd_t ** descriptors)609 static apr_status_t asio_pollset_poll(apr_pollset_t *pollset,
610 apr_interval_time_t timeout,
611 apr_int32_t *num,
612 const apr_pollfd_t **descriptors)
613 {
614 DBG_BUFF
615 int i, ret;
616 asio_elem_t *elem, *next_elem;
617 struct asio_msgbuf_t msg_buff;
618 struct timespec tv;
619 apr_status_t rv = APR_SUCCESS;
620 apr_pollset_private_t *priv = pollset->p;
621
622 DBG(6, "entered\n"); /* chatty - traces every second w/Event */
623
624 if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) {
625 return posix_poll(pollset, timeout, num, descriptors);
626 }
627
628 pollset_lock_rings();
629 APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link);
630
631 while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) {
632 elem = APR_RING_FIRST(&(priv->prior_ready_ring));
633 DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n",
634 pollset,
635 elem,
636 elem->os_pfd.fd);
637
638 APR_RING_REMOVE(elem, link);
639
640 /*
641 * since USS does not remember what's in our pollset, we have
642 * to re-add fds which have not been apr_pollset_remove'd
643 *
644 * there may have been too many ready fd's to return in the
645 * result set last time. re-poll inline for both cases
646 */
647
648 if (elem->state == ASIO_REMOVED) {
649
650 /*
651 * async i/o is done since it was found on prior_ready
652 * the state says the caller is done with it too
653 * so recycle the elem
654 */
655
656 APR_RING_INSERT_TAIL(&(priv->free_ring), elem,
657 asio_elem_t, link);
658 continue; /* do not re-add if it has been _removed */
659 }
660
661 elem->state = ASIO_INIT;
662 elem->a.aio_cflags = AIO_OK2COMPIMD;
663
664 if (0 != (ret = asyncio(&elem->a))) {
665 if (ret == 1) {
666 DBG(4, "asyncio() completed inline\n");
667 /* it's ready now */
668 elem->state = ASIO_COMPLETE;
669 APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t,
670 link);
671 }
672 else {
673 DBG2(1, "asyncio() failed, ret: %d, errno: %d\n",
674 ret, errno);
675 pollset_unlock_rings();
676 return errno;
677 }
678 }
679 DBG1(4, "asyncio() completed rc %d\n", ret);
680 }
681
682 DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */
683
684 /* Gather async poll completions that have occurred since the last call */
685 while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0,
686 IPC_NOWAIT)) {
687 process_msg(pollset, &msg_buff);
688 }
689
690 /* Suspend if nothing is ready yet. */
691 if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) {
692
693 if (timeout >= 0) {
694 tv.tv_sec = apr_time_sec(timeout);
695 tv.tv_nsec = apr_time_usec(timeout) * 1000;
696 } else {
697 tv.tv_sec = INT_MAX; /* block until something is ready */
698 }
699
700 DBG2(6, "nothing on the ready ring "
701 "- blocking for %d seconds %d ns\n",
702 tv.tv_sec, tv.tv_nsec);
703
704 pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */
705
706 if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff,
707 sizeof(asio_elem_t *), 0, NULL, &tv))) {
708 #if DEBUG
709 if (errno == EAGAIN) {
710 DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */
711 }
712 else {
713 DBG(1, "__msgrcv_timed failed!\n");
714 }
715 #endif
716 return (errno == EAGAIN) ? APR_TIMEUP : errno;
717 }
718
719 pollset_lock_rings();
720
721 process_msg(pollset, &msg_buff);
722 }
723
724 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
725
726 (*num) = 0;
727 elem = APR_RING_FIRST(&(priv->ready_ring));
728
729 for (i = 0;
730
731 i < priv->size
732 && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link);
733 i++) {
734 DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd);
735
736 priv->result_set[i] = elem->pfd;
737 priv->result_set[i].rtnevents
738 = get_revent(elem->os_pfd.revents);
739 (*num)++;
740
741 elem = APR_RING_NEXT(elem, link);
742
743 #if DEBUG
744 if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) {
745 DBG(5, "end of ready ring reached\n");
746 }
747 #endif
748 }
749
750 if (descriptors) {
751 *descriptors = priv->result_set;
752 }
753
754 /* if the result size is too small, remember which descriptors
755 * haven't had results reported yet. we will look
756 * at these descriptors on the next apr_pollset_poll call
757 */
758
759 APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link);
760
761 DBG1(2, "exiting, rv = %d\n", rv);
762
763 pollset_unlock_rings();
764
765 return rv;
766 } /* end of asio_pollset_poll */
767
768 static apr_pollset_provider_t impl = {
769 asio_pollset_create,
770 asio_pollset_add,
771 asio_pollset_remove,
772 asio_pollset_poll,
773 asio_pollset_cleanup,
774 "asio"
775 };
776
777 apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl;
778
779 #endif /* HAVE_AIO_MSGQ */
780