1*cda5da8dSAndroid Build Coastguard Worker# Adapted with permission from the EdgeDB project; 2*cda5da8dSAndroid Build Coastguard Worker# license: PSFL. 3*cda5da8dSAndroid Build Coastguard Worker 4*cda5da8dSAndroid Build Coastguard Worker 5*cda5da8dSAndroid Build Coastguard Worker__all__ = ["TaskGroup"] 6*cda5da8dSAndroid Build Coastguard Worker 7*cda5da8dSAndroid Build Coastguard Workerfrom . import events 8*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 9*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks 10*cda5da8dSAndroid Build Coastguard Worker 11*cda5da8dSAndroid Build Coastguard Worker 12*cda5da8dSAndroid Build Coastguard Workerclass TaskGroup: 13*cda5da8dSAndroid Build Coastguard Worker """Asynchronous context manager for managing groups of tasks. 14*cda5da8dSAndroid Build Coastguard Worker 15*cda5da8dSAndroid Build Coastguard Worker Example use: 16*cda5da8dSAndroid Build Coastguard Worker 17*cda5da8dSAndroid Build Coastguard Worker async with asyncio.TaskGroup() as group: 18*cda5da8dSAndroid Build Coastguard Worker task1 = group.create_task(some_coroutine(...)) 19*cda5da8dSAndroid Build Coastguard Worker task2 = group.create_task(other_coroutine(...)) 20*cda5da8dSAndroid Build Coastguard Worker print("Both tasks have completed now.") 21*cda5da8dSAndroid Build Coastguard Worker 22*cda5da8dSAndroid Build Coastguard Worker All tasks are awaited when the context manager exits. 23*cda5da8dSAndroid Build Coastguard Worker 24*cda5da8dSAndroid Build Coastguard Worker Any exceptions other than `asyncio.CancelledError` raised within 25*cda5da8dSAndroid Build Coastguard Worker a task will cancel all remaining tasks and wait for them to exit. 26*cda5da8dSAndroid Build Coastguard Worker The exceptions are then combined and raised as an `ExceptionGroup`. 27*cda5da8dSAndroid Build Coastguard Worker """ 28*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 29*cda5da8dSAndroid Build Coastguard Worker self._entered = False 30*cda5da8dSAndroid Build Coastguard Worker self._exiting = False 31*cda5da8dSAndroid Build Coastguard Worker self._aborting = False 32*cda5da8dSAndroid Build Coastguard Worker self._loop = None 33*cda5da8dSAndroid Build Coastguard Worker self._parent_task = None 34*cda5da8dSAndroid Build Coastguard Worker self._parent_cancel_requested = False 35*cda5da8dSAndroid Build Coastguard Worker self._tasks = set() 36*cda5da8dSAndroid Build Coastguard Worker self._errors = [] 37*cda5da8dSAndroid Build Coastguard Worker self._base_error = None 38*cda5da8dSAndroid Build Coastguard Worker self._on_completed_fut = None 39*cda5da8dSAndroid Build Coastguard Worker 40*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 41*cda5da8dSAndroid Build Coastguard Worker info = [''] 42*cda5da8dSAndroid Build Coastguard Worker if self._tasks: 43*cda5da8dSAndroid Build Coastguard Worker info.append(f'tasks={len(self._tasks)}') 44*cda5da8dSAndroid Build Coastguard Worker if self._errors: 45*cda5da8dSAndroid Build Coastguard Worker info.append(f'errors={len(self._errors)}') 46*cda5da8dSAndroid Build Coastguard Worker if self._aborting: 47*cda5da8dSAndroid Build Coastguard Worker info.append('cancelling') 48*cda5da8dSAndroid Build Coastguard Worker elif self._entered: 49*cda5da8dSAndroid Build Coastguard Worker info.append('entered') 50*cda5da8dSAndroid Build Coastguard Worker 51*cda5da8dSAndroid Build Coastguard Worker info_str = ' '.join(info) 52*cda5da8dSAndroid Build Coastguard Worker return f'<TaskGroup{info_str}>' 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker async def __aenter__(self): 55*cda5da8dSAndroid Build Coastguard Worker if self._entered: 56*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 57*cda5da8dSAndroid Build Coastguard Worker f"TaskGroup {self!r} has been already entered") 58*cda5da8dSAndroid Build Coastguard Worker self._entered = True 59*cda5da8dSAndroid Build Coastguard Worker 60*cda5da8dSAndroid Build Coastguard Worker if self._loop is None: 61*cda5da8dSAndroid Build Coastguard Worker self._loop = events.get_running_loop() 62*cda5da8dSAndroid Build Coastguard Worker 63*cda5da8dSAndroid Build Coastguard Worker self._parent_task = tasks.current_task(self._loop) 64*cda5da8dSAndroid Build Coastguard Worker if self._parent_task is None: 65*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 66*cda5da8dSAndroid Build Coastguard Worker f'TaskGroup {self!r} cannot determine the parent task') 67*cda5da8dSAndroid Build Coastguard Worker 68*cda5da8dSAndroid Build Coastguard Worker return self 69*cda5da8dSAndroid Build Coastguard Worker 70*cda5da8dSAndroid Build Coastguard Worker async def __aexit__(self, et, exc, tb): 71*cda5da8dSAndroid Build Coastguard Worker self._exiting = True 72*cda5da8dSAndroid Build Coastguard Worker 73*cda5da8dSAndroid Build Coastguard Worker if (exc is not None and 74*cda5da8dSAndroid Build Coastguard Worker self._is_base_error(exc) and 75*cda5da8dSAndroid Build Coastguard Worker self._base_error is None): 76*cda5da8dSAndroid Build Coastguard Worker self._base_error = exc 77*cda5da8dSAndroid Build Coastguard Worker 78*cda5da8dSAndroid Build Coastguard Worker propagate_cancellation_error = \ 79*cda5da8dSAndroid Build Coastguard Worker exc if et is exceptions.CancelledError else None 80*cda5da8dSAndroid Build Coastguard Worker if self._parent_cancel_requested: 81*cda5da8dSAndroid Build Coastguard Worker # If this flag is set we *must* call uncancel(). 82*cda5da8dSAndroid Build Coastguard Worker if self._parent_task.uncancel() == 0: 83*cda5da8dSAndroid Build Coastguard Worker # If there are no pending cancellations left, 84*cda5da8dSAndroid Build Coastguard Worker # don't propagate CancelledError. 85*cda5da8dSAndroid Build Coastguard Worker propagate_cancellation_error = None 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker if et is not None: 88*cda5da8dSAndroid Build Coastguard Worker if not self._aborting: 89*cda5da8dSAndroid Build Coastguard Worker # Our parent task is being cancelled: 90*cda5da8dSAndroid Build Coastguard Worker # 91*cda5da8dSAndroid Build Coastguard Worker # async with TaskGroup() as g: 92*cda5da8dSAndroid Build Coastguard Worker # g.create_task(...) 93*cda5da8dSAndroid Build Coastguard Worker # await ... # <- CancelledError 94*cda5da8dSAndroid Build Coastguard Worker # 95*cda5da8dSAndroid Build Coastguard Worker # or there's an exception in "async with": 96*cda5da8dSAndroid Build Coastguard Worker # 97*cda5da8dSAndroid Build Coastguard Worker # async with TaskGroup() as g: 98*cda5da8dSAndroid Build Coastguard Worker # g.create_task(...) 99*cda5da8dSAndroid Build Coastguard Worker # 1 / 0 100*cda5da8dSAndroid Build Coastguard Worker # 101*cda5da8dSAndroid Build Coastguard Worker self._abort() 102*cda5da8dSAndroid Build Coastguard Worker 103*cda5da8dSAndroid Build Coastguard Worker # We use while-loop here because "self._on_completed_fut" 104*cda5da8dSAndroid Build Coastguard Worker # can be cancelled multiple times if our parent task 105*cda5da8dSAndroid Build Coastguard Worker # is being cancelled repeatedly (or even once, when 106*cda5da8dSAndroid Build Coastguard Worker # our own cancellation is already in progress) 107*cda5da8dSAndroid Build Coastguard Worker while self._tasks: 108*cda5da8dSAndroid Build Coastguard Worker if self._on_completed_fut is None: 109*cda5da8dSAndroid Build Coastguard Worker self._on_completed_fut = self._loop.create_future() 110*cda5da8dSAndroid Build Coastguard Worker 111*cda5da8dSAndroid Build Coastguard Worker try: 112*cda5da8dSAndroid Build Coastguard Worker await self._on_completed_fut 113*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError as ex: 114*cda5da8dSAndroid Build Coastguard Worker if not self._aborting: 115*cda5da8dSAndroid Build Coastguard Worker # Our parent task is being cancelled: 116*cda5da8dSAndroid Build Coastguard Worker # 117*cda5da8dSAndroid Build Coastguard Worker # async def wrapper(): 118*cda5da8dSAndroid Build Coastguard Worker # async with TaskGroup() as g: 119*cda5da8dSAndroid Build Coastguard Worker # g.create_task(foo) 120*cda5da8dSAndroid Build Coastguard Worker # 121*cda5da8dSAndroid Build Coastguard Worker # "wrapper" is being cancelled while "foo" is 122*cda5da8dSAndroid Build Coastguard Worker # still running. 123*cda5da8dSAndroid Build Coastguard Worker propagate_cancellation_error = ex 124*cda5da8dSAndroid Build Coastguard Worker self._abort() 125*cda5da8dSAndroid Build Coastguard Worker 126*cda5da8dSAndroid Build Coastguard Worker self._on_completed_fut = None 127*cda5da8dSAndroid Build Coastguard Worker 128*cda5da8dSAndroid Build Coastguard Worker assert not self._tasks 129*cda5da8dSAndroid Build Coastguard Worker 130*cda5da8dSAndroid Build Coastguard Worker if self._base_error is not None: 131*cda5da8dSAndroid Build Coastguard Worker raise self._base_error 132*cda5da8dSAndroid Build Coastguard Worker 133*cda5da8dSAndroid Build Coastguard Worker # Propagate CancelledError if there is one, except if there 134*cda5da8dSAndroid Build Coastguard Worker # are other errors -- those have priority. 135*cda5da8dSAndroid Build Coastguard Worker if propagate_cancellation_error and not self._errors: 136*cda5da8dSAndroid Build Coastguard Worker raise propagate_cancellation_error 137*cda5da8dSAndroid Build Coastguard Worker 138*cda5da8dSAndroid Build Coastguard Worker if et is not None and et is not exceptions.CancelledError: 139*cda5da8dSAndroid Build Coastguard Worker self._errors.append(exc) 140*cda5da8dSAndroid Build Coastguard Worker 141*cda5da8dSAndroid Build Coastguard Worker if self._errors: 142*cda5da8dSAndroid Build Coastguard Worker # Exceptions are heavy objects that can have object 143*cda5da8dSAndroid Build Coastguard Worker # cycles (bad for GC); let's not keep a reference to 144*cda5da8dSAndroid Build Coastguard Worker # a bunch of them. 145*cda5da8dSAndroid Build Coastguard Worker try: 146*cda5da8dSAndroid Build Coastguard Worker me = BaseExceptionGroup('unhandled errors in a TaskGroup', self._errors) 147*cda5da8dSAndroid Build Coastguard Worker raise me from None 148*cda5da8dSAndroid Build Coastguard Worker finally: 149*cda5da8dSAndroid Build Coastguard Worker self._errors = None 150*cda5da8dSAndroid Build Coastguard Worker 151*cda5da8dSAndroid Build Coastguard Worker def create_task(self, coro, *, name=None, context=None): 152*cda5da8dSAndroid Build Coastguard Worker """Create a new task in this group and return it. 153*cda5da8dSAndroid Build Coastguard Worker 154*cda5da8dSAndroid Build Coastguard Worker Similar to `asyncio.create_task`. 155*cda5da8dSAndroid Build Coastguard Worker """ 156*cda5da8dSAndroid Build Coastguard Worker if not self._entered: 157*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f"TaskGroup {self!r} has not been entered") 158*cda5da8dSAndroid Build Coastguard Worker if self._exiting and not self._tasks: 159*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f"TaskGroup {self!r} is finished") 160*cda5da8dSAndroid Build Coastguard Worker if self._aborting: 161*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f"TaskGroup {self!r} is shutting down") 162*cda5da8dSAndroid Build Coastguard Worker if context is None: 163*cda5da8dSAndroid Build Coastguard Worker task = self._loop.create_task(coro) 164*cda5da8dSAndroid Build Coastguard Worker else: 165*cda5da8dSAndroid Build Coastguard Worker task = self._loop.create_task(coro, context=context) 166*cda5da8dSAndroid Build Coastguard Worker tasks._set_task_name(task, name) 167*cda5da8dSAndroid Build Coastguard Worker task.add_done_callback(self._on_task_done) 168*cda5da8dSAndroid Build Coastguard Worker self._tasks.add(task) 169*cda5da8dSAndroid Build Coastguard Worker return task 170*cda5da8dSAndroid Build Coastguard Worker 171*cda5da8dSAndroid Build Coastguard Worker # Since Python 3.8 Tasks propagate all exceptions correctly, 172*cda5da8dSAndroid Build Coastguard Worker # except for KeyboardInterrupt and SystemExit which are 173*cda5da8dSAndroid Build Coastguard Worker # still considered special. 174*cda5da8dSAndroid Build Coastguard Worker 175*cda5da8dSAndroid Build Coastguard Worker def _is_base_error(self, exc: BaseException) -> bool: 176*cda5da8dSAndroid Build Coastguard Worker assert isinstance(exc, BaseException) 177*cda5da8dSAndroid Build Coastguard Worker return isinstance(exc, (SystemExit, KeyboardInterrupt)) 178*cda5da8dSAndroid Build Coastguard Worker 179*cda5da8dSAndroid Build Coastguard Worker def _abort(self): 180*cda5da8dSAndroid Build Coastguard Worker self._aborting = True 181*cda5da8dSAndroid Build Coastguard Worker 182*cda5da8dSAndroid Build Coastguard Worker for t in self._tasks: 183*cda5da8dSAndroid Build Coastguard Worker if not t.done(): 184*cda5da8dSAndroid Build Coastguard Worker t.cancel() 185*cda5da8dSAndroid Build Coastguard Worker 186*cda5da8dSAndroid Build Coastguard Worker def _on_task_done(self, task): 187*cda5da8dSAndroid Build Coastguard Worker self._tasks.discard(task) 188*cda5da8dSAndroid Build Coastguard Worker 189*cda5da8dSAndroid Build Coastguard Worker if self._on_completed_fut is not None and not self._tasks: 190*cda5da8dSAndroid Build Coastguard Worker if not self._on_completed_fut.done(): 191*cda5da8dSAndroid Build Coastguard Worker self._on_completed_fut.set_result(True) 192*cda5da8dSAndroid Build Coastguard Worker 193*cda5da8dSAndroid Build Coastguard Worker if task.cancelled(): 194*cda5da8dSAndroid Build Coastguard Worker return 195*cda5da8dSAndroid Build Coastguard Worker 196*cda5da8dSAndroid Build Coastguard Worker exc = task.exception() 197*cda5da8dSAndroid Build Coastguard Worker if exc is None: 198*cda5da8dSAndroid Build Coastguard Worker return 199*cda5da8dSAndroid Build Coastguard Worker 200*cda5da8dSAndroid Build Coastguard Worker self._errors.append(exc) 201*cda5da8dSAndroid Build Coastguard Worker if self._is_base_error(exc) and self._base_error is None: 202*cda5da8dSAndroid Build Coastguard Worker self._base_error = exc 203*cda5da8dSAndroid Build Coastguard Worker 204*cda5da8dSAndroid Build Coastguard Worker if self._parent_task.done(): 205*cda5da8dSAndroid Build Coastguard Worker # Not sure if this case is possible, but we want to handle 206*cda5da8dSAndroid Build Coastguard Worker # it anyways. 207*cda5da8dSAndroid Build Coastguard Worker self._loop.call_exception_handler({ 208*cda5da8dSAndroid Build Coastguard Worker 'message': f'Task {task!r} has errored out but its parent ' 209*cda5da8dSAndroid Build Coastguard Worker f'task {self._parent_task} is already completed', 210*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 211*cda5da8dSAndroid Build Coastguard Worker 'task': task, 212*cda5da8dSAndroid Build Coastguard Worker }) 213*cda5da8dSAndroid Build Coastguard Worker return 214*cda5da8dSAndroid Build Coastguard Worker 215*cda5da8dSAndroid Build Coastguard Worker if not self._aborting and not self._parent_cancel_requested: 216*cda5da8dSAndroid Build Coastguard Worker # If parent task *is not* being cancelled, it means that we want 217*cda5da8dSAndroid Build Coastguard Worker # to manually cancel it to abort whatever is being run right now 218*cda5da8dSAndroid Build Coastguard Worker # in the TaskGroup. But we want to mark parent task as 219*cda5da8dSAndroid Build Coastguard Worker # "not cancelled" later in __aexit__. Example situation that 220*cda5da8dSAndroid Build Coastguard Worker # we need to handle: 221*cda5da8dSAndroid Build Coastguard Worker # 222*cda5da8dSAndroid Build Coastguard Worker # async def foo(): 223*cda5da8dSAndroid Build Coastguard Worker # try: 224*cda5da8dSAndroid Build Coastguard Worker # async with TaskGroup() as g: 225*cda5da8dSAndroid Build Coastguard Worker # g.create_task(crash_soon()) 226*cda5da8dSAndroid Build Coastguard Worker # await something # <- this needs to be canceled 227*cda5da8dSAndroid Build Coastguard Worker # # by the TaskGroup, e.g. 228*cda5da8dSAndroid Build Coastguard Worker # # foo() needs to be cancelled 229*cda5da8dSAndroid Build Coastguard Worker # except Exception: 230*cda5da8dSAndroid Build Coastguard Worker # # Ignore any exceptions raised in the TaskGroup 231*cda5da8dSAndroid Build Coastguard Worker # pass 232*cda5da8dSAndroid Build Coastguard Worker # await something_else # this line has to be called 233*cda5da8dSAndroid Build Coastguard Worker # # after TaskGroup is finished. 234*cda5da8dSAndroid Build Coastguard Worker self._abort() 235*cda5da8dSAndroid Build Coastguard Worker self._parent_cancel_requested = True 236*cda5da8dSAndroid Build Coastguard Worker self._parent_task.cancel() 237