1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Input-pipeline utilities for Distribution strategies.""" 16 17from tensorflow.python.data.experimental.ops import distribute 18from tensorflow.python.data.ops import dataset_ops 19from tensorflow.python.data.ops.options import AutoShardPolicy 20from tensorflow.python.data.util import traverse 21from tensorflow.python.framework import op_def_registry 22from tensorflow.python.framework import ops 23 24 25# pylint: disable=protected-access 26def auto_shard_dataset(dataset, num_shards, index, num_replicas_in_sync=None): 27 """Shard the input pipeline by sharding the underlying list of files. 28 29 Args: 30 dataset: A `tf.data.Dataset` instance, typically the result of a bunch of 31 dataset transformations. 32 num_shards: A `tf.int64` scalar `tf.Tensor`, representing the number of 33 shards operating in parallel. Same usage as in `tf.data.Dataset.shard`. 34 index: A `tf.int64` scalar `tf.Tensor`, representing the worker index. 35 Same usage as in `tf.data.Dataset.shard`. 36 num_replicas_in_sync: An integer representing the total number of replicas 37 across all workers. This is used in the rewrite when sharding by data. 38 39 Returns: 40 A modified `Dataset` obtained by updating the pipeline sharded by the 41 files. The input dataset will be returned if we cannot automatically 42 determine a good way to shard the input dataset. 43 """ 44 if (dataset.options().experimental_distribute.auto_shard_policy != 45 AutoShardPolicy.OFF): 46 if num_replicas_in_sync is None: 47 num_replicas_in_sync = 1 48 if isinstance(dataset, dataset_ops.DatasetV1): 49 return distribute._AutoShardDatasetV1(dataset, num_shards, index, 50 num_replicas_in_sync) 51 else: 52 return distribute._AutoShardDataset(dataset, num_shards, index, 53 num_replicas_in_sync) 54 else: 55 return dataset 56 57 58def _clone_dataset(dataset): 59 """Returns a cloned version of `dataset`.""" 60 variant_tensor_ops = traverse.obtain_all_variant_tensor_ops(dataset) 61 remap_dict = _clone_helper(dataset._variant_tensor.op, variant_tensor_ops) 62 new_variant_tensor = remap_dict[dataset._variant_tensor.op].outputs[0] 63 return dataset_ops._VariantDataset(new_variant_tensor, dataset.element_spec) 64 65 66def _get_op_def(op): 67 return op.op_def or op_def_registry.get(op.type) 68 69 70def _clone_helper(op_to_clone, variant_tensor_ops): 71 """Helper method that recursively clones `op_to_clone`. 72 73 Args: 74 op_to_clone: The op we want to clone. 75 variant_tensor_ops: A list of ops that we have to clone along the way. 76 77 Returns: 78 A dictionary mapping old_ops to new_ops created. Includes op_to_clone 79 as a key. 80 """ 81 remap_dict = {} 82 for input_tensor in op_to_clone.inputs: 83 input_tensor_op = input_tensor.op 84 if input_tensor_op in variant_tensor_ops: 85 recursive_map = _clone_helper(input_tensor_op, variant_tensor_ops) 86 remap_dict.update(recursive_map) 87 inputs_list = [] 88 for input_tensor in op_to_clone.inputs: 89 input_tensor_op = input_tensor.op 90 if input_tensor_op in remap_dict: 91 remapped_input = remap_dict[input_tensor_op].outputs[0] 92 inputs_list.append(remapped_input) 93 else: 94 inputs_list.append(input_tensor_op.outputs[input_tensor.value_index]) 95 g = ops.get_default_graph() 96 new_op = g.create_op( 97 op_to_clone.type, 98 inputs_list, [o.dtype for o in op_to_clone.outputs], 99 name=op_to_clone.name, 100 attrs=op_to_clone.node_def.attr, 101 op_def=_get_op_def(op_to_clone)) 102 remap_dict[op_to_clone] = new_op 103 return remap_dict 104