Dali: Pipeline.run() errors if input from ops.ExternalSource is not returned

Created on 15 Jan 2019  路  8Comments  路  Source: NVIDIA/DALI

For a pipeline with multiple inputs from an external source, we can write a customer iterator class as described here (sample code included at the end of this post).

What I find odd is that if we change the return for define_graph to just self.batch instead of [self.batch] + [self.bboxes], then pipe.run() will throw RuntimeError: [/.../dali/dali/pipeline/op_graph.h:226] Assert on "it != tensor_producers_.end()" failed: Tensor with name "ExternalSource_id_1_output_0_cpu" has no known source. A couple suggestions/questions regarding this:

  1. Could we change the error message to something more informative like Tensor MyTensor from ops.ExternalSource() must be returned from define_graph in order for the pipeline to be valid? In my example, the tensor _does_ have a known source so the current message isn't appropriate.
  2. Why does the pipeline require all inputs from Ops.ExternalSource to be returned in define_graph? I have some cases where I'd like to send image metadata (i.e. number of channels, pixel dimensions, etc.) to the pipeline, but don't necessarily need them returned. Being forced to add them to the return tensor list makes things more cluttered than necessary.
import numpy as np
import nvidia.dali.ops as ops
from nvidia.dali.pipeline import Pipeline

class ExternalInput():
    """Dummy iterator that returns random NumPy arrays."""
    def __init__(self, batch_size):
        self.batch_size = batch_size

    def __iter__(self):
        self.i = 0
        self.n = 10
        return self

    def __next__(self):
        batch = []
        bboxes = []
        if self.i < self.n:
            for _ in range(self.batch_size):
                batch.append(np.random.rand(3))
                bboxes.append(np.random.rand(4))
                self.i += 1
            return batch, bboxes
        else:
            raise StopIteration

    next = __next__

class MyPipe(Pipeline):
    """Pipeline to receive output from ExternalInput."""
    def __init__(self, batch_size, num_threads, device_id):
        super(MyPipe, self).__init__(batch_size, num_threads, device_id, seed=12)
        self.input_batch = ops.ExternalSource()
        self.input_bboxes = ops.ExternalSource()
        self.iterator = iter(ExternalInput(batch_size))

    def define_graph(self):
        self.batch = self.input_batch()
        self.bboxes = self.input_bboxes()
        return [self.batch] + [self.bboxes]  # error if self.bboxes is not returned

    def iter_setup(self):
        batch, bboxes = self.iterator.next()
        self.feed_input(self.batch, batch)
        self.feed_input(self.bboxes, bboxes)

if __name__ == '__main__':

    pipe = MyPipe(
        batch_size=5,
        num_threads=1,
        device_id=0)
    pipe.build()
    pipe_out = pipe.run()
    print(pipe_out) # Two tensors, as expected
bug

All 8 comments

That looks like a bug - you should not have to return those values from the pipeline for it to be valid.

Tracked as DALI-489

Briefly looking into code I think we may prune unused edges in the graph. So if you are not using self.bboxes this edge is removed, but when feed_input it tries to use it to push data using that edge reference. And this produces this error. We may add checking to feed_input if given edge exist at all and do nothing if it doesn't or just avoid pruning ExternalSource. Personally I would vote for 1) to be consistent.
@Kh4L - you did ExternalSource, any thoughts?

Maybe I misunderstand your suggestion, but I don't think option 1) works for my use case. Doing nothing if feed_input can't find its edge means that input to the pipeline will not be accessible in future runs. I think ops.ExternalSource() should not be pruned so that you can still pass input, but it doesn't have to be returned from the pipeline

I mean skip executing feed_input for the input which is not used. So if you are not using self.bboxes (nor returning as an output from DALI nor as a input to any operation) then self.feed_input(self.bboxes, bboxes) would be a noop from your perspective.

I see now. In my case, I'd be using self.bboxes as input to a pipeline operator (but not for pipeline output)

Just made a fresh local build after the merge for #453 and this is fixed!

Was this page helpful?
0 / 5 - 0 ratings