Dali: question:the different between DALIGenericIterator and DALICOCOIterator

Created on 28 Aug 2019  ·  18Comments  ·  Source: NVIDIA/DALI

Most helpful comment

I just write a simple DALI iterator for pyTorch.It works for me.But the outputs are not friendly.It's not perfect.
Is it useful for you?

class PyTorchIterator(object):
    """
    General DALI iterator for pyTorch. It can return any number of
    outputs from the DALI pipeline in the form of pyTorch's Tensors.

    Parameters
    ----------
    pipelines : list of nvidia.dali.pipeline.Pipeline
                List of pipelines to use
    output_map : list of str
                 List of strings which maps consecutive outputs
                 of DALI pipelines to user specified name.
                 Outputs will be returned from iterator as dictionary
                 of those names.
                 Each name should be distinct
    size : int
           Number of samples in the epoch (Usually the size of the dataset).
    auto_reset : bool, optional, default = False
                 Whether the iterator resets itself for the next epoch
                 or it requires reset() to be called separately.
    stop_at_epoch : bool, optional, default = False
                 Whether to return a fraction of a full batch of data
                 such that the total entries returned by the
                 iterator == 'size'. Setting this flag to False will
                 cause the iterator to return the first integer multiple
                 of self._num_gpus * self.batch_size which exceeds 'size'.
    dynamic_shape: bool, optional, default = False
                 Whether the shape of the output of the DALI pipeline can
                 change during execution. If True, the pytorch tensor will be resized accordingly
                 if the shape of DALI returned tensors changes during execution.
                 If False, the iterator will fail in case of change.
    """
    def __init__(self,
                 pipelines,
                 output_map,
                 size,
                 auto_reset=False,
                 stop_at_epoch=False,
                 dynamic_shape=False):
        if not isinstance(pipelines, list):
            pipelines = [pipelines]
        self._num_gpus = len(pipelines)
        assert pipelines is not None, "Number of provided pipelines has to be at least 1"
        self.batch_size = pipelines[0].batch_size
        self._size = int(size)
        self._auto_reset = auto_reset
        self._stop_at_epoch = stop_at_epoch
        self._dynamic_shape = dynamic_shape
        self._pipes = pipelines
        # Build all pipelines
        for p in self._pipes:
            p.build()
        # Use double-buffering of data batches
        self._data_batches = [[None, None] for i in range(self._num_gpus)]
        self._counter = 0
        self._current_data_batch = 0
        assert len(set(output_map)) == len(output_map), "output_map names should be distinct"
        self._output_categories = set(output_map)
        self.output_map = output_map

        # We need data about the batches (like shape information),
        # so we need to run a single batch as part of setup to get that info
        for p in self._pipes:
            p.schedule_run()
        self._first_batch = None
        self._first_batch = self.next()

    def __next__(self):
        if self._first_batch is not None:
            batch = self._first_batch
            self._first_batch = None
            return batch
        if self._counter >= self._size:
            if self._auto_reset:
                self.reset()
            raise StopIteration
        # Gather outputs
        outputs = []
        for p in self._pipes:
            outputs.append(p.share_outputs())
        for i in range(self._num_gpus):
            dev_id = self._pipes[i].device_id
            # initialize dict for all output categories
            category_outputs = dict()
            # segregate outputs into categories
            for j, out in enumerate(outputs[i]):
                category_outputs[self.output_map[j]] = out

            # Change DALI TensorLists into Tensors
            category_tensors = dict()
            category_shapes = dict()

            for category, out in category_outputs.items():
                if category not in category_tensors:
                    category_shapes[category] = []
                    category_tensors[category] = []
                category_tensors[category].append(out)
                j = len(category_tensors[category]) - 1
                category_shapes[category].append([])
                for k in range(len(category_tensors[category][j])):
                    category_shapes[category][j].append(category_tensors[category][j].at(k).shape())

            # If we did not yet allocate memory for that batch, do it now
            # TODO : 迭代器中申请空间改为申请最大空间,而不是每次变动都更新
            if self._data_batches[i][self._current_data_batch] is None:
                category_torch_type = dict()
                category_device = dict()
                torch_gpu_device = torch.device('cuda', dev_id)
                torch_cpu_device = torch.device('cpu')
                # check category and device
                for category in self._output_categories:
                    category_torch_type[category] = to_torch_type[np.dtype(category_tensors[category][0].at(0).dtype())]
                    from nvidia.dali.backend import TensorListGPU
                    if type(category_tensors[category][0]) is TensorListGPU:
                        category_device[category] = torch_gpu_device
                    else:
                        category_device[category] = torch_cpu_device

                pyt_tensors = dict()
                for category in self._output_categories:
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                          dtype=category_torch_type[category],
                                                          device=category_device[category]) for shape in shape_list]
                                             for shape_list in category_shapes[category]]

                self._data_batches[i][self._current_data_batch] = pyt_tensors
            else:
                pyt_tensors = self._data_batches[i][self._current_data_batch]

            # Copy data from DALI Tensors to torch tensors
            for category, tensor in category_tensors.items():
                if self._dynamic_shape and [[list(pyt_tensors[category][j][k].shape)
                        for k in range(len(pyt_tensors[category][j]))]
                        for j in range(len(pyt_tensors[category]))] != category_shapes[category]:#动态更新尺寸
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                            dtype=pyt_tensors[category][0][0].dtype,
                                                            device=pyt_tensors[category][0][0].device) for shape in
                                                shape_list]
                                                for shape_list in category_shapes[category]]

                  for j, b_list in enumerate(tensor):
                      for k in range(len(b_list)):
                          if (pyt_tensors[category][j][k].shape[0] != 0):
                              feed_ndarray(b_list.at(k), pyt_tensors[category][j][k])


        for p in self._pipes:
            p.release_outputs()
            p.schedule_run()

        copy_db_index = self._current_data_batch
        # Change index for double buffering
        self._current_data_batch = (self._current_data_batch + 1) % 2
        self._counter += self._num_gpus * self.batch_size

        if (self._stop_at_epoch) and (self._counter > self._size):
            # First calculate how much data is required to return exactly self._size entries.
            diff = self._num_gpus * self.batch_size - (self._counter - self._size)
            # Figure out how many GPUs to grab from.
            numGPUs_tograb = int(np.ceil(diff/self.batch_size))
            # Figure out how many results to grab from the last GPU (as a fractional GPU batch may be required to
            # bring us right up to self._size).
            mod_diff = diff % self.batch_size
            data_fromlastGPU = mod_diff if mod_diff else self.batch_size

            # Grab the relevant data.
            # 1) Grab everything from the relevant GPUs.
            # 2) Grab the right data from the last GPU.
            # 3) Append data together correctly and return.
            output = [db[copy_db_index] for db in self._data_batches[0:numGPUs_tograb]]
            output[-1] = output[-1].copy()
            for category in self._output_categories:
                output[-1][category] = output[-1][category][0:data_fromlastGPU]
            return output

        return [db[copy_db_index] for db in self._data_batches]

    def next(self):
        """
        Returns the next batch of data.
        """
        return self.__next__()

    def __iter__(self):
        return self

    def reset(self):
        """
        Resets the iterator after the full epoch.
        DALI iterators do not support resetting before the end of the epoch
        and will ignore such request.
        """
        if self._counter >= self._size:
            if self._stop_at_epoch:
                self._counter = 0
            else:
               self._counter = self._counter % self._size
            for p in self._pipes:
                p.reset()
        else:
            logging.warning("DALI iterator does not support resetting while epoch is not finished. Ignoring...")

All 18 comments

Hello, thanks for the question.
I think that the synchronization in the first link is a leftover from some previous version. I'm not aware of any current issues with the version without it.
In general, please use DALIGenericIterator from our repository. It is actively tested and maintained.

I have used DALIGenericIterator for my work.But I find the function don't support the labels and bboxes of different dimensions (I use COCOReader operator).So I overwrite it according to the DALICOCOIterator,which is the question comes from.

May I ask you a question?
To solve this problem,Is there a good idea?

I just write a poor code for my work.It works but I am not satisfied with that.
When the outputmap is changed,I have to change the Iterator code too.

DALIGenericIterator supports dynamic_shape parameter now. It should be possible to use it to return tensors with different shape every iteration like boxes and labels in detection scenarios.

I think there is a bug in DALIGenericIterator .
When I use DALIGenericIterator function.
The error is follows.
category_tensors[category] = out.as_tensor() RuntimeError: [/opt/dali/dali/pipeline/data/tensor_list.h:386] Assert on "this->IsDenseTensor()" failed: All tensors in the input TensorList must have the same shape and be densely packed.

I think as_tensor() can't be used with different shape.
And here is the reason.

Do you agree with me?

I have tested DALICOCOIterator and DALIGenericIterator with other same .
'DALICOCOIterator' works fine.
And 'DALIGenericIterator' can't work as expected.

You are right. There is an issue with DALIGenericIterator in this context. Thanks for finding it.

May you plan to fix it currently?

I've look a bit deeper into this issue and from the code it seems, that current implementation of DALIGenericIterator was never suppose to work in this context. It relies heavily on the assumption, thtat TensorLists returned from the pipeline are dense -> all of the samples have the same shape. dynamic_shape gives the ability to change sample shape from iteration to iteration, but still all of the samples in the batch must have the same shape.
DALICOCOIterator is a castom code that was written to specifically alleviate this constraint for detection data. It copies samples one by one. This way they don't have to have the same shape. If you need this capability it's ok to follow approach like int DALICOCOIterator.
To sum up, from the DALI perspective it is enhancement rather then a bug. Tracked internalli as DALI-1037
If you are willing to contribute, we would be happy to review and merge your PR.

I think each sample usually has a different number of labels and bboxes.
So if we have used 'COCOReader',we can't use 'DALIGenericIterator' at most time.
Do you agree with me?

Yes, it won't go together like that.
COCOReader and other detection related operators were developed to support SSD network. It uses box encoding, so the resulting output has constant size for every sample. That's why this feature we are discussing was not needed. As you may notice, doing it with a single copy is faster. That's why it was done this way in the first place.
That said, it's definitely a desirable feature that would make iterators more versatile.

en,what do you mean about "doing it with a single copy is faster"?
Does It mean to set dynamic_shape=True in 'DALIGenericIterator' and batch_size=1 in 'COCOReader'?
Can box encoding be used in other detection networks which just need ground truth boxes ?

Current implementation of the iterator copies all of the samples using one copy call as they are contiguous in the memory. This is faster then issuing copy for every sample. And separate copy for every sample is necessary when you want to have samples with different shapes in the batch.

You can set dynamic_shape=True and batch_size=1 to bypass this whole issue. Please keep in mind, that processing batch this small might cause inferior performance.

Can box encoding be used in other detection networks which just need ground truth boxes

BoxEncoder we have implemented is something rather specific to SSD. You can read more about it in the SSD paper. This encoding will not work as it is, if you need ground truth boxes.

I just write a simple DALI iterator for pyTorch.It works for me.But the outputs are not friendly.It's not perfect.
Is it useful for you?

class PyTorchIterator(object):
    """
    General DALI iterator for pyTorch. It can return any number of
    outputs from the DALI pipeline in the form of pyTorch's Tensors.

    Parameters
    ----------
    pipelines : list of nvidia.dali.pipeline.Pipeline
                List of pipelines to use
    output_map : list of str
                 List of strings which maps consecutive outputs
                 of DALI pipelines to user specified name.
                 Outputs will be returned from iterator as dictionary
                 of those names.
                 Each name should be distinct
    size : int
           Number of samples in the epoch (Usually the size of the dataset).
    auto_reset : bool, optional, default = False
                 Whether the iterator resets itself for the next epoch
                 or it requires reset() to be called separately.
    stop_at_epoch : bool, optional, default = False
                 Whether to return a fraction of a full batch of data
                 such that the total entries returned by the
                 iterator == 'size'. Setting this flag to False will
                 cause the iterator to return the first integer multiple
                 of self._num_gpus * self.batch_size which exceeds 'size'.
    dynamic_shape: bool, optional, default = False
                 Whether the shape of the output of the DALI pipeline can
                 change during execution. If True, the pytorch tensor will be resized accordingly
                 if the shape of DALI returned tensors changes during execution.
                 If False, the iterator will fail in case of change.
    """
    def __init__(self,
                 pipelines,
                 output_map,
                 size,
                 auto_reset=False,
                 stop_at_epoch=False,
                 dynamic_shape=False):
        if not isinstance(pipelines, list):
            pipelines = [pipelines]
        self._num_gpus = len(pipelines)
        assert pipelines is not None, "Number of provided pipelines has to be at least 1"
        self.batch_size = pipelines[0].batch_size
        self._size = int(size)
        self._auto_reset = auto_reset
        self._stop_at_epoch = stop_at_epoch
        self._dynamic_shape = dynamic_shape
        self._pipes = pipelines
        # Build all pipelines
        for p in self._pipes:
            p.build()
        # Use double-buffering of data batches
        self._data_batches = [[None, None] for i in range(self._num_gpus)]
        self._counter = 0
        self._current_data_batch = 0
        assert len(set(output_map)) == len(output_map), "output_map names should be distinct"
        self._output_categories = set(output_map)
        self.output_map = output_map

        # We need data about the batches (like shape information),
        # so we need to run a single batch as part of setup to get that info
        for p in self._pipes:
            p.schedule_run()
        self._first_batch = None
        self._first_batch = self.next()

    def __next__(self):
        if self._first_batch is not None:
            batch = self._first_batch
            self._first_batch = None
            return batch
        if self._counter >= self._size:
            if self._auto_reset:
                self.reset()
            raise StopIteration
        # Gather outputs
        outputs = []
        for p in self._pipes:
            outputs.append(p.share_outputs())
        for i in range(self._num_gpus):
            dev_id = self._pipes[i].device_id
            # initialize dict for all output categories
            category_outputs = dict()
            # segregate outputs into categories
            for j, out in enumerate(outputs[i]):
                category_outputs[self.output_map[j]] = out

            # Change DALI TensorLists into Tensors
            category_tensors = dict()
            category_shapes = dict()

            for category, out in category_outputs.items():
                if category not in category_tensors:
                    category_shapes[category] = []
                    category_tensors[category] = []
                category_tensors[category].append(out)
                j = len(category_tensors[category]) - 1
                category_shapes[category].append([])
                for k in range(len(category_tensors[category][j])):
                    category_shapes[category][j].append(category_tensors[category][j].at(k).shape())

            # If we did not yet allocate memory for that batch, do it now
            # TODO : 迭代器中申请空间改为申请最大空间,而不是每次变动都更新
            if self._data_batches[i][self._current_data_batch] is None:
                category_torch_type = dict()
                category_device = dict()
                torch_gpu_device = torch.device('cuda', dev_id)
                torch_cpu_device = torch.device('cpu')
                # check category and device
                for category in self._output_categories:
                    category_torch_type[category] = to_torch_type[np.dtype(category_tensors[category][0].at(0).dtype())]
                    from nvidia.dali.backend import TensorListGPU
                    if type(category_tensors[category][0]) is TensorListGPU:
                        category_device[category] = torch_gpu_device
                    else:
                        category_device[category] = torch_cpu_device

                pyt_tensors = dict()
                for category in self._output_categories:
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                          dtype=category_torch_type[category],
                                                          device=category_device[category]) for shape in shape_list]
                                             for shape_list in category_shapes[category]]

                self._data_batches[i][self._current_data_batch] = pyt_tensors
            else:
                pyt_tensors = self._data_batches[i][self._current_data_batch]

            # Copy data from DALI Tensors to torch tensors
            for category, tensor in category_tensors.items():
                if self._dynamic_shape and [[list(pyt_tensors[category][j][k].shape)
                        for k in range(len(pyt_tensors[category][j]))]
                        for j in range(len(pyt_tensors[category]))] != category_shapes[category]:#动态更新尺寸
                    pyt_tensors[category] = [[torch.zeros(shape,
                                                            dtype=pyt_tensors[category][0][0].dtype,
                                                            device=pyt_tensors[category][0][0].device) for shape in
                                                shape_list]
                                                for shape_list in category_shapes[category]]

                  for j, b_list in enumerate(tensor):
                      for k in range(len(b_list)):
                          if (pyt_tensors[category][j][k].shape[0] != 0):
                              feed_ndarray(b_list.at(k), pyt_tensors[category][j][k])


        for p in self._pipes:
            p.release_outputs()
            p.schedule_run()

        copy_db_index = self._current_data_batch
        # Change index for double buffering
        self._current_data_batch = (self._current_data_batch + 1) % 2
        self._counter += self._num_gpus * self.batch_size

        if (self._stop_at_epoch) and (self._counter > self._size):
            # First calculate how much data is required to return exactly self._size entries.
            diff = self._num_gpus * self.batch_size - (self._counter - self._size)
            # Figure out how many GPUs to grab from.
            numGPUs_tograb = int(np.ceil(diff/self.batch_size))
            # Figure out how many results to grab from the last GPU (as a fractional GPU batch may be required to
            # bring us right up to self._size).
            mod_diff = diff % self.batch_size
            data_fromlastGPU = mod_diff if mod_diff else self.batch_size

            # Grab the relevant data.
            # 1) Grab everything from the relevant GPUs.
            # 2) Grab the right data from the last GPU.
            # 3) Append data together correctly and return.
            output = [db[copy_db_index] for db in self._data_batches[0:numGPUs_tograb]]
            output[-1] = output[-1].copy()
            for category in self._output_categories:
                output[-1][category] = output[-1][category][0:data_fromlastGPU]
            return output

        return [db[copy_db_index] for db in self._data_batches]

    def next(self):
        """
        Returns the next batch of data.
        """
        return self.__next__()

    def __iter__(self):
        return self

    def reset(self):
        """
        Resets the iterator after the full epoch.
        DALI iterators do not support resetting before the end of the epoch
        and will ignore such request.
        """
        if self._counter >= self._size:
            if self._stop_at_epoch:
                self._counter = 0
            else:
               self._counter = self._counter % self._size
            for p in self._pipes:
                p.reset()
        else:
            logging.warning("DALI iterator does not support resetting while epoch is not finished. Ignoring...")

Hi, I have a question related to this.

My current pipeline output has a

  • different size for each item in a batch (the first dimension always varies, but other dimensions are always fixed. For each item, padding along the first dimension is beautiful),
  • and cannot use tf.sparse_to_dense -like functions since I am using PyTorch now.

Thus, currently, I get

Assert on "this->IsDenseTensor()" failed: All tensors in the input TensorList must have the same shape and be densely packed.

this error, as expected.

In this case, is there a new iterator to iterate the data? Or, does the code on the very last answer the recommended solution, as of now?

Hi @YunseokJANG,
You can use recently introduced pad operator (but still you need to figure out where data ends and padding starts) or use the very last answer.

Hi @JanuszL , thanks for your kind reply.
This API look is close to what I am looking for.

By the way, I have one additional question related to this. If I want to manipulate the data before feeding it to the model (adding StartOfSentence or EndOfSentence token in NLP, for instance), then do you recommend (or this framework prefers) to manipulate the data format within the Pipeline class (maybe with ops.PythonFunction ( eg1, eg2 ), or implement outside of the iterator? I am slightly biased to the former approach, but I'd like to hear the best practices ;)

Regards,
Yunseok

@YunseokJANG - using the PythonFunction strips your pipeline from asynchronous and pipelines execution, also calling back to python code from the Pipeline imposes some overhead as well. This feature is mainly for prototyping and debugging, not for obtaining performance in the production environment. Doing it outside of the iterator would be probably faster, still, you can try both approaches (it should not be that difficult to change one approach to the other once you have the needed operation implemented).

Oh, I see. Thanks for your comments :)

Was this page helpful?
0 / 5 - 0 ratings