1# Copyright 2021 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15# pylint: disable=g-classes-have-attributes 16"""Input dataset creator for `model.fit`.""" 17 18from tensorflow.python.data.ops import dataset_ops 19from tensorflow.python.distribute import distribute_lib 20from tensorflow.python.util.tf_export import keras_export 21 22 23@keras_export('keras.utils.experimental.DatasetCreator', v1=[]) 24class DatasetCreator(object): 25 """Object that returns a `tf.data.Dataset` upon invoking. 26 27 `tf.keras.utils.experimental.DatasetCreator` is designated as a supported type 28 for `x`, or the input, in `tf.keras.Model.fit`. Pass an instance of this class 29 to `fit` when using a callable (with a `input_context` argument) that returns 30 a `tf.data.Dataset`. 31 32 ```python 33 model = tf.keras.Sequential([tf.keras.layers.Dense(10)]) 34 model.compile(tf.keras.optimizers.SGD(), loss="mse") 35 36 def dataset_fn(input_context): 37 global_batch_size = 64 38 batch_size = input_context.get_per_replica_batch_size(global_batch_size) 39 dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat() 40 dataset = dataset.shard( 41 input_context.num_input_pipelines, input_context.input_pipeline_id) 42 dataset = dataset.batch(batch_size) 43 dataset = dataset.prefetch(2) 44 return dataset 45 46 input_options = tf.distribute.InputOptions( 47 experimental_fetch_to_device=True, 48 experimental_per_replica_buffer_size=2) 49 model.fit(tf.keras.utils.experimental.DatasetCreator( 50 dataset_fn, input_options=input_options), epochs=10, steps_per_epoch=10) 51 ``` 52 53 `Model.fit` usage with `DatasetCreator` is intended to work across all 54 `tf.distribute.Strategy`s, as long as `Strategy.scope` is used at model 55 creation: 56 57 ```python 58 strategy = tf.distribute.experimental.ParameterServerStrategy( 59 cluster_resolver) 60 with strategy.scope(): 61 model = tf.keras.Sequential([tf.keras.layers.Dense(10)]) 62 model.compile(tf.keras.optimizers.SGD(), loss="mse") 63 ... 64 ``` 65 66 Note: When using `DatasetCreator`, `steps_per_epoch` argument in `Model.fit` 67 must be provided as the cardinality of such input cannot be inferred. 68 69 Args: 70 dataset_fn: A callable that takes a single argument of type 71 `tf.distribute.InputContext`, which is used for batch size calculation and 72 cross-worker input pipeline sharding (if neither is needed, the 73 `InputContext` parameter can be ignored in the `dataset_fn`), and returns 74 a `tf.data.Dataset`. 75 input_options: Optional `tf.distribute.InputOptions`, used for specific 76 options when used with distribution, for example, whether to prefetch 77 dataset elements to accelerator device memory or host device memory, and 78 prefetch buffer size in the replica device memory. No effect if not used 79 with distributed training. See `tf.distribute.InputOptions` for more 80 information. 81 """ 82 83 def __init__(self, dataset_fn, input_options=None): 84 if not callable(dataset_fn): 85 raise TypeError('`dataset_fn` for `DatasetCreator` must be a `callable`.') 86 if input_options and (not isinstance(input_options, 87 distribute_lib.InputOptions)): 88 raise TypeError('`input_options` for `DatasetCreator` must be a ' 89 '`tf.distribute.InputOptions`.') 90 91 self.dataset_fn = dataset_fn 92 self.input_options = input_options 93 94 def __call__(self, *args, **kwargs): 95 # When a `DatasetCreator` is invoked, it forwards args/kwargs straight to 96 # the callable. 97 dataset = self.dataset_fn(*args, **kwargs) 98 if not isinstance(dataset, dataset_ops.DatasetV2): 99 raise TypeError('The `callable` provided to `DatasetCreator` must return ' 100 'a Dataset.') 101 return dataset 102