Dali: How could I create a multi-label pytorch dataloader?

Created on 13 May 2019  路  16Comments  路  Source: NVIDIA/DALI

Hi,

It seems that there is no example of multi label dataloader. How could I create a multi label pytorch dataloader please?

enhancement external contribution welcome question

All 16 comments

Hi,
There is a simple but less performant way - just use the external source. Another workaround is to use file_list in the file reader, assign a unique label to each file as an ID, then use it to translate the real set of labels.
Also, you could use data container - like TFRecord or RecordIO where labels could be stored as a list of values instead of scalars.
This could be a good place where the external contribution is more than welcome to extend FileReader to support multiple labels.

Thanks a lot, I am learning how to use external source. Here I met two questions, and I wish I could have more help from you.

  1. Is the external source approach still more efficient than pytorch dataloader?
  2. when I use it in the task of semantic segmentations, the input image and label image are not only loaded, but also identical crop and flip augmentations are required. How could I make sure the crop and flip augmentations are same please?

Hi,
1) It depends. We haven't measured this. What can I say is that I would expect that data reading by the ExternalSource would be slower than other readers in DALI and probably than pytorch dataloader. But when your data is already loaded processing on the GPU would be faster, on CPU should be not slower and sometimes faster. So if your data processing pipeline is dominated by the processing it should be still faster to even if you use ExternalSource.
2) There is a couple of ways to do so:

  • if you using some operator with the internal randomness, like RandomResizedCrop, you can set the same seed to both instances of it
  ...
  self.rrc_img = ops.RandomResizedCrop(device="gpu", size =(224,224), seed=seed)
  self.rrc_labels = ops.RandomResizedCrop(device="gpu", size =(224,224), seed=seed)
  ...
def define_graph(self):
  ...
  images = self.rrc_img(images)
  labels = self.rrc_labels(labels)
  • you can use the same support operator that generates random numbers
  ...
  self.flip_img = ops.Flip(device="gpu")
  self.flip_labels = ops.Flip(device="gpu")
  self.flip_coin = ops.CoinFlip(probability=0.5)
  ...
def define_graph(self):
  ...
  coin_rnd = self.flip_coin()
  images = self.flip(images, horizontal=coin_rnd)
  labels = self.flip(labels, horizontal=coin_rnd)
  • or the easiest way is to provide list of inputs to given op
  ...
  self.flip = ops.Flip(device="gpu")
  self.flip_coin = ops.CoinFlip(probability=0.5)
  ...
def define_graph(self):
  ...
  coin_rnd = self.flip_coin()
  outs = self.flip([images, labels] , horizontal=coin_rnd)

I am using external source for processing large number of images and labels. in my experience, jpeg decoding on gpu alone can offer reasonable speed ups(8-10x) as compared to the pytorch dataloader specially if the dataset is large.

After that I expect more speeds if there are augmentations that are performed on CPU using pytorch approach which dali can process on GPU.

I can play with dali on my task now. I am closing this. Thanks again for support!!!

Sorry, I am not really fully understand the principle of external source, so I reopen this issue:

I notice from the tutorial that the next method of the ExternalInputIterator class is actually an infinite one, since it does not have a raise in it. Is this the correct design or should I add a raise in my implementation on our own dataset ?

Also when I use torch.distributed to spread my model on multi gpus, do I need to take care of how I split the dataset and distribute them to different gpus.

Hi,
The raise is optional, if you throw StopIteration inside iter_setup DALI will handle it, process all data and then rise StopIteration on its own so user know that all data is processed. If you don't do it it is also fine but you need to count the amount of iteration needed for your training on your own.
Answering the second question - in case of ExternalSource you need to split your data on your own. In DALI readers are responsible for that. If you use FileReder or MXNetReader then num_shards and shard_id arguments tell DALI what to do. For the ExternalSource you are on your own, as ExternalSource accepts raw numpy array at every iteration so it is hard to ask DALI to split it between different processes or even nodes.

Thanks a lot!!

What would DALIClassificationIterator do when the pipe object raise an stopIteration please?
I am thinking of creating an iterator object inside the __init__() method of the pipe class, and call its next method inside the iter_setup() method. But I do not know what would happen when an epoch is accomplished and a stopIteration is raised. Will the DALIClassificationIterator create a new pipe object when another round of for batch in loader: is executed?

What I mean is like this:

class iterator(object):
    def __init__(self):
        ...
    def __next__(self):
         ...
         raise stopIteration
    def __iter__(self):
        shuffle()
        split()
        return self

class Pipe(Pipeline):
     def __init__(self):
          self.iter = iterator()
          self.input = ops.ExternalSource()
     def define_grapth(self):
           ...
          return self.batch
    def iter_setup(self):
           batch = next(self.iter)
           self.feed_input(self.batch, batch)

pipe = Pipe()
pipe.build()
loader = DALIClassificationIterator(pipe)

for e in range(n_epoch):
    for batch in loader:  ## Will a new iterator and a new pipe object be created here with new random settings?
        .....

Can this work as expected please? If I want to shuffle and split the dataset for different gpus, should I do this in the definition of the iterator?

Hi,
DALIClassificationIterator would throw the StopIteration when all the data in the processing pipeline is exhausted (or because iter_setup did that or the number of requested samples returned by DALI iterator crossed size argument). In that case, it is enough to call reset or use auto_reset=True.
Regarding the second part of your question:

pipe = Pipe()
loader = DALIClassificationIterator(pipe)

Creates and build the pipeline.
So the code should look like:


DALIClassificationIterator(pipe, (...), auto_reset=True)
for e in range(n_epoch):
    for batch in loader:  ## Will a new iterator and a new pipe object be created here with new random 
        ...

or

for e in range(n_epoch):
    for batch in loader:  ## Will a new iterator and a new pipe object be created here with new random 
        ...
    loader.reset()

And your iterator should deal with dealing with the randomness.

Thanks, am I correct to handle the randomness in the __iter__ method of the iterator? And should I add self.iter = iterator() in the __init__ of the pipe class?

Hi,
Yes, in your case iterator should be fully responsible for the randomness. You could do random things inside __iter__ or preshuffle data in the constructor and then return them sequentially in the __iter__.
And again, yes self.iter = iterator() should do.

Hi,
I am using dali to write a 'balanced dataloader', which means that I sample P categories with K samples of each category to compose a P*K batch. Imitating the tutorial, my code goes like this:

import os.path as osp
import random
import numpy as np

from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types


class DeepFashionIterator(object):
    def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
        self.rootpth = '/dataset/deepfashion/ISCR'
        annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
        self.P = P
        self.K = K
        self.shard_id = shard_id
        self.num_shards = num_shards
        self.do_shuffle = shuffle
        self.mode = mode
        ...

    def __iter__(self):
        if self.do_shuffle:
            self.shuffle()
        return self

    def __next__(self):
        ...
        # one batch includes P categories with each category have K samples
        samples = [np.frombuffer(...) for _ in range(...)]
        labels = [np.array(idx, np.uint8) for idx in pids for _ in range(K)]
        return samples, labels
    next = __next__


class DeepFashion(Pipeline):
    def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
        # why we need to assign batch size to Pipline since I assigned it to iterator?
        super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
        self.input_ims = ops.ExternalSource()
        self.input_lbs = ops.ExternalSource()
        self.decoder = ops.nvJPEGDecoder(
            device='mixed',
            output_type=types.RGB,
            )
        self.resize = ops.Resize(
            device='gpu',
            image_type=types.RGB,
            interp_type=types.INTERP_CUBIC,
            resize_x=128,
            resize_y=256,
            )
        self.norm = ops.CropMirrorNormalize(
            device="gpu",
            crop=(256, 128),
            mean=[0.485*255, 0.456*255, 0.406*255],
            std=[0.229*255, 0.224*255, 0.225*255],
            image_type=types.RGB,
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            )
        self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)

    def define_graph(self):
        self.jpegs = self.input_ims()
        self.lbs = self.input_lbs()
        ims = self.decoder(self.jpegs)
        ims = self.resize(ims)
        ims = self.norm(ims.gpu())
        return ims, self.lbs

    def iter_setup(self):
        imgs, labels = self.iterator.next()
        self.feed_input(self.jpegs, imgs)
        self.feed_input(self.lbs, labels)


def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
    pipe = DeepFashion(
        P=P,
        K=K,
        num_threads=n_threads,
        shuffle=True,
        device_id=device_id,
        local_rank=local_rank,
        world_size=world_size,
        mode='train'
        )
    pipe.build()
    loader = DALIClassificationIterator(
        pipe,
        size=1e8, # use a big number so that I could let raise StopIteration work
        stop_at_epoch=False)
    return loader

if __name__ == '__main__':
    loader = get_train_loader(18, 4, 0, 4, 0, 1)
    for i, data in enumerate(loader):
        data = data[0]
        img, lbs = data['data'], data['label'].squeeze().long()
        print(img.size())

I know it is a bit too long, but I cannot decide which line brings the error of Segmentation fault(Core dumped). Would you please point out where the error comes from?

Hi,
Such dummy code works for me:

import os.path as osp
import random
import numpy as np

from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types


class DeepFashionIterator(object):
    def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
        self.rootpth = '/dataset/deepfashion/ISCR'
        annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
        self.P = P
        self.K = K
        self.shard_id = shard_id
        self.num_shards = num_shards
        self.do_shuffle = shuffle
        self.mode = mode
        #...

    def __iter__(self):
        if self.do_shuffle:
            self.shuffle()
        return self

    def __next__(self):
        #...
        # one batch includes P categories with each category have K samples
        #samples = [np.frombuffer(...) for _ in range(...)]
        #labels = [np.array(idx, np.uint8) for idx in pids for _ in range(K)]
        samples = [np.array([1],np.uint8) for idx in range(self.P) for _ in range(self.K)]
        labels = [np.array([1],np.uint8) for idx in range(self.P) for _ in range(self.K)]
        return samples, labels
    next = __next__


class DeepFashion(Pipeline):
    def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
        # why we need to assign batch size to Pipline since I assigned it to iterator?
        super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
        self.input_ims = ops.ExternalSource()
        self.input_lbs = ops.ExternalSource()
        self.decoder = ops.nvJPEGDecoder(
            device='mixed',
            output_type=types.RGB,
            )
        self.resize = ops.Resize(
            device='gpu',
            image_type=types.RGB,
            interp_type=types.INTERP_CUBIC,
            resize_x=128,
            resize_y=256,
            )
        self.norm = ops.CropMirrorNormalize(
            device="gpu",
            crop=(256, 128),
            mean=[0.485*255, 0.456*255, 0.406*255],
            std=[0.229*255, 0.224*255, 0.225*255],
            image_type=types.RGB,
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            )
        self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)

    def define_graph(self):
        self.jpegs = self.input_ims()
        self.lbs = self.input_lbs()
        ims = self.decoder(self.jpegs)
        ims = self.resize(ims)
        ims = self.norm(ims.gpu())
        return self.jpegs, self.lbs

    def iter_setup(self):
        imgs, labels = self.iterator.next()
        self.feed_input(self.jpegs, imgs)
        self.feed_input(self.lbs, labels)


def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
    pipe = DeepFashion(
        P=P,
        K=K,
        num_threads=n_threads,
        shuffle=True,
        device_id=device_id,
        local_rank=local_rank,
        world_size=world_size,
        mode='train'
        )
    pipe.build()
    loader = DALIClassificationIterator(
        pipe,
        size=1e8, # use a big number so that I could let raise StopIteration work
        stop_at_epoch=False)
    return loader

if __name__ == '__main__':
    loader = get_train_loader(18, 4, 0, 4, 0, 1)
    for i, data in enumerate(loader):
        data = data[0]
        img, lbs = data['data'], data['label'].squeeze().long()
        print(img.size())

So I guess there may be a problem in the code you have not provided. If you could provide something self-contained that doesn't have missing pieces preventing it from running it would be easier to tell more.
Answering to # why we need to assign a batch size to the pipeline since I assigned it to iterator? - it is required as DALI pipeline doesn't have any knowledge about the iterator you have provided. It can be learned in iter_setup, but DALI needs it during pipeline creation before that (to properly setup operators and etc).

HI,
I did not post all the code because I did not want to bring you so much trouble with these redundant contents, but I am pleased to share the full code:

import os.path as osp
import random
import numpy as np

from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types


class DeepFashionIterator(object):
    def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
        self.rootpth = '/mnt/projects/yd/dataset/deepfashion/ISCR'
        annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
        self.P = P
        self.K = K
        self.shard_id = shard_id
        self.num_shards = num_shards
        self.do_shuffle = shuffle
        self.mode = mode

        assert mode in ('train', 'query', 'gallery')
        with open(annfile, 'r') as fr:
            lines = fr.read().splitlines()[2:]
        pnames = []
        for line in lines:
            tokens = line.split()
            if not tokens[2] == mode: continue
            pnames.append(tokens[1])
        pnames = list(set(pnames))
        self.n_classes = len(pnames)
        self.pname_pid = {el: i for i, el in enumerate(pnames)}

        self.pid_items = [[] for _ in range(self.n_classes)]
        for line in lines:
            tokens = line.split()
            item, pname, cat = tokens
            if not cat == mode: continue
            pid = self.pname_pid[pname]
            self.pid_items[pid].append(item)
        self.pids = list(range(self.n_classes))

        if self.do_shuffle:
            self.shuffle()
        self.group_ids = self.split()

        self.curr_id_group = self.group_ids[self.shard_id]
        self.curr = 0

    def shuffle(self):
        list(map(random.shuffle, self.pid_items))
        random.shuffle(self.pids)

    def split(self):
        split_len = len(self.pids) // self.num_shards
        group_ids = []
        [group_ids.append(self.pids[i:i+split_len]) for i in range(self.num_shards)]
        return group_ids

    def __iter__(self):
        if self.do_shuffle:
            self.shuffle()
        self.group_ids = self.split()
        return self

    def __next__(self):
        pids = self.curr_id_group[self.curr:self.curr+self.P]
        def pick_samples(container):
            def pick(idx):
                group = self.pid_items[idx]
                if len(group) >= self.K:
                    container.extend(random.sample(group, k=self.K))
                else:
                    container.extend(random.choices(group, k=self.K))
            return pick
        def get_raw_im(pth):
            with open(osp.join(self.rootpth, pth), 'rb') as fr:
                raw = np.frombuffer(fr.read(), dtype=np.uint8)
            return raw
        samples = []
        list(map(pick_samples(samples), pids))
        samples = list(map(get_raw_im, samples))
        self.curr += self.P
        if self.curr > len(self.curr_id_group): raise StopIteration
        labels = [np.array(idx, np.uint8) for idx in pids for _ in range(4)]
        return samples, labels
    next = __next__


class DeepFashion(Pipeline):
    def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
        # why we need to assign batch size to Pipline since I assigned it to iterator?
        super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
        self.input_ims = ops.ExternalSource()
        self.input_lbs = ops.ExternalSource()
        self.decoder = ops.nvJPEGDecoder(
            device='mixed',
            output_type=types.RGB,
            )
        self.resize = ops.Resize(
            device='gpu',
            image_type=types.RGB,
            interp_type=types.INTERP_CUBIC,
            resize_x=128,
            resize_y=256,
            )
        self.norm = ops.CropMirrorNormalize(
            device="gpu",
            crop=(256, 128),
            mean=[0.485*255, 0.456*255, 0.406*255],
            std=[0.229*255, 0.224*255, 0.225*255],
            image_type=types.RGB,
            output_dtype=types.FLOAT,
            output_layout=types.NCHW,
            )
        self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)

    def define_graph(self):
        self.jpegs = self.input_ims()
        self.lbs = self.input_lbs()
        ims = self.decoder(self.jpegs)
        ims = self.resize(ims)
        ims = self.norm(ims.gpu())
        return ims, self.lbs

    def iter_setup(self):
        imgs, labels = self.iterator.next()
        self.feed_input(self.jpegs, imgs)
        self.feed_input(self.lbs, labels)


def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
    pipe = DeepFashion(
        P=P,
        K=K,
        num_threads=n_threads,
        shuffle=True,
        device_id=device_id,
        local_rank=local_rank,
        world_size=world_size,
        mode='train'
        )
    pipe.build()
    loader = DALIClassificationIterator(
        pipe,
        size=1e8, # use a big number so that I could let raise StopIteration work
        stop_at_epoch=False)
    return loader

if __name__ == '__main__':
    loader = get_train_loader(18, 4, 0, 4, 0, 1)
    for i, data in enumerate(loader):
        data = data[0]
        img, lbs = data['data'], data['label'].squeeze().long()
        print(img.size())

Would you please show me how could I make this work?

@CoinCheung - as a next step try to remove nonessential parts to see if that still works. Like remove:

  • remove shuffling from your custom iterator
  • skip images and see if DALI pipeline works only for labels
    When you have core code and it still doesn't work please get back to us. It is very likely that in the meantime you will be able to solve the problem on your own.
    You can also do the opposite. Check my dummy code and slowly extend it until it stops working.

Thanks for support, I am closing this :)

Was this page helpful?
0 / 5 - 0 ratings