1"""Selector and proactor event loops for Windows."""
2
3import sys
4
5if sys.platform != 'win32':  # pragma: no cover
6    raise ImportError('win32 only')
7
8import _overlapped
9import _winapi
10import errno
11import math
12import msvcrt
13import socket
14import struct
15import time
16import weakref
17
18from . import events
19from . import base_subprocess
20from . import futures
21from . import exceptions
22from . import proactor_events
23from . import selector_events
24from . import tasks
25from . import windows_utils
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32    'WindowsProactorEventLoopPolicy',
33)
34
35
36NULL = _winapi.NULL
37INFINITE = _winapi.INFINITE
38ERROR_CONNECTION_REFUSED = 1225
39ERROR_CONNECTION_ABORTED = 1236
40
41# Initial delay in seconds for connect_pipe() before retrying to connect
42CONNECT_PIPE_INIT_DELAY = 0.001
43
44# Maximum delay in seconds for connect_pipe() before retrying to connect
45CONNECT_PIPE_MAX_DELAY = 0.100
46
47
48class _OverlappedFuture(futures.Future):
49    """Subclass of Future which represents an overlapped operation.
50
51    Cancelling it will immediately cancel the overlapped operation.
52    """
53
54    def __init__(self, ov, *, loop=None):
55        super().__init__(loop=loop)
56        if self._source_traceback:
57            del self._source_traceback[-1]
58        self._ov = ov
59
60    def _repr_info(self):
61        info = super()._repr_info()
62        if self._ov is not None:
63            state = 'pending' if self._ov.pending else 'completed'
64            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65        return info
66
67    def _cancel_overlapped(self):
68        if self._ov is None:
69            return
70        try:
71            self._ov.cancel()
72        except OSError as exc:
73            context = {
74                'message': 'Cancelling an overlapped future failed',
75                'exception': exc,
76                'future': self,
77            }
78            if self._source_traceback:
79                context['source_traceback'] = self._source_traceback
80            self._loop.call_exception_handler(context)
81        self._ov = None
82
83    def cancel(self, msg=None):
84        self._cancel_overlapped()
85        return super().cancel(msg=msg)
86
87    def set_exception(self, exception):
88        super().set_exception(exception)
89        self._cancel_overlapped()
90
91    def set_result(self, result):
92        super().set_result(result)
93        self._ov = None
94
95
96class _BaseWaitHandleFuture(futures.Future):
97    """Subclass of Future which represents a wait handle."""
98
99    def __init__(self, ov, handle, wait_handle, *, loop=None):
100        super().__init__(loop=loop)
101        if self._source_traceback:
102            del self._source_traceback[-1]
103        # Keep a reference to the Overlapped object to keep it alive until the
104        # wait is unregistered
105        self._ov = ov
106        self._handle = handle
107        self._wait_handle = wait_handle
108
109        # Should we call UnregisterWaitEx() if the wait completes
110        # or is cancelled?
111        self._registered = True
112
113    def _poll(self):
114        # non-blocking wait: use a timeout of 0 millisecond
115        return (_winapi.WaitForSingleObject(self._handle, 0) ==
116                _winapi.WAIT_OBJECT_0)
117
118    def _repr_info(self):
119        info = super()._repr_info()
120        info.append(f'handle={self._handle:#x}')
121        if self._handle is not None:
122            state = 'signaled' if self._poll() else 'waiting'
123            info.append(state)
124        if self._wait_handle is not None:
125            info.append(f'wait_handle={self._wait_handle:#x}')
126        return info
127
128    def _unregister_wait_cb(self, fut):
129        # The wait was unregistered: it's not safe to destroy the Overlapped
130        # object
131        self._ov = None
132
133    def _unregister_wait(self):
134        if not self._registered:
135            return
136        self._registered = False
137
138        wait_handle = self._wait_handle
139        self._wait_handle = None
140        try:
141            _overlapped.UnregisterWait(wait_handle)
142        except OSError as exc:
143            if exc.winerror != _overlapped.ERROR_IO_PENDING:
144                context = {
145                    'message': 'Failed to unregister the wait handle',
146                    'exception': exc,
147                    'future': self,
148                }
149                if self._source_traceback:
150                    context['source_traceback'] = self._source_traceback
151                self._loop.call_exception_handler(context)
152                return
153            # ERROR_IO_PENDING means that the unregister is pending
154
155        self._unregister_wait_cb(None)
156
157    def cancel(self, msg=None):
158        self._unregister_wait()
159        return super().cancel(msg=msg)
160
161    def set_exception(self, exception):
162        self._unregister_wait()
163        super().set_exception(exception)
164
165    def set_result(self, result):
166        self._unregister_wait()
167        super().set_result(result)
168
169
170class _WaitCancelFuture(_BaseWaitHandleFuture):
171    """Subclass of Future which represents a wait for the cancellation of a
172    _WaitHandleFuture using an event.
173    """
174
175    def __init__(self, ov, event, wait_handle, *, loop=None):
176        super().__init__(ov, event, wait_handle, loop=loop)
177
178        self._done_callback = None
179
180    def cancel(self):
181        raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
183    def set_result(self, result):
184        super().set_result(result)
185        if self._done_callback is not None:
186            self._done_callback(self)
187
188    def set_exception(self, exception):
189        super().set_exception(exception)
190        if self._done_callback is not None:
191            self._done_callback(self)
192
193
194class _WaitHandleFuture(_BaseWaitHandleFuture):
195    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196        super().__init__(ov, handle, wait_handle, loop=loop)
197        self._proactor = proactor
198        self._unregister_proactor = True
199        self._event = _overlapped.CreateEvent(None, True, False, None)
200        self._event_fut = None
201
202    def _unregister_wait_cb(self, fut):
203        if self._event is not None:
204            _winapi.CloseHandle(self._event)
205            self._event = None
206            self._event_fut = None
207
208        # If the wait was cancelled, the wait may never be signalled, so
209        # it's required to unregister it. Otherwise, IocpProactor.close() will
210        # wait forever for an event which will never come.
211        #
212        # If the IocpProactor already received the event, it's safe to call
213        # _unregister() because we kept a reference to the Overlapped object
214        # which is used as a unique key.
215        self._proactor._unregister(self._ov)
216        self._proactor = None
217
218        super()._unregister_wait_cb(fut)
219
220    def _unregister_wait(self):
221        if not self._registered:
222            return
223        self._registered = False
224
225        wait_handle = self._wait_handle
226        self._wait_handle = None
227        try:
228            _overlapped.UnregisterWaitEx(wait_handle, self._event)
229        except OSError as exc:
230            if exc.winerror != _overlapped.ERROR_IO_PENDING:
231                context = {
232                    'message': 'Failed to unregister the wait handle',
233                    'exception': exc,
234                    'future': self,
235                }
236                if self._source_traceback:
237                    context['source_traceback'] = self._source_traceback
238                self._loop.call_exception_handler(context)
239                return
240            # ERROR_IO_PENDING is not an error, the wait was unregistered
241
242        self._event_fut = self._proactor._wait_cancel(self._event,
243                                                      self._unregister_wait_cb)
244
245
246class PipeServer(object):
247    """Class representing a pipe server.
248
249    This is much like a bound, listening socket.
250    """
251    def __init__(self, address):
252        self._address = address
253        self._free_instances = weakref.WeakSet()
254        # initialize the pipe attribute before calling _server_pipe_handle()
255        # because this function can raise an exception and the destructor calls
256        # the close() method
257        self._pipe = None
258        self._accept_pipe_future = None
259        self._pipe = self._server_pipe_handle(True)
260
261    def _get_unconnected_pipe(self):
262        # Create new instance and return previous one.  This ensures
263        # that (until the server is closed) there is always at least
264        # one pipe handle for address.  Therefore if a client attempt
265        # to connect it will not fail with FileNotFoundError.
266        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267        return tmp
268
269    def _server_pipe_handle(self, first):
270        # Return a wrapper for a new pipe handle.
271        if self.closed():
272            return None
273        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274        if first:
275            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276        h = _winapi.CreateNamedPipe(
277            self._address, flags,
278            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279            _winapi.PIPE_WAIT,
280            _winapi.PIPE_UNLIMITED_INSTANCES,
281            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283        pipe = windows_utils.PipeHandle(h)
284        self._free_instances.add(pipe)
285        return pipe
286
287    def closed(self):
288        return (self._address is None)
289
290    def close(self):
291        if self._accept_pipe_future is not None:
292            self._accept_pipe_future.cancel()
293            self._accept_pipe_future = None
294        # Close all instances which have not been connected to by a client.
295        if self._address is not None:
296            for pipe in self._free_instances:
297                pipe.close()
298            self._pipe = None
299            self._address = None
300            self._free_instances.clear()
301
302    __del__ = close
303
304
305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
306    """Windows version of selector event loop."""
307
308
309class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310    """Windows version of proactor event loop using IOCP."""
311
312    def __init__(self, proactor=None):
313        if proactor is None:
314            proactor = IocpProactor()
315        super().__init__(proactor)
316
317    def run_forever(self):
318        try:
319            assert self._self_reading_future is None
320            self.call_soon(self._loop_self_reading)
321            super().run_forever()
322        finally:
323            if self._self_reading_future is not None:
324                ov = self._self_reading_future._ov
325                self._self_reading_future.cancel()
326                # self_reading_future was just cancelled so if it hasn't been
327                # finished yet, it never will be (it's possible that it has
328                # already finished and its callback is waiting in the queue,
329                # where it could still happen if the event loop is restarted).
330                # Unregister it otherwise IocpProactor.close will wait for it
331                # forever
332                if ov is not None:
333                    self._proactor._unregister(ov)
334                self._self_reading_future = None
335
336    async def create_pipe_connection(self, protocol_factory, address):
337        f = self._proactor.connect_pipe(address)
338        pipe = await f
339        protocol = protocol_factory()
340        trans = self._make_duplex_pipe_transport(pipe, protocol,
341                                                 extra={'addr': address})
342        return trans, protocol
343
344    async def start_serving_pipe(self, protocol_factory, address):
345        server = PipeServer(address)
346
347        def loop_accept_pipe(f=None):
348            pipe = None
349            try:
350                if f:
351                    pipe = f.result()
352                    server._free_instances.discard(pipe)
353
354                    if server.closed():
355                        # A client connected before the server was closed:
356                        # drop the client (close the pipe) and exit
357                        pipe.close()
358                        return
359
360                    protocol = protocol_factory()
361                    self._make_duplex_pipe_transport(
362                        pipe, protocol, extra={'addr': address})
363
364                pipe = server._get_unconnected_pipe()
365                if pipe is None:
366                    return
367
368                f = self._proactor.accept_pipe(pipe)
369            except BrokenPipeError:
370                if pipe and pipe.fileno() != -1:
371                    pipe.close()
372                self.call_soon(loop_accept_pipe)
373            except OSError as exc:
374                if pipe and pipe.fileno() != -1:
375                    self.call_exception_handler({
376                        'message': 'Pipe accept failed',
377                        'exception': exc,
378                        'pipe': pipe,
379                    })
380                    pipe.close()
381                elif self._debug:
382                    logger.warning("Accept pipe failed on pipe %r",
383                                   pipe, exc_info=True)
384                self.call_soon(loop_accept_pipe)
385            except exceptions.CancelledError:
386                if pipe:
387                    pipe.close()
388            else:
389                server._accept_pipe_future = f
390                f.add_done_callback(loop_accept_pipe)
391
392        self.call_soon(loop_accept_pipe)
393        return [server]
394
395    async def _make_subprocess_transport(self, protocol, args, shell,
396                                         stdin, stdout, stderr, bufsize,
397                                         extra=None, **kwargs):
398        waiter = self.create_future()
399        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
400                                             stdin, stdout, stderr, bufsize,
401                                             waiter=waiter, extra=extra,
402                                             **kwargs)
403        try:
404            await waiter
405        except (SystemExit, KeyboardInterrupt):
406            raise
407        except BaseException:
408            transp.close()
409            await transp._wait()
410            raise
411
412        return transp
413
414
415class IocpProactor:
416    """Proactor implementation using IOCP."""
417
418    def __init__(self, concurrency=INFINITE):
419        self._loop = None
420        self._results = []
421        self._iocp = _overlapped.CreateIoCompletionPort(
422            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
423        self._cache = {}
424        self._registered = weakref.WeakSet()
425        self._unregistered = []
426        self._stopped_serving = weakref.WeakSet()
427
428    def _check_closed(self):
429        if self._iocp is None:
430            raise RuntimeError('IocpProactor is closed')
431
432    def __repr__(self):
433        info = ['overlapped#=%s' % len(self._cache),
434                'result#=%s' % len(self._results)]
435        if self._iocp is None:
436            info.append('closed')
437        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
438
439    def set_loop(self, loop):
440        self._loop = loop
441
442    def select(self, timeout=None):
443        if not self._results:
444            self._poll(timeout)
445        tmp = self._results
446        self._results = []
447        try:
448            return tmp
449        finally:
450            # Needed to break cycles when an exception occurs.
451            tmp = None
452
453    def _result(self, value):
454        fut = self._loop.create_future()
455        fut.set_result(value)
456        return fut
457
458    def recv(self, conn, nbytes, flags=0):
459        self._register_with_iocp(conn)
460        ov = _overlapped.Overlapped(NULL)
461        try:
462            if isinstance(conn, socket.socket):
463                ov.WSARecv(conn.fileno(), nbytes, flags)
464            else:
465                ov.ReadFile(conn.fileno(), nbytes)
466        except BrokenPipeError:
467            return self._result(b'')
468
469        def finish_recv(trans, key, ov):
470            try:
471                return ov.getresult()
472            except OSError as exc:
473                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
474                                    _overlapped.ERROR_OPERATION_ABORTED):
475                    raise ConnectionResetError(*exc.args)
476                else:
477                    raise
478
479        return self._register(ov, conn, finish_recv)
480
481    def recv_into(self, conn, buf, flags=0):
482        self._register_with_iocp(conn)
483        ov = _overlapped.Overlapped(NULL)
484        try:
485            if isinstance(conn, socket.socket):
486                ov.WSARecvInto(conn.fileno(), buf, flags)
487            else:
488                ov.ReadFileInto(conn.fileno(), buf)
489        except BrokenPipeError:
490            return self._result(0)
491
492        def finish_recv(trans, key, ov):
493            try:
494                return ov.getresult()
495            except OSError as exc:
496                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
497                                    _overlapped.ERROR_OPERATION_ABORTED):
498                    raise ConnectionResetError(*exc.args)
499                else:
500                    raise
501
502        return self._register(ov, conn, finish_recv)
503
504    def recvfrom(self, conn, nbytes, flags=0):
505        self._register_with_iocp(conn)
506        ov = _overlapped.Overlapped(NULL)
507        try:
508            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
509        except BrokenPipeError:
510            return self._result((b'', None))
511
512        def finish_recv(trans, key, ov):
513            try:
514                return ov.getresult()
515            except OSError as exc:
516                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
517                                    _overlapped.ERROR_OPERATION_ABORTED):
518                    raise ConnectionResetError(*exc.args)
519                else:
520                    raise
521
522        return self._register(ov, conn, finish_recv)
523
524    def recvfrom_into(self, conn, buf, flags=0):
525        self._register_with_iocp(conn)
526        ov = _overlapped.Overlapped(NULL)
527        try:
528            ov.WSARecvFromInto(conn.fileno(), buf, flags)
529        except BrokenPipeError:
530            return self._result((0, None))
531
532        def finish_recv(trans, key, ov):
533            try:
534                return ov.getresult()
535            except OSError as exc:
536                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
537                                    _overlapped.ERROR_OPERATION_ABORTED):
538                    raise ConnectionResetError(*exc.args)
539                else:
540                    raise
541
542        return self._register(ov, conn, finish_recv)
543
544    def sendto(self, conn, buf, flags=0, addr=None):
545        self._register_with_iocp(conn)
546        ov = _overlapped.Overlapped(NULL)
547
548        ov.WSASendTo(conn.fileno(), buf, flags, addr)
549
550        def finish_send(trans, key, ov):
551            try:
552                return ov.getresult()
553            except OSError as exc:
554                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
555                                    _overlapped.ERROR_OPERATION_ABORTED):
556                    raise ConnectionResetError(*exc.args)
557                else:
558                    raise
559
560        return self._register(ov, conn, finish_send)
561
562    def send(self, conn, buf, flags=0):
563        self._register_with_iocp(conn)
564        ov = _overlapped.Overlapped(NULL)
565        if isinstance(conn, socket.socket):
566            ov.WSASend(conn.fileno(), buf, flags)
567        else:
568            ov.WriteFile(conn.fileno(), buf)
569
570        def finish_send(trans, key, ov):
571            try:
572                return ov.getresult()
573            except OSError as exc:
574                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
575                                    _overlapped.ERROR_OPERATION_ABORTED):
576                    raise ConnectionResetError(*exc.args)
577                else:
578                    raise
579
580        return self._register(ov, conn, finish_send)
581
582    def accept(self, listener):
583        self._register_with_iocp(listener)
584        conn = self._get_accept_socket(listener.family)
585        ov = _overlapped.Overlapped(NULL)
586        ov.AcceptEx(listener.fileno(), conn.fileno())
587
588        def finish_accept(trans, key, ov):
589            ov.getresult()
590            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
591            buf = struct.pack('@P', listener.fileno())
592            conn.setsockopt(socket.SOL_SOCKET,
593                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
594            conn.settimeout(listener.gettimeout())
595            return conn, conn.getpeername()
596
597        async def accept_coro(future, conn):
598            # Coroutine closing the accept socket if the future is cancelled
599            try:
600                await future
601            except exceptions.CancelledError:
602                conn.close()
603                raise
604
605        future = self._register(ov, listener, finish_accept)
606        coro = accept_coro(future, conn)
607        tasks.ensure_future(coro, loop=self._loop)
608        return future
609
610    def connect(self, conn, address):
611        if conn.type == socket.SOCK_DGRAM:
612            # WSAConnect will complete immediately for UDP sockets so we don't
613            # need to register any IOCP operation
614            _overlapped.WSAConnect(conn.fileno(), address)
615            fut = self._loop.create_future()
616            fut.set_result(None)
617            return fut
618
619        self._register_with_iocp(conn)
620        # The socket needs to be locally bound before we call ConnectEx().
621        try:
622            _overlapped.BindLocal(conn.fileno(), conn.family)
623        except OSError as e:
624            if e.winerror != errno.WSAEINVAL:
625                raise
626            # Probably already locally bound; check using getsockname().
627            if conn.getsockname()[1] == 0:
628                raise
629        ov = _overlapped.Overlapped(NULL)
630        ov.ConnectEx(conn.fileno(), address)
631
632        def finish_connect(trans, key, ov):
633            ov.getresult()
634            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
635            conn.setsockopt(socket.SOL_SOCKET,
636                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
637            return conn
638
639        return self._register(ov, conn, finish_connect)
640
641    def sendfile(self, sock, file, offset, count):
642        self._register_with_iocp(sock)
643        ov = _overlapped.Overlapped(NULL)
644        offset_low = offset & 0xffff_ffff
645        offset_high = (offset >> 32) & 0xffff_ffff
646        ov.TransmitFile(sock.fileno(),
647                        msvcrt.get_osfhandle(file.fileno()),
648                        offset_low, offset_high,
649                        count, 0, 0)
650
651        def finish_sendfile(trans, key, ov):
652            try:
653                return ov.getresult()
654            except OSError as exc:
655                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
656                                    _overlapped.ERROR_OPERATION_ABORTED):
657                    raise ConnectionResetError(*exc.args)
658                else:
659                    raise
660        return self._register(ov, sock, finish_sendfile)
661
662    def accept_pipe(self, pipe):
663        self._register_with_iocp(pipe)
664        ov = _overlapped.Overlapped(NULL)
665        connected = ov.ConnectNamedPipe(pipe.fileno())
666
667        if connected:
668            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
669            # that the pipe is connected. There is no need to wait for the
670            # completion of the connection.
671            return self._result(pipe)
672
673        def finish_accept_pipe(trans, key, ov):
674            ov.getresult()
675            return pipe
676
677        return self._register(ov, pipe, finish_accept_pipe)
678
679    async def connect_pipe(self, address):
680        delay = CONNECT_PIPE_INIT_DELAY
681        while True:
682            # Unfortunately there is no way to do an overlapped connect to
683            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
684            # ERROR_PIPE_BUSY.
685            try:
686                handle = _overlapped.ConnectPipe(address)
687                break
688            except OSError as exc:
689                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
690                    raise
691
692            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
693            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
694            await tasks.sleep(delay)
695
696        return windows_utils.PipeHandle(handle)
697
698    def wait_for_handle(self, handle, timeout=None):
699        """Wait for a handle.
700
701        Return a Future object. The result of the future is True if the wait
702        completed, or False if the wait did not complete (on timeout).
703        """
704        return self._wait_for_handle(handle, timeout, False)
705
706    def _wait_cancel(self, event, done_callback):
707        fut = self._wait_for_handle(event, None, True)
708        # add_done_callback() cannot be used because the wait may only complete
709        # in IocpProactor.close(), while the event loop is not running.
710        fut._done_callback = done_callback
711        return fut
712
713    def _wait_for_handle(self, handle, timeout, _is_cancel):
714        self._check_closed()
715
716        if timeout is None:
717            ms = _winapi.INFINITE
718        else:
719            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
720            # round away from zero to wait *at least* timeout seconds.
721            ms = math.ceil(timeout * 1e3)
722
723        # We only create ov so we can use ov.address as a key for the cache.
724        ov = _overlapped.Overlapped(NULL)
725        wait_handle = _overlapped.RegisterWaitWithQueue(
726            handle, self._iocp, ov.address, ms)
727        if _is_cancel:
728            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
729        else:
730            f = _WaitHandleFuture(ov, handle, wait_handle, self,
731                                  loop=self._loop)
732        if f._source_traceback:
733            del f._source_traceback[-1]
734
735        def finish_wait_for_handle(trans, key, ov):
736            # Note that this second wait means that we should only use
737            # this with handles types where a successful wait has no
738            # effect.  So events or processes are all right, but locks
739            # or semaphores are not.  Also note if the handle is
740            # signalled and then quickly reset, then we may return
741            # False even though we have not timed out.
742            return f._poll()
743
744        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
745        return f
746
747    def _register_with_iocp(self, obj):
748        # To get notifications of finished ops on this objects sent to the
749        # completion port, were must register the handle.
750        if obj not in self._registered:
751            self._registered.add(obj)
752            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
753            # XXX We could also use SetFileCompletionNotificationModes()
754            # to avoid sending notifications to completion port of ops
755            # that succeed immediately.
756
757    def _register(self, ov, obj, callback):
758        self._check_closed()
759
760        # Return a future which will be set with the result of the
761        # operation when it completes.  The future's value is actually
762        # the value returned by callback().
763        f = _OverlappedFuture(ov, loop=self._loop)
764        if f._source_traceback:
765            del f._source_traceback[-1]
766        if not ov.pending:
767            # The operation has completed, so no need to postpone the
768            # work.  We cannot take this short cut if we need the
769            # NumberOfBytes, CompletionKey values returned by
770            # PostQueuedCompletionStatus().
771            try:
772                value = callback(None, None, ov)
773            except OSError as e:
774                f.set_exception(e)
775            else:
776                f.set_result(value)
777            # Even if GetOverlappedResult() was called, we have to wait for the
778            # notification of the completion in GetQueuedCompletionStatus().
779            # Register the overlapped operation to keep a reference to the
780            # OVERLAPPED object, otherwise the memory is freed and Windows may
781            # read uninitialized memory.
782
783        # Register the overlapped operation for later.  Note that
784        # we only store obj to prevent it from being garbage
785        # collected too early.
786        self._cache[ov.address] = (f, ov, obj, callback)
787        return f
788
789    def _unregister(self, ov):
790        """Unregister an overlapped object.
791
792        Call this method when its future has been cancelled. The event can
793        already be signalled (pending in the proactor event queue). It is also
794        safe if the event is never signalled (because it was cancelled).
795        """
796        self._check_closed()
797        self._unregistered.append(ov)
798
799    def _get_accept_socket(self, family):
800        s = socket.socket(family)
801        s.settimeout(0)
802        return s
803
804    def _poll(self, timeout=None):
805        if timeout is None:
806            ms = INFINITE
807        elif timeout < 0:
808            raise ValueError("negative timeout")
809        else:
810            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
811            # round away from zero to wait *at least* timeout seconds.
812            ms = math.ceil(timeout * 1e3)
813            if ms >= INFINITE:
814                raise ValueError("timeout too big")
815
816        while True:
817            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
818            if status is None:
819                break
820            ms = 0
821
822            err, transferred, key, address = status
823            try:
824                f, ov, obj, callback = self._cache.pop(address)
825            except KeyError:
826                if self._loop.get_debug():
827                    self._loop.call_exception_handler({
828                        'message': ('GetQueuedCompletionStatus() returned an '
829                                    'unexpected event'),
830                        'status': ('err=%s transferred=%s key=%#x address=%#x'
831                                   % (err, transferred, key, address)),
832                    })
833
834                # key is either zero, or it is used to return a pipe
835                # handle which should be closed to avoid a leak.
836                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
837                    _winapi.CloseHandle(key)
838                continue
839
840            if obj in self._stopped_serving:
841                f.cancel()
842            # Don't call the callback if _register() already read the result or
843            # if the overlapped has been cancelled
844            elif not f.done():
845                try:
846                    value = callback(transferred, key, ov)
847                except OSError as e:
848                    f.set_exception(e)
849                    self._results.append(f)
850                else:
851                    f.set_result(value)
852                    self._results.append(f)
853                finally:
854                    f = None
855
856        # Remove unregistered futures
857        for ov in self._unregistered:
858            self._cache.pop(ov.address, None)
859        self._unregistered.clear()
860
861    def _stop_serving(self, obj):
862        # obj is a socket or pipe handle.  It will be closed in
863        # BaseProactorEventLoop._stop_serving() which will make any
864        # pending operations fail quickly.
865        self._stopped_serving.add(obj)
866
867    def close(self):
868        if self._iocp is None:
869            # already closed
870            return
871
872        # Cancel remaining registered operations.
873        for fut, ov, obj, callback in list(self._cache.values()):
874            if fut.cancelled():
875                # Nothing to do with cancelled futures
876                pass
877            elif isinstance(fut, _WaitCancelFuture):
878                # _WaitCancelFuture must not be cancelled
879                pass
880            else:
881                try:
882                    fut.cancel()
883                except OSError as exc:
884                    if self._loop is not None:
885                        context = {
886                            'message': 'Cancelling a future failed',
887                            'exception': exc,
888                            'future': fut,
889                        }
890                        if fut._source_traceback:
891                            context['source_traceback'] = fut._source_traceback
892                        self._loop.call_exception_handler(context)
893
894        # Wait until all cancelled overlapped complete: don't exit with running
895        # overlapped to prevent a crash. Display progress every second if the
896        # loop is still running.
897        msg_update = 1.0
898        start_time = time.monotonic()
899        next_msg = start_time + msg_update
900        while self._cache:
901            if next_msg <= time.monotonic():
902                logger.debug('%r is running after closing for %.1f seconds',
903                             self, time.monotonic() - start_time)
904                next_msg = time.monotonic() + msg_update
905
906            # handle a few events, or timeout
907            self._poll(msg_update)
908
909        self._results = []
910
911        _winapi.CloseHandle(self._iocp)
912        self._iocp = None
913
914    def __del__(self):
915        self.close()
916
917
918class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
919
920    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
921        self._proc = windows_utils.Popen(
922            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
923            bufsize=bufsize, **kwargs)
924
925        def callback(f):
926            returncode = self._proc.poll()
927            self._process_exited(returncode)
928
929        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
930        f.add_done_callback(callback)
931
932
933SelectorEventLoop = _WindowsSelectorEventLoop
934
935
936class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
937    _loop_factory = SelectorEventLoop
938
939
940class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
941    _loop_factory = ProactorEventLoop
942
943
944DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
945