1from types import GenericAlias 2 3__all__ = ["TopologicalSorter", "CycleError"] 4 5_NODE_OUT = -1 6_NODE_DONE = -2 7 8 9class _NodeInfo: 10 __slots__ = "node", "npredecessors", "successors" 11 12 def __init__(self, node): 13 # The node this class is augmenting. 14 self.node = node 15 16 # Number of predecessors, generally >= 0. When this value falls to 0, 17 # and is returned by get_ready(), this is set to _NODE_OUT and when the 18 # node is marked done by a call to done(), set to _NODE_DONE. 19 self.npredecessors = 0 20 21 # List of successor nodes. The list can contain duplicated elements as 22 # long as they're all reflected in the successor's npredecessors attribute. 23 self.successors = [] 24 25 26class CycleError(ValueError): 27 """Subclass of ValueError raised by TopologicalSorter.prepare if cycles 28 exist in the working graph. 29 30 If multiple cycles exist, only one undefined choice among them will be reported 31 and included in the exception. The detected cycle can be accessed via the second 32 element in the *args* attribute of the exception instance and consists in a list 33 of nodes, such that each node is, in the graph, an immediate predecessor of the 34 next node in the list. In the reported list, the first and the last node will be 35 the same, to make it clear that it is cyclic. 36 """ 37 38 pass 39 40 41class TopologicalSorter: 42 """Provides functionality to topologically sort a graph of hashable nodes""" 43 44 def __init__(self, graph=None): 45 self._node2info = {} 46 self._ready_nodes = None 47 self._npassedout = 0 48 self._nfinished = 0 49 50 if graph is not None: 51 for node, predecessors in graph.items(): 52 self.add(node, *predecessors) 53 54 def _get_nodeinfo(self, node): 55 if (result := self._node2info.get(node)) is None: 56 self._node2info[node] = result = _NodeInfo(node) 57 return result 58 59 def add(self, node, *predecessors): 60 """Add a new node and its predecessors to the graph. 61 62 Both the *node* and all elements in *predecessors* must be hashable. 63 64 If called multiple times with the same node argument, the set of dependencies 65 will be the union of all dependencies passed in. 66 67 It is possible to add a node with no dependencies (*predecessors* is not provided) 68 as well as provide a dependency twice. If a node that has not been provided before 69 is included among *predecessors* it will be automatically added to the graph with 70 no predecessors of its own. 71 72 Raises ValueError if called after "prepare". 73 """ 74 if self._ready_nodes is not None: 75 raise ValueError("Nodes cannot be added after a call to prepare()") 76 77 # Create the node -> predecessor edges 78 nodeinfo = self._get_nodeinfo(node) 79 nodeinfo.npredecessors += len(predecessors) 80 81 # Create the predecessor -> node edges 82 for pred in predecessors: 83 pred_info = self._get_nodeinfo(pred) 84 pred_info.successors.append(node) 85 86 def prepare(self): 87 """Mark the graph as finished and check for cycles in the graph. 88 89 If any cycle is detected, "CycleError" will be raised, but "get_ready" can 90 still be used to obtain as many nodes as possible until cycles block more 91 progress. After a call to this function, the graph cannot be modified and 92 therefore no more nodes can be added using "add". 93 """ 94 if self._ready_nodes is not None: 95 raise ValueError("cannot prepare() more than once") 96 97 self._ready_nodes = [ 98 i.node for i in self._node2info.values() if i.npredecessors == 0 99 ] 100 # ready_nodes is set before we look for cycles on purpose: 101 # if the user wants to catch the CycleError, that's fine, 102 # they can continue using the instance to grab as many 103 # nodes as possible before cycles block more progress 104 cycle = self._find_cycle() 105 if cycle: 106 raise CycleError(f"nodes are in a cycle", cycle) 107 108 def get_ready(self): 109 """Return a tuple of all the nodes that are ready. 110 111 Initially it returns all nodes with no predecessors; once those are marked 112 as processed by calling "done", further calls will return all new nodes that 113 have all their predecessors already processed. Once no more progress can be made, 114 empty tuples are returned. 115 116 Raises ValueError if called without calling "prepare" previously. 117 """ 118 if self._ready_nodes is None: 119 raise ValueError("prepare() must be called first") 120 121 # Get the nodes that are ready and mark them 122 result = tuple(self._ready_nodes) 123 n2i = self._node2info 124 for node in result: 125 n2i[node].npredecessors = _NODE_OUT 126 127 # Clean the list of nodes that are ready and update 128 # the counter of nodes that we have returned. 129 self._ready_nodes.clear() 130 self._npassedout += len(result) 131 132 return result 133 134 def is_active(self): 135 """Return ``True`` if more progress can be made and ``False`` otherwise. 136 137 Progress can be made if cycles do not block the resolution and either there 138 are still nodes ready that haven't yet been returned by "get_ready" or the 139 number of nodes marked "done" is less than the number that have been returned 140 by "get_ready". 141 142 Raises ValueError if called without calling "prepare" previously. 143 """ 144 if self._ready_nodes is None: 145 raise ValueError("prepare() must be called first") 146 return self._nfinished < self._npassedout or bool(self._ready_nodes) 147 148 def __bool__(self): 149 return self.is_active() 150 151 def done(self, *nodes): 152 """Marks a set of nodes returned by "get_ready" as processed. 153 154 This method unblocks any successor of each node in *nodes* for being returned 155 in the future by a call to "get_ready". 156 157 Raises :exec:`ValueError` if any node in *nodes* has already been marked as 158 processed by a previous call to this method, if a node was not added to the 159 graph by using "add" or if called without calling "prepare" previously or if 160 node has not yet been returned by "get_ready". 161 """ 162 163 if self._ready_nodes is None: 164 raise ValueError("prepare() must be called first") 165 166 n2i = self._node2info 167 168 for node in nodes: 169 170 # Check if we know about this node (it was added previously using add() 171 if (nodeinfo := n2i.get(node)) is None: 172 raise ValueError(f"node {node!r} was not added using add()") 173 174 # If the node has not being returned (marked as ready) previously, inform the user. 175 stat = nodeinfo.npredecessors 176 if stat != _NODE_OUT: 177 if stat >= 0: 178 raise ValueError( 179 f"node {node!r} was not passed out (still not ready)" 180 ) 181 elif stat == _NODE_DONE: 182 raise ValueError(f"node {node!r} was already marked done") 183 else: 184 assert False, f"node {node!r}: unknown status {stat}" 185 186 # Mark the node as processed 187 nodeinfo.npredecessors = _NODE_DONE 188 189 # Go to all the successors and reduce the number of predecessors, collecting all the ones 190 # that are ready to be returned in the next get_ready() call. 191 for successor in nodeinfo.successors: 192 successor_info = n2i[successor] 193 successor_info.npredecessors -= 1 194 if successor_info.npredecessors == 0: 195 self._ready_nodes.append(successor) 196 self._nfinished += 1 197 198 def _find_cycle(self): 199 n2i = self._node2info 200 stack = [] 201 itstack = [] 202 seen = set() 203 node2stacki = {} 204 205 for node in n2i: 206 if node in seen: 207 continue 208 209 while True: 210 if node in seen: 211 # If we have seen already the node and is in the 212 # current stack we have found a cycle. 213 if node in node2stacki: 214 return stack[node2stacki[node] :] + [node] 215 # else go on to get next successor 216 else: 217 seen.add(node) 218 itstack.append(iter(n2i[node].successors).__next__) 219 node2stacki[node] = len(stack) 220 stack.append(node) 221 222 # Backtrack to the topmost stack entry with 223 # at least another successor. 224 while stack: 225 try: 226 node = itstack[-1]() 227 break 228 except StopIteration: 229 del node2stacki[stack.pop()] 230 itstack.pop() 231 else: 232 break 233 return None 234 235 def static_order(self): 236 """Returns an iterable of nodes in a topological order. 237 238 The particular order that is returned may depend on the specific 239 order in which the items were inserted in the graph. 240 241 Using this method does not require to call "prepare" or "done". If any 242 cycle is detected, :exc:`CycleError` will be raised. 243 """ 244 self.prepare() 245 while self.is_active(): 246 node_group = self.get_ready() 247 yield from node_group 248 self.done(*node_group) 249 250 __class_getitem__ = classmethod(GenericAlias) 251