The TensorFlow Dataset API provides various facilities for creating scalable input pipelines for TensorFlow models, including:
Reading data from a variety of formats including CSV files and TFRecords files (the standard binary format for TensorFlow training data).
Transforming datasets in a variety of ways including mapping arbitrary functions against them.
Shuffling, batching, and repeating datasets over a number of epochs.
Streaming interface to data for reading arbitrarily large datasets.
Reading and transforming data are TensorFlow graph operations, so are executed in C++ and in parallel with model training.
The R interface to TensorFlow datasets provides access to the Dataset API, including high-level convenience functions for easy integration with the keras R package.
To use tfdatasets you need to install both the R package as well as TensorFlow itself.
First, install the tfdatasets R package from GitHub as follows:
::install_github("rstudio/tfdatasets") devtools
Then, use the install_tensorflow()
function to install
TensorFlow:
library(tfdtasets)
install_tensorflow()
To create a dataset, use one of the dataset creation functions. Dataset can be created from delimted text files, TFRecords files, as well as from in-memory data.
For example, to create a dataset from a text file, first create a
specification for how records will be decoded from the file, then call
text_line_dataset()
with the file to be read and the
specification:
library(tfdatasets)
# create specification for parsing records from an example file
<- csv_record_spec("iris.csv")
iris_spec
# read the datset
<- text_line_dataset("iris.csv", record_spec = iris_spec)
dataset
# take a glimpse at the dataset
str(dataset)
<MapDataset shapes: {Sepal.Length: (), Sepal.Width: (), Petal.Length: (), Petal.Width: (),
Species: ()}, types: {Sepal.Length: tf.float32, Sepal.Width: tf.float32, Petal.Length:
tf.float32, Petal.Width: tf.float32, Species: tf.int32}>
In the example above, the csv_record_spec()
function is
passed an example file which is used to automatically detect column
names and types (done by reading up to the first 1,000 lines of the
file). You can also provide explicit column names and/or data types
using the names
and types
parameters (note
that in this case we don’t pass an example file):
# provide colum names and types explicitly
<- csv_record_spec(
iris_spec names = c("SepalLength", "SepalWidth", "PetalLength", "PetalWidth", "Species"),
types = c("double", "double", "double", "double", "integer"),
skip = 1
)
# read the datset
<- text_line_dataset("iris.csv", record_spec = iris_spec) dataset
Note that we’ve also specified skip = 1
to indicate that
the first row of the CSV that contains column names should be
skipped.
Supported column types are integer, double, and character. You can
also provide types
in a more compact form using
single-letter abbreviations (e.g. types = "dddi"
). For
example:
<- csv_record_spec("mtcars.csv", types = "dididddiiii") mtcars_spec
Decoding lines of text into a record can be computationally
expensive. You can parallelize these computations using the
parallel_records
parameter. For example:
<- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4) dataset
You can also parallelize the reading of data from storage by
requesting that a buffer of records be prefected. You do this with the
dataset_prefetch()
function. For example:
<- text_line_dataset("iris.csv", record_spec = iris_spec, parallel_records = 4) %>%
dataset dataset_batch(128) %>%
dataset_prefetch(1)
This code will result in the prefetching of a single batch of data on a background thread (i.e. in parallel with training operations).
If you have multiple input files, you can also parallelize reading of these files both across multiple machines (sharding) and/or on multiple threads per-machine (parallel reads with interleaving). See the section on Reading Multiple Files below for additional details.
You can read datasets from TFRecords
files using the tfrecord_dataset()
function.
In many cases you’ll want to map the records in the dataset into a
set of named columns. You can do this using the
dataset_map()
function along with the
tf$parse_single_example()
function. for example:
# Creates a dataset that reads all of the examples from two files, and extracts
# the image and label features.
<- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
filenames <- tfrecord_dataset(filenames) %>%
dataset dataset_map(function(example_proto) {
<- list(
features image = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int32)
)$parse_single_example(example_proto, features)
tf })
You can parallelize reading of TFRecord files using the
num_parallel_reads
option, for example:
<- c("/var/data/file1.tfrecord", "/var/data/file2.tfrecord")
filenames <- tfrecord_dataset(filenames, num_parallel_reads = 4) dataset
You can read datasets from SQLite databases using the
sqlite_dataset()
function. To use
sqlite_dataset()
you provide the filename of the database,
a SQL query to execute, and sql_record_spec()
that
describes the names and TensorFlow types of columns within the query.
For example:
library(tfdatasets)
<- sql_record_spec(
record_spec names = c("disp", "drat", "vs", "gear", "mpg", "qsec", "hp", "am", "wt", "carb", "cyl"),
types = c(tf$float64, tf$int32, tf$float64, tf$int32, tf$float64, tf$float64,
$float64, tf$int32, tf$int32, tf$int32, tf$int32)
tf
)
<- sqlite_dataset(
dataset "data/mtcars.sqlite3",
"select * from mtcars",
record_spec
)
dataset
<MapDataset shapes: {disp: (), drat: (), vs: (), gear: (), mpg: (), qsec: (), hp: (), am: (),
wt: (), carb: (), cyl: ()}, types: {disp: tf.float64, drat: tf.int32, vs: tf.float64, gear:
tf.int32, mpg: tf.float64, qsec: tf.float64, hp: tf.float64, am: tf.int32, wt: tf.int32, carb:
tf.int32, cyl: tf.int32}>
Note that for floating point data you must use
tf$float64
(reading tf$float32
is not
supported for SQLite databases).
You can map arbitrary transformation functions onto dataset records
using the dataset_map()
function. For example, to transform
the “Species” column into a one-hot encoded vector you would do
this:
<- dataset %>%
dataset dataset_map(function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record })
Note that while dataset_map()
is defined using an R
function, there are some special constraints on this function which
allow it to execute not within R but rather within the
TensorFlow graph.
For a dataset created with the csv_dataset()
function,
the passed record will be named list of tensors (one for each column of
the dataset). The return value should be another set of tensors which
were created from TensorFlow functions (e.g. tf$one_hot
as
illustrated above). This function will be converted to a TensorFlow
graph operation that performs the transformation within native code.
If these transformations are computationally expensive they can be
executed on multiple threads using the num_parallel_calls
parameter. For example:
<- dataset %>%
dataset dataset_map(num_parallel_calls = 4, function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record })
You can control the maximum number of processed elements that will be
buffered when processing in parallel using the
dataset_prefetch()
transformation. For example:
<- dataset %>%
dataset dataset_map(num_parallel_calls = 4, function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record%>%
}) datset_prefetch(1)
If you are batching your data for training, you can optimize
performance using the dataset_map_and_batch()
function
(which fuses together the map and batch operations). For example:
<- dataset %>%
dataset dataset_map_and_batch(batch_size = 128, function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record%>%
}) datset_prefetch(1)
You can filter the elements of a dataset using the
dataset_filter()
function, which takes a
predicate
function that returns a boolean tensor for
records that should be included. For example:
<- csv_dataset("mtcars.csv") %>%
dataset dataset_filter(function(record) {
$mpg >= 20
record
})
<- csv_dataset("mtcars.csv") %>%
dataset dataset_filter(function(record) {
$mpg >= 20 & record$cyl >= 6L
record })
Note that the functions used inside the predicate must be tensor
operations (e.g. tf$not_equal
, tf$less
, etc.).
R generic methods for relational operators (e.g. <, >, <=,
etc.) and logical operators (e.g. !, &, |, etc.) are provided so you
can use shorthand syntax for most common comparisons (as illustrated
above).
A common transformation is taking a column oriented dataset (e.g. one
created by csv_dataset()
or
tfrecord_dataset()
) and transforming it into a two-element
list with features (“x”) and response (“y”). You can use the
dataset_prepare()
function to do this type of
transformation. For example:
<- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
mtcars_dataset dataset_prepare(x = c(mpg, disp), y = cyl)
<- text_line_dataset("iris.csv", record_spec = iris_spec) %>%
iris_dataset dataset_prepare(x = -Species, y = Species)
The dataset_prepare()
function also accepts standard R
formula syntax for defining features and response:
<- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
mtcars_dataset dataset_prepare(cyl ~ mpg + disp)
If you are batching your data for training you add a
batch_size
parameter to fuse together the
dataset_prepare()
and dataset_batch()
steps
(which generally results in faster training). For example:
<- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
mtcars_dataset dataset_prepare(cyl ~ mpg + disp, batch_size = 16)
There are several functions which control how batches are drawn from the dataset. For example, the following specifies that data will be drawn in batches of 128 from a shuffled window of 1000 records, and that the dataset will be repeated for 10 epochs:
<- dataset %>%
dataset dataset_shuffle(1000) %>%
dataset_repeat(10) %>%
dataset_batch(128) %>%
Note that you can optimize performance by fusing the shuffle and
repeat operations into a single step using the
dataset_shuffle_and_repeat()
function. For example:
<- dataset %>%
dataset dataset_shuffle_and_repeat(buffer_size = 1000, count = 10) %>%
dataset_batch(128)
Earlier we alluded to the dataset_prefetch()
function,
which enables you to ensure that a given number of records (or batches
of records) are prefetched in parallel so they are ready to go when the
next batch is processed. For example:
<- dataset %>%
dataset dataset_map_and_batch(batch_size = 128, function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record%>%
}) dataset_prefetch(1)
If you are using a GPU for training, you can also use the
dataset_prefetch_to_device()
function to specify that the
parallel prefetch operation stage the data directly into GPU memory. For
example:
<- dataset %>%
dataset dataset_map_and_batch(batch_size = 128, function(record) {
$Species <- tf$one_hot(record$Species, 3L)
record
record%>%
}) dataset_prefetch_to_device("/gpu:0")
In this case the buffer size for prefetches is determined
automatically (you can manually speicfy it using the
buffer_size
parameter).
Here’s a complete example of using the various dataset transformation
functions together. We’ll read the mtcars
dataset from a
CSV, filter it on some threshold values, map it into x
and
y
components for modeling, and specify desired shuffling
and batch iteration behavior:
<- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset dataset_filter(function(record) {
$mpg >= 20 & record$cyl >= 6L
record%>%
}) dataset_shuffle_and_repeat(buffer_size = 1000, count = 10) %>%
dataset_prepare(cyl ~ mpg + disp, batch_size = 128) %>%
dataset_prefetch(1)
The method for reading data from a TensorFlow Dataset varies depending upon which API you are using to build your models. If you are using the keras, then TensorFlow Datasets can be used much like in-memory R matrices and arrays. If you are using the lower-level tensorflow core API then you’ll use explicit dataset iteration functions.
The sections below provide additional details and examples for each of the supported APIs.
IMPORTANT NOTE: Using TensorFlow Datasets with Keras requires that you are running the very latest versions of Keras (v2.2) and TensorFlow (v1.9). You can ensure that you have the latest versions of the core Keras and TensorFlow libraries with:
library(keras)
install_keras()
Keras models are often trained by passing in-memory arrays directly
to the fit
function. For example:
%>% fit(
model
x_train, y_train, epochs = 30,
batch_size = 128
)
However, this requires loading data into an R data frame or matrix
before calling fit. You can use the train_on_batch()
function to stream data one batch at a time, however the reading and
processing of the input data is still being done serially and outside of
native code.
Alternatively, Keras enables you to pass a dataset directly as the
x
argument to fit()
and
evaluate()
. Here’s a complete example that uses datasets to
read from TFRecord files containing MNIST digits:
library(keras)
library(tfdatasets)
= 128
batch_size = 500
steps_per_epoch
# function to read and preprocess mnist dataset
<- function(filename) {
mnist_dataset <- tfrecord_dataset(filename) %>%
dataset dataset_map(function(example_proto) {
# parse record
<- tf$parse_single_example(
features
example_proto,features = list(
image_raw = tf$FixedLenFeature(shape(), tf$string),
label = tf$FixedLenFeature(shape(), tf$int64)
)
)
# preprocess image
<- tf$decode_raw(features$image_raw, tf$uint8)
image <- tf$cast(image, tf$float32) / 255
image
# convert label to one-hot
<- tf$one_hot(tf$cast(features$label, tf$int32), 10L)
label
# return
list(image, label)
%>%
}) dataset_repeat() %>%
dataset_shuffle(1000) %>%
dataset_batch(batch_size, drop_remainder = TRUE) %>%
dataset_prefetch(1)
}
<- keras_model_sequential() %>%
model layer_dense(units = 256, activation = 'relu', input_shape = c(784)) %>%
layer_dropout(rate = 0.4) %>%
layer_dense(units = 128, activation = 'relu') %>%
layer_dropout(rate = 0.3) %>%
layer_dense(units = 10, activation = 'softmax')
%>% compile(
model loss = 'categorical_crossentropy',
optimizer = optimizer_rmsprop(),
metrics = c('accuracy')
)
<- model %>% fit(
history mnist_dataset("mnist/train.tfrecords"),
steps_per_epoch = steps_per_epoch,
epochs = 20,
validation_data = mnist_dataset("mnist/validation.tfrecords"),
validation_steps = steps_per_epoch
)
<- model %>% evaluate(
score mnist_dataset("mnist/test.tfrecords"),
steps = steps_per_epoch
)
print(score)
Note that all data preprocessing (e.g. one-hot encoding of the
response variable) is done within the dataset_map()
operation.
Also note that we pass drop_remainder = TRUE
to the
dataset_batch()
function (this is to make sure that all
batches are of equal size, a requirement for Keras tensor inputs).
You read batches of data from a dataset by using tensors that yield
the next batch. You can obtain this tensor from a dataset via the
make_iterator_one_shot()
and
iterator_get_next()
functions. For example:
<- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset dataset_prepare(cyl ~ mpg + disp) %>%
dataset_shuffle(20) %>%
dataset_batch(5)
<- make_iterator_one_shot(dataset)
iter <- iterator_get_next(iter)
next_batch next_batch
$x
Tensor("IteratorGetNext_13:0", shape=(?, 2), dtype=float32)
$y
Tensor("IteratorGetNext_13:1", shape=(?,), dtype=int32)
As you can see next_batch
isn’t the data itself but
rather a tensor that will yield the next batch of data when it is
evaluated:
<- tf$Session()
sess $run(next_batch) sess
$x
[,1] [,2]
[1,] 21.0 160
[2,] 21.0 160
[3,] 22.8 108
[4,] 21.4 258
[5,] 18.7 360
$y
[1] 6 6 4 6 8
If you are iterating over a dataset using these functions, you will
need to determine at what point to stop iteration. One approach to this
is to use the dataset_repeat()
function to create an
dataset that yields values infinitely. For example:
library(tfdatasets)
<- tf$Session()
sess
<- csv_record_spec("mtcars.csv")
mtcars_spec <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset dataset_shuffle(5000) %>%
dataset_repeat() # repeat infinitely
dataset_prepare(x = c(mpg, disp), y = cyl) %>%
dataset_batch(128) %>%
<- make_iterator_one_shot(dataset)
iter <- iterator_get_next(iter)
next_batch
<- 200
steps for (i in 1:steps) {
# use next_batch for training, etc.
# (note that you need to actually use the next_batch e.g. by passing it to a
# function that consumes a tensor or by running it explicitly) in order to
# advance to the next batch)
}
In this case the steps
variable is used to determine
when to stop drawing new batches of training data (we could have equally
included code to detect a learning plateau or any other custom method of
determining when to stop training).
Another approach is to detect when all batches have been yielded from
the dataset. When a dataset iterator reaches the end, an out of range
runtime error will occur. You can catch and ignore the error when it
occurs by using out_of_range_handler
as the
error
argument to tryCatch()
. For example:
library(tfdatasets)
<- tf$Session()
sess
<- csv_record_spec("mtcars.csv")
mtcars_spec <- text_line_dataset("mtcars.csv", record_spec = mtcars_spec) %>%
dataset dataset_prepare(x = c(mpg, disp), y = cyl) %>%
dataset_batch(128) %>%
dataset_repeat(10)
<- make_iterator_one_shot(dataset)
iter <- iterator_get_next(iter)
next_batch
tryCatch({
while(TRUE) {
<- sess$run(next_batch)
batch str(batch)
}error = out_of_range_handler) },
You can write this iteration more elegantly using the
until_out_of_range()
function, which automatically handles
the error and provides the while(TRUE)
around an
expression:
until_out_of_range({
<- sess$run(next_batch)
batch str(batch)
})
When running under eager execution, you organize the code a bit
differently (since you don’t need to explicitly run()
tensors):
<- make_iterator_one_shot(dataset)
iter
until_out_of_range({
<- iterator_get_next(iter)
batch str(batch)
})
If you have multiple input files you can process them in parallel
both across machines (sharding) and/or on multiple threads per-machine
(parallel reads with interleaving). The read_files()
function provides a high-level interface to parallel file reading.
The read_files()
function takes a set of files and a
read function along with various options to orchestrate parallel
reading. For example, the following function reads all CSV files in a
directory using the text_line_dataset()
function:
<- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
dataset parallel_files = 4, parallel_interleave = 16) %>%
dataset_prefetch(5000) %>%
dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>%
dataset_batch(128)
The parallel_files
argument requests that 4 files be
processed in parallel and the parallel_interleave
argument
requests that blocks of 16 consecutive records from each file be
interleaved in the resulting dataset.
Note that because we are processing files in parallel we do
not pass the parallel_records
argument to
text_line_dataset()
, since we are already parallelizing at
the file level.
If you are training on multiple machines and the training supervisor passes a shard index to your training script, you can also parallelizing reading by sharding the file list. For example:
# command line flags for training script (shard info is passed by training
# supervisor that executes the script)
<- flags(
FLAGS flag_integer("num_shards", 1),
flag_integer("shard_index", 1)
)
# forward shard info to read_files
<- read_files("data/*.csv", text_line_dataset, record_spec = mtcars_spec,
dataset parallel_files = 4, parallel_interleave = 16,
num_shards = FLAGS$num_shards, shard_index = FLAGS$shard_index) %>%
dataset_shuffle_and_repeat(buffer_size = 1000, count = 3) %>%
dataset_batch(128) %>%
dataset_prefetch(1)