Hi,
It seems that there is no example of multi label dataloader. How could I create a multi label pytorch dataloader please?
Hi,
There is a simple but less performant way - just use the external source. Another workaround is to use file_list in the file reader, assign a unique label to each file as an ID, then use it to translate the real set of labels.
Also, you could use data container - like TFRecord or RecordIO where labels could be stored as a list of values instead of scalars.
This could be a good place where the external contribution is more than welcome to extend FileReader to support multiple labels.
Thanks a lot, I am learning how to use external source. Here I met two questions, and I wish I could have more help from you.
Hi,
1) It depends. We haven't measured this. What can I say is that I would expect that data reading by the ExternalSource would be slower than other readers in DALI and probably than pytorch dataloader. But when your data is already loaded processing on the GPU would be faster, on CPU should be not slower and sometimes faster. So if your data processing pipeline is dominated by the processing it should be still faster to even if you use ExternalSource.
2) There is a couple of ways to do so:
...
self.rrc_img = ops.RandomResizedCrop(device="gpu", size =(224,224), seed=seed)
self.rrc_labels = ops.RandomResizedCrop(device="gpu", size =(224,224), seed=seed)
...
def define_graph(self):
...
images = self.rrc_img(images)
labels = self.rrc_labels(labels)
...
self.flip_img = ops.Flip(device="gpu")
self.flip_labels = ops.Flip(device="gpu")
self.flip_coin = ops.CoinFlip(probability=0.5)
...
def define_graph(self):
...
coin_rnd = self.flip_coin()
images = self.flip(images, horizontal=coin_rnd)
labels = self.flip(labels, horizontal=coin_rnd)
...
self.flip = ops.Flip(device="gpu")
self.flip_coin = ops.CoinFlip(probability=0.5)
...
def define_graph(self):
...
coin_rnd = self.flip_coin()
outs = self.flip([images, labels] , horizontal=coin_rnd)
I am using external source for processing large number of images and labels. in my experience, jpeg decoding on gpu alone can offer reasonable speed ups(8-10x) as compared to the pytorch dataloader specially if the dataset is large.
After that I expect more speeds if there are augmentations that are performed on CPU using pytorch approach which dali can process on GPU.
I can play with dali on my task now. I am closing this. Thanks again for support!!!
Sorry, I am not really fully understand the principle of external source, so I reopen this issue:
I notice from the tutorial that the next method of the ExternalInputIterator class is actually an infinite one, since it does not have a raise in it. Is this the correct design or should I add a raise in my implementation on our own dataset ?
Also when I use torch.distributed to spread my model on multi gpus, do I need to take care of how I split the dataset and distribute them to different gpus.
Hi,
The raise is optional, if you throw StopIteration inside iter_setup DALI will handle it, process all data and then rise StopIteration on its own so user know that all data is processed. If you don't do it it is also fine but you need to count the amount of iteration needed for your training on your own.
Answering the second question - in case of ExternalSource you need to split your data on your own. In DALI readers are responsible for that. If you use FileReder or MXNetReader then num_shards and shard_id arguments tell DALI what to do. For the ExternalSource you are on your own, as ExternalSource accepts raw numpy array at every iteration so it is hard to ask DALI to split it between different processes or even nodes.
Thanks a lot!!
What would DALIClassificationIterator do when the pipe object raise an stopIteration please?
I am thinking of creating an iterator object inside the __init__() method of the pipe class, and call its next method inside the iter_setup() method. But I do not know what would happen when an epoch is accomplished and a stopIteration is raised. Will the DALIClassificationIterator create a new pipe object when another round of for batch in loader: is executed?
What I mean is like this:
class iterator(object):
def __init__(self):
...
def __next__(self):
...
raise stopIteration
def __iter__(self):
shuffle()
split()
return self
class Pipe(Pipeline):
def __init__(self):
self.iter = iterator()
self.input = ops.ExternalSource()
def define_grapth(self):
...
return self.batch
def iter_setup(self):
batch = next(self.iter)
self.feed_input(self.batch, batch)
pipe = Pipe()
pipe.build()
loader = DALIClassificationIterator(pipe)
for e in range(n_epoch):
for batch in loader: ## Will a new iterator and a new pipe object be created here with new random settings?
.....
Can this work as expected please? If I want to shuffle and split the dataset for different gpus, should I do this in the definition of the iterator?
Hi,
DALIClassificationIterator would throw the StopIteration when all the data in the processing pipeline is exhausted (or because iter_setup did that or the number of requested samples returned by DALI iterator crossed size argument). In that case, it is enough to call reset or use auto_reset=True.
Regarding the second part of your question:
pipe = Pipe()
loader = DALIClassificationIterator(pipe)
Creates and build the pipeline.
So the code should look like:
DALIClassificationIterator(pipe, (...), auto_reset=True)
for e in range(n_epoch):
for batch in loader: ## Will a new iterator and a new pipe object be created here with new random
...
or
for e in range(n_epoch):
for batch in loader: ## Will a new iterator and a new pipe object be created here with new random
...
loader.reset()
And your iterator should deal with dealing with the randomness.
Thanks, am I correct to handle the randomness in the __iter__ method of the iterator? And should I add self.iter = iterator() in the __init__ of the pipe class?
Hi,
Yes, in your case iterator should be fully responsible for the randomness. You could do random things inside __iter__ or preshuffle data in the constructor and then return them sequentially in the __iter__.
And again, yes self.iter = iterator() should do.
Hi,
I am using dali to write a 'balanced dataloader', which means that I sample P categories with K samples of each category to compose a P*K batch. Imitating the tutorial, my code goes like this:
import os.path as osp
import random
import numpy as np
from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
class DeepFashionIterator(object):
def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
self.rootpth = '/dataset/deepfashion/ISCR'
annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
self.P = P
self.K = K
self.shard_id = shard_id
self.num_shards = num_shards
self.do_shuffle = shuffle
self.mode = mode
...
def __iter__(self):
if self.do_shuffle:
self.shuffle()
return self
def __next__(self):
...
# one batch includes P categories with each category have K samples
samples = [np.frombuffer(...) for _ in range(...)]
labels = [np.array(idx, np.uint8) for idx in pids for _ in range(K)]
return samples, labels
next = __next__
class DeepFashion(Pipeline):
def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
# why we need to assign batch size to Pipline since I assigned it to iterator?
super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
self.input_ims = ops.ExternalSource()
self.input_lbs = ops.ExternalSource()
self.decoder = ops.nvJPEGDecoder(
device='mixed',
output_type=types.RGB,
)
self.resize = ops.Resize(
device='gpu',
image_type=types.RGB,
interp_type=types.INTERP_CUBIC,
resize_x=128,
resize_y=256,
)
self.norm = ops.CropMirrorNormalize(
device="gpu",
crop=(256, 128),
mean=[0.485*255, 0.456*255, 0.406*255],
std=[0.229*255, 0.224*255, 0.225*255],
image_type=types.RGB,
output_dtype=types.FLOAT,
output_layout=types.NCHW,
)
self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)
def define_graph(self):
self.jpegs = self.input_ims()
self.lbs = self.input_lbs()
ims = self.decoder(self.jpegs)
ims = self.resize(ims)
ims = self.norm(ims.gpu())
return ims, self.lbs
def iter_setup(self):
imgs, labels = self.iterator.next()
self.feed_input(self.jpegs, imgs)
self.feed_input(self.lbs, labels)
def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
pipe = DeepFashion(
P=P,
K=K,
num_threads=n_threads,
shuffle=True,
device_id=device_id,
local_rank=local_rank,
world_size=world_size,
mode='train'
)
pipe.build()
loader = DALIClassificationIterator(
pipe,
size=1e8, # use a big number so that I could let raise StopIteration work
stop_at_epoch=False)
return loader
if __name__ == '__main__':
loader = get_train_loader(18, 4, 0, 4, 0, 1)
for i, data in enumerate(loader):
data = data[0]
img, lbs = data['data'], data['label'].squeeze().long()
print(img.size())
I know it is a bit too long, but I cannot decide which line brings the error of Segmentation fault(Core dumped). Would you please point out where the error comes from?
Hi,
Such dummy code works for me:
import os.path as osp
import random
import numpy as np
from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
class DeepFashionIterator(object):
def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
self.rootpth = '/dataset/deepfashion/ISCR'
annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
self.P = P
self.K = K
self.shard_id = shard_id
self.num_shards = num_shards
self.do_shuffle = shuffle
self.mode = mode
#...
def __iter__(self):
if self.do_shuffle:
self.shuffle()
return self
def __next__(self):
#...
# one batch includes P categories with each category have K samples
#samples = [np.frombuffer(...) for _ in range(...)]
#labels = [np.array(idx, np.uint8) for idx in pids for _ in range(K)]
samples = [np.array([1],np.uint8) for idx in range(self.P) for _ in range(self.K)]
labels = [np.array([1],np.uint8) for idx in range(self.P) for _ in range(self.K)]
return samples, labels
next = __next__
class DeepFashion(Pipeline):
def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
# why we need to assign batch size to Pipline since I assigned it to iterator?
super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
self.input_ims = ops.ExternalSource()
self.input_lbs = ops.ExternalSource()
self.decoder = ops.nvJPEGDecoder(
device='mixed',
output_type=types.RGB,
)
self.resize = ops.Resize(
device='gpu',
image_type=types.RGB,
interp_type=types.INTERP_CUBIC,
resize_x=128,
resize_y=256,
)
self.norm = ops.CropMirrorNormalize(
device="gpu",
crop=(256, 128),
mean=[0.485*255, 0.456*255, 0.406*255],
std=[0.229*255, 0.224*255, 0.225*255],
image_type=types.RGB,
output_dtype=types.FLOAT,
output_layout=types.NCHW,
)
self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)
def define_graph(self):
self.jpegs = self.input_ims()
self.lbs = self.input_lbs()
ims = self.decoder(self.jpegs)
ims = self.resize(ims)
ims = self.norm(ims.gpu())
return self.jpegs, self.lbs
def iter_setup(self):
imgs, labels = self.iterator.next()
self.feed_input(self.jpegs, imgs)
self.feed_input(self.lbs, labels)
def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
pipe = DeepFashion(
P=P,
K=K,
num_threads=n_threads,
shuffle=True,
device_id=device_id,
local_rank=local_rank,
world_size=world_size,
mode='train'
)
pipe.build()
loader = DALIClassificationIterator(
pipe,
size=1e8, # use a big number so that I could let raise StopIteration work
stop_at_epoch=False)
return loader
if __name__ == '__main__':
loader = get_train_loader(18, 4, 0, 4, 0, 1)
for i, data in enumerate(loader):
data = data[0]
img, lbs = data['data'], data['label'].squeeze().long()
print(img.size())
So I guess there may be a problem in the code you have not provided. If you could provide something self-contained that doesn't have missing pieces preventing it from running it would be easier to tell more.
Answering to # why we need to assign a batch size to the pipeline since I assigned it to iterator? - it is required as DALI pipeline doesn't have any knowledge about the iterator you have provided. It can be learned in iter_setup, but DALI needs it during pipeline creation before that (to properly setup operators and etc).
HI,
I did not post all the code because I did not want to bring you so much trouble with these redundant contents, but I am pleased to share the full code:
import os.path as osp
import random
import numpy as np
from nvidia.dali.plugin.pytorch import DALIClassificationIterator
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
class DeepFashionIterator(object):
def __init__(self, P, K, shard_id, num_shards, shuffle, mode='train'):
self.rootpth = '/mnt/projects/yd/dataset/deepfashion/ISCR'
annfile = osp.join(self.rootpth, 'Eval/list_eval_partition.txt')
self.P = P
self.K = K
self.shard_id = shard_id
self.num_shards = num_shards
self.do_shuffle = shuffle
self.mode = mode
assert mode in ('train', 'query', 'gallery')
with open(annfile, 'r') as fr:
lines = fr.read().splitlines()[2:]
pnames = []
for line in lines:
tokens = line.split()
if not tokens[2] == mode: continue
pnames.append(tokens[1])
pnames = list(set(pnames))
self.n_classes = len(pnames)
self.pname_pid = {el: i for i, el in enumerate(pnames)}
self.pid_items = [[] for _ in range(self.n_classes)]
for line in lines:
tokens = line.split()
item, pname, cat = tokens
if not cat == mode: continue
pid = self.pname_pid[pname]
self.pid_items[pid].append(item)
self.pids = list(range(self.n_classes))
if self.do_shuffle:
self.shuffle()
self.group_ids = self.split()
self.curr_id_group = self.group_ids[self.shard_id]
self.curr = 0
def shuffle(self):
list(map(random.shuffle, self.pid_items))
random.shuffle(self.pids)
def split(self):
split_len = len(self.pids) // self.num_shards
group_ids = []
[group_ids.append(self.pids[i:i+split_len]) for i in range(self.num_shards)]
return group_ids
def __iter__(self):
if self.do_shuffle:
self.shuffle()
self.group_ids = self.split()
return self
def __next__(self):
pids = self.curr_id_group[self.curr:self.curr+self.P]
def pick_samples(container):
def pick(idx):
group = self.pid_items[idx]
if len(group) >= self.K:
container.extend(random.sample(group, k=self.K))
else:
container.extend(random.choices(group, k=self.K))
return pick
def get_raw_im(pth):
with open(osp.join(self.rootpth, pth), 'rb') as fr:
raw = np.frombuffer(fr.read(), dtype=np.uint8)
return raw
samples = []
list(map(pick_samples(samples), pids))
samples = list(map(get_raw_im, samples))
self.curr += self.P
if self.curr > len(self.curr_id_group): raise StopIteration
labels = [np.array(idx, np.uint8) for idx in pids for _ in range(4)]
return samples, labels
next = __next__
class DeepFashion(Pipeline):
def __init__(self, P, K, num_threads, shuffle, device_id, local_rank, world_size, mode='train'):
# why we need to assign batch size to Pipline since I assigned it to iterator?
super(DeepFashion, self).__init__(P*K, num_threads, device_id, seed=12+device_id )
self.input_ims = ops.ExternalSource()
self.input_lbs = ops.ExternalSource()
self.decoder = ops.nvJPEGDecoder(
device='mixed',
output_type=types.RGB,
)
self.resize = ops.Resize(
device='gpu',
image_type=types.RGB,
interp_type=types.INTERP_CUBIC,
resize_x=128,
resize_y=256,
)
self.norm = ops.CropMirrorNormalize(
device="gpu",
crop=(256, 128),
mean=[0.485*255, 0.456*255, 0.406*255],
std=[0.229*255, 0.224*255, 0.225*255],
image_type=types.RGB,
output_dtype=types.FLOAT,
output_layout=types.NCHW,
)
self.iterator = DeepFashionIterator(P, K, shard_id=local_rank, num_shards=world_size, shuffle=shuffle, mode=mode)
def define_graph(self):
self.jpegs = self.input_ims()
self.lbs = self.input_lbs()
ims = self.decoder(self.jpegs)
ims = self.resize(ims)
ims = self.norm(ims.gpu())
return ims, self.lbs
def iter_setup(self):
imgs, labels = self.iterator.next()
self.feed_input(self.jpegs, imgs)
self.feed_input(self.lbs, labels)
def get_train_loader(P, K, device_id, n_threads, local_rank, world_size):
pipe = DeepFashion(
P=P,
K=K,
num_threads=n_threads,
shuffle=True,
device_id=device_id,
local_rank=local_rank,
world_size=world_size,
mode='train'
)
pipe.build()
loader = DALIClassificationIterator(
pipe,
size=1e8, # use a big number so that I could let raise StopIteration work
stop_at_epoch=False)
return loader
if __name__ == '__main__':
loader = get_train_loader(18, 4, 0, 4, 0, 1)
for i, data in enumerate(loader):
data = data[0]
img, lbs = data['data'], data['label'].squeeze().long()
print(img.size())
Would you please show me how could I make this work?
@CoinCheung - as a next step try to remove nonessential parts to see if that still works. Like remove:
Thanks for support, I am closing this :)