xref: /aosp_15_r20/external/tensorflow/tensorflow/core/common_runtime/propagator_state.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_PROPAGATOR_STATE_H_
16 #define TENSORFLOW_CORE_COMMON_RUNTIME_PROPAGATOR_STATE_H_
17 
18 #include <queue>
19 #include <vector>
20 
21 #include "tensorflow/core/common_runtime/entry.h"
22 #include "tensorflow/core/common_runtime/immutable_executor_state.h"
23 #include "tensorflow/core/common_runtime/pending_counts.h"
24 #include "tensorflow/core/framework/allocator.h"
25 #include "tensorflow/core/framework/control_flow.h"
26 #include "tensorflow/core/lib/gtl/flatmap.h"
27 #include "tensorflow/core/lib/gtl/inlined_vector.h"
28 #include "tensorflow/core/platform/env.h"
29 #include "tensorflow/core/platform/logging.h"
30 #include "tensorflow/core/platform/macros.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/thread_annotations.h"
33 #include "tensorflow/core/platform/types.h"
34 
35 namespace tensorflow {
36 
37 typedef gtl::InlinedVector<AllocatorAttributes, 4> AllocatorAttributeVec;
38 
39 // Represents the ephemeral "edge state" associated with one invocation of
40 // `Executor::Run()`.
41 //
42 // `PropagatorState` is responsible for propagating values along dataflow
43 // edges in a TensorFlow graph and determining which nodes are runnable. The
44 // executor primarily updates `PropagatorState` by calling `PropagateOutputs()`
45 // after processing a node, and `PropagatorState` dispatches `TaggedNode`s by
46 // adding them to a `TaggedNodeSeq`.
47 class PropagatorState {
48  public:
49   PropagatorState(const ImmutableExecutorState& immutable_state,
50                   int64_t step_id, bool vlog);
51   ~PropagatorState();
52 
53  private:
54   // Forward declaration so that `TaggedNode` can include a `FrameState*` and an
55   // `IterationState*`.
56   struct FrameState;
57   struct IterationState;
58 
59  public:
60   // A `TaggedNode` corresponds to a single invocation of a node's kernel,
61   // and it is created when the kernel becomes runnable (in a particular
62   // iteration of a particular frame).
63   struct TaggedNode {
64     const NodeItem* node_item;
65     FrameState* input_frame;
66     IterationState* input_iter;
67     bool is_dead;
68 
69     TaggedNode() = default;
TaggedNodeTaggedNode70     TaggedNode(const NodeItem* node_item, FrameState* in_frame,
71                IterationState* in_iter, bool dead)
72         : node_item(node_item),
73           input_frame(in_frame),
74           input_iter(in_iter),
75           is_dead(dead) {}
76 
get_node_itemTaggedNode77     const NodeItem& get_node_item() const { return *node_item; }
78 
get_is_deadTaggedNode79     bool get_is_dead() const { return is_dead; }
80     int64_t get_iter_num() const;
81   };
82 
83   // A drop-in replacement for std::deque<TaggedNode>.  We typically don't
84   // have that many nodes in the ready queue, so we just use a vector and
85   // don't free up memory from the queue as we consume nodes.
86   class TaggedNodeReadyQueue {
87    public:
TaggedNodeReadyQueue()88     TaggedNodeReadyQueue() : front_index_(0) {}
89 
push_back(const TaggedNode & node)90     void push_back(const TaggedNode& node) { ready_.push_back(node); }
front()91     TaggedNode front() const {
92       DCHECK_LT(front_index_, ready_.size());
93       return ready_[front_index_];
94     }
pop_front()95     void pop_front() {
96       DCHECK_LT(front_index_, ready_.size());
97       front_index_++;
98       if ((front_index_ == ready_.size()) || (front_index_ > kSpillThreshold)) {
99         if (front_index_ == ready_.size()) {
100           ready_.clear();
101         } else {
102           // Lots of unused entries at beginning of vector: move everything
103           // down to start of vector.
104           ready_.erase(ready_.begin(), ready_.begin() + front_index_);
105         }
106         front_index_ = 0;
107       }
108     }
empty()109     bool empty() const { return ready_.empty(); }
size()110     int size() const { return ready_.size() - front_index_; }
111 
112    private:
113     // TODO(b/152925936): Re-evaluate these constants with current usage
114     // patterns.
115     static constexpr int kSpillThreshold = 16384;
116     gtl::InlinedVector<TaggedNode, 16> ready_;
117     int front_index_;
118   };
119 
120   // TODO(b/152925936): Re-evaluate this constant with current usage patterns.
121   typedef gtl::InlinedVector<TaggedNode, 8> TaggedNodeSeq;
122 
123  private:
124   // The state of an iteration in a particular frame.
125   struct IterationState {
IterationStateIterationState126     explicit IterationState(int64_t iter_num,
127                             const PendingCounts* pending_counts,
128                             int total_input_tensors)
129         : iter_num(iter_num),
130           input_tensors(new Entry[total_input_tensors]),
131           outstanding_ops(0),
132           outstanding_frame_count(0),
133           counts(*pending_counts) {  // Initialize with copy of *pending_counts
134     }
135 
136     const int64_t
137         iter_num;  // The index of this iteration in the enclosing loop.
138 
139     // One copy per iteration. For iteration k, i-th node's j-th input is in
140     // input_tensors[k][immutable_state_.nodes[i].input_start + j]. An entry is
141     // either a tensor pointer (pass-by-reference) or a tensor (pass-by-value).
142     //
143     // NOTE: No need to protect input_tensors[i] by any locks because it
144     // is resized once. Each element of tensors_ is written once by the
145     // source node of an edge and is cleared by the destination of the same
146     // edge. The latter node is never run concurrently with the former node.
147     Entry* input_tensors;
148 
149     // The number of outstanding ops for each iteration.
150     std::atomic<size_t> outstanding_ops;
151 
152     // The number of outstanding frames for each iteration.
153     int outstanding_frame_count;
pendingIterationState154     int pending(PendingCounts::Handle h) { return counts.pending(h); }
decrement_pendingIterationState155     int decrement_pending(PendingCounts::Handle h, int v) {
156       return counts.decrement_pending(h, v);
157     }
158     // Mark a merge node as live
159     // REQUIRES: Node corresponding to "h" is a merge node
mark_liveIterationState160     void mark_live(PendingCounts::Handle h) { counts.mark_live(h); }
161     // Mark a node to show that processing has started.
mark_startedIterationState162     void mark_started(PendingCounts::Handle h) { counts.mark_started(h); }
163     // Mark a node to show that processing has completed.
mark_completedIterationState164     void mark_completed(PendingCounts::Handle h) { counts.mark_completed(h); }
node_stateIterationState165     PendingCounts::NodeState node_state(PendingCounts::Handle h) {
166       return counts.node_state(h);
167     }
168 
dead_countIterationState169     int dead_count(PendingCounts::Handle h) { return counts.dead_count(h); }
increment_dead_countIterationState170     void increment_dead_count(PendingCounts::Handle h) {
171       counts.increment_dead_count(h);
172     }
adjust_for_activationIterationState173     PendingCounts::AdjustResult adjust_for_activation(PendingCounts::Handle h,
174                                                       bool increment_dead) {
175       return counts.adjust_for_activation(h, increment_dead);
176     }
adjust_for_activation_atomicIterationState177     PendingCounts::AdjustResult adjust_for_activation_atomic(
178         PendingCounts::Handle h, bool increment_dead) {
179       return counts.adjust_for_activation_atomic(h, increment_dead);
180     }
181 
~IterationStateIterationState182     ~IterationState() { delete[] input_tensors; }
183 
184    private:
185     PendingCounts counts;
186   };
187 
188   struct FrameState {
FrameStateFrameState189     explicit FrameState(const ImmutableExecutorState& immutable_state,
190                         int parallel_iters)
191         : immutable_state(immutable_state),
192           max_parallel_iterations(parallel_iters),
193           num_outstanding_iterations(1),
194           iterations(parallel_iters + 1),
195           iterations_raw(iterations.data()) {}
196 
197     // A new frame is created for each loop. Execution starts at iteration 0.
198     // When a value at iteration 0 passes through a NextIteration node,
199     // iteration 1 is created and starts running. Note that iteration 0 may
200     // still be running so multiple iterations may run in parallel. The
201     // frame maintains the state of iterations in several data structures
202     // such as pending_count and input_tensors. When iteration 0 completes,
203     // we garbage collect the state of iteration 0.
204     //
205     // A frame instance is considered "done" and can be garbage collected
206     // if all its inputs have entered and all its iterations are "done".
207     //
208     // A frame manages the live iterations of an iterative computation.
209     // Iteration i is considered "done" when there are no outstanding ops,
210     // frames at iteration i are done, all recvs for this iteration are
211     // completed, and iteration i-1 is done. For iteration 0, we instead
212     // wait for there to be no more pending inputs of the frame.
213     //
214     // Frames and iterations are garbage collected once they are done.
215     // The state we need to keep around is highly dependent on the
216     // parallelism enabled by the scheduler. We may want to have the
217     // scheduler dynamically control the outstanding number of live
218     // parallel frames and iterations. To reduce the state space, the
219     // scheduler might want to schedule ops in inner frames first and
220     // lower iterations first.
221     //
222     // This frame state is mostly initialized lazily on demand so we
223     // don't introduce unnecessary overhead.
224 
225     // The immutable state of the executor the frame is in.
226     const ImmutableExecutorState& immutable_state;
227 
228     // The name of this frame, which is the concatenation of its parent
229     // frame name, the iteration of the parent frame when this frame was
230     // created, and the value of the attr 'frame_name'.
231     string frame_name;
232 
233     // The unique id for this frame. Generated by fingerprinting
234     // frame_name.
235     uint64 frame_id;
236 
237     // The iteration state of its parent frame when this frame is created.
238     // nullptr if there is no parent frame. The frame_name/parent_iter pair
239     // uniquely identifies this FrameState.
240     IterationState* parent_iter = nullptr;
241 
242     // The FrameState of its parent frame.
243     FrameState* parent_frame = nullptr;
244 
245     // The maximum allowed number of parallel iterations.
246     const int max_parallel_iterations;
247 
248     // The number of inputs this frame is still waiting.
249     int num_pending_inputs = 0;
250 
251     // The highest iteration number we have reached so far in this frame.
252     int64_t iteration_count TF_GUARDED_BY(mu) = 0;
253 
254     // The number of outstanding iterations.
255     int num_outstanding_iterations TF_GUARDED_BY(mu) = 1;
256 
257    private:
258     // The active iteration states of this frame.
259     gtl::InlinedVector<IterationState*, 12> iterations;
260     IterationState** const iterations_raw TF_GUARDED_BY(mu);
261     IterationState* iterations_first TF_GUARDED_BY(mu);
262 
263    public:
264     // The NextIteration nodes to enter a new iteration. If the number of
265     // outstanding iterations reaches the limit, we will defer the start of
266     // the next iteration until the number of outstanding iterations falls
267     // below the limit.
268     std::vector<std::pair<const NodeItem*, Entry>> next_iter_roots
269         TF_GUARDED_BY(mu);
270 
271     // The values of the loop invariants for this loop. They are added into
272     // this list as they "enter" the frame. When a loop invariant enters,
273     // we make it available to all active iterations. When the frame starts
274     // a new iteration, we make all the current loop invariants available
275     // to the new iteration.
276     std::vector<std::pair<const NodeItem*, Entry>> inv_values TF_GUARDED_BY(mu);
277 
278     // The list of dead exit node items for the current highest iteration. We
279     // will only "execute" the dead exits of the final iteration.
280     std::vector<const NodeItem*> dead_exits TF_GUARDED_BY(mu);
281 
282     // Static information specific to this frame.
283     PendingCounts* pending_counts = nullptr;
284     int total_input_tensors = 0;
285     std::vector<const NodeItem*>* nodes = nullptr;
286 
287     // Lock ordering: ExecutorState.mu_ < mu;
288     // during structured traversal: parent_frame->mu < mu.
289     mutex mu;
290 
291     void InitializeFrameInfo(const ImmutableExecutorState::FrameInfo& finfo);
292 
GetIterationFrameState293     inline IterationState* GetIteration(int64_t iter)
294         TF_SHARED_LOCKS_REQUIRED(mu) {
295       if (TF_PREDICT_TRUE(iter == 0)) {
296         return iterations_first;
297       } else {
298         size_t index = iter % (max_parallel_iterations + 1);
299         return iterations_raw[index];
300       }
301     }
302 
303     void SetIteration(int64_t iter, IterationState* state);
304 
305     // Adjust the outstanding op count by 'delta' and clean up the iterations in
306     // the frame if no more ops are oustanding. Return true iff the execution of
307     // the frame is done.
308     //
309     // Avoids acquiring the lock in the common case that the frame is not done.
310     bool AdjustOutstandingOps(IterationState* iter_state, int delta,
311                               TaggedNodeSeq* ready);
312 
313     bool AdjustOutstandingOpsLocked(IterationState* iter_state, int delta,
314                                     TaggedNodeSeq* ready)
315         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
316 
317     bool AdjustOutstandingOpsFastPath(IterationState* iter_state, int delta)
318         TF_SHARED_LOCKS_REQUIRED(mu);
319 
320     // Convenience methods for the above 'Adjust' calls where delta takes the
321     // common value of -1.
322     bool DecrementOutstandingOps(IterationState* iter_state,
323                                  TaggedNodeSeq* ready);
324 
325     bool DecrementOutstandingOpsLocked(IterationState* iter_state,
326                                        TaggedNodeSeq* ready);
327 
328     // Returns true if the computation in the frame is completed.
329     bool IsFrameDone();
330 
331     // Returns true if the iteration of the frame is completed.
332     bool IsIterationDone(IterationState* iter_state)
333         TF_SHARED_LOCKS_REQUIRED(mu);
334 
335     // Increments the iteration id. If this is a new iteration, initialize it.
336     //
337     // Returns a pointer to the new iteration.
338     IterationState* IncrementIteration(TaggedNodeSeq* ready)
339         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
340 
341     // Activate all the deferred NextIteration nodes in a new iteration.
342     void ActivateNexts(IterationState* iter_state, TaggedNodeSeq* ready)
343         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
344 
345     // Activate all the current loop invariants in a new iteration.
346     void ActivateLoopInvs(IterationState* iter_state, TaggedNodeSeq* ready)
347         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
348 
349     // Add a new loop invariant and make it available to all active
350     // iterations.
351     void AddLoopInv(const NodeItem* item, const Entry& entry,
352                     TaggedNodeSeq* ready) TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
353 
354     // Activate the successors of a node. Contents of *outputs are left in an
355     // indeterminate state after returning from this method.
356     //
357     // In the case that 'item' is a simple node (no merge/control outputs) this
358     // will acquire a shared lock and can run concurrently with other
359     // invocations.
360     //
361     // Return true if the frame is done after activation.
362     bool ActivateNodesAndAdjustOutstanding(const NodeItem* item,
363                                            const bool is_dead,
364                                            IterationState* iter_state,
365                                            EntryVector* outputs,
366                                            TaggedNodeSeq* ready);
367 
368     // Same as the above, but requires 'mu' already held in exclusive mode.
369     int ActivateNodesLocked(const NodeItem* item, const bool is_dead,
370                             IterationState* iter_state, EntryVector* outputs,
371                             TaggedNodeSeq* ready)
372         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
373 
374     // Cleanup iterations of this frame starting from the given iteration.
375     bool CleanupIterations(IterationState* iter_state, TaggedNodeSeq* ready)
376         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
377 
DumpIterationStateFrameState378     void DumpIterationState(PropagatorState* parent) {
379       mutex_lock l(mu);
380       for (IterationState* iteration : iterations) {
381         if (iteration) {
382           LOG(WARNING) << "  Iteration:";
383           parent->DumpIterationState(this, iteration);
384         }
385       }
386     }
387 
~FrameStateFrameState388     ~FrameState() {
389       for (size_t i = 0; i < iterations.size(); ++i) {
390         delete iterations[i];
391         iterations[i] = nullptr;
392       }
393     }
394 
395    private:
396     // REQUIRES: `!item->is_any_consumer_merge_or_control_trigger`.
397     // This variant does not use atomic operations to modify the pending counts
398     // and thus must hold the exclusive lock.
ActivateNodesFastPathLockedFrameState399     int ActivateNodesFastPathLocked(const NodeItem* item, const bool is_dead,
400                                     IterationState* iter_state,
401                                     EntryVector* outputs, TaggedNodeSeq* ready)
402         TF_EXCLUSIVE_LOCKS_REQUIRED(mu) {
403       return ActivateNodesFastPathInternal<false>(item, is_dead, iter_state,
404                                                   outputs, ready);
405     }
406 
407     // REQUIRES: `!item->is_any_consumer_merge_or_control_trigger`.
408     // This variant uses atomic operations to modify the pending counts.
ActivateNodesFastPathSharedFrameState409     int ActivateNodesFastPathShared(const NodeItem* item, const bool is_dead,
410                                     IterationState* iter_state,
411                                     EntryVector* outputs, TaggedNodeSeq* ready)
412         TF_SHARED_LOCKS_REQUIRED(mu) {
413       return ActivateNodesFastPathInternal<true>(item, is_dead, iter_state,
414                                                  outputs, ready);
415     }
416 
417     template <bool atomic>
418     int ActivateNodesFastPathInternal(const NodeItem* item, const bool is_dead,
419                                       IterationState* iter_state,
420                                       EntryVector* outputs,
421                                       TaggedNodeSeq* ready);
422 
423     int ActivateNodesSlowPath(const NodeItem* item, const bool is_dead,
424                               IterationState* iter_state, EntryVector* outputs,
425                               TaggedNodeSeq* ready)
426         TF_EXCLUSIVE_LOCKS_REQUIRED(mu);
427   };
428 
429  public:
430   // Creates and adds a `TaggedNode` for each node in `roots` to `*ready`.
431   void ActivateRoots(gtl::ArraySlice<const NodeItem*> roots,
432                      TaggedNodeSeq* ready);
433 
434   // After processing the outputs, propagates the outputs to their dsts.
435   // Contents of *outputs are left in an indeterminate state after
436   // returning from this method.
437   void PropagateOutputs(const TaggedNode& tagged_node, EntryVector* outputs,
438                         TaggedNodeSeq* ready);
439 
440   // Returns an array of `Entry` objects corresponding to the inputs of
441   // `tagged_node`.
442   //
443   // NOTE: Thread safety analysis is disabled on this method, because the
444   // underlying `IterationState` and its array of `input_tensors` retain the
445   // same address while the iteration is live.
GetInputTensors(const TaggedNode & tagged_node)446   Entry* GetInputTensors(const TaggedNode& tagged_node) const
447       TF_NO_THREAD_SAFETY_ANALYSIS {
448     return tagged_node.input_iter->input_tensors +
449            tagged_node.node_item->input_start;
450   }
451 
GetFrameAndIter(const TaggedNode & tagged_node)452   FrameAndIter GetFrameAndIter(const TaggedNode& tagged_node) const {
453     return {tagged_node.input_frame->frame_id,
454             tagged_node.input_iter->iter_num};
455   }
456 
457   // Provide debugging output of the state of the executor.
458   void DumpState();
459 
460   // For debugging/logging only.
MaybeMarkStarted(const TaggedNode & tagged_node)461   void MaybeMarkStarted(const TaggedNode& tagged_node) {
462     // TODO(misard) Replace with a finer-grain enabling flag once we add better
463     // optional debugging support.
464     if (TF_PREDICT_FALSE(vlog_) && VLOG_IS_ON(1)) {
465       mutex_lock l(tagged_node.input_frame->mu);
466       tagged_node.input_iter->mark_started(
467           immutable_state_.pending_ids()[tagged_node.node_item->node_id]);
468     }
469   }
470 
MaybeMarkCompleted(const TaggedNode & tagged_node)471   void MaybeMarkCompleted(const TaggedNode& tagged_node) {
472     // TODO(misard) Replace with a finer-grain enabling flag once we add better
473     // optional debugging support.
474     if (TF_PREDICT_FALSE(vlog_) && VLOG_IS_ON(1)) {
475       mutex_lock l(tagged_node.input_frame->mu);
476       tagged_node.input_iter->mark_completed(
477           immutable_state_.pending_ids()[tagged_node.node_item->node_id]);
478     }
479   }
480 
481  private:
482   // Find an existing or create a new child frame in the frame 'frame' at
483   // iteration 'iter'.
484   void FindOrCreateChildFrame(FrameState* frame, IterationState* iter_state,
485                               const NodeItem& node_item, FrameState** child);
486 
487   // Delete a frame. Called when the frame is done.
488   void DeleteFrame(FrameState* frame, TaggedNodeSeq* ready);
489 
490   // Cleanup frames and iterations starting from frame/iter. Called when
491   // a child frame is done.
492   void CleanupFramesIterations(FrameState* frame, IterationState* iter_state,
493                                TaggedNodeSeq* ready);
494 
495   // Provide debugging output about an outstanding iteration in the executor.
496   void DumpIterationState(const FrameState* frame, IterationState* iteration);
497 
498   const ImmutableExecutorState& immutable_state_;
499   const int64_t step_id_;
500   const bool vlog_;
501 
502   mutex mu_;
503 
504   // The root frame in which the execution of this step is started.
505   FrameState* root_frame_;
506 
507   // Mapping from frame ID to outstanding frames. A new frame is created
508   // at some iteration of an active frame. So the unique key for the new
509   // child frame is a hash composed of the ID of the parent frame, the iteration
510   // number at which the parent frame is creating the new frame, and the
511   // name of the new frame from nodedef.
512   absl::flat_hash_map<uint64, FrameState*> outstanding_frames_
513       TF_GUARDED_BY(mu_);
514 
515   TF_DISALLOW_COPY_AND_ASSIGN(PropagatorState);
516 };
517 
get_iter_num()518 inline int64_t PropagatorState::TaggedNode::get_iter_num() const {
519   return input_iter->iter_num;
520 }
521 
522 // `OrderedPropagatorState` replaces `PropagatorState`s `TaggedNodeReadyQueue`
523 // with a priority queue. This ensures that the order in which we dequeue
524 // `TaggedNode&`s is stable with respect to ASLR.
525 //
526 // This is not always needed, as in a multithreaded environment, executions are
527 // expected to happen nondeterministically, but this nondeteminism can be a
528 // problem: For example, In usecases that are running close to the RAM limit of
529 // a device, reordering ops can cause an increase in memory fragmenenation,
530 // causing an OOM.
531 // This codepath is enabled using TF_DETERMINISTIC_ORDER=1 in executor.cc
532 class OrderedPropagatorState : public PropagatorState {
533   using PropagatorState::PropagatorState;
534 
535  public:
536   class TaggedNodeReadyQueue : PropagatorState::TaggedNodeReadyQueue {
537    public:
TaggedNodeReadyQueue()538     TaggedNodeReadyQueue() : readyp_(compare) {}
push_back(const TaggedNode & node)539     void push_back(const TaggedNode& node) { readyp_.push(node); }
front()540     TaggedNode front() const { return readyp_.top(); }
pop_front()541     void pop_front() { readyp_.pop(); }
empty()542     bool empty() const { return readyp_.empty(); }
size()543     int size() const { return readyp_.size(); }
544 
545    private:
compare(TaggedNode const & lhs,TaggedNode const & rhs)546     static bool compare(TaggedNode const& lhs, TaggedNode const& rhs) {
547       std::tuple<int, uint64, int64_t> lhs_prio{lhs.node_item->node_id,
548                                                 lhs.input_frame->frame_id,
549                                                 lhs.input_iter->iter_num};
550       std::tuple<int, uint64, int64_t> rhs_prio{rhs.node_item->node_id,
551                                                 rhs.input_frame->frame_id,
552                                                 rhs.input_iter->iter_num};
553       return lhs_prio < rhs_prio;
554     }
555 
556     std::priority_queue<TaggedNode, std::vector<TaggedNode>, decltype(&compare)>
557         readyp_;
558   };
559 };
560 
561 }  // namespace tensorflow
562 
563 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_PROPAGATOR_STATE_H_
564