xref: /aosp_15_r20/external/pytorch/torch/_dynamo/polyfills/itertools.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1"""
2Python polyfills for itertools
3"""
4
5from __future__ import annotations
6
7import itertools
8from typing import Iterable, Iterator, TypeVar
9
10from ..decorators import substitute_in_graph
11
12
13__all__ = [
14    "chain",
15    "chain_from_iterable",
16    "islice",
17    "tee",
18]
19
20
21_T = TypeVar("_T")
22
23
24# Reference: https://docs.python.org/3/library/itertools.html#itertools.chain
25@substitute_in_graph(itertools.chain, is_embedded_type=True)  # type: ignore[arg-type]
26def chain(*iterables: Iterable[_T]) -> Iterator[_T]:
27    for iterable in iterables:
28        yield from iterable
29
30
31@substitute_in_graph(itertools.chain.from_iterable)  # type: ignore[arg-type]
32def chain_from_iterable(iterable: Iterable[Iterable[_T]], /) -> Iterator[_T]:
33    return itertools.chain(*iterable)
34
35
36chain.from_iterable = chain_from_iterable  # type: ignore[method-assign]
37
38
39# Reference: https://docs.python.org/3/library/itertools.html#itertools.islice
40@substitute_in_graph(itertools.islice, is_embedded_type=True)  # type: ignore[arg-type]
41def islice(iterable: Iterable[_T], /, *args: int | None) -> Iterator[_T]:
42    s = slice(*args)
43    start = 0 if s.start is None else s.start
44    stop = s.stop
45    step = 1 if s.step is None else s.step
46    if start < 0 or (stop is not None and stop < 0) or step <= 0:
47        raise ValueError(
48            "Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.",
49        )
50
51    if stop is None:
52        # TODO: use indices = itertools.count() and merge implementation with the else branch
53        #       when we support infinite iterators
54        next_i = start
55        for i, element in enumerate(iterable):
56            if i == next_i:
57                yield element
58                next_i += step
59    else:
60        indices = range(max(start, stop))
61        next_i = start
62        for i, element in zip(indices, iterable):
63            if i == next_i:
64                yield element
65                next_i += step
66
67
68# Reference: https://docs.python.org/3/library/itertools.html#itertools.tee
69@substitute_in_graph(itertools.tee)
70def tee(iterable: Iterable[_T], n: int = 2, /) -> tuple[Iterator[_T], ...]:
71    iterator = iter(iterable)
72    shared_link = [None, None]
73
74    def _tee(link) -> Iterator[_T]:  # type: ignore[no-untyped-def]
75        try:
76            while True:
77                if link[1] is None:
78                    link[0] = next(iterator)
79                    link[1] = [None, None]
80                value, link = link
81                yield value
82        except StopIteration:
83            return
84
85    return tuple(_tee(shared_link) for _ in range(n))
86