xref: /aosp_15_r20/external/tensorflow/tensorflow/python/distribute/input_ops.py (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Input-pipeline utilities for Distribution strategies."""
16
17from tensorflow.python.data.experimental.ops import distribute
18from tensorflow.python.data.ops import dataset_ops
19from tensorflow.python.data.ops.options import AutoShardPolicy
20from tensorflow.python.data.util import traverse
21from tensorflow.python.framework import op_def_registry
22from tensorflow.python.framework import ops
23
24
25# pylint: disable=protected-access
26def auto_shard_dataset(dataset, num_shards, index, num_replicas_in_sync=None):
27  """Shard the input pipeline by sharding the underlying list of files.
28
29  Args:
30    dataset: A `tf.data.Dataset` instance, typically the result of a bunch of
31      dataset transformations.
32    num_shards: A `tf.int64` scalar `tf.Tensor`, representing the number of
33        shards operating in parallel. Same usage as in `tf.data.Dataset.shard`.
34    index: A `tf.int64` scalar `tf.Tensor`, representing the worker index.
35      Same usage as in `tf.data.Dataset.shard`.
36    num_replicas_in_sync: An integer representing the total number of replicas
37      across all workers. This is used in the rewrite when sharding by data.
38
39  Returns:
40    A modified `Dataset` obtained by updating the pipeline sharded by the
41    files. The input dataset will be returned if we cannot automatically
42    determine a good way to shard the input dataset.
43  """
44  if (dataset.options().experimental_distribute.auto_shard_policy !=
45      AutoShardPolicy.OFF):
46    if num_replicas_in_sync is None:
47      num_replicas_in_sync = 1
48    if isinstance(dataset, dataset_ops.DatasetV1):
49      return distribute._AutoShardDatasetV1(dataset, num_shards, index,
50                                            num_replicas_in_sync)
51    else:
52      return distribute._AutoShardDataset(dataset, num_shards, index,
53                                          num_replicas_in_sync)
54  else:
55    return dataset
56
57
58def _clone_dataset(dataset):
59  """Returns a cloned version of `dataset`."""
60  variant_tensor_ops = traverse.obtain_all_variant_tensor_ops(dataset)
61  remap_dict = _clone_helper(dataset._variant_tensor.op, variant_tensor_ops)
62  new_variant_tensor = remap_dict[dataset._variant_tensor.op].outputs[0]
63  return dataset_ops._VariantDataset(new_variant_tensor, dataset.element_spec)
64
65
66def _get_op_def(op):
67  return op.op_def or op_def_registry.get(op.type)
68
69
70def _clone_helper(op_to_clone, variant_tensor_ops):
71  """Helper method that recursively clones `op_to_clone`.
72
73  Args:
74    op_to_clone: The op we want to clone.
75    variant_tensor_ops: A list of ops that we have to clone along the way.
76
77  Returns:
78    A dictionary mapping old_ops to new_ops created. Includes op_to_clone
79    as a key.
80  """
81  remap_dict = {}
82  for input_tensor in op_to_clone.inputs:
83    input_tensor_op = input_tensor.op
84    if input_tensor_op in variant_tensor_ops:
85      recursive_map = _clone_helper(input_tensor_op, variant_tensor_ops)
86      remap_dict.update(recursive_map)
87  inputs_list = []
88  for input_tensor in op_to_clone.inputs:
89    input_tensor_op = input_tensor.op
90    if input_tensor_op in remap_dict:
91      remapped_input = remap_dict[input_tensor_op].outputs[0]
92      inputs_list.append(remapped_input)
93    else:
94      inputs_list.append(input_tensor_op.outputs[input_tensor.value_index])
95  g = ops.get_default_graph()
96  new_op = g.create_op(
97      op_to_clone.type,
98      inputs_list, [o.dtype for o in op_to_clone.outputs],
99      name=op_to_clone.name,
100      attrs=op_to_clone.node_def.attr,
101      op_def=_get_op_def(op_to_clone))
102  remap_dict[op_to_clone] = new_op
103  return remap_dict
104