1"""Selector and proactor event loops for Windows.""" 2 3import sys 4 5if sys.platform != 'win32': # pragma: no cover 6 raise ImportError('win32 only') 7 8import _overlapped 9import _winapi 10import errno 11import math 12import msvcrt 13import socket 14import struct 15import time 16import weakref 17 18from . import events 19from . import base_subprocess 20from . import futures 21from . import exceptions 22from . import proactor_events 23from . import selector_events 24from . import tasks 25from . import windows_utils 26from .log import logger 27 28 29__all__ = ( 30 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 31 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 32 'WindowsProactorEventLoopPolicy', 33) 34 35 36NULL = _winapi.NULL 37INFINITE = _winapi.INFINITE 38ERROR_CONNECTION_REFUSED = 1225 39ERROR_CONNECTION_ABORTED = 1236 40 41# Initial delay in seconds for connect_pipe() before retrying to connect 42CONNECT_PIPE_INIT_DELAY = 0.001 43 44# Maximum delay in seconds for connect_pipe() before retrying to connect 45CONNECT_PIPE_MAX_DELAY = 0.100 46 47 48class _OverlappedFuture(futures.Future): 49 """Subclass of Future which represents an overlapped operation. 50 51 Cancelling it will immediately cancel the overlapped operation. 52 """ 53 54 def __init__(self, ov, *, loop=None): 55 super().__init__(loop=loop) 56 if self._source_traceback: 57 del self._source_traceback[-1] 58 self._ov = ov 59 60 def _repr_info(self): 61 info = super()._repr_info() 62 if self._ov is not None: 63 state = 'pending' if self._ov.pending else 'completed' 64 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 65 return info 66 67 def _cancel_overlapped(self): 68 if self._ov is None: 69 return 70 try: 71 self._ov.cancel() 72 except OSError as exc: 73 context = { 74 'message': 'Cancelling an overlapped future failed', 75 'exception': exc, 76 'future': self, 77 } 78 if self._source_traceback: 79 context['source_traceback'] = self._source_traceback 80 self._loop.call_exception_handler(context) 81 self._ov = None 82 83 def cancel(self, msg=None): 84 self._cancel_overlapped() 85 return super().cancel(msg=msg) 86 87 def set_exception(self, exception): 88 super().set_exception(exception) 89 self._cancel_overlapped() 90 91 def set_result(self, result): 92 super().set_result(result) 93 self._ov = None 94 95 96class _BaseWaitHandleFuture(futures.Future): 97 """Subclass of Future which represents a wait handle.""" 98 99 def __init__(self, ov, handle, wait_handle, *, loop=None): 100 super().__init__(loop=loop) 101 if self._source_traceback: 102 del self._source_traceback[-1] 103 # Keep a reference to the Overlapped object to keep it alive until the 104 # wait is unregistered 105 self._ov = ov 106 self._handle = handle 107 self._wait_handle = wait_handle 108 109 # Should we call UnregisterWaitEx() if the wait completes 110 # or is cancelled? 111 self._registered = True 112 113 def _poll(self): 114 # non-blocking wait: use a timeout of 0 millisecond 115 return (_winapi.WaitForSingleObject(self._handle, 0) == 116 _winapi.WAIT_OBJECT_0) 117 118 def _repr_info(self): 119 info = super()._repr_info() 120 info.append(f'handle={self._handle:#x}') 121 if self._handle is not None: 122 state = 'signaled' if self._poll() else 'waiting' 123 info.append(state) 124 if self._wait_handle is not None: 125 info.append(f'wait_handle={self._wait_handle:#x}') 126 return info 127 128 def _unregister_wait_cb(self, fut): 129 # The wait was unregistered: it's not safe to destroy the Overlapped 130 # object 131 self._ov = None 132 133 def _unregister_wait(self): 134 if not self._registered: 135 return 136 self._registered = False 137 138 wait_handle = self._wait_handle 139 self._wait_handle = None 140 try: 141 _overlapped.UnregisterWait(wait_handle) 142 except OSError as exc: 143 if exc.winerror != _overlapped.ERROR_IO_PENDING: 144 context = { 145 'message': 'Failed to unregister the wait handle', 146 'exception': exc, 147 'future': self, 148 } 149 if self._source_traceback: 150 context['source_traceback'] = self._source_traceback 151 self._loop.call_exception_handler(context) 152 return 153 # ERROR_IO_PENDING means that the unregister is pending 154 155 self._unregister_wait_cb(None) 156 157 def cancel(self, msg=None): 158 self._unregister_wait() 159 return super().cancel(msg=msg) 160 161 def set_exception(self, exception): 162 self._unregister_wait() 163 super().set_exception(exception) 164 165 def set_result(self, result): 166 self._unregister_wait() 167 super().set_result(result) 168 169 170class _WaitCancelFuture(_BaseWaitHandleFuture): 171 """Subclass of Future which represents a wait for the cancellation of a 172 _WaitHandleFuture using an event. 173 """ 174 175 def __init__(self, ov, event, wait_handle, *, loop=None): 176 super().__init__(ov, event, wait_handle, loop=loop) 177 178 self._done_callback = None 179 180 def cancel(self): 181 raise RuntimeError("_WaitCancelFuture must not be cancelled") 182 183 def set_result(self, result): 184 super().set_result(result) 185 if self._done_callback is not None: 186 self._done_callback(self) 187 188 def set_exception(self, exception): 189 super().set_exception(exception) 190 if self._done_callback is not None: 191 self._done_callback(self) 192 193 194class _WaitHandleFuture(_BaseWaitHandleFuture): 195 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 196 super().__init__(ov, handle, wait_handle, loop=loop) 197 self._proactor = proactor 198 self._unregister_proactor = True 199 self._event = _overlapped.CreateEvent(None, True, False, None) 200 self._event_fut = None 201 202 def _unregister_wait_cb(self, fut): 203 if self._event is not None: 204 _winapi.CloseHandle(self._event) 205 self._event = None 206 self._event_fut = None 207 208 # If the wait was cancelled, the wait may never be signalled, so 209 # it's required to unregister it. Otherwise, IocpProactor.close() will 210 # wait forever for an event which will never come. 211 # 212 # If the IocpProactor already received the event, it's safe to call 213 # _unregister() because we kept a reference to the Overlapped object 214 # which is used as a unique key. 215 self._proactor._unregister(self._ov) 216 self._proactor = None 217 218 super()._unregister_wait_cb(fut) 219 220 def _unregister_wait(self): 221 if not self._registered: 222 return 223 self._registered = False 224 225 wait_handle = self._wait_handle 226 self._wait_handle = None 227 try: 228 _overlapped.UnregisterWaitEx(wait_handle, self._event) 229 except OSError as exc: 230 if exc.winerror != _overlapped.ERROR_IO_PENDING: 231 context = { 232 'message': 'Failed to unregister the wait handle', 233 'exception': exc, 234 'future': self, 235 } 236 if self._source_traceback: 237 context['source_traceback'] = self._source_traceback 238 self._loop.call_exception_handler(context) 239 return 240 # ERROR_IO_PENDING is not an error, the wait was unregistered 241 242 self._event_fut = self._proactor._wait_cancel(self._event, 243 self._unregister_wait_cb) 244 245 246class PipeServer(object): 247 """Class representing a pipe server. 248 249 This is much like a bound, listening socket. 250 """ 251 def __init__(self, address): 252 self._address = address 253 self._free_instances = weakref.WeakSet() 254 # initialize the pipe attribute before calling _server_pipe_handle() 255 # because this function can raise an exception and the destructor calls 256 # the close() method 257 self._pipe = None 258 self._accept_pipe_future = None 259 self._pipe = self._server_pipe_handle(True) 260 261 def _get_unconnected_pipe(self): 262 # Create new instance and return previous one. This ensures 263 # that (until the server is closed) there is always at least 264 # one pipe handle for address. Therefore if a client attempt 265 # to connect it will not fail with FileNotFoundError. 266 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 267 return tmp 268 269 def _server_pipe_handle(self, first): 270 # Return a wrapper for a new pipe handle. 271 if self.closed(): 272 return None 273 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 274 if first: 275 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 276 h = _winapi.CreateNamedPipe( 277 self._address, flags, 278 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 279 _winapi.PIPE_WAIT, 280 _winapi.PIPE_UNLIMITED_INSTANCES, 281 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 282 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 283 pipe = windows_utils.PipeHandle(h) 284 self._free_instances.add(pipe) 285 return pipe 286 287 def closed(self): 288 return (self._address is None) 289 290 def close(self): 291 if self._accept_pipe_future is not None: 292 self._accept_pipe_future.cancel() 293 self._accept_pipe_future = None 294 # Close all instances which have not been connected to by a client. 295 if self._address is not None: 296 for pipe in self._free_instances: 297 pipe.close() 298 self._pipe = None 299 self._address = None 300 self._free_instances.clear() 301 302 __del__ = close 303 304 305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 306 """Windows version of selector event loop.""" 307 308 309class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 310 """Windows version of proactor event loop using IOCP.""" 311 312 def __init__(self, proactor=None): 313 if proactor is None: 314 proactor = IocpProactor() 315 super().__init__(proactor) 316 317 def run_forever(self): 318 try: 319 assert self._self_reading_future is None 320 self.call_soon(self._loop_self_reading) 321 super().run_forever() 322 finally: 323 if self._self_reading_future is not None: 324 ov = self._self_reading_future._ov 325 self._self_reading_future.cancel() 326 # self_reading_future was just cancelled so if it hasn't been 327 # finished yet, it never will be (it's possible that it has 328 # already finished and its callback is waiting in the queue, 329 # where it could still happen if the event loop is restarted). 330 # Unregister it otherwise IocpProactor.close will wait for it 331 # forever 332 if ov is not None: 333 self._proactor._unregister(ov) 334 self._self_reading_future = None 335 336 async def create_pipe_connection(self, protocol_factory, address): 337 f = self._proactor.connect_pipe(address) 338 pipe = await f 339 protocol = protocol_factory() 340 trans = self._make_duplex_pipe_transport(pipe, protocol, 341 extra={'addr': address}) 342 return trans, protocol 343 344 async def start_serving_pipe(self, protocol_factory, address): 345 server = PipeServer(address) 346 347 def loop_accept_pipe(f=None): 348 pipe = None 349 try: 350 if f: 351 pipe = f.result() 352 server._free_instances.discard(pipe) 353 354 if server.closed(): 355 # A client connected before the server was closed: 356 # drop the client (close the pipe) and exit 357 pipe.close() 358 return 359 360 protocol = protocol_factory() 361 self._make_duplex_pipe_transport( 362 pipe, protocol, extra={'addr': address}) 363 364 pipe = server._get_unconnected_pipe() 365 if pipe is None: 366 return 367 368 f = self._proactor.accept_pipe(pipe) 369 except BrokenPipeError: 370 if pipe and pipe.fileno() != -1: 371 pipe.close() 372 self.call_soon(loop_accept_pipe) 373 except OSError as exc: 374 if pipe and pipe.fileno() != -1: 375 self.call_exception_handler({ 376 'message': 'Pipe accept failed', 377 'exception': exc, 378 'pipe': pipe, 379 }) 380 pipe.close() 381 elif self._debug: 382 logger.warning("Accept pipe failed on pipe %r", 383 pipe, exc_info=True) 384 self.call_soon(loop_accept_pipe) 385 except exceptions.CancelledError: 386 if pipe: 387 pipe.close() 388 else: 389 server._accept_pipe_future = f 390 f.add_done_callback(loop_accept_pipe) 391 392 self.call_soon(loop_accept_pipe) 393 return [server] 394 395 async def _make_subprocess_transport(self, protocol, args, shell, 396 stdin, stdout, stderr, bufsize, 397 extra=None, **kwargs): 398 waiter = self.create_future() 399 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 400 stdin, stdout, stderr, bufsize, 401 waiter=waiter, extra=extra, 402 **kwargs) 403 try: 404 await waiter 405 except (SystemExit, KeyboardInterrupt): 406 raise 407 except BaseException: 408 transp.close() 409 await transp._wait() 410 raise 411 412 return transp 413 414 415class IocpProactor: 416 """Proactor implementation using IOCP.""" 417 418 def __init__(self, concurrency=INFINITE): 419 self._loop = None 420 self._results = [] 421 self._iocp = _overlapped.CreateIoCompletionPort( 422 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 423 self._cache = {} 424 self._registered = weakref.WeakSet() 425 self._unregistered = [] 426 self._stopped_serving = weakref.WeakSet() 427 428 def _check_closed(self): 429 if self._iocp is None: 430 raise RuntimeError('IocpProactor is closed') 431 432 def __repr__(self): 433 info = ['overlapped#=%s' % len(self._cache), 434 'result#=%s' % len(self._results)] 435 if self._iocp is None: 436 info.append('closed') 437 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 438 439 def set_loop(self, loop): 440 self._loop = loop 441 442 def select(self, timeout=None): 443 if not self._results: 444 self._poll(timeout) 445 tmp = self._results 446 self._results = [] 447 try: 448 return tmp 449 finally: 450 # Needed to break cycles when an exception occurs. 451 tmp = None 452 453 def _result(self, value): 454 fut = self._loop.create_future() 455 fut.set_result(value) 456 return fut 457 458 def recv(self, conn, nbytes, flags=0): 459 self._register_with_iocp(conn) 460 ov = _overlapped.Overlapped(NULL) 461 try: 462 if isinstance(conn, socket.socket): 463 ov.WSARecv(conn.fileno(), nbytes, flags) 464 else: 465 ov.ReadFile(conn.fileno(), nbytes) 466 except BrokenPipeError: 467 return self._result(b'') 468 469 def finish_recv(trans, key, ov): 470 try: 471 return ov.getresult() 472 except OSError as exc: 473 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 474 _overlapped.ERROR_OPERATION_ABORTED): 475 raise ConnectionResetError(*exc.args) 476 else: 477 raise 478 479 return self._register(ov, conn, finish_recv) 480 481 def recv_into(self, conn, buf, flags=0): 482 self._register_with_iocp(conn) 483 ov = _overlapped.Overlapped(NULL) 484 try: 485 if isinstance(conn, socket.socket): 486 ov.WSARecvInto(conn.fileno(), buf, flags) 487 else: 488 ov.ReadFileInto(conn.fileno(), buf) 489 except BrokenPipeError: 490 return self._result(0) 491 492 def finish_recv(trans, key, ov): 493 try: 494 return ov.getresult() 495 except OSError as exc: 496 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 497 _overlapped.ERROR_OPERATION_ABORTED): 498 raise ConnectionResetError(*exc.args) 499 else: 500 raise 501 502 return self._register(ov, conn, finish_recv) 503 504 def recvfrom(self, conn, nbytes, flags=0): 505 self._register_with_iocp(conn) 506 ov = _overlapped.Overlapped(NULL) 507 try: 508 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 509 except BrokenPipeError: 510 return self._result((b'', None)) 511 512 def finish_recv(trans, key, ov): 513 try: 514 return ov.getresult() 515 except OSError as exc: 516 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 517 _overlapped.ERROR_OPERATION_ABORTED): 518 raise ConnectionResetError(*exc.args) 519 else: 520 raise 521 522 return self._register(ov, conn, finish_recv) 523 524 def recvfrom_into(self, conn, buf, flags=0): 525 self._register_with_iocp(conn) 526 ov = _overlapped.Overlapped(NULL) 527 try: 528 ov.WSARecvFromInto(conn.fileno(), buf, flags) 529 except BrokenPipeError: 530 return self._result((0, None)) 531 532 def finish_recv(trans, key, ov): 533 try: 534 return ov.getresult() 535 except OSError as exc: 536 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 537 _overlapped.ERROR_OPERATION_ABORTED): 538 raise ConnectionResetError(*exc.args) 539 else: 540 raise 541 542 return self._register(ov, conn, finish_recv) 543 544 def sendto(self, conn, buf, flags=0, addr=None): 545 self._register_with_iocp(conn) 546 ov = _overlapped.Overlapped(NULL) 547 548 ov.WSASendTo(conn.fileno(), buf, flags, addr) 549 550 def finish_send(trans, key, ov): 551 try: 552 return ov.getresult() 553 except OSError as exc: 554 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 555 _overlapped.ERROR_OPERATION_ABORTED): 556 raise ConnectionResetError(*exc.args) 557 else: 558 raise 559 560 return self._register(ov, conn, finish_send) 561 562 def send(self, conn, buf, flags=0): 563 self._register_with_iocp(conn) 564 ov = _overlapped.Overlapped(NULL) 565 if isinstance(conn, socket.socket): 566 ov.WSASend(conn.fileno(), buf, flags) 567 else: 568 ov.WriteFile(conn.fileno(), buf) 569 570 def finish_send(trans, key, ov): 571 try: 572 return ov.getresult() 573 except OSError as exc: 574 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 575 _overlapped.ERROR_OPERATION_ABORTED): 576 raise ConnectionResetError(*exc.args) 577 else: 578 raise 579 580 return self._register(ov, conn, finish_send) 581 582 def accept(self, listener): 583 self._register_with_iocp(listener) 584 conn = self._get_accept_socket(listener.family) 585 ov = _overlapped.Overlapped(NULL) 586 ov.AcceptEx(listener.fileno(), conn.fileno()) 587 588 def finish_accept(trans, key, ov): 589 ov.getresult() 590 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 591 buf = struct.pack('@P', listener.fileno()) 592 conn.setsockopt(socket.SOL_SOCKET, 593 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 594 conn.settimeout(listener.gettimeout()) 595 return conn, conn.getpeername() 596 597 async def accept_coro(future, conn): 598 # Coroutine closing the accept socket if the future is cancelled 599 try: 600 await future 601 except exceptions.CancelledError: 602 conn.close() 603 raise 604 605 future = self._register(ov, listener, finish_accept) 606 coro = accept_coro(future, conn) 607 tasks.ensure_future(coro, loop=self._loop) 608 return future 609 610 def connect(self, conn, address): 611 if conn.type == socket.SOCK_DGRAM: 612 # WSAConnect will complete immediately for UDP sockets so we don't 613 # need to register any IOCP operation 614 _overlapped.WSAConnect(conn.fileno(), address) 615 fut = self._loop.create_future() 616 fut.set_result(None) 617 return fut 618 619 self._register_with_iocp(conn) 620 # The socket needs to be locally bound before we call ConnectEx(). 621 try: 622 _overlapped.BindLocal(conn.fileno(), conn.family) 623 except OSError as e: 624 if e.winerror != errno.WSAEINVAL: 625 raise 626 # Probably already locally bound; check using getsockname(). 627 if conn.getsockname()[1] == 0: 628 raise 629 ov = _overlapped.Overlapped(NULL) 630 ov.ConnectEx(conn.fileno(), address) 631 632 def finish_connect(trans, key, ov): 633 ov.getresult() 634 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 635 conn.setsockopt(socket.SOL_SOCKET, 636 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 637 return conn 638 639 return self._register(ov, conn, finish_connect) 640 641 def sendfile(self, sock, file, offset, count): 642 self._register_with_iocp(sock) 643 ov = _overlapped.Overlapped(NULL) 644 offset_low = offset & 0xffff_ffff 645 offset_high = (offset >> 32) & 0xffff_ffff 646 ov.TransmitFile(sock.fileno(), 647 msvcrt.get_osfhandle(file.fileno()), 648 offset_low, offset_high, 649 count, 0, 0) 650 651 def finish_sendfile(trans, key, ov): 652 try: 653 return ov.getresult() 654 except OSError as exc: 655 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 656 _overlapped.ERROR_OPERATION_ABORTED): 657 raise ConnectionResetError(*exc.args) 658 else: 659 raise 660 return self._register(ov, sock, finish_sendfile) 661 662 def accept_pipe(self, pipe): 663 self._register_with_iocp(pipe) 664 ov = _overlapped.Overlapped(NULL) 665 connected = ov.ConnectNamedPipe(pipe.fileno()) 666 667 if connected: 668 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 669 # that the pipe is connected. There is no need to wait for the 670 # completion of the connection. 671 return self._result(pipe) 672 673 def finish_accept_pipe(trans, key, ov): 674 ov.getresult() 675 return pipe 676 677 return self._register(ov, pipe, finish_accept_pipe) 678 679 async def connect_pipe(self, address): 680 delay = CONNECT_PIPE_INIT_DELAY 681 while True: 682 # Unfortunately there is no way to do an overlapped connect to 683 # a pipe. Call CreateFile() in a loop until it doesn't fail with 684 # ERROR_PIPE_BUSY. 685 try: 686 handle = _overlapped.ConnectPipe(address) 687 break 688 except OSError as exc: 689 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 690 raise 691 692 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 693 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 694 await tasks.sleep(delay) 695 696 return windows_utils.PipeHandle(handle) 697 698 def wait_for_handle(self, handle, timeout=None): 699 """Wait for a handle. 700 701 Return a Future object. The result of the future is True if the wait 702 completed, or False if the wait did not complete (on timeout). 703 """ 704 return self._wait_for_handle(handle, timeout, False) 705 706 def _wait_cancel(self, event, done_callback): 707 fut = self._wait_for_handle(event, None, True) 708 # add_done_callback() cannot be used because the wait may only complete 709 # in IocpProactor.close(), while the event loop is not running. 710 fut._done_callback = done_callback 711 return fut 712 713 def _wait_for_handle(self, handle, timeout, _is_cancel): 714 self._check_closed() 715 716 if timeout is None: 717 ms = _winapi.INFINITE 718 else: 719 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 720 # round away from zero to wait *at least* timeout seconds. 721 ms = math.ceil(timeout * 1e3) 722 723 # We only create ov so we can use ov.address as a key for the cache. 724 ov = _overlapped.Overlapped(NULL) 725 wait_handle = _overlapped.RegisterWaitWithQueue( 726 handle, self._iocp, ov.address, ms) 727 if _is_cancel: 728 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 729 else: 730 f = _WaitHandleFuture(ov, handle, wait_handle, self, 731 loop=self._loop) 732 if f._source_traceback: 733 del f._source_traceback[-1] 734 735 def finish_wait_for_handle(trans, key, ov): 736 # Note that this second wait means that we should only use 737 # this with handles types where a successful wait has no 738 # effect. So events or processes are all right, but locks 739 # or semaphores are not. Also note if the handle is 740 # signalled and then quickly reset, then we may return 741 # False even though we have not timed out. 742 return f._poll() 743 744 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 745 return f 746 747 def _register_with_iocp(self, obj): 748 # To get notifications of finished ops on this objects sent to the 749 # completion port, were must register the handle. 750 if obj not in self._registered: 751 self._registered.add(obj) 752 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 753 # XXX We could also use SetFileCompletionNotificationModes() 754 # to avoid sending notifications to completion port of ops 755 # that succeed immediately. 756 757 def _register(self, ov, obj, callback): 758 self._check_closed() 759 760 # Return a future which will be set with the result of the 761 # operation when it completes. The future's value is actually 762 # the value returned by callback(). 763 f = _OverlappedFuture(ov, loop=self._loop) 764 if f._source_traceback: 765 del f._source_traceback[-1] 766 if not ov.pending: 767 # The operation has completed, so no need to postpone the 768 # work. We cannot take this short cut if we need the 769 # NumberOfBytes, CompletionKey values returned by 770 # PostQueuedCompletionStatus(). 771 try: 772 value = callback(None, None, ov) 773 except OSError as e: 774 f.set_exception(e) 775 else: 776 f.set_result(value) 777 # Even if GetOverlappedResult() was called, we have to wait for the 778 # notification of the completion in GetQueuedCompletionStatus(). 779 # Register the overlapped operation to keep a reference to the 780 # OVERLAPPED object, otherwise the memory is freed and Windows may 781 # read uninitialized memory. 782 783 # Register the overlapped operation for later. Note that 784 # we only store obj to prevent it from being garbage 785 # collected too early. 786 self._cache[ov.address] = (f, ov, obj, callback) 787 return f 788 789 def _unregister(self, ov): 790 """Unregister an overlapped object. 791 792 Call this method when its future has been cancelled. The event can 793 already be signalled (pending in the proactor event queue). It is also 794 safe if the event is never signalled (because it was cancelled). 795 """ 796 self._check_closed() 797 self._unregistered.append(ov) 798 799 def _get_accept_socket(self, family): 800 s = socket.socket(family) 801 s.settimeout(0) 802 return s 803 804 def _poll(self, timeout=None): 805 if timeout is None: 806 ms = INFINITE 807 elif timeout < 0: 808 raise ValueError("negative timeout") 809 else: 810 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 811 # round away from zero to wait *at least* timeout seconds. 812 ms = math.ceil(timeout * 1e3) 813 if ms >= INFINITE: 814 raise ValueError("timeout too big") 815 816 while True: 817 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 818 if status is None: 819 break 820 ms = 0 821 822 err, transferred, key, address = status 823 try: 824 f, ov, obj, callback = self._cache.pop(address) 825 except KeyError: 826 if self._loop.get_debug(): 827 self._loop.call_exception_handler({ 828 'message': ('GetQueuedCompletionStatus() returned an ' 829 'unexpected event'), 830 'status': ('err=%s transferred=%s key=%#x address=%#x' 831 % (err, transferred, key, address)), 832 }) 833 834 # key is either zero, or it is used to return a pipe 835 # handle which should be closed to avoid a leak. 836 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 837 _winapi.CloseHandle(key) 838 continue 839 840 if obj in self._stopped_serving: 841 f.cancel() 842 # Don't call the callback if _register() already read the result or 843 # if the overlapped has been cancelled 844 elif not f.done(): 845 try: 846 value = callback(transferred, key, ov) 847 except OSError as e: 848 f.set_exception(e) 849 self._results.append(f) 850 else: 851 f.set_result(value) 852 self._results.append(f) 853 finally: 854 f = None 855 856 # Remove unregistered futures 857 for ov in self._unregistered: 858 self._cache.pop(ov.address, None) 859 self._unregistered.clear() 860 861 def _stop_serving(self, obj): 862 # obj is a socket or pipe handle. It will be closed in 863 # BaseProactorEventLoop._stop_serving() which will make any 864 # pending operations fail quickly. 865 self._stopped_serving.add(obj) 866 867 def close(self): 868 if self._iocp is None: 869 # already closed 870 return 871 872 # Cancel remaining registered operations. 873 for fut, ov, obj, callback in list(self._cache.values()): 874 if fut.cancelled(): 875 # Nothing to do with cancelled futures 876 pass 877 elif isinstance(fut, _WaitCancelFuture): 878 # _WaitCancelFuture must not be cancelled 879 pass 880 else: 881 try: 882 fut.cancel() 883 except OSError as exc: 884 if self._loop is not None: 885 context = { 886 'message': 'Cancelling a future failed', 887 'exception': exc, 888 'future': fut, 889 } 890 if fut._source_traceback: 891 context['source_traceback'] = fut._source_traceback 892 self._loop.call_exception_handler(context) 893 894 # Wait until all cancelled overlapped complete: don't exit with running 895 # overlapped to prevent a crash. Display progress every second if the 896 # loop is still running. 897 msg_update = 1.0 898 start_time = time.monotonic() 899 next_msg = start_time + msg_update 900 while self._cache: 901 if next_msg <= time.monotonic(): 902 logger.debug('%r is running after closing for %.1f seconds', 903 self, time.monotonic() - start_time) 904 next_msg = time.monotonic() + msg_update 905 906 # handle a few events, or timeout 907 self._poll(msg_update) 908 909 self._results = [] 910 911 _winapi.CloseHandle(self._iocp) 912 self._iocp = None 913 914 def __del__(self): 915 self.close() 916 917 918class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 919 920 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 921 self._proc = windows_utils.Popen( 922 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 923 bufsize=bufsize, **kwargs) 924 925 def callback(f): 926 returncode = self._proc.poll() 927 self._process_exited(returncode) 928 929 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 930 f.add_done_callback(callback) 931 932 933SelectorEventLoop = _WindowsSelectorEventLoop 934 935 936class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 937 _loop_factory = SelectorEventLoop 938 939 940class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 941 _loop_factory = ProactorEventLoop 942 943 944DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 945