xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/taskgroups.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Worker# Adapted with permission from the EdgeDB project;
2*cda5da8dSAndroid Build Coastguard Worker# license: PSFL.
3*cda5da8dSAndroid Build Coastguard Worker
4*cda5da8dSAndroid Build Coastguard Worker
5*cda5da8dSAndroid Build Coastguard Worker__all__ = ["TaskGroup"]
6*cda5da8dSAndroid Build Coastguard Worker
7*cda5da8dSAndroid Build Coastguard Workerfrom . import events
8*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions
9*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks
10*cda5da8dSAndroid Build Coastguard Worker
11*cda5da8dSAndroid Build Coastguard Worker
12*cda5da8dSAndroid Build Coastguard Workerclass TaskGroup:
13*cda5da8dSAndroid Build Coastguard Worker    """Asynchronous context manager for managing groups of tasks.
14*cda5da8dSAndroid Build Coastguard Worker
15*cda5da8dSAndroid Build Coastguard Worker    Example use:
16*cda5da8dSAndroid Build Coastguard Worker
17*cda5da8dSAndroid Build Coastguard Worker        async with asyncio.TaskGroup() as group:
18*cda5da8dSAndroid Build Coastguard Worker            task1 = group.create_task(some_coroutine(...))
19*cda5da8dSAndroid Build Coastguard Worker            task2 = group.create_task(other_coroutine(...))
20*cda5da8dSAndroid Build Coastguard Worker        print("Both tasks have completed now.")
21*cda5da8dSAndroid Build Coastguard Worker
22*cda5da8dSAndroid Build Coastguard Worker    All tasks are awaited when the context manager exits.
23*cda5da8dSAndroid Build Coastguard Worker
24*cda5da8dSAndroid Build Coastguard Worker    Any exceptions other than `asyncio.CancelledError` raised within
25*cda5da8dSAndroid Build Coastguard Worker    a task will cancel all remaining tasks and wait for them to exit.
26*cda5da8dSAndroid Build Coastguard Worker    The exceptions are then combined and raised as an `ExceptionGroup`.
27*cda5da8dSAndroid Build Coastguard Worker    """
28*cda5da8dSAndroid Build Coastguard Worker    def __init__(self):
29*cda5da8dSAndroid Build Coastguard Worker        self._entered = False
30*cda5da8dSAndroid Build Coastguard Worker        self._exiting = False
31*cda5da8dSAndroid Build Coastguard Worker        self._aborting = False
32*cda5da8dSAndroid Build Coastguard Worker        self._loop = None
33*cda5da8dSAndroid Build Coastguard Worker        self._parent_task = None
34*cda5da8dSAndroid Build Coastguard Worker        self._parent_cancel_requested = False
35*cda5da8dSAndroid Build Coastguard Worker        self._tasks = set()
36*cda5da8dSAndroid Build Coastguard Worker        self._errors = []
37*cda5da8dSAndroid Build Coastguard Worker        self._base_error = None
38*cda5da8dSAndroid Build Coastguard Worker        self._on_completed_fut = None
39*cda5da8dSAndroid Build Coastguard Worker
40*cda5da8dSAndroid Build Coastguard Worker    def __repr__(self):
41*cda5da8dSAndroid Build Coastguard Worker        info = ['']
42*cda5da8dSAndroid Build Coastguard Worker        if self._tasks:
43*cda5da8dSAndroid Build Coastguard Worker            info.append(f'tasks={len(self._tasks)}')
44*cda5da8dSAndroid Build Coastguard Worker        if self._errors:
45*cda5da8dSAndroid Build Coastguard Worker            info.append(f'errors={len(self._errors)}')
46*cda5da8dSAndroid Build Coastguard Worker        if self._aborting:
47*cda5da8dSAndroid Build Coastguard Worker            info.append('cancelling')
48*cda5da8dSAndroid Build Coastguard Worker        elif self._entered:
49*cda5da8dSAndroid Build Coastguard Worker            info.append('entered')
50*cda5da8dSAndroid Build Coastguard Worker
51*cda5da8dSAndroid Build Coastguard Worker        info_str = ' '.join(info)
52*cda5da8dSAndroid Build Coastguard Worker        return f'<TaskGroup{info_str}>'
53*cda5da8dSAndroid Build Coastguard Worker
54*cda5da8dSAndroid Build Coastguard Worker    async def __aenter__(self):
55*cda5da8dSAndroid Build Coastguard Worker        if self._entered:
56*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(
57*cda5da8dSAndroid Build Coastguard Worker                f"TaskGroup {self!r} has been already entered")
58*cda5da8dSAndroid Build Coastguard Worker        self._entered = True
59*cda5da8dSAndroid Build Coastguard Worker
60*cda5da8dSAndroid Build Coastguard Worker        if self._loop is None:
61*cda5da8dSAndroid Build Coastguard Worker            self._loop = events.get_running_loop()
62*cda5da8dSAndroid Build Coastguard Worker
63*cda5da8dSAndroid Build Coastguard Worker        self._parent_task = tasks.current_task(self._loop)
64*cda5da8dSAndroid Build Coastguard Worker        if self._parent_task is None:
65*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(
66*cda5da8dSAndroid Build Coastguard Worker                f'TaskGroup {self!r} cannot determine the parent task')
67*cda5da8dSAndroid Build Coastguard Worker
68*cda5da8dSAndroid Build Coastguard Worker        return self
69*cda5da8dSAndroid Build Coastguard Worker
70*cda5da8dSAndroid Build Coastguard Worker    async def __aexit__(self, et, exc, tb):
71*cda5da8dSAndroid Build Coastguard Worker        self._exiting = True
72*cda5da8dSAndroid Build Coastguard Worker
73*cda5da8dSAndroid Build Coastguard Worker        if (exc is not None and
74*cda5da8dSAndroid Build Coastguard Worker                self._is_base_error(exc) and
75*cda5da8dSAndroid Build Coastguard Worker                self._base_error is None):
76*cda5da8dSAndroid Build Coastguard Worker            self._base_error = exc
77*cda5da8dSAndroid Build Coastguard Worker
78*cda5da8dSAndroid Build Coastguard Worker        propagate_cancellation_error = \
79*cda5da8dSAndroid Build Coastguard Worker            exc if et is exceptions.CancelledError else None
80*cda5da8dSAndroid Build Coastguard Worker        if self._parent_cancel_requested:
81*cda5da8dSAndroid Build Coastguard Worker            # If this flag is set we *must* call uncancel().
82*cda5da8dSAndroid Build Coastguard Worker            if self._parent_task.uncancel() == 0:
83*cda5da8dSAndroid Build Coastguard Worker                # If there are no pending cancellations left,
84*cda5da8dSAndroid Build Coastguard Worker                # don't propagate CancelledError.
85*cda5da8dSAndroid Build Coastguard Worker                propagate_cancellation_error = None
86*cda5da8dSAndroid Build Coastguard Worker
87*cda5da8dSAndroid Build Coastguard Worker        if et is not None:
88*cda5da8dSAndroid Build Coastguard Worker            if not self._aborting:
89*cda5da8dSAndroid Build Coastguard Worker                # Our parent task is being cancelled:
90*cda5da8dSAndroid Build Coastguard Worker                #
91*cda5da8dSAndroid Build Coastguard Worker                #    async with TaskGroup() as g:
92*cda5da8dSAndroid Build Coastguard Worker                #        g.create_task(...)
93*cda5da8dSAndroid Build Coastguard Worker                #        await ...  # <- CancelledError
94*cda5da8dSAndroid Build Coastguard Worker                #
95*cda5da8dSAndroid Build Coastguard Worker                # or there's an exception in "async with":
96*cda5da8dSAndroid Build Coastguard Worker                #
97*cda5da8dSAndroid Build Coastguard Worker                #    async with TaskGroup() as g:
98*cda5da8dSAndroid Build Coastguard Worker                #        g.create_task(...)
99*cda5da8dSAndroid Build Coastguard Worker                #        1 / 0
100*cda5da8dSAndroid Build Coastguard Worker                #
101*cda5da8dSAndroid Build Coastguard Worker                self._abort()
102*cda5da8dSAndroid Build Coastguard Worker
103*cda5da8dSAndroid Build Coastguard Worker        # We use while-loop here because "self._on_completed_fut"
104*cda5da8dSAndroid Build Coastguard Worker        # can be cancelled multiple times if our parent task
105*cda5da8dSAndroid Build Coastguard Worker        # is being cancelled repeatedly (or even once, when
106*cda5da8dSAndroid Build Coastguard Worker        # our own cancellation is already in progress)
107*cda5da8dSAndroid Build Coastguard Worker        while self._tasks:
108*cda5da8dSAndroid Build Coastguard Worker            if self._on_completed_fut is None:
109*cda5da8dSAndroid Build Coastguard Worker                self._on_completed_fut = self._loop.create_future()
110*cda5da8dSAndroid Build Coastguard Worker
111*cda5da8dSAndroid Build Coastguard Worker            try:
112*cda5da8dSAndroid Build Coastguard Worker                await self._on_completed_fut
113*cda5da8dSAndroid Build Coastguard Worker            except exceptions.CancelledError as ex:
114*cda5da8dSAndroid Build Coastguard Worker                if not self._aborting:
115*cda5da8dSAndroid Build Coastguard Worker                    # Our parent task is being cancelled:
116*cda5da8dSAndroid Build Coastguard Worker                    #
117*cda5da8dSAndroid Build Coastguard Worker                    #    async def wrapper():
118*cda5da8dSAndroid Build Coastguard Worker                    #        async with TaskGroup() as g:
119*cda5da8dSAndroid Build Coastguard Worker                    #            g.create_task(foo)
120*cda5da8dSAndroid Build Coastguard Worker                    #
121*cda5da8dSAndroid Build Coastguard Worker                    # "wrapper" is being cancelled while "foo" is
122*cda5da8dSAndroid Build Coastguard Worker                    # still running.
123*cda5da8dSAndroid Build Coastguard Worker                    propagate_cancellation_error = ex
124*cda5da8dSAndroid Build Coastguard Worker                    self._abort()
125*cda5da8dSAndroid Build Coastguard Worker
126*cda5da8dSAndroid Build Coastguard Worker            self._on_completed_fut = None
127*cda5da8dSAndroid Build Coastguard Worker
128*cda5da8dSAndroid Build Coastguard Worker        assert not self._tasks
129*cda5da8dSAndroid Build Coastguard Worker
130*cda5da8dSAndroid Build Coastguard Worker        if self._base_error is not None:
131*cda5da8dSAndroid Build Coastguard Worker            raise self._base_error
132*cda5da8dSAndroid Build Coastguard Worker
133*cda5da8dSAndroid Build Coastguard Worker        # Propagate CancelledError if there is one, except if there
134*cda5da8dSAndroid Build Coastguard Worker        # are other errors -- those have priority.
135*cda5da8dSAndroid Build Coastguard Worker        if propagate_cancellation_error and not self._errors:
136*cda5da8dSAndroid Build Coastguard Worker            raise propagate_cancellation_error
137*cda5da8dSAndroid Build Coastguard Worker
138*cda5da8dSAndroid Build Coastguard Worker        if et is not None and et is not exceptions.CancelledError:
139*cda5da8dSAndroid Build Coastguard Worker            self._errors.append(exc)
140*cda5da8dSAndroid Build Coastguard Worker
141*cda5da8dSAndroid Build Coastguard Worker        if self._errors:
142*cda5da8dSAndroid Build Coastguard Worker            # Exceptions are heavy objects that can have object
143*cda5da8dSAndroid Build Coastguard Worker            # cycles (bad for GC); let's not keep a reference to
144*cda5da8dSAndroid Build Coastguard Worker            # a bunch of them.
145*cda5da8dSAndroid Build Coastguard Worker            try:
146*cda5da8dSAndroid Build Coastguard Worker                me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors)
147*cda5da8dSAndroid Build Coastguard Worker                raise me from None
148*cda5da8dSAndroid Build Coastguard Worker            finally:
149*cda5da8dSAndroid Build Coastguard Worker                self._errors = None
150*cda5da8dSAndroid Build Coastguard Worker
151*cda5da8dSAndroid Build Coastguard Worker    def create_task(self, coro, *, name=None, context=None):
152*cda5da8dSAndroid Build Coastguard Worker        """Create a new task in this group and return it.
153*cda5da8dSAndroid Build Coastguard Worker
154*cda5da8dSAndroid Build Coastguard Worker        Similar to `asyncio.create_task`.
155*cda5da8dSAndroid Build Coastguard Worker        """
156*cda5da8dSAndroid Build Coastguard Worker        if not self._entered:
157*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(f"TaskGroup {self!r} has not been entered")
158*cda5da8dSAndroid Build Coastguard Worker        if self._exiting and not self._tasks:
159*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(f"TaskGroup {self!r} is finished")
160*cda5da8dSAndroid Build Coastguard Worker        if self._aborting:
161*cda5da8dSAndroid Build Coastguard Worker            raise RuntimeError(f"TaskGroup {self!r} is shutting down")
162*cda5da8dSAndroid Build Coastguard Worker        if context is None:
163*cda5da8dSAndroid Build Coastguard Worker            task = self._loop.create_task(coro)
164*cda5da8dSAndroid Build Coastguard Worker        else:
165*cda5da8dSAndroid Build Coastguard Worker            task = self._loop.create_task(coro, context=context)
166*cda5da8dSAndroid Build Coastguard Worker        tasks._set_task_name(task, name)
167*cda5da8dSAndroid Build Coastguard Worker        task.add_done_callback(self._on_task_done)
168*cda5da8dSAndroid Build Coastguard Worker        self._tasks.add(task)
169*cda5da8dSAndroid Build Coastguard Worker        return task
170*cda5da8dSAndroid Build Coastguard Worker
171*cda5da8dSAndroid Build Coastguard Worker    # Since Python 3.8 Tasks propagate all exceptions correctly,
172*cda5da8dSAndroid Build Coastguard Worker    # except for KeyboardInterrupt and SystemExit which are
173*cda5da8dSAndroid Build Coastguard Worker    # still considered special.
174*cda5da8dSAndroid Build Coastguard Worker
175*cda5da8dSAndroid Build Coastguard Worker    def _is_base_error(self, exc: BaseException) -> bool:
176*cda5da8dSAndroid Build Coastguard Worker        assert isinstance(exc, BaseException)
177*cda5da8dSAndroid Build Coastguard Worker        return isinstance(exc, (SystemExit, KeyboardInterrupt))
178*cda5da8dSAndroid Build Coastguard Worker
179*cda5da8dSAndroid Build Coastguard Worker    def _abort(self):
180*cda5da8dSAndroid Build Coastguard Worker        self._aborting = True
181*cda5da8dSAndroid Build Coastguard Worker
182*cda5da8dSAndroid Build Coastguard Worker        for t in self._tasks:
183*cda5da8dSAndroid Build Coastguard Worker            if not t.done():
184*cda5da8dSAndroid Build Coastguard Worker                t.cancel()
185*cda5da8dSAndroid Build Coastguard Worker
186*cda5da8dSAndroid Build Coastguard Worker    def _on_task_done(self, task):
187*cda5da8dSAndroid Build Coastguard Worker        self._tasks.discard(task)
188*cda5da8dSAndroid Build Coastguard Worker
189*cda5da8dSAndroid Build Coastguard Worker        if self._on_completed_fut is not None and not self._tasks:
190*cda5da8dSAndroid Build Coastguard Worker            if not self._on_completed_fut.done():
191*cda5da8dSAndroid Build Coastguard Worker                self._on_completed_fut.set_result(True)
192*cda5da8dSAndroid Build Coastguard Worker
193*cda5da8dSAndroid Build Coastguard Worker        if task.cancelled():
194*cda5da8dSAndroid Build Coastguard Worker            return
195*cda5da8dSAndroid Build Coastguard Worker
196*cda5da8dSAndroid Build Coastguard Worker        exc = task.exception()
197*cda5da8dSAndroid Build Coastguard Worker        if exc is None:
198*cda5da8dSAndroid Build Coastguard Worker            return
199*cda5da8dSAndroid Build Coastguard Worker
200*cda5da8dSAndroid Build Coastguard Worker        self._errors.append(exc)
201*cda5da8dSAndroid Build Coastguard Worker        if self._is_base_error(exc) and self._base_error is None:
202*cda5da8dSAndroid Build Coastguard Worker            self._base_error = exc
203*cda5da8dSAndroid Build Coastguard Worker
204*cda5da8dSAndroid Build Coastguard Worker        if self._parent_task.done():
205*cda5da8dSAndroid Build Coastguard Worker            # Not sure if this case is possible, but we want to handle
206*cda5da8dSAndroid Build Coastguard Worker            # it anyways.
207*cda5da8dSAndroid Build Coastguard Worker            self._loop.call_exception_handler({
208*cda5da8dSAndroid Build Coastguard Worker                'message': f'Task {task!r} has errored out but its parent '
209*cda5da8dSAndroid Build Coastguard Worker                           f'task {self._parent_task} is already completed',
210*cda5da8dSAndroid Build Coastguard Worker                'exception': exc,
211*cda5da8dSAndroid Build Coastguard Worker                'task': task,
212*cda5da8dSAndroid Build Coastguard Worker            })
213*cda5da8dSAndroid Build Coastguard Worker            return
214*cda5da8dSAndroid Build Coastguard Worker
215*cda5da8dSAndroid Build Coastguard Worker        if not self._aborting and not self._parent_cancel_requested:
216*cda5da8dSAndroid Build Coastguard Worker            # If parent task *is not* being cancelled, it means that we want
217*cda5da8dSAndroid Build Coastguard Worker            # to manually cancel it to abort whatever is being run right now
218*cda5da8dSAndroid Build Coastguard Worker            # in the TaskGroup.  But we want to mark parent task as
219*cda5da8dSAndroid Build Coastguard Worker            # "not cancelled" later in __aexit__.  Example situation that
220*cda5da8dSAndroid Build Coastguard Worker            # we need to handle:
221*cda5da8dSAndroid Build Coastguard Worker            #
222*cda5da8dSAndroid Build Coastguard Worker            #    async def foo():
223*cda5da8dSAndroid Build Coastguard Worker            #        try:
224*cda5da8dSAndroid Build Coastguard Worker            #            async with TaskGroup() as g:
225*cda5da8dSAndroid Build Coastguard Worker            #                g.create_task(crash_soon())
226*cda5da8dSAndroid Build Coastguard Worker            #                await something  # <- this needs to be canceled
227*cda5da8dSAndroid Build Coastguard Worker            #                                 #    by the TaskGroup, e.g.
228*cda5da8dSAndroid Build Coastguard Worker            #                                 #    foo() needs to be cancelled
229*cda5da8dSAndroid Build Coastguard Worker            #        except Exception:
230*cda5da8dSAndroid Build Coastguard Worker            #            # Ignore any exceptions raised in the TaskGroup
231*cda5da8dSAndroid Build Coastguard Worker            #            pass
232*cda5da8dSAndroid Build Coastguard Worker            #        await something_else     # this line has to be called
233*cda5da8dSAndroid Build Coastguard Worker            #                                 # after TaskGroup is finished.
234*cda5da8dSAndroid Build Coastguard Worker            self._abort()
235*cda5da8dSAndroid Build Coastguard Worker            self._parent_cancel_requested = True
236*cda5da8dSAndroid Build Coastguard Worker            self._parent_task.cancel()
237