1""" 2Python polyfills for itertools 3""" 4 5from __future__ import annotations 6 7import itertools 8from typing import Iterable, Iterator, TypeVar 9 10from ..decorators import substitute_in_graph 11 12 13__all__ = [ 14 "chain", 15 "chain_from_iterable", 16 "islice", 17 "tee", 18] 19 20 21_T = TypeVar("_T") 22 23 24# Reference: https://docs.python.org/3/library/itertools.html#itertools.chain 25@substitute_in_graph(itertools.chain, is_embedded_type=True) # type: ignore[arg-type] 26def chain(*iterables: Iterable[_T]) -> Iterator[_T]: 27 for iterable in iterables: 28 yield from iterable 29 30 31@substitute_in_graph(itertools.chain.from_iterable) # type: ignore[arg-type] 32def chain_from_iterable(iterable: Iterable[Iterable[_T]], /) -> Iterator[_T]: 33 return itertools.chain(*iterable) 34 35 36chain.from_iterable = chain_from_iterable # type: ignore[method-assign] 37 38 39# Reference: https://docs.python.org/3/library/itertools.html#itertools.islice 40@substitute_in_graph(itertools.islice, is_embedded_type=True) # type: ignore[arg-type] 41def islice(iterable: Iterable[_T], /, *args: int | None) -> Iterator[_T]: 42 s = slice(*args) 43 start = 0 if s.start is None else s.start 44 stop = s.stop 45 step = 1 if s.step is None else s.step 46 if start < 0 or (stop is not None and stop < 0) or step <= 0: 47 raise ValueError( 48 "Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.", 49 ) 50 51 if stop is None: 52 # TODO: use indices = itertools.count() and merge implementation with the else branch 53 # when we support infinite iterators 54 next_i = start 55 for i, element in enumerate(iterable): 56 if i == next_i: 57 yield element 58 next_i += step 59 else: 60 indices = range(max(start, stop)) 61 next_i = start 62 for i, element in zip(indices, iterable): 63 if i == next_i: 64 yield element 65 next_i += step 66 67 68# Reference: https://docs.python.org/3/library/itertools.html#itertools.tee 69@substitute_in_graph(itertools.tee) 70def tee(iterable: Iterable[_T], n: int = 2, /) -> tuple[Iterator[_T], ...]: 71 iterator = iter(iterable) 72 shared_link = [None, None] 73 74 def _tee(link) -> Iterator[_T]: # type: ignore[no-untyped-def] 75 try: 76 while True: 77 if link[1] is None: 78 link[0] = next(iterator) 79 link[1] = [None, None] 80 value, link = link 81 yield value 82 except StopIteration: 83 return 84 85 return tuple(_tee(shared_link) for _ in range(n)) 86