Dali: Running a pipeline several times on the same batch of images

Created on 25 Sep 2020  路  19Comments  路  Source: NVIDIA/DALI

Hi, I want to run() a pipeline several times on the same batch of images.
Then enable the pipeline to bring the next batch and do the same operation and so on

I am using CocoReader but I cannot see any argument to stop incoming batches and work using the same batch again and again

I have tried bringing a batch of images from outside using external source (no success), I can show you the code but I wanted to ask you if this could be done in another way first.

Thanks!

question

All 19 comments

Hi,
Currently, there is no way to tell the reader to repeat the given batch several times.
In one of the next releases, 0.27 you can easily chain DALI pipelines as ExternalSource would accept the output from other DALI pipelines directly (@mzient, right?). So you can have a pipeline with the reader and another with the processing, run the first one only when you need the new set of dataset inputs, and the other one as many times you like to reuse the same input.
Still, it should work now with the ExternalSource as the dataset reader. Could be more specific what is difficult to make that working as you like with your present code?

Hi @JanuszL, thank you for your reply.

I will post here the code with which I am trying to solve this:

First, I declare a variable as global inside the package file:

global img_batch

the reader is a classic one:

class COCOReader(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(COCOReader, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
        self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
                                     shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

    def define_graph(self):
        inputs, bboxes, labels = self.input()
        images = self.decode(inputs)
        return (images, bboxes, labels)

An image collector as an external source:

class ImageCollector(object):
    def __init__(self, batch_size):
        self.batch_size = batch_size

    def _get_images(self):
        self.img_batch = img_batch

    def __iter__(self):
        self._get_images()
        assert len(self.img_batch) == self.batch_size
        self.i = 0
        self.n = len(self.img_batch)
        return self

    def __next__(self):
        batch = []
        self._get_images()
        for _ in range(self.batch_size):
            img = self.img_batch[self.i]
            batch.append(img)
            self.i = (self.i + 1) % self.n
        return batch

Then the pipeline:

Please, note that I feed images through image argument below

class FoveatedRetinalProcessor(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus, image, fixation):
        super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)

        self.resize_zero = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)

        self.rotate = ops.Rotate(device = "gpu")

        self.resize_one  = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)

        self.crop_zero  = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
        self.crop_one   = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
        self.crop_two   = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
        self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
        self.crop_four  = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)

        self.images_source   = ops.ExternalSource(source = image)
        self.fixation_source = ops.ExternalSource(source = fixation, num_outputs = 3)

    def define_graph(self):
        images = self.images_source()
        crop_pos_x, crop_pos_y, angle = self.fixation_source()

        images   = self.rotate(self.resize_zero(images), angle=angle)

        cropped0 = self.crop_zero(images)
        cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)

        sized0   = self.resize_one(cropped0)
        sized1   = self.resize_one(cropped1)
        sized2   = self.resize_one(cropped2)
        sized3   = self.resize_one(cropped3)
        sized4   = self.resize_one(cropped4)

        return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)

Then outside the package in a notebook

I construct the reader

num_gpus = 1
batch_size = 1024*2
start = time()
pipes = [NDP.COCOReader(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = num_gpus)  for device_id in range(num_gpus)]
for pipe in pipes:
    pipe.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

then I bring the batch of images

pipe_out = [pipe.run() for pipe in pipes]

images_gpu = pipe_out[0][0]

images_cpu = pipe_out[0][0].as_cpu()
bboxes_cpu = pipe_out[0][1]
labels_cpu = pipe_out[0][2]

Then I write the global variable with the batch of images and construct the another pipeline

Note image from ImageCollector

NDP.img_batch = images_gpu

NDP.fixation_pos_x = torch.rand(batch_size)
NDP.fixation_pos_y = torch.rand(batch_size)
NDP.fixation_angle = (torch.rand(batch_size)-0.5)*60

image = NDP.ImageCollector(batch_size)
fixation = NDP.FixationCommand(batch_size)

start = time()
pipes = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, image=image, fixation=fixation)  for device_id in range(num_gpus)]
for pipe in pipes:
  pipe.build()

total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

Finally I try to run the pipe

NDP.img_batch = images_gpu

NDP.fixation_pos_x = torch.rand(batch_size)
NDP.fixation_pos_y = torch.rand(batch_size)
NDP.fixation_angle = (torch.rand(batch_size)-0.5)*60

start = time()
pipe_out = [pipe.run() for pipe in pipes]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

and, I receive the following error message:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-21-3a20f77f020b> in <module>()
     11 
     12 start = time()
---> 13 pipe_out = [pipe.run() for pipe in pipes]
     14 total_time = time() - start
     15 print("Computation graph built and dataset loaded in %f seconds." % total_time)

8 frames
<ipython-input-21-3a20f77f020b> in <listcomp>(.0)
     11 
     12 start = time()
---> 13 pipe_out = [pipe.run() for pipe in pipes]
     14 total_time = time() - start
     15 print("Computation graph built and dataset loaded in %f seconds." % total_time)

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in run(self)
    719         """
    720         with self._check_api_type_scope(types.PipelineAPIType.BASIC):
--> 721             self.schedule_run()
    722             return self.outputs()
    723 

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in schedule_run(self)
    636                 self._prefetch()
    637             else:
--> 638                 self._run_once()
    639 
    640     # for the backward compatibility

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _run_once(self)
    741         try:
    742             if not self._last_iter:
--> 743                 self._iter_setup()
    744                 self._batches_to_consume += 1
    745             # Special case to prevent a deadlock if user didn't release the only buffer

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _iter_setup(self)
    953 
    954     def _iter_setup(self):
--> 955         self._run_input_callbacks()
    956         self.iter_setup()
    957         self._iter += 1

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _run_input_callbacks(self)
    950 
    951         for group in self._input_callbacks:
--> 952             group.call_and_feed(self, self._iter)
    953 
    954     def _iter_setup(self):

/usr/local/lib/python3.6/dist-packages/nvidia/dali/external_source.py in call_and_feed(self, pipeline, current_iter)
     83             data = callback_out
     84             op = self.instances[0]
---> 85             pipeline.feed_input(op._name, data, op._layout, self._cuda_stream, self.use_copy_kernel)
     86 
     87 def _is_generator_function(x):

/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in feed_input(self, data_node, data, layout, cuda_stream, use_copy_kernel)
    561                 info = CheckDLPackCapsule(datum)
    562                 if not info[0] and not checked:
--> 563                     _check_data_batch(data, self._batch_size, layout)
    564                     checked = True
    565                 if hasattr(datum, "__cuda_array_interface__") or (info[0] and info[1]):

/usr/local/lib/python3.6/dist-packages/nvidia/dali/external_source.py in _check_data_batch(data, batch_size, layout)
      9             "size: {} instead of {}".format(len(data), batch_size))
     10         if len(data) > 0:
---> 11             dim = len(data[0].shape)
     12             for t in data:
     13                 if len(t.shape) != dim:

TypeError: object of type 'method' has no len()

Let me try to repro your problem and get back to you soon.

This simple example works with the nightly build (it is not available in the stable release yet):

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import numpy as np

batch_size = 2
test_data_shape = [2, 3, 4]
def get_data():
    out = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
    return out

pipe = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs = fn.external_source(source = get_data, device = "gpu")
pipe.set_outputs(outs)
pipe.build()
dali_out = pipe.run()
print("The first pipe ", dali_out[0].as_cpu().as_array())

def get_data_from_dali():
    return dali_out[0]

pipe2 = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs2 = fn.external_source(source = get_data_from_dali, device = "gpu")
pipe2.set_outputs(outs2)
pipe2.build()
final_out = pipe2.run()
print("The second pipe ", final_out[0].as_cpu().as_array())

Thanks @JanuszL, I do not understand your example quite well I think.

I need to run() a pipeline on the same batch of images several times. If I am not wrong in your example I have to build a new pipeline every time I want to run it.
I do not know how to implement such strategy in my sample code above neither
That is, how do I implement it to the pipeline called FoveatedRetinalProcessor?

Thanks!

How about such example:

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import numpy as np

batch_size = 2
test_data_shape = [2, 3, 4]
def get_data():
    out = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
    return out

pipe = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs = fn.external_source(source = get_data, device = "gpu")
pipe.set_outputs(outs)
pipe.build()
dali_out = pipe.run()
print("The first pipe ", dali_out[0].as_cpu().as_array())

class image_collector(object):
    def __init__(self):
        pass
    def __iter__(self):
        return self
    def __next__(self):
        return self.data[0]
    next = __next__

iterator = image_collector()
iterator.data = dali_out

pipe2 = Pipeline(batch_size=batch_size, num_threads=3, device_id=0, prefetch_queue_depth=1)
outs2 = fn.external_source(source = iterator, device = "gpu")
pipe2.set_outputs(outs2)
pipe2.build()
final_out = pipe2.run()
print("The second pipe ", final_out[0].as_cpu().as_array())
final_out = pipe2.run()
print("The second pipe, get the same data ", final_out[0].as_cpu().as_array())

dali_out = pipe.run()
print("The first pipe run again to get new data", dali_out[0].as_cpu().as_array())
iterator.data = dali_out
final_out = pipe2.run()
print("The second pipe get new data from the first one", final_out[0].as_cpu().as_array())

Dear @JanuszL , thank you for your help.

I am afraid I has not been able to test the solution you propose since for some reason that I don't know, a problem solved in a previous question is not working any more on Google Colab.

Honestly, I do not know if I modified something I cannot detect right now or it has something to do with google colab and its compatibility with DALI library.

I wanted to ask you how to proceed in such a case. Should we re-open the previous question

I would like to close the previous question first, of course, has it nothing to do with some compatibility issue with Google colab.

If this is a compatibility issue, I will have to change of platform and test it in a cluster. I cannot test it in my machine since I have not GPUs in my laptop.

Thanks,

Dario

I am afraid I has not been able to test the solution you propose since for some reason that I don't know, a problem solved in a previous question is not working any more on Google Colab.

If something is not working anymore please create a simple and self-contained script we can rerun to debug the problem. The simpler the code the better.

Thanks @JanuszL , I can simplify it more if you want but I have tried to be as precise as possible given that the error could be everywhere.

In fact, now that I put this in a self-contained notebook it returns different errors!

Furthermore, I got different errors in different executions of the same code sometimes

I am completely disorientated, yesterday it was working beautifully

I highly appreciate your help

a simple and self-contained script.zip

@dariodematties,
Thank you for the script, I was able to reproduce the problem and there is a regression in 0.26 version how DALI ExternalSource operator threats the batch of scalars.
We are asking for this self contained scripts because it shortens the reproduction step on our side.
For now please try to change:

from
torch.rand(batch_size)
to
torch.rand((batch_size, 1))

by adding this additional dimension.
In the meantime, we will fix the issue and let you know when it is ready.

Wow, I am between crying and laughing!!! Thank you so much @JanuszL

Ok, now I can return to the main question we were approaching

I have not had the chance to test your suggestion yet @JanuszL I was thinking on conducting this treatment inside the pipeline

The first idea was to bring images from another pipeline and feeding them into the foveated pipeline

But I have another idea, what about making the foveated pipeline to read the following batch of images from coco dataset according to an external boolean (external source) and put it in a self.images member

If such condition is false--for instance--the pipeline will use the self.images member otherwise it will use CocoReader and bring the following batch

Is it possible to implement something like that?
If so, I can invest time in giving it a try

If such condition is false--for instance--the pipeline will use the self.images member otherwise it will use CocoReader and bring the following batch

It is not possible to add a condition inside the pipeline. So you need to use the same source/iterator all the time and externally decide what and when to put there.

Dear @JanuszL, I implemented what you suggested.

First I generate the images using this pipeline:

class COCOReader(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(COCOReader, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
        self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
                                     shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)



    def define_graph(self):
        inputs, bboxes, labels = self.input()
        images = self.resize(self.decode(inputs))
        return (images, bboxes, labels)

This pipe returns a nvidia.dali.backend_impl.TensorListGPU whose shape is [4,640,640,3] (batch size 4)
I feed it in an iterator as you suggested:

class ImageCollector(object):
    def __init__(self):
        pass
    def __iter__(self):
        return self
    def __next__(self):
        return self.data[0]
    next = __next__

The foveated pipeline is:

class FoveatedRetinalProcessor(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus, fixation, images):
        super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)

        self.rotate = ops.Rotate(device = "gpu")

        self.resize_one  = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)

        self.crop_zero  = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
        self.crop_one   = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
        self.crop_two   = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
        self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
        self.crop_four  = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)

        self.img_batch = ops.ExternalSource(source = images)
        self.fixation_source = ops.ExternalSource(source = fixation, num_outputs = 3)

    def define_graph(self):
        images = self.img_batch()

        crop_pos_x, crop_pos_y, angle = self.fixation_source()

        images   = self.rotate(images, angle=angle)

        cropped0 = self.crop_zero(images)
        cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
        cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)

        sized0   = self.resize_one(cropped0)
        sized1   = self.resize_one(cropped1)
        sized2   = self.resize_one(cropped2)
        sized3   = self.resize_one(cropped3)
        sized4   = self.resize_one(cropped4)

        return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)

On the notebook first I successfully build the pipe

NDP.fixation_pos_x = torch.rand((batch_size,1))
NDP.fixation_pos_y = torch.rand((batch_size,1))
NDP.fixation_angle = (torch.rand((batch_size,1))-0.5)*60

images = NDP.ImageCollector()
fixation = NDP.FixationCommand(batch_size)

images.data = images_gpu

start = time()
pipes1 = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, fixation=fixation, images=images)  for device_id in range(num_gpus)]
for pipe1 in pipes1:
  pipe1.build()

total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

Then, when I want to run it:

NDP.fixation_pos_x = torch.repeat_interleave(torch.Tensor([0.5]), batch_size).view(-1,1)
NDP.fixation_pos_y = torch.repeat_interleave(torch.Tensor([0.5]), batch_size).view(-1,1)
NDP.fixation_angle = torch.repeat_interleave(torch.Tensor([30]), batch_size).view(-1,1)


start = time()
pipe_out1 = [pipe1.run() for pipe1 in pipes1]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)

crop_images_cpu0 = pipe_out1[0][0].as_cpu()
crop_images_cpu1 = pipe_out1[0][1].as_cpu()
crop_images_cpu2 = pipe_out1[0][2].as_cpu()
crop_images_cpu3 = pipe_out1[0][3].as_cpu()
crop_images_cpu4 = pipe_out1[0][4].as_cpu()

sized_images_cpu0 = pipe_out1[0][5].as_cpu()
sized_images_cpu1 = pipe_out1[0][6].as_cpu()
sized_images_cpu2 = pipe_out1[0][7].as_cpu()
sized_images_cpu3 = pipe_out1[0][8].as_cpu()
sized_images_cpu4 = pipe_out1[0][9].as_cpu()

It returns the following error

RuntimeError: The external source callback returned an unexpected batch size: 640 instead of 4

remember, the shape of the images is: [4,640,640,3] (batch size 4)

I would check what you get in ImageCollector __next__ method. Maybe you are returning single tensor instead of TensorList, try just return self.data in your case.

Yes @JanuszL, that was the first thing I tried but things seemed to turn even worst in such case

RuntimeError: Critical error in pipeline:
Error when executing GPU operator Rotate encountered:
[/opt/dali/dali/pipeline/workspace/workspace.h:419] Assert on "tensor_meta.storage_device == StorageDevice::GPU" failed: Input with given index (0) does not have the calling backend type (GPUBackend)

This is what I give to the iterator

images_gpu.as_cpu().as_array().shape
(4, 640, 640, 3)

Of course I just do

images.data = images_gpu

RuntimeError: Critical error in pipeline: Error when executing GPU operator Rotate encountered: [/opt/dali/dali/pipeline/workspace/workspace.h:419] Assert on "tensor_meta.storage_device == StorageDevice::GPU" failed: Input with given index (0) does not have the calling backend type (GPUBackend)

I guess you mixed backends of the Rotate input. It expects data on the GPU while it is on the CPU.
Keep in mind that (images, bboxes, labels) only images are on the GPU, while bboxes and labels are on the CPU.

However self.img_batch = ops.ExternalSource(source = images) ask data to be available on the CPU.
Try self.img_batch = ops.ExternalSource(device="gpu", source = images)

Thanks @JanuszL, it finally worked!!!!

I highly appreciate all your help

Best!

Dario

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ZHUANGHP picture ZHUANGHP  路  5Comments

jramapuram picture jramapuram  路  4Comments

ay27 picture ay27  路  6Comments

Doom9234 picture Doom9234  路  3Comments

frank-wei picture frank-wei  路  3Comments