xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/graphlib.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1from types import GenericAlias
2
3__all__ = ["TopologicalSorter", "CycleError"]
4
5_NODE_OUT = -1
6_NODE_DONE = -2
7
8
9class _NodeInfo:
10    __slots__ = "node", "npredecessors", "successors"
11
12    def __init__(self, node):
13        # The node this class is augmenting.
14        self.node = node
15
16        # Number of predecessors, generally >= 0. When this value falls to 0,
17        # and is returned by get_ready(), this is set to _NODE_OUT and when the
18        # node is marked done by a call to done(), set to _NODE_DONE.
19        self.npredecessors = 0
20
21        # List of successor nodes. The list can contain duplicated elements as
22        # long as they're all reflected in the successor's npredecessors attribute.
23        self.successors = []
24
25
26class CycleError(ValueError):
27    """Subclass of ValueError raised by TopologicalSorter.prepare if cycles
28    exist in the working graph.
29
30    If multiple cycles exist, only one undefined choice among them will be reported
31    and included in the exception. The detected cycle can be accessed via the second
32    element in the *args* attribute of the exception instance and consists in a list
33    of nodes, such that each node is, in the graph, an immediate predecessor of the
34    next node in the list. In the reported list, the first and the last node will be
35    the same, to make it clear that it is cyclic.
36    """
37
38    pass
39
40
41class TopologicalSorter:
42    """Provides functionality to topologically sort a graph of hashable nodes"""
43
44    def __init__(self, graph=None):
45        self._node2info = {}
46        self._ready_nodes = None
47        self._npassedout = 0
48        self._nfinished = 0
49
50        if graph is not None:
51            for node, predecessors in graph.items():
52                self.add(node, *predecessors)
53
54    def _get_nodeinfo(self, node):
55        if (result := self._node2info.get(node)) is None:
56            self._node2info[node] = result = _NodeInfo(node)
57        return result
58
59    def add(self, node, *predecessors):
60        """Add a new node and its predecessors to the graph.
61
62        Both the *node* and all elements in *predecessors* must be hashable.
63
64        If called multiple times with the same node argument, the set of dependencies
65        will be the union of all dependencies passed in.
66
67        It is possible to add a node with no dependencies (*predecessors* is not provided)
68        as well as provide a dependency twice. If a node that has not been provided before
69        is included among *predecessors* it will be automatically added to the graph with
70        no predecessors of its own.
71
72        Raises ValueError if called after "prepare".
73        """
74        if self._ready_nodes is not None:
75            raise ValueError("Nodes cannot be added after a call to prepare()")
76
77        # Create the node -> predecessor edges
78        nodeinfo = self._get_nodeinfo(node)
79        nodeinfo.npredecessors += len(predecessors)
80
81        # Create the predecessor -> node edges
82        for pred in predecessors:
83            pred_info = self._get_nodeinfo(pred)
84            pred_info.successors.append(node)
85
86    def prepare(self):
87        """Mark the graph as finished and check for cycles in the graph.
88
89        If any cycle is detected, "CycleError" will be raised, but "get_ready" can
90        still be used to obtain as many nodes as possible until cycles block more
91        progress. After a call to this function, the graph cannot be modified and
92        therefore no more nodes can be added using "add".
93        """
94        if self._ready_nodes is not None:
95            raise ValueError("cannot prepare() more than once")
96
97        self._ready_nodes = [
98            i.node for i in self._node2info.values() if i.npredecessors == 0
99        ]
100        # ready_nodes is set before we look for cycles on purpose:
101        # if the user wants to catch the CycleError, that's fine,
102        # they can continue using the instance to grab as many
103        # nodes as possible before cycles block more progress
104        cycle = self._find_cycle()
105        if cycle:
106            raise CycleError(f"nodes are in a cycle", cycle)
107
108    def get_ready(self):
109        """Return a tuple of all the nodes that are ready.
110
111        Initially it returns all nodes with no predecessors; once those are marked
112        as processed by calling "done", further calls will return all new nodes that
113        have all their predecessors already processed. Once no more progress can be made,
114        empty tuples are returned.
115
116        Raises ValueError if called without calling "prepare" previously.
117        """
118        if self._ready_nodes is None:
119            raise ValueError("prepare() must be called first")
120
121        # Get the nodes that are ready and mark them
122        result = tuple(self._ready_nodes)
123        n2i = self._node2info
124        for node in result:
125            n2i[node].npredecessors = _NODE_OUT
126
127        # Clean the list of nodes that are ready and update
128        # the counter of nodes that we have returned.
129        self._ready_nodes.clear()
130        self._npassedout += len(result)
131
132        return result
133
134    def is_active(self):
135        """Return ``True`` if more progress can be made and ``False`` otherwise.
136
137        Progress can be made if cycles do not block the resolution and either there
138        are still nodes ready that haven't yet been returned by "get_ready" or the
139        number of nodes marked "done" is less than the number that have been returned
140        by "get_ready".
141
142        Raises ValueError if called without calling "prepare" previously.
143        """
144        if self._ready_nodes is None:
145            raise ValueError("prepare() must be called first")
146        return self._nfinished < self._npassedout or bool(self._ready_nodes)
147
148    def __bool__(self):
149        return self.is_active()
150
151    def done(self, *nodes):
152        """Marks a set of nodes returned by "get_ready" as processed.
153
154        This method unblocks any successor of each node in *nodes* for being returned
155        in the future by a call to "get_ready".
156
157        Raises :exec:`ValueError` if any node in *nodes* has already been marked as
158        processed by a previous call to this method, if a node was not added to the
159        graph by using "add" or if called without calling "prepare" previously or if
160        node has not yet been returned by "get_ready".
161        """
162
163        if self._ready_nodes is None:
164            raise ValueError("prepare() must be called first")
165
166        n2i = self._node2info
167
168        for node in nodes:
169
170            # Check if we know about this node (it was added previously using add()
171            if (nodeinfo := n2i.get(node)) is None:
172                raise ValueError(f"node {node!r} was not added using add()")
173
174            # If the node has not being returned (marked as ready) previously, inform the user.
175            stat = nodeinfo.npredecessors
176            if stat != _NODE_OUT:
177                if stat >= 0:
178                    raise ValueError(
179                        f"node {node!r} was not passed out (still not ready)"
180                    )
181                elif stat == _NODE_DONE:
182                    raise ValueError(f"node {node!r} was already marked done")
183                else:
184                    assert False, f"node {node!r}: unknown status {stat}"
185
186            # Mark the node as processed
187            nodeinfo.npredecessors = _NODE_DONE
188
189            # Go to all the successors and reduce the number of predecessors, collecting all the ones
190            # that are ready to be returned in the next get_ready() call.
191            for successor in nodeinfo.successors:
192                successor_info = n2i[successor]
193                successor_info.npredecessors -= 1
194                if successor_info.npredecessors == 0:
195                    self._ready_nodes.append(successor)
196            self._nfinished += 1
197
198    def _find_cycle(self):
199        n2i = self._node2info
200        stack = []
201        itstack = []
202        seen = set()
203        node2stacki = {}
204
205        for node in n2i:
206            if node in seen:
207                continue
208
209            while True:
210                if node in seen:
211                    # If we have seen already the node and is in the
212                    # current stack we have found a cycle.
213                    if node in node2stacki:
214                        return stack[node2stacki[node] :] + [node]
215                    # else go on to get next successor
216                else:
217                    seen.add(node)
218                    itstack.append(iter(n2i[node].successors).__next__)
219                    node2stacki[node] = len(stack)
220                    stack.append(node)
221
222                # Backtrack to the topmost stack entry with
223                # at least another successor.
224                while stack:
225                    try:
226                        node = itstack[-1]()
227                        break
228                    except StopIteration:
229                        del node2stacki[stack.pop()]
230                        itstack.pop()
231                else:
232                    break
233        return None
234
235    def static_order(self):
236        """Returns an iterable of nodes in a topological order.
237
238        The particular order that is returned may depend on the specific
239        order in which the items were inserted in the graph.
240
241        Using this method does not require to call "prepare" or "done". If any
242        cycle is detected, :exc:`CycleError` will be raised.
243        """
244        self.prepare()
245        while self.is_active():
246            node_group = self.get_ready()
247            yield from node_group
248            self.done(*node_group)
249
250    __class_getitem__ = classmethod(GenericAlias)
251