Thanks for developing this awesome project !
I have a question about GenericIterator
PyTorch DALIGenericIterator allocates all tensors to the gpu.
I checked my pipeline with pipe.build and pipe.run. And as following, it works well.
<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6a3010a0b0>
<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6a3010a0f0>
<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6a3010a070>
<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6a3010a030>
<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6a3010a170>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a1b0>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a1f0>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a230>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a270>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a2b0>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a2f0>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a330>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a370>
<nvidia.dali.backend_impl.TensorListCPU object at 0x7f6a3010a3b0>
However, with DALIGenericIterator, every TensorLists are allocated to gpu device.
Is this a bug of dali?
Hi,
According to https://github.com/NVIDIA/DALI/blob/master/dali/python/nvidia/dali/plugin/pytorch.py#L205 each Torch tensor should have an appropriate backend.
Could you share a simple, self-contained repro to show the problem?
@JanuszL
Finally, I reproduced ... !
Maybe this is because of my misusage of pytorch lightning
Everything works fine without PL.
import types
import cv2
import numpy as np
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import torch
import torch.nn as nn
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator
from overrides import overrides
from pytorch_lightning import Trainer
from pytorch_lightning.core.lightning import LightningModule
class MyPipeline(Pipeline):
def __init__(self, rank, batch_size):
# get configurations
mean = [0.485, 0.456, 0.406]
mean = [mean[0] * 255, mean[1] * 255, mean[2] * 255]
variance = [0.229, 0.224, 0.225]
variance = [variance[0] * 255, variance[1] * 255, variance[2] * 255]
interp_type = types.INTERP_LINEAR
# initialize pipeline
super().__init__(batch_size=batch_size,
num_threads=2,
device_id=rank,
seed=777,
exec_async=True,
exec_pipelined=True,
prefetch_queue_depth=2)
# inputs
feeder = DaliPipeFeeder(batch_size)
self.external_src = ops.ExternalSource(source=feeder,
num_outputs=17,
device='cpu')
# Random variables
self.rand_contrast = ops.Uniform(range=[0.5, 1.5])
self.rand_hue = ops.Uniform(range=[-36.0, 36.0])
self.rand_saturation = ops.Uniform(range=[0.975, 1.025])
self.rand_brightness = ops.Uniform(range=[0.9375, 1.0625])
# Operations
self.jpeg_decoder = ops.ImageDecoder(device='mixed',
output_type=types.RGB)
self.rotate = ops.WarpAffine(device='gpu',
interp_type=interp_type,
fill_value=0.)
self.slice = ops.Slice(device='gpu',
normalized_anchor=False,
normalized_shape=False)
self.resize = ops.Resize(device='gpu',
interp_type=interp_type,
resize_x=1024,
resize_y=1024)
self.color_twist = ops.ColorTwist(device='gpu')
self.normalize = ops.CropMirrorNormalize(device='gpu',
mirror=0,
pad_output=False,
mean=mean,
std=variance,
dtype=types.FLOAT,
output_layout=types.NCHW)
def define_graph(self):
"""์ค์ augmentation pipeline
Returns:
imgs: batch ๋จ์ NCHW image
angles: batch ๋จ์ augmentation ์ ์ฌ์ฉํ angle
crop_anchor: batch ๋จ์
crop_shape: batch ๋จ์
"""
jpegs, heat_chars, heat_links, heat_upvs, w_masks, rot_matrices, \
crop_begins, crop_sizes, err_chars, is_word, \
num_chars, num_words, c_quads, c_indices, \
w_quads, w_indices, w_lengths = self.external_src()
heat_chars = heat_chars.gpu()
heat_links = heat_links.gpu()
heat_upvs = heat_upvs.gpu()
w_masks = w_masks.gpu()
imgs = self.jpeg_decoder(jpegs)
imgs = self.rotate(imgs, matrix=rot_matrices)
imgs = self.slice(imgs, crop_begins, crop_sizes)
imgs = self.resize(imgs)
imgs = self._color_augmentation(imgs)
imgs = self.normalize(imgs)
return imgs, heat_chars, heat_links, \
heat_upvs, w_masks, err_chars, is_word, \
num_chars, num_words, c_quads, c_indices, \
w_quads, w_indices, w_lengths
def _color_augmentation(self, imgs):
"""color augmentation
"""
saturation = self.rand_saturation()
contrast = self.rand_contrast()
brightness = self.rand_brightness()
hue = self.rand_hue()
imgs = self.color_twist(imgs,
saturation=saturation,
contrast=contrast,
brightness=brightness,
hue=hue)
return imgs
class DaliPipeFeeder:
def __init__(self, batch_size):
self.batch_size = batch_size
def __iter__(self):
return self
def __next__(self):
jpegs, heat_chars, heat_links, heat_upvs, w_masks = [], [], [], [], []
rot_matrices, crop_begins, crop_sizes = [], [], []
err_chars, is_word_list = [], []
c_quads_batch, c_indices_batch = [], []
w_quads_batch, w_indices_batch, w_lengths_batch = [], [], []
num_chars, num_words = [], []
img = (np.random.random((1024, 1024, 3)) * 255).astype(np.uint8)
cv2.imwrite('tmp.jpg', img)
buffer = open('tmp.jpg', 'rb').read()
encoded = np.frombuffer(buffer, dtype=np.uint8)
for _ in range(self.batch_size):
jpegs.append(np.copy(encoded))
heat_chars.append(np.random.random((512, 512)).astype(np.float32))
heat_links.append(np.random.random((512, 512)).astype(np.float32))
heat_upvs.append(np.random.random((512, 512)).astype(np.float32))
w_masks.append(np.random.random((512, 512)).astype(np.float32))
angle = np.random.random()
rot_matrix = cv2.getRotationMatrix2D((512, 512), angle, 1)
rot_matrix = rot_matrix.astype(np.float32)
rot_matrices.append(rot_matrix)
crop_begins.append(np.array([0, 0], dtype=np.float32))
crop_sizes.append(np.array([100, 100], dtype=np.float32))
err_chars.append(np.array([-1], dtype=np.float32))
is_word_list.append(np.array([True], dtype=np.int32))
num_chars.append(np.array([50], dtype=np.int32))
num_words.append(np.array([10], dtype=np.int32))
c_quads_batch.append(np.random.random((1024, 4, 2)).astype(np.float32))
c_indices_batch.append(np.random.random((1024, 1)).astype(np.int32))
w_quads_batch.append(np.random.random((1024, 4, 2)).astype(np.float32))
w_indices_batch.append(np.random.random((1024, 1)).astype(np.int32))
w_lengths_batch.append(np.random.random((1024, 1)).astype(np.int32))
return jpegs, heat_chars, heat_links, heat_upvs, w_masks, \
rot_matrices, crop_begins, crop_sizes, err_chars, is_word_list, \
num_chars, num_words, \
c_quads_batch, c_indices_batch, \
w_quads_batch, w_indices_batch, w_lengths_batch
class MyLightningModule(LightningModule):
def __init__(self, net):
super().__init__()
self.net = net
self.loader = None
@overrides
def setup(self, stage):
pipe = MyPipeline(rank=self.local_rank, batch_size=4)
dali_output_map = ['imgs', 'gt_chars', 'gt_links', 'gt_upvs',
'w_masks', 'err_chars', 'is_word',
'num_chars', 'num_words', 'c_quads', 'c_indices',
'w_quads', 'w_indices', 'w_lengths']
dali_iter = DALIGenericIterator(pipelines=[pipe],
output_map=dali_output_map,
size=9999999999)
self.loader = dali_iter
@overrides
def train_dataloader(self):
return self.loader
@overrides
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.net.parameters())
return optimizer
@staticmethod
def process_batch(dali_out):
"""batch data๋ฅผ ๋ฐ์์ค๋ ํจ์.
Returns:
imgs (torch.Tensor): (N, C, H, W) tensor (gpu device)
gt_chars (torch.Tensor): (N, out_H, out_W) tensor (gpu device)
gt_links (torch.Tensor): (N, out_H, out_W) tensor (gpu device)
gt_upvs (torch.Tensor): (N, C, out_H, out_W) tensor (gpu device)
w_masks (torch.Tensor): (N, out_H, out_W) tensor (gpu device)
err_char (torch.Tensor): (1,) tensor (cpu device)
num_word (torch.Tensor): (1,) tensor (cpu device)
"""
imgs = dali_out['imgs']
gt_chars = dali_out['gt_chars']
gt_links = dali_out['gt_links']
gt_upvs = dali_out['gt_upvs']
w_masks = dali_out['w_masks']
err_char = torch.sum(dali_out['err_chars'])
num_word = torch.sum(dali_out['is_word'])
return imgs, gt_chars, gt_links, gt_upvs, w_masks, err_char, num_word
@overrides
def training_step(self, batch, batch_idx): # pylint: disable=W0613,W0221
""" training_step
net_out์ logging ํ๊ธฐ ์ํด์ ํจ๊ป ๋ฐํ๊ฐ์ผ๋ก ๋๋ ค์ค
Args:
batch (Any): nvidia dali output
batch_idx (int): batch index
Returns:
outputs (Any): backward()์ on_train_batch_end()์์ ์ฌ์ฉํ outputs.
"""
datum = self.process_batch(batch[0])
imgs, gt_chars, gt_links, gt_upvs, weights, err_char, num_words = datum
print('imgs', imgs.device, flush=True)
print('gt_chars', gt_chars.device, flush=True)
print('gt_links', gt_links.device, flush=True)
print('gt_upvs', gt_upvs.device, flush=True)
print('weights', weights.device, flush=True)
print('err_char', err_char.device, flush=True)
print('num_words', num_words.device, flush=True)
exit()
return None
if __name__ == '__main__':
dali_output_map = ['imgs', 'gt_chars', 'gt_links', 'gt_upvs',
'w_masks', 'err_chars', 'is_word',
'num_chars', 'num_words', 'c_quads', 'c_indices',
'w_quads', 'w_indices', 'w_lengths']
# Pipe RUN
dali_pipe1 = MyPipeline(rank=0, batch_size=4)
dali_pipe1.build()
raw_out = dali_pipe1.run()
print(raw_out)
print('='*80)
# Generic Iterator
dali_pipe2 = MyPipeline(rank=0, batch_size=4)
dali_iter = DALIGenericIterator(pipelines=[dali_pipe2],
output_map=dali_output_map,
size=9999999999)
for i, data in enumerate(dali_iter):
if i == 2:
break
print('='*80)
print('imgs', data[0]["imgs"].device)
print('gt_chars', data[0]["gt_chars"].device)
print('gt_links', data[0]["gt_links"].device)
print('gt_upvs', data[0]["gt_upvs"].device)
print('w_masks', data[0]["w_masks"].device)
print('err_chars', data[0]["err_chars"].device)
print('is_word', data[0]["is_word"].device)
print('num_chars', data[0]["num_chars"].device)
print('num_words', data[0]["num_words"].device)
print('c_quads', data[0]["c_quads"].device)
print('c_indices', data[0]["c_indices"].device)
print('w_quads', data[0]["w_quads"].device)
print('w_indices', data[0]["w_indices"].device)
print('w_lengths', data[0]["w_lengths"].device)
# PL
trainer = Trainer(
checkpoint_callback=False,
gpus=torch.cuda.device_count(),
max_steps=100,
accelerator='ddp',
sync_batchnorm=True,
precision=16
)
net = nn.Sequential(nn.Conv2d(3, 64, 3, 1, 1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True))
pl_module = MyLightningModule(net)
trainer.fit(pl_module)
And the result is following,
dh@dh-desktop:naver/CRAFTS-pytorch โนModel_201223_addE2ESATRN*โบ$ python dali_debug.py
(<nvidia.dali.backend_impl.TensorListGPU object at 0x7f6243047c70>, <nvidia.dali.backend_impl.TensorListGPU object at 0x7f6243047ab0>, <nvidia.dali.backend_impl.TensorListGPU object at 0x7f6243047e30>, <nvidia.dali.backend_impl.TensorListGPU object at 0x7f6242f5d270>, <nvidia.dali.backend_impl.TensorListGPU object at 0x7f6242f5d5f0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d4b0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d430>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d3f0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d5b0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d6b0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d6f0>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d730>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d770>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7f6242f5d7b0>)
================================================================================
/home/dh/.local/lib/python3.8/site-packages/nvidia/dali/plugin/base_iterator.py:156: Warning: Please set `reader_name` and don't set last_batch_padded and size manually whenever possible. This may lead, in some situations, to miss some samples or return duplicated ones. Check the Sharding section of the documentation for more details.
_iterator_deprecation_warning()
================================================================================
imgs cuda:0
gt_chars cuda:0
gt_links cuda:0
gt_upvs cuda:0
w_masks cuda:0
err_chars cpu
is_word cpu
num_chars cpu
num_words cpu
c_quads cpu
c_indices cpu
w_quads cpu
w_indices cpu
w_lengths cpu
================================================================================
imgs cuda:0
gt_chars cuda:0
gt_links cuda:0
gt_upvs cuda:0
w_masks cuda:0
err_chars cpu
is_word cpu
num_chars cpu
num_words cpu
c_quads cpu
c_indices cpu
w_quads cpu
w_indices cpu
w_lengths cpu
GPU available: True, used: True
TPU available: None, using: 0 TPU cores
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Using native 16bit precision.
Missing logger folder: /home/dh/Desktop/naver/CRAFTS-pytorch/lightning_logs
initializing ddp: GLOBAL_RANK: 0, MEMBER: 1/1
| Name | Type | Params
------------------------------------
0 | net | Sequential | 1.9 K
------------------------------------
1.9 K Trainable params
0 Non-trainable params
1.9 K Total params
Epoch 0: 0%| | 0/2500000000 [00:00<?, ?it/s]imgs cuda:0
gt_chars cuda:0
gt_links cuda:0
gt_upvs cuda:0
weights cuda:0
err_char cuda:0
num_words cuda:0
Epoch 0: 0%| | 0/2500000000 [00:00<?, ?it/s]
As you can see, err_char, num_words are on cuda:0
This issue can be solved with transfer_batch_to_device() function in PL.
https://pytorch-lightning.readthedocs.io/en/latest/lightning_module.html?highlight=transfer_batch_to_device#transfer-batch-to-device
But it seems there's a bug with it for now.
https://github.com/PyTorchLightning/pytorch-lightning/issues/2350
I'm glad you managed to narrow down the issue.