1"""Utilities for with-statement contexts.  See PEP 343."""
2import abc
3import os
4import sys
5import _collections_abc
6from collections import deque
7from functools import wraps
8from types import MethodType, GenericAlias
9
10__all__ = ["asynccontextmanager", "contextmanager", "closing", "nullcontext",
11           "AbstractContextManager", "AbstractAsyncContextManager",
12           "AsyncExitStack", "ContextDecorator", "ExitStack",
13           "redirect_stdout", "redirect_stderr", "suppress", "aclosing",
14           "chdir"]
15
16
17class AbstractContextManager(abc.ABC):
18
19    """An abstract base class for context managers."""
20
21    __class_getitem__ = classmethod(GenericAlias)
22
23    def __enter__(self):
24        """Return `self` upon entering the runtime context."""
25        return self
26
27    @abc.abstractmethod
28    def __exit__(self, exc_type, exc_value, traceback):
29        """Raise any exception triggered within the runtime context."""
30        return None
31
32    @classmethod
33    def __subclasshook__(cls, C):
34        if cls is AbstractContextManager:
35            return _collections_abc._check_methods(C, "__enter__", "__exit__")
36        return NotImplemented
37
38
39class AbstractAsyncContextManager(abc.ABC):
40
41    """An abstract base class for asynchronous context managers."""
42
43    __class_getitem__ = classmethod(GenericAlias)
44
45    async def __aenter__(self):
46        """Return `self` upon entering the runtime context."""
47        return self
48
49    @abc.abstractmethod
50    async def __aexit__(self, exc_type, exc_value, traceback):
51        """Raise any exception triggered within the runtime context."""
52        return None
53
54    @classmethod
55    def __subclasshook__(cls, C):
56        if cls is AbstractAsyncContextManager:
57            return _collections_abc._check_methods(C, "__aenter__",
58                                                   "__aexit__")
59        return NotImplemented
60
61
62class ContextDecorator(object):
63    "A base class or mixin that enables context managers to work as decorators."
64
65    def _recreate_cm(self):
66        """Return a recreated instance of self.
67
68        Allows an otherwise one-shot context manager like
69        _GeneratorContextManager to support use as
70        a decorator via implicit recreation.
71
72        This is a private interface just for _GeneratorContextManager.
73        See issue #11647 for details.
74        """
75        return self
76
77    def __call__(self, func):
78        @wraps(func)
79        def inner(*args, **kwds):
80            with self._recreate_cm():
81                return func(*args, **kwds)
82        return inner
83
84
85class AsyncContextDecorator(object):
86    "A base class or mixin that enables async context managers to work as decorators."
87
88    def _recreate_cm(self):
89        """Return a recreated instance of self.
90        """
91        return self
92
93    def __call__(self, func):
94        @wraps(func)
95        async def inner(*args, **kwds):
96            async with self._recreate_cm():
97                return await func(*args, **kwds)
98        return inner
99
100
101class _GeneratorContextManagerBase:
102    """Shared functionality for @contextmanager and @asynccontextmanager."""
103
104    def __init__(self, func, args, kwds):
105        self.gen = func(*args, **kwds)
106        self.func, self.args, self.kwds = func, args, kwds
107        # Issue 19330: ensure context manager instances have good docstrings
108        doc = getattr(func, "__doc__", None)
109        if doc is None:
110            doc = type(self).__doc__
111        self.__doc__ = doc
112        # Unfortunately, this still doesn't provide good help output when
113        # inspecting the created context manager instances, since pydoc
114        # currently bypasses the instance docstring and shows the docstring
115        # for the class instead.
116        # See http://bugs.python.org/issue19404 for more details.
117
118    def _recreate_cm(self):
119        # _GCMB instances are one-shot context managers, so the
120        # CM must be recreated each time a decorated function is
121        # called
122        return self.__class__(self.func, self.args, self.kwds)
123
124
125class _GeneratorContextManager(
126    _GeneratorContextManagerBase,
127    AbstractContextManager,
128    ContextDecorator,
129):
130    """Helper for @contextmanager decorator."""
131
132    def __enter__(self):
133        # do not keep args and kwds alive unnecessarily
134        # they are only needed for recreation, which is not possible anymore
135        del self.args, self.kwds, self.func
136        try:
137            return next(self.gen)
138        except StopIteration:
139            raise RuntimeError("generator didn't yield") from None
140
141    def __exit__(self, typ, value, traceback):
142        if typ is None:
143            try:
144                next(self.gen)
145            except StopIteration:
146                return False
147            else:
148                raise RuntimeError("generator didn't stop")
149        else:
150            if value is None:
151                # Need to force instantiation so we can reliably
152                # tell if we get the same exception back
153                value = typ()
154            try:
155                self.gen.throw(typ, value, traceback)
156            except StopIteration as exc:
157                # Suppress StopIteration *unless* it's the same exception that
158                # was passed to throw().  This prevents a StopIteration
159                # raised inside the "with" statement from being suppressed.
160                return exc is not value
161            except RuntimeError as exc:
162                # Don't re-raise the passed in exception. (issue27122)
163                if exc is value:
164                    exc.__traceback__ = traceback
165                    return False
166                # Avoid suppressing if a StopIteration exception
167                # was passed to throw() and later wrapped into a RuntimeError
168                # (see PEP 479 for sync generators; async generators also
169                # have this behavior). But do this only if the exception wrapped
170                # by the RuntimeError is actually Stop(Async)Iteration (see
171                # issue29692).
172                if (
173                    isinstance(value, StopIteration)
174                    and exc.__cause__ is value
175                ):
176                    value.__traceback__ = traceback
177                    return False
178                raise
179            except BaseException as exc:
180                # only re-raise if it's *not* the exception that was
181                # passed to throw(), because __exit__() must not raise
182                # an exception unless __exit__() itself failed.  But throw()
183                # has to raise the exception to signal propagation, so this
184                # fixes the impedance mismatch between the throw() protocol
185                # and the __exit__() protocol.
186                if exc is not value:
187                    raise
188                exc.__traceback__ = traceback
189                return False
190            raise RuntimeError("generator didn't stop after throw()")
191
192class _AsyncGeneratorContextManager(
193    _GeneratorContextManagerBase,
194    AbstractAsyncContextManager,
195    AsyncContextDecorator,
196):
197    """Helper for @asynccontextmanager decorator."""
198
199    async def __aenter__(self):
200        # do not keep args and kwds alive unnecessarily
201        # they are only needed for recreation, which is not possible anymore
202        del self.args, self.kwds, self.func
203        try:
204            return await anext(self.gen)
205        except StopAsyncIteration:
206            raise RuntimeError("generator didn't yield") from None
207
208    async def __aexit__(self, typ, value, traceback):
209        if typ is None:
210            try:
211                await anext(self.gen)
212            except StopAsyncIteration:
213                return False
214            else:
215                raise RuntimeError("generator didn't stop")
216        else:
217            if value is None:
218                # Need to force instantiation so we can reliably
219                # tell if we get the same exception back
220                value = typ()
221            try:
222                await self.gen.athrow(typ, value, traceback)
223            except StopAsyncIteration as exc:
224                # Suppress StopIteration *unless* it's the same exception that
225                # was passed to throw().  This prevents a StopIteration
226                # raised inside the "with" statement from being suppressed.
227                return exc is not value
228            except RuntimeError as exc:
229                # Don't re-raise the passed in exception. (issue27122)
230                if exc is value:
231                    exc.__traceback__ = traceback
232                    return False
233                # Avoid suppressing if a Stop(Async)Iteration exception
234                # was passed to athrow() and later wrapped into a RuntimeError
235                # (see PEP 479 for sync generators; async generators also
236                # have this behavior). But do this only if the exception wrapped
237                # by the RuntimeError is actually Stop(Async)Iteration (see
238                # issue29692).
239                if (
240                    isinstance(value, (StopIteration, StopAsyncIteration))
241                    and exc.__cause__ is value
242                ):
243                    value.__traceback__ = traceback
244                    return False
245                raise
246            except BaseException as exc:
247                # only re-raise if it's *not* the exception that was
248                # passed to throw(), because __exit__() must not raise
249                # an exception unless __exit__() itself failed.  But throw()
250                # has to raise the exception to signal propagation, so this
251                # fixes the impedance mismatch between the throw() protocol
252                # and the __exit__() protocol.
253                if exc is not value:
254                    raise
255                exc.__traceback__ = traceback
256                return False
257            raise RuntimeError("generator didn't stop after athrow()")
258
259
260def contextmanager(func):
261    """@contextmanager decorator.
262
263    Typical usage:
264
265        @contextmanager
266        def some_generator(<arguments>):
267            <setup>
268            try:
269                yield <value>
270            finally:
271                <cleanup>
272
273    This makes this:
274
275        with some_generator(<arguments>) as <variable>:
276            <body>
277
278    equivalent to this:
279
280        <setup>
281        try:
282            <variable> = <value>
283            <body>
284        finally:
285            <cleanup>
286    """
287    @wraps(func)
288    def helper(*args, **kwds):
289        return _GeneratorContextManager(func, args, kwds)
290    return helper
291
292
293def asynccontextmanager(func):
294    """@asynccontextmanager decorator.
295
296    Typical usage:
297
298        @asynccontextmanager
299        async def some_async_generator(<arguments>):
300            <setup>
301            try:
302                yield <value>
303            finally:
304                <cleanup>
305
306    This makes this:
307
308        async with some_async_generator(<arguments>) as <variable>:
309            <body>
310
311    equivalent to this:
312
313        <setup>
314        try:
315            <variable> = <value>
316            <body>
317        finally:
318            <cleanup>
319    """
320    @wraps(func)
321    def helper(*args, **kwds):
322        return _AsyncGeneratorContextManager(func, args, kwds)
323    return helper
324
325
326class closing(AbstractContextManager):
327    """Context to automatically close something at the end of a block.
328
329    Code like this:
330
331        with closing(<module>.open(<arguments>)) as f:
332            <block>
333
334    is equivalent to this:
335
336        f = <module>.open(<arguments>)
337        try:
338            <block>
339        finally:
340            f.close()
341
342    """
343    def __init__(self, thing):
344        self.thing = thing
345    def __enter__(self):
346        return self.thing
347    def __exit__(self, *exc_info):
348        self.thing.close()
349
350
351class aclosing(AbstractAsyncContextManager):
352    """Async context manager for safely finalizing an asynchronously cleaned-up
353    resource such as an async generator, calling its ``aclose()`` method.
354
355    Code like this:
356
357        async with aclosing(<module>.fetch(<arguments>)) as agen:
358            <block>
359
360    is equivalent to this:
361
362        agen = <module>.fetch(<arguments>)
363        try:
364            <block>
365        finally:
366            await agen.aclose()
367
368    """
369    def __init__(self, thing):
370        self.thing = thing
371    async def __aenter__(self):
372        return self.thing
373    async def __aexit__(self, *exc_info):
374        await self.thing.aclose()
375
376
377class _RedirectStream(AbstractContextManager):
378
379    _stream = None
380
381    def __init__(self, new_target):
382        self._new_target = new_target
383        # We use a list of old targets to make this CM re-entrant
384        self._old_targets = []
385
386    def __enter__(self):
387        self._old_targets.append(getattr(sys, self._stream))
388        setattr(sys, self._stream, self._new_target)
389        return self._new_target
390
391    def __exit__(self, exctype, excinst, exctb):
392        setattr(sys, self._stream, self._old_targets.pop())
393
394
395class redirect_stdout(_RedirectStream):
396    """Context manager for temporarily redirecting stdout to another file.
397
398        # How to send help() to stderr
399        with redirect_stdout(sys.stderr):
400            help(dir)
401
402        # How to write help() to a file
403        with open('help.txt', 'w') as f:
404            with redirect_stdout(f):
405                help(pow)
406    """
407
408    _stream = "stdout"
409
410
411class redirect_stderr(_RedirectStream):
412    """Context manager for temporarily redirecting stderr to another file."""
413
414    _stream = "stderr"
415
416
417class suppress(AbstractContextManager):
418    """Context manager to suppress specified exceptions
419
420    After the exception is suppressed, execution proceeds with the next
421    statement following the with statement.
422
423         with suppress(FileNotFoundError):
424             os.remove(somefile)
425         # Execution still resumes here if the file was already removed
426    """
427
428    def __init__(self, *exceptions):
429        self._exceptions = exceptions
430
431    def __enter__(self):
432        pass
433
434    def __exit__(self, exctype, excinst, exctb):
435        # Unlike isinstance and issubclass, CPython exception handling
436        # currently only looks at the concrete type hierarchy (ignoring
437        # the instance and subclass checking hooks). While Guido considers
438        # that a bug rather than a feature, it's a fairly hard one to fix
439        # due to various internal implementation details. suppress provides
440        # the simpler issubclass based semantics, rather than trying to
441        # exactly reproduce the limitations of the CPython interpreter.
442        #
443        # See http://bugs.python.org/issue12029 for more details
444        return exctype is not None and issubclass(exctype, self._exceptions)
445
446
447class _BaseExitStack:
448    """A base class for ExitStack and AsyncExitStack."""
449
450    @staticmethod
451    def _create_exit_wrapper(cm, cm_exit):
452        return MethodType(cm_exit, cm)
453
454    @staticmethod
455    def _create_cb_wrapper(callback, /, *args, **kwds):
456        def _exit_wrapper(exc_type, exc, tb):
457            callback(*args, **kwds)
458        return _exit_wrapper
459
460    def __init__(self):
461        self._exit_callbacks = deque()
462
463    def pop_all(self):
464        """Preserve the context stack by transferring it to a new instance."""
465        new_stack = type(self)()
466        new_stack._exit_callbacks = self._exit_callbacks
467        self._exit_callbacks = deque()
468        return new_stack
469
470    def push(self, exit):
471        """Registers a callback with the standard __exit__ method signature.
472
473        Can suppress exceptions the same way __exit__ method can.
474        Also accepts any object with an __exit__ method (registering a call
475        to the method instead of the object itself).
476        """
477        # We use an unbound method rather than a bound method to follow
478        # the standard lookup behaviour for special methods.
479        _cb_type = type(exit)
480
481        try:
482            exit_method = _cb_type.__exit__
483        except AttributeError:
484            # Not a context manager, so assume it's a callable.
485            self._push_exit_callback(exit)
486        else:
487            self._push_cm_exit(exit, exit_method)
488        return exit  # Allow use as a decorator.
489
490    def enter_context(self, cm):
491        """Enters the supplied context manager.
492
493        If successful, also pushes its __exit__ method as a callback and
494        returns the result of the __enter__ method.
495        """
496        # We look up the special methods on the type to match the with
497        # statement.
498        cls = type(cm)
499        try:
500            _enter = cls.__enter__
501            _exit = cls.__exit__
502        except AttributeError:
503            raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
504                            f"not support the context manager protocol") from None
505        result = _enter(cm)
506        self._push_cm_exit(cm, _exit)
507        return result
508
509    def callback(self, callback, /, *args, **kwds):
510        """Registers an arbitrary callback and arguments.
511
512        Cannot suppress exceptions.
513        """
514        _exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds)
515
516        # We changed the signature, so using @wraps is not appropriate, but
517        # setting __wrapped__ may still help with introspection.
518        _exit_wrapper.__wrapped__ = callback
519        self._push_exit_callback(_exit_wrapper)
520        return callback  # Allow use as a decorator
521
522    def _push_cm_exit(self, cm, cm_exit):
523        """Helper to correctly register callbacks to __exit__ methods."""
524        _exit_wrapper = self._create_exit_wrapper(cm, cm_exit)
525        self._push_exit_callback(_exit_wrapper, True)
526
527    def _push_exit_callback(self, callback, is_sync=True):
528        self._exit_callbacks.append((is_sync, callback))
529
530
531# Inspired by discussions on http://bugs.python.org/issue13585
532class ExitStack(_BaseExitStack, AbstractContextManager):
533    """Context manager for dynamic management of a stack of exit callbacks.
534
535    For example:
536        with ExitStack() as stack:
537            files = [stack.enter_context(open(fname)) for fname in filenames]
538            # All opened files will automatically be closed at the end of
539            # the with statement, even if attempts to open files later
540            # in the list raise an exception.
541    """
542
543    def __enter__(self):
544        return self
545
546    def __exit__(self, *exc_details):
547        received_exc = exc_details[0] is not None
548
549        # We manipulate the exception state so it behaves as though
550        # we were actually nesting multiple with statements
551        frame_exc = sys.exc_info()[1]
552        def _fix_exception_context(new_exc, old_exc):
553            # Context may not be correct, so find the end of the chain
554            while 1:
555                exc_context = new_exc.__context__
556                if exc_context is None or exc_context is old_exc:
557                    # Context is already set correctly (see issue 20317)
558                    return
559                if exc_context is frame_exc:
560                    break
561                new_exc = exc_context
562            # Change the end of the chain to point to the exception
563            # we expect it to reference
564            new_exc.__context__ = old_exc
565
566        # Callbacks are invoked in LIFO order to match the behaviour of
567        # nested context managers
568        suppressed_exc = False
569        pending_raise = False
570        while self._exit_callbacks:
571            is_sync, cb = self._exit_callbacks.pop()
572            assert is_sync
573            try:
574                if cb(*exc_details):
575                    suppressed_exc = True
576                    pending_raise = False
577                    exc_details = (None, None, None)
578            except:
579                new_exc_details = sys.exc_info()
580                # simulate the stack of exceptions by setting the context
581                _fix_exception_context(new_exc_details[1], exc_details[1])
582                pending_raise = True
583                exc_details = new_exc_details
584        if pending_raise:
585            try:
586                # bare "raise exc_details[1]" replaces our carefully
587                # set-up context
588                fixed_ctx = exc_details[1].__context__
589                raise exc_details[1]
590            except BaseException:
591                exc_details[1].__context__ = fixed_ctx
592                raise
593        return received_exc and suppressed_exc
594
595    def close(self):
596        """Immediately unwind the context stack."""
597        self.__exit__(None, None, None)
598
599
600# Inspired by discussions on https://bugs.python.org/issue29302
601class AsyncExitStack(_BaseExitStack, AbstractAsyncContextManager):
602    """Async context manager for dynamic management of a stack of exit
603    callbacks.
604
605    For example:
606        async with AsyncExitStack() as stack:
607            connections = [await stack.enter_async_context(get_connection())
608                for i in range(5)]
609            # All opened connections will automatically be released at the
610            # end of the async with statement, even if attempts to open a
611            # connection later in the list raise an exception.
612    """
613
614    @staticmethod
615    def _create_async_exit_wrapper(cm, cm_exit):
616        return MethodType(cm_exit, cm)
617
618    @staticmethod
619    def _create_async_cb_wrapper(callback, /, *args, **kwds):
620        async def _exit_wrapper(exc_type, exc, tb):
621            await callback(*args, **kwds)
622        return _exit_wrapper
623
624    async def enter_async_context(self, cm):
625        """Enters the supplied async context manager.
626
627        If successful, also pushes its __aexit__ method as a callback and
628        returns the result of the __aenter__ method.
629        """
630        cls = type(cm)
631        try:
632            _enter = cls.__aenter__
633            _exit = cls.__aexit__
634        except AttributeError:
635            raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
636                            f"not support the asynchronous context manager protocol"
637                           ) from None
638        result = await _enter(cm)
639        self._push_async_cm_exit(cm, _exit)
640        return result
641
642    def push_async_exit(self, exit):
643        """Registers a coroutine function with the standard __aexit__ method
644        signature.
645
646        Can suppress exceptions the same way __aexit__ method can.
647        Also accepts any object with an __aexit__ method (registering a call
648        to the method instead of the object itself).
649        """
650        _cb_type = type(exit)
651        try:
652            exit_method = _cb_type.__aexit__
653        except AttributeError:
654            # Not an async context manager, so assume it's a coroutine function
655            self._push_exit_callback(exit, False)
656        else:
657            self._push_async_cm_exit(exit, exit_method)
658        return exit  # Allow use as a decorator
659
660    def push_async_callback(self, callback, /, *args, **kwds):
661        """Registers an arbitrary coroutine function and arguments.
662
663        Cannot suppress exceptions.
664        """
665        _exit_wrapper = self._create_async_cb_wrapper(callback, *args, **kwds)
666
667        # We changed the signature, so using @wraps is not appropriate, but
668        # setting __wrapped__ may still help with introspection.
669        _exit_wrapper.__wrapped__ = callback
670        self._push_exit_callback(_exit_wrapper, False)
671        return callback  # Allow use as a decorator
672
673    async def aclose(self):
674        """Immediately unwind the context stack."""
675        await self.__aexit__(None, None, None)
676
677    def _push_async_cm_exit(self, cm, cm_exit):
678        """Helper to correctly register coroutine function to __aexit__
679        method."""
680        _exit_wrapper = self._create_async_exit_wrapper(cm, cm_exit)
681        self._push_exit_callback(_exit_wrapper, False)
682
683    async def __aenter__(self):
684        return self
685
686    async def __aexit__(self, *exc_details):
687        received_exc = exc_details[0] is not None
688
689        # We manipulate the exception state so it behaves as though
690        # we were actually nesting multiple with statements
691        frame_exc = sys.exc_info()[1]
692        def _fix_exception_context(new_exc, old_exc):
693            # Context may not be correct, so find the end of the chain
694            while 1:
695                exc_context = new_exc.__context__
696                if exc_context is None or exc_context is old_exc:
697                    # Context is already set correctly (see issue 20317)
698                    return
699                if exc_context is frame_exc:
700                    break
701                new_exc = exc_context
702            # Change the end of the chain to point to the exception
703            # we expect it to reference
704            new_exc.__context__ = old_exc
705
706        # Callbacks are invoked in LIFO order to match the behaviour of
707        # nested context managers
708        suppressed_exc = False
709        pending_raise = False
710        while self._exit_callbacks:
711            is_sync, cb = self._exit_callbacks.pop()
712            try:
713                if is_sync:
714                    cb_suppress = cb(*exc_details)
715                else:
716                    cb_suppress = await cb(*exc_details)
717
718                if cb_suppress:
719                    suppressed_exc = True
720                    pending_raise = False
721                    exc_details = (None, None, None)
722            except:
723                new_exc_details = sys.exc_info()
724                # simulate the stack of exceptions by setting the context
725                _fix_exception_context(new_exc_details[1], exc_details[1])
726                pending_raise = True
727                exc_details = new_exc_details
728        if pending_raise:
729            try:
730                # bare "raise exc_details[1]" replaces our carefully
731                # set-up context
732                fixed_ctx = exc_details[1].__context__
733                raise exc_details[1]
734            except BaseException:
735                exc_details[1].__context__ = fixed_ctx
736                raise
737        return received_exc and suppressed_exc
738
739
740class nullcontext(AbstractContextManager, AbstractAsyncContextManager):
741    """Context manager that does no additional processing.
742
743    Used as a stand-in for a normal context manager, when a particular
744    block of code is only sometimes used with a normal context manager:
745
746    cm = optional_cm if condition else nullcontext()
747    with cm:
748        # Perform operation, using optional_cm if condition is True
749    """
750
751    def __init__(self, enter_result=None):
752        self.enter_result = enter_result
753
754    def __enter__(self):
755        return self.enter_result
756
757    def __exit__(self, *excinfo):
758        pass
759
760    async def __aenter__(self):
761        return self.enter_result
762
763    async def __aexit__(self, *excinfo):
764        pass
765
766
767class chdir(AbstractContextManager):
768    """Non thread-safe context manager to change the current working directory."""
769
770    def __init__(self, path):
771        self.path = path
772        self._old_cwd = []
773
774    def __enter__(self):
775        self._old_cwd.append(os.getcwd())
776        os.chdir(self.path)
777
778    def __exit__(self, *excinfo):
779        os.chdir(self._old_cwd.pop())
780