Hi, I want to run() a pipeline several times on the same batch of images.
Then enable the pipeline to bring the next batch and do the same operation and so on
I am using CocoReader but I cannot see any argument to stop incoming batches and work using the same batch again and again
I have tried bringing a batch of images from outside using external source (no success), I can show you the code but I wanted to ask you if this could be done in another way first.
Thanks!
Hi,
Currently, there is no way to tell the reader to repeat the given batch several times.
In one of the next releases, 0.27 you can easily chain DALI pipelines as ExternalSource would accept the output from other DALI pipelines directly (@mzient, right?). So you can have a pipeline with the reader and another with the processing, run the first one only when you need the new set of dataset inputs, and the other one as many times you like to reuse the same input.
Still, it should work now with the ExternalSource as the dataset reader. Could be more specific what is difficult to make that working as you like with your present code?
Hi @JanuszL, thank you for your reply.
I will post here the code with which I am trying to solve this:
First, I declare a variable as global inside the package file:
global img_batch
the reader is a classic one:
class COCOReader(Pipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus):
super(COCOReader, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
def define_graph(self):
inputs, bboxes, labels = self.input()
images = self.decode(inputs)
return (images, bboxes, labels)
An image collector as an external source:
class ImageCollector(object):
def __init__(self, batch_size):
self.batch_size = batch_size
def _get_images(self):
self.img_batch = img_batch
def __iter__(self):
self._get_images()
assert len(self.img_batch) == self.batch_size
self.i = 0
self.n = len(self.img_batch)
return self
def __next__(self):
batch = []
self._get_images()
for _ in range(self.batch_size):
img = self.img_batch[self.i]
batch.append(img)
self.i = (self.i + 1) % self.n
return batch
Then the pipeline:
Please, note that I feed images through image argument below
class FoveatedRetinalProcessor(Pipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus, image, fixation):
super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
self.resize_zero = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)
self.rotate = ops.Rotate(device = "gpu")
self.resize_one = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)
self.crop_zero = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
self.crop_one = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
self.crop_two = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
self.crop_four = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)
self.images_source = ops.ExternalSource(source = image)
self.fixation_source = ops.ExternalSource(source = fixation, num_outputs = 3)
def define_graph(self):
images = self.images_source()
crop_pos_x, crop_pos_y, angle = self.fixation_source()
images = self.rotate(self.resize_zero(images), angle=angle)
cropped0 = self.crop_zero(images)
cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
sized0 = self.resize_one(cropped0)
sized1 = self.resize_one(cropped1)
sized2 = self.resize_one(cropped2)
sized3 = self.resize_one(cropped3)
sized4 = self.resize_one(cropped4)
return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)
Then outside the package in a notebook
I construct the reader
num_gpus = 1
batch_size = 1024*2
start = time()
pipes = [NDP.COCOReader(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = num_gpus) for device_id in range(num_gpus)]
for pipe in pipes:
pipe.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)
then I bring the batch of images
pipe_out = [pipe.run() for pipe in pipes]
images_gpu = pipe_out[0][0]
images_cpu = pipe_out[0][0].as_cpu()
bboxes_cpu = pipe_out[0][1]
labels_cpu = pipe_out[0][2]
Then I write the global variable with the batch of images and construct the another pipeline
Note image from ImageCollector
NDP.img_batch = images_gpu
NDP.fixation_pos_x = torch.rand(batch_size)
NDP.fixation_pos_y = torch.rand(batch_size)
NDP.fixation_angle = (torch.rand(batch_size)-0.5)*60
image = NDP.ImageCollector(batch_size)
fixation = NDP.FixationCommand(batch_size)
start = time()
pipes = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, image=image, fixation=fixation) for device_id in range(num_gpus)]
for pipe in pipes:
pipe.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)
Finally I try to run the pipe
NDP.img_batch = images_gpu
NDP.fixation_pos_x = torch.rand(batch_size)
NDP.fixation_pos_y = torch.rand(batch_size)
NDP.fixation_angle = (torch.rand(batch_size)-0.5)*60
start = time()
pipe_out = [pipe.run() for pipe in pipes]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)
and, I receive the following error message:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-21-3a20f77f020b> in <module>()
11
12 start = time()
---> 13 pipe_out = [pipe.run() for pipe in pipes]
14 total_time = time() - start
15 print("Computation graph built and dataset loaded in %f seconds." % total_time)
8 frames
<ipython-input-21-3a20f77f020b> in <listcomp>(.0)
11
12 start = time()
---> 13 pipe_out = [pipe.run() for pipe in pipes]
14 total_time = time() - start
15 print("Computation graph built and dataset loaded in %f seconds." % total_time)
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in run(self)
719 """
720 with self._check_api_type_scope(types.PipelineAPIType.BASIC):
--> 721 self.schedule_run()
722 return self.outputs()
723
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in schedule_run(self)
636 self._prefetch()
637 else:
--> 638 self._run_once()
639
640 # for the backward compatibility
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _run_once(self)
741 try:
742 if not self._last_iter:
--> 743 self._iter_setup()
744 self._batches_to_consume += 1
745 # Special case to prevent a deadlock if user didn't release the only buffer
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _iter_setup(self)
953
954 def _iter_setup(self):
--> 955 self._run_input_callbacks()
956 self.iter_setup()
957 self._iter += 1
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in _run_input_callbacks(self)
950
951 for group in self._input_callbacks:
--> 952 group.call_and_feed(self, self._iter)
953
954 def _iter_setup(self):
/usr/local/lib/python3.6/dist-packages/nvidia/dali/external_source.py in call_and_feed(self, pipeline, current_iter)
83 data = callback_out
84 op = self.instances[0]
---> 85 pipeline.feed_input(op._name, data, op._layout, self._cuda_stream, self.use_copy_kernel)
86
87 def _is_generator_function(x):
/usr/local/lib/python3.6/dist-packages/nvidia/dali/pipeline.py in feed_input(self, data_node, data, layout, cuda_stream, use_copy_kernel)
561 info = CheckDLPackCapsule(datum)
562 if not info[0] and not checked:
--> 563 _check_data_batch(data, self._batch_size, layout)
564 checked = True
565 if hasattr(datum, "__cuda_array_interface__") or (info[0] and info[1]):
/usr/local/lib/python3.6/dist-packages/nvidia/dali/external_source.py in _check_data_batch(data, batch_size, layout)
9 "size: {} instead of {}".format(len(data), batch_size))
10 if len(data) > 0:
---> 11 dim = len(data[0].shape)
12 for t in data:
13 if len(t.shape) != dim:
TypeError: object of type 'method' has no len()
Let me try to repro your problem and get back to you soon.
This simple example works with the nightly build (it is not available in the stable release yet):
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import numpy as np
batch_size = 2
test_data_shape = [2, 3, 4]
def get_data():
out = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
return out
pipe = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs = fn.external_source(source = get_data, device = "gpu")
pipe.set_outputs(outs)
pipe.build()
dali_out = pipe.run()
print("The first pipe ", dali_out[0].as_cpu().as_array())
def get_data_from_dali():
return dali_out[0]
pipe2 = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs2 = fn.external_source(source = get_data_from_dali, device = "gpu")
pipe2.set_outputs(outs2)
pipe2.build()
final_out = pipe2.run()
print("The second pipe ", final_out[0].as_cpu().as_array())
Thanks @JanuszL, I do not understand your example quite well I think.
I need to run() a pipeline on the same batch of images several times. If I am not wrong in your example I have to build a new pipeline every time I want to run it.
I do not know how to implement such strategy in my sample code above neither
That is, how do I implement it to the pipeline called FoveatedRetinalProcessor?
Thanks!
How about such example:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import numpy as np
batch_size = 2
test_data_shape = [2, 3, 4]
def get_data():
out = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
return out
pipe = Pipeline(batch_size=batch_size, num_threads=3, device_id=0)
outs = fn.external_source(source = get_data, device = "gpu")
pipe.set_outputs(outs)
pipe.build()
dali_out = pipe.run()
print("The first pipe ", dali_out[0].as_cpu().as_array())
class image_collector(object):
def __init__(self):
pass
def __iter__(self):
return self
def __next__(self):
return self.data[0]
next = __next__
iterator = image_collector()
iterator.data = dali_out
pipe2 = Pipeline(batch_size=batch_size, num_threads=3, device_id=0, prefetch_queue_depth=1)
outs2 = fn.external_source(source = iterator, device = "gpu")
pipe2.set_outputs(outs2)
pipe2.build()
final_out = pipe2.run()
print("The second pipe ", final_out[0].as_cpu().as_array())
final_out = pipe2.run()
print("The second pipe, get the same data ", final_out[0].as_cpu().as_array())
dali_out = pipe.run()
print("The first pipe run again to get new data", dali_out[0].as_cpu().as_array())
iterator.data = dali_out
final_out = pipe2.run()
print("The second pipe get new data from the first one", final_out[0].as_cpu().as_array())
Dear @JanuszL , thank you for your help.
I am afraid I has not been able to test the solution you propose since for some reason that I don't know, a problem solved in a previous question is not working any more on Google Colab.
Honestly, I do not know if I modified something I cannot detect right now or it has something to do with google colab and its compatibility with DALI library.
I wanted to ask you how to proceed in such a case. Should we re-open the previous question
I would like to close the previous question first, of course, has it nothing to do with some compatibility issue with Google colab.
If this is a compatibility issue, I will have to change of platform and test it in a cluster. I cannot test it in my machine since I have not GPUs in my laptop.
Thanks,
Dario
I am afraid I has not been able to test the solution you propose since for some reason that I don't know, a problem solved in a previous question is not working any more on Google Colab.
If something is not working anymore please create a simple and self-contained script we can rerun to debug the problem. The simpler the code the better.
Thanks @JanuszL , I can simplify it more if you want but I have tried to be as precise as possible given that the error could be everywhere.
In fact, now that I put this in a self-contained notebook it returns different errors!
Furthermore, I got different errors in different executions of the same code sometimes
I am completely disorientated, yesterday it was working beautifully
I highly appreciate your help
@dariodematties,
Thank you for the script, I was able to reproduce the problem and there is a regression in 0.26 version how DALI ExternalSource operator threats the batch of scalars.
We are asking for this self contained scripts because it shortens the reproduction step on our side.
For now please try to change:
from
torch.rand(batch_size)
to
torch.rand((batch_size, 1))
by adding this additional dimension.
In the meantime, we will fix the issue and let you know when it is ready.
Wow, I am between crying and laughing!!! Thank you so much @JanuszL
Ok, now I can return to the main question we were approaching
I have not had the chance to test your suggestion yet @JanuszL I was thinking on conducting this treatment inside the pipeline
The first idea was to bring images from another pipeline and feeding them into the foveated pipeline
But I have another idea, what about making the foveated pipeline to read the following batch of images from coco dataset according to an external boolean (external source) and put it in a self.images member
If such condition is false--for instance--the pipeline will use the self.images member otherwise it will use CocoReader and bring the following batch
Is it possible to implement something like that?
If so, I can invest time in giving it a try
If such condition is false--for instance--the pipeline will use the self.images member otherwise it will use CocoReader and bring the following batch
It is not possible to add a condition inside the pipeline. So you need to use the same source/iterator all the time and externally decide what and when to put there.
https://github.com/NVIDIA/DALI/pull/2318 should fix the issue.
Dear @JanuszL, I implemented what you suggested.
First I generate the images using this pipeline:
class COCOReader(Pipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus):
super(COCOReader, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
self.input = ops.COCOReader(file_root = file_root, annotations_file = annotations_file,
shard_id = device_id, num_shards = num_gpus, ratio=True, random_shuffle=False)
self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
self.resize = ops.Resize(device = "gpu", resize_x = 640, resize_y = 640)
def define_graph(self):
inputs, bboxes, labels = self.input()
images = self.resize(self.decode(inputs))
return (images, bboxes, labels)
This pipe returns a nvidia.dali.backend_impl.TensorListGPU whose shape is [4,640,640,3] (batch size 4)
I feed it in an iterator as you suggested:
class ImageCollector(object):
def __init__(self):
pass
def __iter__(self):
return self
def __next__(self):
return self.data[0]
next = __next__
The foveated pipeline is:
class FoveatedRetinalProcessor(Pipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus, fixation, images):
super(FoveatedRetinalProcessor, self).__init__(batch_size, num_threads, device_id, seed = 15, exec_pipelined=False, exec_async=False, prefetch_queue_depth=1)
self.rotate = ops.Rotate(device = "gpu")
self.resize_one = ops.Resize(device = "gpu", resize_x = 30, resize_y = 30)
self.crop_zero = ops.Crop(device = "gpu", crop_h = 640, crop_w = 640)
self.crop_one = ops.Crop(device = "gpu", crop_h = 400, crop_w = 400)
self.crop_two = ops.Crop(device = "gpu", crop_h = 240, crop_w = 240)
self.crop_three = ops.Crop(device = "gpu", crop_h = 100, crop_w = 100)
self.crop_four = ops.Crop(device = "gpu", crop_h = 30, crop_w = 30)
self.img_batch = ops.ExternalSource(source = images)
self.fixation_source = ops.ExternalSource(source = fixation, num_outputs = 3)
def define_graph(self):
images = self.img_batch()
crop_pos_x, crop_pos_y, angle = self.fixation_source()
images = self.rotate(images, angle=angle)
cropped0 = self.crop_zero(images)
cropped1 = self.crop_one(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped2 = self.crop_two(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped3 = self.crop_three(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
cropped4 = self.crop_four(cropped0, crop_pos_x=crop_pos_x, crop_pos_y=crop_pos_y)
sized0 = self.resize_one(cropped0)
sized1 = self.resize_one(cropped1)
sized2 = self.resize_one(cropped2)
sized3 = self.resize_one(cropped3)
sized4 = self.resize_one(cropped4)
return (cropped0, cropped1, cropped2, cropped3, cropped4, sized0, sized1, sized2, sized3, sized4)
On the notebook first I successfully build the pipe
NDP.fixation_pos_x = torch.rand((batch_size,1))
NDP.fixation_pos_y = torch.rand((batch_size,1))
NDP.fixation_angle = (torch.rand((batch_size,1))-0.5)*60
images = NDP.ImageCollector()
fixation = NDP.FixationCommand(batch_size)
images.data = images_gpu
start = time()
pipes1 = [NDP.FoveatedRetinalProcessor(batch_size=batch_size, num_threads=2, device_id=device_id, num_gpus=num_gpus, fixation=fixation, images=images) for device_id in range(num_gpus)]
for pipe1 in pipes1:
pipe1.build()
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)
Then, when I want to run it:
NDP.fixation_pos_x = torch.repeat_interleave(torch.Tensor([0.5]), batch_size).view(-1,1)
NDP.fixation_pos_y = torch.repeat_interleave(torch.Tensor([0.5]), batch_size).view(-1,1)
NDP.fixation_angle = torch.repeat_interleave(torch.Tensor([30]), batch_size).view(-1,1)
start = time()
pipe_out1 = [pipe1.run() for pipe1 in pipes1]
total_time = time() - start
print("Computation graph built and dataset loaded in %f seconds." % total_time)
crop_images_cpu0 = pipe_out1[0][0].as_cpu()
crop_images_cpu1 = pipe_out1[0][1].as_cpu()
crop_images_cpu2 = pipe_out1[0][2].as_cpu()
crop_images_cpu3 = pipe_out1[0][3].as_cpu()
crop_images_cpu4 = pipe_out1[0][4].as_cpu()
sized_images_cpu0 = pipe_out1[0][5].as_cpu()
sized_images_cpu1 = pipe_out1[0][6].as_cpu()
sized_images_cpu2 = pipe_out1[0][7].as_cpu()
sized_images_cpu3 = pipe_out1[0][8].as_cpu()
sized_images_cpu4 = pipe_out1[0][9].as_cpu()
It returns the following error
RuntimeError: The external source callback returned an unexpected batch size: 640 instead of 4
remember, the shape of the images is: [4,640,640,3] (batch size 4)
I would check what you get in ImageCollector __next__ method. Maybe you are returning single tensor instead of TensorList, try just return self.data in your case.
Yes @JanuszL, that was the first thing I tried but things seemed to turn even worst in such case
RuntimeError: Critical error in pipeline:
Error when executing GPU operator Rotate encountered:
[/opt/dali/dali/pipeline/workspace/workspace.h:419] Assert on "tensor_meta.storage_device == StorageDevice::GPU" failed: Input with given index (0) does not have the calling backend type (GPUBackend)
This is what I give to the iterator
images_gpu.as_cpu().as_array().shape
(4, 640, 640, 3)
Of course I just do
images.data = images_gpu
RuntimeError: Critical error in pipeline:
Error when executing GPU operator Rotate encountered:
[/opt/dali/dali/pipeline/workspace/workspace.h:419] Assert on "tensor_meta.storage_device == StorageDevice::GPU" failed: Input with given index (0) does not have the calling backend type (GPUBackend)
I guess you mixed backends of the Rotate input. It expects data on the GPU while it is on the CPU.
Keep in mind that (images, bboxes, labels) only images are on the GPU, while bboxes and labels are on the CPU.
However self.img_batch = ops.ExternalSource(source = images) ask data to be available on the CPU.
Try self.img_batch = ops.ExternalSource(device="gpu", source = images)
Thanks @JanuszL, it finally worked!!!!
I highly appreciate all your help
Best!
Dario