Dali: Question about batches supported by dali

Created on 26 Oct 2018  路  17Comments  路  Source: NVIDIA/DALI

When I read the multiple official documents, such as the "Using Tensorflow DALI plugin: simple example", I feel like the code does not preprocess different batches (on a single gpu), but it rather uses batches with random pictures (or the same ones over and over again). I have copied a part from the document here. I guess my question is what happens inside each iteration of the loop below, especially the actual run of dali_tf tensors with respect to the batches? Is there a way to make sure we feed a new batch in each iteration?

with tf.Session(config=config) as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE * DEVICES

    for i in range(ITERATIONS):
        start_time = time.time()

        # The actual run with our dali_tf tensors
        res = sess.run([images, labels])

        elapsed_time = time.time() - start_time
        img_per_sec = total_batch_size / elapsed_time
        if i > BURNIN_STEPS:
            all_img_per_sec.append(img_per_sec)
            print("\t%7.1f img/s" %  img_per_sec)
question

Most helpful comment

251 should address your questions.

All 17 comments

Hi @sosherio
First, different batches with different images are used at each iterations, as you would expect.
But I think that I should clear up some details so you can use DALI. The _TensorFlow_ tensors image and labels in the example you pasted come from here:

image, label = daliop(serialized_pipeline = serialized_pipes[d],
            shape = [BATCH_SIZE, 3, 224, 224],
            image_type = tf.int32,
            label_type = tf.float32)

Keep in mind that here, daliop is basically a wrapper to the pipeline, that you would have defined then serialized to serialized_pipes (simply by using pipe.serialize() on the DALI pipeline you define).

This pipeline is defined as such.

The behavior that dictates having "different batches at each iteration" or not comes from the first Operator that is a Reader op (MXNetReader that reads in the example). You can look at the implem of this operator.

TL; DR* at every iteration of TF,: images and labels TF Tensors are called - > DALI pipeline is executed by them -> the reader op which is the first op of the pipeline to be executed handles the fetching/loading of your data.
In the end, which input image you get at each iteration depends on the Reader implem. And our Reader implems fetch batch times new random images at each iteration. That's to say the stochasticity that SGD & co need. :+1:

Hi,
We will provide full TF RN50 example soon. So it should allow you to test how it works in the practice.

251 should address your questions.

Thank you for your replies. They were very helpful. I will look into #251 as well.

The Reader op I was using in my example was FileReader, but I believe that one also ensures that the batches are different. The way I understand it is that it uses a buffer (with a size I can specify) to load images and feed multiple of them in each iteration depending on the batch size, and then replaces the used ones with new random ones from all images. Does it mean that the same image can actually show up in two or more consecutive batches in theory (despite being very unlikely if the number of images is large)?

Hi,
Regarding randomness - it depends on random_shuffle argument provided to the reader.
If you skip at each iteration batch_size number of input images are read and processed by DALI. When you read all images, DALI starts over reading from the begging - so in such case, it is not possible to have two the same images in the batch (unless your batch is bigger than dataset). If you set random_shuffle to True, DALI internal buffer (default value is set to 1024, you can change it by initial_fill parameter) is filled by reading consecutive images form data set, and then randomly one by images are picked from that buffer to fill batch. In the same time images are replaced by next samples read from the input data set. Please refer to dali/pipeline/operators/reader/loader/loader.h.

I'd like to test DALI with Keras and see if they can work together. Would it be possible for me to define a generator and use the above daliop inside my generator to yield the images and labels TF Tensors above, and then I use this generator for training the network, something like model.fit_generator(generator = generator_using_dali)?

Hi,
The yielded objects images and labels would be TensorFlow tensors, and model.fit_generator expect a generator that yields numpy arrays. Thus, using our TF op wouldn't work in this configuration.

However you can achieve what you want by using the vanilla DALI.
First, define the pipeline

class YourPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(YourPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.CaffeReader(path = FOLDER)
        self.decode= ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)

    def define_graph(self):
        self.jpegs, self.labels = self.input()
        images = self.decode(self.jpegs)
        return (output, self.labels)

    def iter_setup(self):
        pass

then, define your generator

def GeneratorUsingDali(batch_size):
  pipe = YourPipeline(batch_size=batch_size, num_threads=4, device_id = 0)
  pipe.build()
  pipe_out = pipe.run()
 while True:
   yield pipe_out[0], pipe_out[1]

and finally,

model.fit_generator(generator = GeneratorUsingDali(batch_size=BATCH_SIZE))

Thank for the reply.

However, I'm not sure how this would solve the issue as both pipe_out[0] and pipe_out[1] are TensorListGPU objects and not compatible with fit_generator.

When I tried the above code, I ran into the following error:

  File "dali-train.py", line 62, in main
    model.fit_generator(generator = GeneratorUsingDali(batch_size=test_batch_size), steps_per_epoch=100, epochs=3)
  File "/usr/local/lib/python3.5/dist-packages/keras/legacy/interfaces.py", line 91, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1415, in fit_generator
    initial_epoch=initial_epoch)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training_generator.py", line 206, in fit_generator
    batch_size = x.shape[0]
AttributeError: 'nvidia.dali.backend_impl.TensorListGPU' object has no attribute 'shape'

I also tried changing the yield command to yield pipe_out[0].as_tensor(), pipe_out[1].as_tensor(). However, it gave me a different error:

  File "dali-train.py", line 62, in main
    model.fit_generator(generator = GeneratorUsingDali(batch_size=test_batch_size), steps_per_epoch=100, epochs=3)
  File "/usr/local/lib/python3.5/dist-packages/keras/legacy/interfaces.py", line 91, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1415, in fit_generator
    initial_epoch=initial_epoch)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training_generator.py", line 197, in fit_generator
    if x is None or len(x) == 0:
TypeError: object of type 'nvidia.dali.backend_impl.TensorGPU' has no len()

Indeed, I forgot a step to get the numpy arrays, sorry!

The best way to get your the batches as CPU lists is:

pipe_out = pipe.run()
images_out = pipe_out[0].asCPU() # <---
labels_out = pipe_out[1]

# Getting arrays with .at
images_list = [images_out.at(i) for i in range(batch_size)]
labels_list = [labels_out.at(i) for i in range(batch_size)]

yield images_list, labels_list

Hope this will help you.

Thanks - my code works with Dali now.

However, the batches provided by this scheme are always the same. What happened was that I saw a 100% accuracy on my training data (and around 50% on validation) and got suspicious, so I printed the content of batches (right before yielding them) and noticed that they turn out to be exactly the same every time.

I have pasted my code here. I was wondering if you could let me know what I am doing wrong here with Dali. It works fine when I have a regular generator.

class mixedPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(mixedPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 1024)
        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu", image_type = types.RGB, resize_x=width, resize_y=height)
        self.rotate = ops.Rotate(device = "gpu")
        self.rng = ops.Uniform(range = (-10.0, 10.0))
        self.cast = ops.Cast(device = "gpu", dtype = types.INT32)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        images_resized = self.resize(images)
        rotated_images = self.rotate(images_resized, angle = self.rng())
        output = self.cast(rotated_images)
        return (output, labels.gpu())

    def iter_setup(self):
        pass

def GeneratorUsingDali(batch_size):
  pipe = mixedPipeline(batch_size=test_batch_size, num_threads=4, device_id = 0)
  pipe.build()
  pipe_out = pipe.run()
  images_out = pipe_out[0].asCPU()
  labels_out = pipe_out[1].asCPU()

  images_list = np.array(images_out.as_tensor())
  labels_list = np.array(labels_out.as_tensor())

  labels_list_final = keras.utils.to_categorical(labels_list)

  while True:
    yield images_list, labels_list_final

def main():
    from get_model import get_model
    model = get_model()

    from get_dataset import get_dali_validation_set
    X_val, Y_val = get_dali_validation_set()

    model.fit_generator(generator = GeneratorUsingDali(batch_size=test_batch_size), steps_per_epoch = math.floor(num_dataset_samples/test_batch_size), epochs=1, validation_data=(X_val, Y_val))

    return model

I actually figured it out. I needed to have everything after pipe.build inside the while loop.

@sosherio I was about to write it but I'm happy you managed to make this working.
pipe.run() is the things that asks Dali to generate next batch of images.

304 should easy up a bit this use case as it will be enough to:

images_list = images_out.as_array()
labels_list = labels_out.as_array()

Thanks - it is a useful method.

I'll leave a message here just in case anyone else would like to know how to use DALI with tf.keras. It appears that instead of defining a custom generator, moving results of the pipeline to cpu and the feeding to the model again you could do something like this:

pipe = ... #your pipe
daliop = dali_tf.DALIIterator()
pipe.build() #to get epoch size
epoch_size = list(pipe.epoch_size().values())[0] // batch_size

with tf.device("/gpu:{}".format(device_id)):
    images, labels = daliop(
        pipeline=pipe,
        shapes=[(batch_size, height, width, 3), (batch_size, 1)],
        dtypes=[tf.float32, tf.int64],
        device_id=device_id)
    # Change to 0-based (don't use background class)
    labels -= 1
    one_hot_labels = tf.squeeze(tf.one_hot(labels, 1000))

model = VGG16(input_tensor=images) # set dali tensor as input to the pipeline
model.compile('adam', loss='categorical_crossentropy', 
    target_tensors=[one_hot]) #set dali tensor as output to calculate loss
model.fit(epochs=1, steps_per_epoch=ep_s)

That's it. @JanuszL if you are interested, maybe you could add this keras example to one of the TF notebooks.

@bonlime - thanks for the code snippet. We will try to put together some example with it (tracked as DALI-981). On the other hand, as you are the author of this code maybe you can prepare some simple example, issue a PR and became a DALI coauthor?

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import nvidia.dali.plugin.tf as dali_tf


class AugmentationPipeline(Pipeline):
    def __init__(self, root_dir, batch_size, num_threads, device_id):
        super().__init__(batch_size, num_threads, device_id, seed=12)

        self.random_angle = ops.Uniform(range=(0, 360.0))
        self.random = ops.Uniform(range=(0.5, 1.5))
        self.random_coin = ops.CoinFlip()

        self.input = ops.FileReader(file_root=root_dir, random_shuffle=True)

        self.decode = ops.ImageDecoder(device='mixed')
        self.rotate = ops.Rotate(device='gpu', interp_type=types.INTERP_LINEAR)
        self.crop = ops.Crop(device='gpu', crop=(350, 350))
        self.crop2 = ops.RandomResizedCrop(
            device='gpu',
            size=(224, 224),
            interp_type=types.INTERP_LINEAR,
            random_area=(0.3, 1.0),
        )
        self.flip = ops.Flip(device='gpu')
        self.color_twist = ops.ColorTwist(device='gpu')

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        images = self.rotate(images, angle=self.random_angle())
        images = self.crop(images)
        images = self.crop2(images)
        images = self.flip(images, horizontal=self.random_coin())
        images = self.color_twist(
            images,
            hue=self.random(),
            saturation=self.random(),
            contrast=self.random(),
            brightness=self.random(),
        )
        return (images, labels.gpu())


batch_size = 32
device_id = 0
pipe = AugmentationPipeline('PATH_TO_DATASET/train', batch_size, 6, device_id)
daliop = dali_tf.DALIIterator()
pipe.build()  # to get epoch size
epoch_size = list(pipe.epoch_size().values())[0] // batch_size

with tf.device("/gpu:{}".format(device_id)):
    images_tensor, labels_tensor = daliop(
        pipeline=pipe,
        shapes=[(batch_size, 224, 224, 3), (batch_size, 1)],
        dtypes=[tf.uint8, tf.int32],
        device_id=device_id,
    )
    images_tensor = tf.cast(images_tensor, tf.float32) / 127.5 - 1.

from MobileNetV3 import MobileNetV3Small
from MobileNetV3.base import Base

base = Base(layers=L, backend=K)

LAST_CONV_FILTERS = 256

def compose(*layers):
    layers = list(layers)
    ret = input_layer = layers.pop(0)
    for i in layers:
        ret = i(ret)
    return M.Model(input_layer, ret)

layers = [
    MobileNetV3Small(
        input_shape=(None, None, 3),
        include_top=False,
    ),
    L.GlobalAveragePooling2D(),
    L.Reshape((1, 1, 576)),
    L.Conv2D(256, (1, 1)),
    L.Activation(base.hard_swish),
    L.Conv2D(2, (1, 1)),
    L.Flatten(),
    L.Activation('softmax'),
]

train_model = compose(
    L.Input(tensor=images_tensor, shape=(224, 224, 3)),
    *layers,
)
train_model.compile(RAdam(), loss='sparse_categorical_crossentropy',  target_tensors=[labels_tensor])

val_model = compose(
    L.Input(shape=(224, 224, 3)),
    *layers,
)
val_model.compile(RAdam(), loss='sparse_categorical_crossentropy', metrics=['acc'])


class CenterCropPipeline(Pipeline):
    def __init__(self, root_dir, batch_size, num_threads, device_id):
        super().__init__(batch_size, num_threads, device_id, seed=12)
        self.input = ops.FileReader(file_root=root_dir, random_shuffle=True)
        self.decode = ops.ImageDecoder(device='cpu')
        self.crop = ops.Crop(device='cpu', crop=(224, 224))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        images = self.crop(images)
        return (images, labels)

from glob import glob
val_size = len(glob('PATH_TO_DATASET/val/*/*.jpg'))
val_pipeline = CenterCropPipeline('PATH_TO_DATASET/val', val_size, 6, device_id)
val_pipeline.build()
val_images, val_labels = val_pipeline.run()
val_images = val_images.as_array() / 127.5 - 1.
val_labels = val_labels.as_array()


class EvaluateModel(CB.Callback):

    def __init__(self, model, X, y):
        super().__init__()
        self.val_model = model
        self.X, self.y = X, y

    def on_epoch_end(self, epoch, logs={}):
        results = self.val_model.evaluate(self.X, self.y, verbose=0)
        for result, name in zip(results, self.val_model.metrics_names):
            logs['val_' + name] = result
        msg = '\r%s' % " - ".join([f"{k}: {v:.4f}" for k, v in logs.items()])
        print(msg + ' ' * (80 - len(msg)))


train_model.fit(
    epochs=500,
    steps_per_epoch=epoch_size,
    callbacks=[
        EvaluateModel(val_model, val_images, val_labels),
        CB.EarlyStopping(patience=50, restore_best_weights=True),
        CB.ModelCheckpoint('checkpoints/mobilenetv3-small-2.weights.{epoch:03d}-{val_loss:.4f}.h5', save_weights_only=True, save_best_only=True),
        CB.TensorBoard(f'logs/MobileNetV3Small-2(f={LAST_CONV_FILTERS},bs={batch_size})'),
        #CB.ReduceLROnPlateau(patience=10, cooldown=20, verbose=1),
    ],
)
Was this page helpful?
0 / 5 - 0 ratings