Dali: feed 2dim-list to ops.ExternalSource

Created on 3 Nov 2020  路  9Comments  路  Source: NVIDIA/DALI

Hi, I am facing to a problem about ops.ExternalSource:

I am training a NN for coco detection and here is my data format: (img, target_list, mask_list, joints_list)

I hope to feed them into pipeline and run the pipeline to see whether I can have a speed improvement. But I can not run pipeline for the 3 lists. The reason should be that these lists have 2 dimensions for each image so that can not be compatible with dali?

here is my code:

class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus, external_data):
        self.batch_size = batch_size
        self.external_data = external_data
        self.data_set_len = len(self.external_data)
        self.iterator = iter(self.external_data)
        self.n = len(self.external_data)

    def __iter__(self):
        self.i = 0
        return self

    def __next__(self):
        batch = []
        labels_1 = []
        labels_2 = []
        labels_3 = []

        if self.i >= self.n:
            raise StopIteration

        for z in range(self.batch_size):
            batch.append(self.external_data[z][0])
            labels_1.append(self.external_data[z][1])
            labels_2.append(self.external_data[z][2])
            labels_3.append(self.external_data[z][3])
            self.i = (self.i + 1) % self.n
        return (batch, labels_1, labels_2, labels_3)

    @property
    def size(self,):
        return self.data_set_len

    next = __next__

class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, external_data):
        super(ExternalSourcePipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)
        self.input = ops.ExternalSource(device='gpu')
        self.input_label_1 = ops.ExternalSource(device='gpu')
        self.input_label_2 = ops.ExternalSource(device='gpu')
        self.input_label_3 = ops.ExternalSource(device='gpu')

        self.external_data = external_data
        self.iterator = iter(self.external_data)

    def define_graph(self):
        self.jpegs = self.input()
        self.labels_1 = self.input_label_1()
        self.labels_2 = self.input_label_2()
        self.labels_3 = self.input_label_3()

        return (self.jpegs, self.labels_1, self.labels_2, self.labels_3)

    def iter_setup(self):
        try:
            (images, labels_1, labels_2, labels_3) = self.iterator.next()
            self.feed_input(self.jpegs, images)
            self.feed_input(self.labels_1, labels_1)
            self.feed_input(self.labels_2, labels_2)
            self.feed_input(self.labels_3, labels_3)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration
eii = ExternalInputIterator(batch_size=images_per_batch, device_id=0, num_gpus=len(cfg.GPUS), external_data=dataset)
train_pipe = ExternalSourcePipeline(batch_size=self.batch_size, num_threads=self.workers, device_id=0, external_data=self.external)

train_pipe.build()
train_pipe.run()

I have tried ops.ExternalSource(num_outputs=2) but it will raise error:

TypeError: Expected outputs of type compatible with "DataNode". Received output type with name "list" that does not match.

How can I fix the error? Thanks!

question

All 9 comments

This toy example works with DALI 0.28:

import nvidia.dali as dali
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import numpy as np
import nvidia.dali.fn as fn

class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus, external_data):
        self.batch_size = batch_size
        self.external_data = external_data
        self.data_set_len = len(self.external_data)
        self.iterator = iter(self.external_data)
        self.n = len(self.external_data)

    def __iter__(self):
        self.i = 0
        return self

    def __next__(self):
        batch = []
        labels_1 = []
        labels_2 = []
        labels_3 = []

        if self.i >= self.n:
            raise StopIteration

        for z in range(self.batch_size):
            batch.append(self.external_data[z][0])
            labels_1.append(self.external_data[z][1])
            labels_2.append(self.external_data[z][2])
            labels_3.append(self.external_data[z][3])
            self.i = (self.i + 1) % self.n
        return (batch, labels_1, labels_2, labels_3)

    @property
    def size(self,):
        return self.data_set_len

    next = __next__

class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, external_data):
        super(ExternalSourcePipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)
        self.input = ops.ExternalSource(device='gpu')
        self.input_label_1 = ops.ExternalSource(device='gpu')
        self.input_label_2 = ops.ExternalSource(device='gpu')
        self.input_label_3 = ops.ExternalSource(device='gpu')

        self.external_data = external_data
        self.iterator = iter(self.external_data)

    def define_graph(self):
        self.jpegs = self.input()
        self.labels_1 = self.input_label_1()
        self.labels_2 = self.input_label_2()
        self.labels_3 = self.input_label_3()

        return (self.jpegs, self.labels_1, self.labels_2, self.labels_3)

    def iter_setup(self):
        try:
            (images, labels_1, labels_2, labels_3) = self.iterator.next()
            self.feed_input(self.jpegs, images)
            self.feed_input(self.labels_1, labels_1)
            self.feed_input(self.labels_2, labels_2)
            self.feed_input(self.labels_3, labels_3)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

batch_size = 2
test_data_shape = [4, 2]
dataset = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
eii = ExternalInputIterator(batch_size=2, device_id=0, num_gpus=1, external_data=dataset)
train_pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=4, device_id=0, external_data=eii)

train_pipe.build()
train_pipe.run()

Hi,

Thanks for your help!

I think I did not describe my problem properly: My labels are 2-dim lists. Specifically:

label_1[0].shape = (23,32,32)
label_1[1].shape = (23,64,64)
label_2[0].shape = (32,32)
label_2[1].shape = (64,64)
label_3[0].shape = (30,23,23)
label_3[1].shape = (30,23,23)

Thus, I can not feed my label to ops.ExternalSource. Can you help me about this? Thanks!!

Hi,
ExternalSource can accept only numpy arrays. Please convert your labels to numpy.

@JanuszL Would you add the numpy requirement to the docs? I couldn't find it anywhere.

The data batch produced by source may be anything that鈥檚 accepted by nvidia.dali.pipeline.Pipeline.feed_input()
image

To be more specific:
The batch may be represented as a single tensor/ndarray or a list of these (the latter allows you to have batches of objects with varying shape).
The constraint is that the whole batch must have the same type and the same rank (number of dimensions).

@FarzanT if you have some spare time feel free to improve the wording in the documentation https://github.com/NVIDIA/DALI/blob/master/dali/python/nvidia/dali/external_source.py.

@mzient Thank you, any chance that it might support nested lists in the future? Consider a case where the iterator can return e.g. 5 inputs to a 5 headed multi-modal model, vs when the iterator is requested to return only a subset of those 5.
If ExternalResource could accept a list of arbitrary size, it would remove the need to create a separate iterator for every combination of 5 data sources I mentioned.
Right now, external_source.py -> _check_data_batch & _get_batch_shape disallow this.
So one would have to unpack everything at define_graph() before returning, which could then alter how each batch is called during training for each data combination.
Of course, there are ways to get around this but this makes the logic unnecessarily complicated.
Thank you for your time!

@FarzanT I don't quite get it. If the iterator supplies 4 inputs, then it should provide 4 batches. define_graph has little to do with it - if you only need one, you can wrap your iterator into an additional function that will extract the relevant input.
The example below uses a more modern approach to using DALI pipeline and ExternalSource

import nvidia.dali as dali
import nvidia.dali.fn as fn
from nvidia.dali.pipeline import Pipeline

class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus, external_data):
        self.batch_size = batch_size
        self.external_data = external_data
        self.data_set_len = len(self.external_data)
        self.iterator = iter(self.external_data)
        self.n = len(self.external_data)

    def __iter__(self):
        self.i = 0
        return self

    def __next__(self):
        batch = []
        labels_1 = []
        labels_2 = []
        labels_3 = []

        if self.i >= self.n:
            raise StopIteration

        for z in range(self.batch_size):
            batch.append(self.external_data[z][0])
            labels_1.append(self.external_data[z][1])
            labels_2.append(self.external_data[z][2])
            labels_3.append(self.external_data[z][3])
            self.i = (self.i + 1) % self.n
        return (batch, labels_1, labels_2, labels_3)

    @property
    def size(self,):
        return self.data_set_len

    next = __next__

test_data_shape = [4, 2]
dataset = [np.random.randint(0, 255, size = test_data_shape, dtype = np.uint8) for _ in range(batch_size)]
eii = ExternalInputIterator(batch_size=2, device_id=0, num_gpus=1, external_data=dataset)

batch_size = 42
num_threads = 3

Now, when you want to use all the inputs, you can just write:

pipe = Pipeline(batch_size, num_threads, 0)
with pipe:
    jpegs, *labels = fn.external_source(source=eii, num_outputs=4)  # specifying num_outputs causes ExternalSource to return a tuple
    pipe.set_outputs(jpegs, *labels)

If you want to use just 1 output:

pipe = Pipeline(batch_size, num_threads, 0)
with pipe:
    labels2 = fn.external_source(source=lambda: next(eii)[2])
    pipe.set_outputs(labels2)
Was this page helpful?
0 / 5 - 0 ratings