Dali: How to read a set of frames and process them by same operations?

Created on 28 May 2020  路  12Comments  路  Source: NVIDIA/DALI

I am trying to sample arbitrary frames from video, so I have extracted frames and generated paths of frames and corresponding label by a custom ExternalInputIterator.

class ExternalInputIterator(object):
    ...

    def __iter__(self):
        self.i = 0
        random.shuffle(self.sequences_paths)
        self.n = len(self.sequences_paths)
        return self

    def __next__(self):
        batch_data = []
        batch_labels = []
        for _ in range(self.batch_size):
            sequence, label = read_seqeunces_as_stream(self.sequences_paths[self.i])
            batch_data.append(sequence)    # sequence includes a sequence of image stream readed by numpy.from_buffer
            batch_labels.append(label)
            self.i = (self.i + 1) % self.n
        return (batch_data, batch_labels)

    next = __next__

Now I need to read these frames into pipeline and process them by same operations. Refer to #1579 , I create a pipeline as follows:

class HybridPipe(Pipeline):
    def __init__(self, sequences_paths, batch_size, num_threads, device_id):
        super(HybridPipe, self).__init__(batch_size, num_threads, device_id, seed=device_id)
        self.external_data = ExternalInputIterator(sequences_paths, batch_size)
        self.iterator = iter(self.external_data)
        self.input = [ops.ExternalSource() for _ in range(sequence_size)]
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device='mixed', output_type=types.RGB)
        self.crop = ops.Crop(device='gpu', crop=(224, 224))

    def define_graph(self):
        self.batch_data = self.input
        self.labels = self.input_label()
        batch_frames = []
        for sequence in self.batch_data:
            frames = sequence()
            out = self.decode(frames)
            out = self.crop(out)
            batch_data.append(out)
        return (batch_data, self.labels)

    def iter_setup(self):
        try:
            (batch_data, labels) = self.iterator.next()
            for i in range(sequence_size):
                self.feed_input(self.batch_data[i], batch_data[i])
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

However, it raises error:

TypeError: Expected outputs of type compatible with "EdgeReference". Received output type with name "list" that does not match.

And if I feed batch_data to self.batch_data directly and try to iterate self.batch_data in define_graph(), it will raise:

TypeError: '_EdgeReference' object is not iterable

So could I feed batch_data into pipeline and process sequence by iterating it or using sequence processing?

Thanks!

question

All 12 comments

Hi,

Regarding the first error, DALI expects that define_graph returns an iterable consisting of operator outputs or just a single output. When you return a tuple with a list inside DALI becomes confused. What you can do is return (*batch_data, self.labels).

Regarding the second error - you need to pass a single operator output to the feed_input method, not the iterable. I would also suggest familiarizing with the most recent and recommended way of using the ExternalSource operator .

Hello!
It seems that you're trying to iterate the sequences in define_graph - the objects you operate on in define_graph are merely graph nodes.
Currently there's no efficient way to decode a sequence of separate frames.

What @mzient mean is that it is probably not the best way to achieve your goal, but maybe the only that is possible now in DALI.
In your case, you need to store batch of 1st frames, a batch of 2nd frames, and so on. Also, it is memory inefficient as DALI would create multiple ImageDecoder instances, each of one may consume hundreds of MB of GPU memory.

The code in define_graph does not constitute a valid DALI pipeline and we're guessing what was the intention. @esdream Could you elaborate what is your pipeline's desired output? If you want:
output0 = batch_of_frames_0
output1 = batch_of_frames_1
output2 = batch_of_frames_2
...
outputN = batch_of_frames_N

then it's doable, but it will consume a lot of memory and you have to know the sequence length at the point of graph definition (before the pipeline's had a chance to touch the data).

If you want to assemble the sequences inside DALI pipeline, then unfortunately we don't have an operator that would concatenate individual frames to produce sequences.

With small corrections it should work:

class HybridPipe(Pipeline):
    def __init__(self, sequences_paths, batch_size, num_threads, device_id):
        super(HybridPipe, self).__init__(batch_size, num_threads, device_id, seed=device_id)
        self.external_data = ExternalInputIterator(sequences_paths, batch_size)
        self.iterator = iter(self.external_data)
        self.input = [ops.ExternalSource() for _ in range(sequence_size)]
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device='mixed', output_type=types.RGB)
        self.crop = ops.Crop(device='gpu', crop=(224, 224))

    def define_graph(self):
        self.batch_data = [i() for i in self.input]
        self.labels = self.input_label()
        batch_frames = []
        for frames in self.batch_data:
            out = self.decode(frames)
            out = self.crop(out)
            batch_frames.append(out)
        return (*batch_frames, self.labels)

    def iter_setup(self):
        try:
            (batch_data, labels) = self.iterator.next()
            for i in range(sequence_size):
                self.feed_input(self.batch_data[i], batch_data[i])
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

@mzient Thanks for your reply. I have a fixed sequence length N so that my expected outputs of
one batch are as follows (assumed batch_size = M):
[[img_1, img_2, img_3, ..., img_N], label1],
[[img_1, img_2, img_3, ..., img_N], label2],
...
[[img_1, img_2, img_3, ..., img_N], labelM]

But any output format of pipeline can be accepted because I can get expected data by running pipeline.run() repeatedly. I have solved problem of feed and output.

@JanuszL Thanks! Enlightened by your code and @mzient , I modified the outputs of ExternalInputIterator:

class ExternalInputIterator(object):
    ...

    def __iter__(self):
        self.i = 0
        random.shuffle(self.sequences_paths)
        self.n = len(self.sequences_paths)
        return self

    def __next__(self):
        batch = []
        batch_labels = []
        for _ in range(self.batch_size):
            sequence, label = read_seqeunces_as_stream(self.sequences_paths[self.i])
            batch.append(sequence)    # sequence includes a sequence of image streams readed by numpy.from_buffer
            batch_labels.append(label)
            self.i = (self.i + 1) % self.n

        batch_data = []
        for x in range(self.sequence_length):
            elem = []
            for y in range(self.batch_size):
                elem.append(batch[y][x])   # each elem includes batch of frames x (x is the index of frame sequence)
            batch_data.append(elem)

        return (batch_data, batch_labels)

    next = __next__

Then I feeded batch_data into pipeline by a list of ExternalSource(), and unpacked batch_out to output:

class HybridPipe(Pipeline):
    def __init__(self, sequences_paths, batch_size, sequence_length, num_threads, device_id):
        super(HybridPipe, self).__init__(batch_size, num_threads, device_id, seed=device_id)
        self.external_data = ExternalInputIterator(sequences_paths, batch_size)
        self.iterator = iter(self.external_data)
        self.sequence_length = sequence_length
        self.inputs = [ops.ExternalSource() for _ in range(self.sequence_length)]
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device='mixed', output_type=types.RGB)
        self.resize = ops.Resize(device='gpu', resize_shorter=256)
        self.crop = ops.Crop(device='gpu', crop=(224, 224))

    def define_graph(self):
        inputs = self.inputs
        self.sequence_data = []
        for batch in inputs:
            elem = batch()
            self.sequence_data.append(elem)

        batch_out = []
        for i in range(self.sequence_length):
            out = self.decode(self.sequence_data[i])
            out = self.resize(out)
            out = self.crop(out)
            batch_out.append(out)

        self.labels = self.input_labels()
        return (*batch_out, self.labels)

    def iter_setup(self):
        try:
            (batch_data, labels) = self.iterator.next()
            for i in range(sequence_length):
                self.feed_input(self.batch_data[i], batch_data[i])
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

Finally, I can get each sample data by pip.run():

pip = HybridPipe.run()
pip.build()

pip_out = pip.run()
for i in range(batch_size):
    sequence = []
    for j in range(sequence_length):
        sequence.append(pipe_out[j].as_cpu().at(i))
    label = pipe_out[sequence_length].at(i) 
    sample = [sequnece, label]

And now I want to know could I apply same random resize and crop on frames in the same sequence? It means I should process every frame_N in batch_of_frames_N separately. But in HybridPipe object, batch_of_frames_N(self.sequence_data[i]) is _EdgeReference, it seems I cannot seperate them.

And now I want to know could I apply the same random resize and crop on frames in the same sequence?

If you apply the same operator instance to any input (or any iterable of inputs) you should have the operator executed with the same internal state.
So this should execute the same transformation on inputs:

    self.operation = SomeRandomOperation(...)
def define_graph(self):
    (...)
    out_0 = self.operation(in_0)
    out_1 = self.operation(in_1)
    return out_0, return out_1

is equivalent to:

    self.operation = SomeRandomOperation(...)
def define_graph(self):
    (...)
    ins = [in_0, in_1]
    outs = self.operation(ins)
    return outs

If in_0 and in_1 in the above examples are the same the out_0 and out_1 will be the same as well.
To make them differ you would need to create two operator insances:

    self.operation_0 = SomeRandomOperation(...)
    self.operation_1= SomeRandomOperation(...)
def define_graph(self):
    (...)
    out_0 = self.operation_0(in_0)
    out_1 = self.operation_1(in_1)
    return out_0, return out_1

Even if in_0 and in_1 in the above examples are the same the out_0 and out_1 will differ.
In your case you can simply:

    def define_graph(self):
        self.batch_data = [i() for i in self.input]
        self.labels = self.input_label()
        out = self.decode(self.batch_data)
        batch_frames = self.crop(out)
        return (*batch_frames, self.labels)

@JanuszL
I know how to process batch_data in define graph, but actually I want to apply same operations on images in the same sequence and apply different oprerations between different sequences.
For example, I assume that batch_size = 4, sequence_length = 15. Now I should feed following data into pipeline in one batch:
[
[seq1_img1, seq1_img2, ..., seq1_img15], seq1_label,
[seq2_img1, seq2_img2, ..., seq2_img15], seq2_label,
[seq3_img1, seq3_img2, ..., seq3_img15], seq3_label,
[seq4_img1, seq4_img2, ..., seq4_img15], seq4_label,
]

Idea 1:

If I declare a inputs with batch_size in pipeline initial function:

self.inputs = [ops.ExternalSource() for _ in range(batch_size)]

And call them in define_graph:

self.batch_data = [i() for i in self.inputs]

Finally I feed data in iter_setup:

for i in range(self.batch_size):
        self.feed_input(self.batch_data[i], batch_data[i])  # batch_data[i] = sequence i, which includes 15 images

It will raise Error when I feed data:

Data list provided to feed_input needs to have batch_size length

So I concluded that every batch_data must be feeded batch_size elements (right?).

Idea 2:

Then I modified the outputs of ExternalInputIterator as mentioned above, and declare a inputs with sequence_length in pipeline initial function:

self.inputs = [ops.ExternalSource() for _ in range(sequence_length)]

And feed data in inter_setup:

for i in range(self.sequence_length):
        self.feed_input(self.batch_data[i], batch_data[i])  # batch_data[i] = [seq1_img1, seq2_img1, seq3_img1, seq4_img1]

Now in define_graph, if I call self.batch_data[i], I will get a batch of img[i] but every img in different sequence.
So I what to know that:

  1. In idea 1, could I feed a batch_data which size unequal to batch_size?
  2. In idea 2, it is any possible to seprate self.batch_data[i] and apply different operations on every img[i]? (seq1_img1, seq2_img1, seq3_img1. seq4_img1)

So I concluded that every batch_data must be fed batch_size elements (right?).

Yes, that is correct.

In idea 1, could I feed a batch_data which size unequal to batch_size?

Sorry, you cannot do that.

In idea 2, it is any possible to seprate self.batch_data[i] and apply different operations on every img[i]? (seq1_img1, seq2_img1, seq3_img1. seq4_img1)

Definitely. That is how DALI applies random operations, per sample, not per batch.
If you have:

  • an operator with internal randomness -like RandomResizeCrop it will generate different parameters to every sample in the batch. If you do something like:
def define_graph(self):
        self.batch_data = [i() for i in self.input]
        out = self.RandomResizedCrop(self.batch_data)

It will apply the same transformation to every first element in the data provided, but different than for the second. So all frames in the sequence will get the same transformation, but every sequence will get a different one

  • the operator that is driven by another operator, like slice which can be feed by a Uniform operator will work in the same way as in the first case. The same value from Uniform will be applied to every frame in a given sequence, but different between sequences

So, as mentioned in my previous answer

    self.operation = SomeRandomOperation(...)
def define_graph(self):
    (...)
    out_0 = self.operation(in_0)
    out_1 = self.operation(in_1)
    return out_0, return out_1

Sample seq1_img1 and seq2_img1 in the above example will get the same random transformation applied (with the same internal operator state).

@JanuszL It works great. Thank you!

In order to help others better, full code of pipeline are showed as follows:

class HybridPipe(Pipeline):
    def __init__(self, sequences_paths, batch_size, num_threads, device_id=0, dali_cpu=True):
        super(HybridPipe, self).__init__(batch_size, num_threads, device_id, seed=12 + device_id)
        self.external_data = ExternalInputIterator(sequences_paths, batch_size, device_id=device_id)
        self.iterator = iter(self.external_data)
        self.inputs = [ops.ExternalSource() for _ in range(sequences_paths)]
        self.input_labels = ops.ExternalSource()

        decoder_device = 'cpu' if dali_cpu else 'mixed'
        device_memory_padding = 1048576 if decoder_device == 'mixed' else 0
        host_memory_padding = 140544512 if decoder_device == 'mixed' else 0

        self.decode = ops.ImageDecoder(device=decoder_device, output_type=types.RGB)
        self.resize = ops.Resize(device='gpu', resize_shorter=224)

    def define_graph(self):
        self.batch_data = [i() for i in self.inputs]
        self.labels = self.input_labels()
        out = self.decode(self.batch_data)
        out = [out_elem.gpu() for out_elem in out]
        out = self.resize(out)
        return (*out, self.labels)

    def iter_setup(self):
        try:
            (batch_data, labels) = self.iterator.next()
            for i in range(sequences_paths):
                self.feed_input(self.batch_data[i], batch_data[i])
            self.feed_input(self.labels, labels)

        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration
Was this page helpful?
0 / 5 - 0 ratings