xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/base_tasks.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1import linecache
2import reprlib
3import traceback
4
5from . import base_futures
6from . import coroutines
7
8
9def _task_repr_info(task):
10    info = base_futures._future_repr_info(task)
11
12    if task.cancelling() and not task.done():
13        # replace status
14        info[0] = 'cancelling'
15
16    info.insert(1, 'name=%r' % task.get_name())
17
18    coro = coroutines._format_coroutine(task._coro)
19    info.insert(2, f'coro=<{coro}>')
20
21    if task._fut_waiter is not None:
22        info.insert(3, f'wait_for={task._fut_waiter!r}')
23    return info
24
25
26@reprlib.recursive_repr()
27def _task_repr(task):
28    info = ' '.join(_task_repr_info(task))
29    return f'<{task.__class__.__name__} {info}>'
30
31
32def _task_get_stack(task, limit):
33    frames = []
34    if hasattr(task._coro, 'cr_frame'):
35        # case 1: 'async def' coroutines
36        f = task._coro.cr_frame
37    elif hasattr(task._coro, 'gi_frame'):
38        # case 2: legacy coroutines
39        f = task._coro.gi_frame
40    elif hasattr(task._coro, 'ag_frame'):
41        # case 3: async generators
42        f = task._coro.ag_frame
43    else:
44        # case 4: unknown objects
45        f = None
46    if f is not None:
47        while f is not None:
48            if limit is not None:
49                if limit <= 0:
50                    break
51                limit -= 1
52            frames.append(f)
53            f = f.f_back
54        frames.reverse()
55    elif task._exception is not None:
56        tb = task._exception.__traceback__
57        while tb is not None:
58            if limit is not None:
59                if limit <= 0:
60                    break
61                limit -= 1
62            frames.append(tb.tb_frame)
63            tb = tb.tb_next
64    return frames
65
66
67def _task_print_stack(task, limit, file):
68    extracted_list = []
69    checked = set()
70    for f in task.get_stack(limit=limit):
71        lineno = f.f_lineno
72        co = f.f_code
73        filename = co.co_filename
74        name = co.co_name
75        if filename not in checked:
76            checked.add(filename)
77            linecache.checkcache(filename)
78        line = linecache.getline(filename, lineno, f.f_globals)
79        extracted_list.append((filename, lineno, name, line))
80
81    exc = task._exception
82    if not extracted_list:
83        print(f'No stack for {task!r}', file=file)
84    elif exc is not None:
85        print(f'Traceback for {task!r} (most recent call last):', file=file)
86    else:
87        print(f'Stack for {task!r} (most recent call last):', file=file)
88
89    traceback.print_list(extracted_list, file=file)
90    if exc is not None:
91        for line in traceback.format_exception_only(exc.__class__, exc):
92            print(line, file=file, end='')
93