Ignite: ProgressBar ETA with IterableDataset where __len__ undefined

Created on 22 Dec 2020  ยท  9Comments  ยท  Source: pytorch/ignite

โ“ Questions/Help/Support

I've been successfully using ignite with regular Dataset/TensorDataset classes in the past. These have a fixed length and are tied to a DataLoader with a DistributedSampler. Keeping all other training hyper-parameters equal, if I increase the number of nodes/GPUs, I've always noticed that the ETA displayed by the ProgressBar reduces.

Then, I switched to an IterableDataset where the length was computable in advance and so __len__ was defined. There is no DistributedSampler defined in this case because the dataset is iterable: the data files are grouped into distinct subsets in advance and assigned to different ranks. In this scenario too, I noticed that keeping all else equal, the ETA displayed by ProgressBar reduces when the number of nodes/GPUs increases. Some earlier discussion on this here: https://github.com/pytorch/ignite/issues/1263.

Finally, I came across the setting where I had a massive dataset where the length (i.e., number of data-points) was not computable in advance. So I removed the __len__ definition, making the IterableDataset more generic.

Unfortunately, in this final setting, I find that the ETA displayed by ProgressBar doesn't reduce when the number of nodes/GPUs increases. I tried training for a fixed 50000 iterations, i.e., epoch_length of 50000. I notice that if I train on 1 GPU, the ETA is much lesser than if I train on > 1 GPUs. I also notice that the overall time taken per iteration is much lesser when 1 GPU is used.

I'm confused about this behavior, it doesn't seem like I'm doing something incorrect. Could you please explain what may be happening?

@vfdev-5

question

All 9 comments

@g-karthik thanks for the reporting ! Let me try to reproduce this behaviour to inspect what happens.
Few questions to understand better your use-case :

  • If I take the code snippet from #1263 with iterable dataset defined as below, does it look like your use-case ?
  • How do you spawn multiple distributed processes (torch.distributed.launch or mp.spawn) ?
  • Do you use multiprocessing for your DataLoader over your IterableDataset ?
  • Which ignite version are you using ?

Anyway, i'll try from my side to check a simplified situation and then post my code...

EDIT: thinking more about that, shouldn't we scale epoch_length by the number of participating processes: epoch_length / idist.get_world_size() ? Same implicitly happens when we are using distributed sampler and len(dataloader) is reduced by the world size.

Here is a code snippet of how I see this such that we can see ETA and time reduction if using >1 procs:

Code

import math
import time

import torch
from torch.utils.data import DataLoader, IterableDataset

import ignite
import ignite.distributed as idist
from ignite.engine import Engine, Events
from ignite.contrib.handlers import ProgressBar


class MyIterableDataset(IterableDataset):
    def __init__(self, size=10000):
        super(MyIterableDataset).__init__()
        size = int(size / idist.get_world_size())
        time.sleep(0.0001 * idist.get_rank())
        print(idist.get_rank(), "- dataset size per rank: ", size)        
        self.start = idist.get_rank() * size
        self.end = (idist.get_rank() + 1) * size

    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        if worker_info is None:
            iter_start = self.start
            iter_end = self.end
            return iter(self[i] for i in range(iter_start, iter_end))

        num_workers = worker_info.num_workers
        iter_start = self.start + worker_info.id
        iter_end = self.end
        return iter(self[i] for i in range(iter_start, iter_end, num_workers))

    def __getitem__(self, i):
        time.sleep(0.001)
        return i


def training(local_rank):
    num_workers = 4
    ds = MyIterableDataset()
    data_loader = DataLoader(ds, num_workers=num_workers, shuffle=False)


    def foo(e, b):
        time.sleep(0.0005)


    engine = Engine(foo)
    if idist.get_rank() == 0:
        ProgressBar().attach(engine)

    @engine.on(Events.EPOCH_COMPLETED)
    @idist.one_rank_only()
    def show_times():
        print("Epoch taken: ", engine.state.times[Events.EPOCH_COMPLETED], "last element:", engine.state.batch)

    epoch_length = 5000 // idist.get_world_size()
    if idist.get_rank() == 0:
        print("Epoch length: ", epoch_length)

    engine.run(data_loader, epoch_length=epoch_length, max_epochs=2)

    idist.barrier()

    time.sleep(0.001 * idist.get_rank())
    print(idist.get_rank(), "- Completed taken per proc: ", engine.state.times[Events.COMPLETED], "last element: ", engine.state.batch)


if __name__ == "__main__":

    with idist.Parallel(backend="nccl") as parallel:

        if idist.get_rank() == 0:
            print(torch.__version__, ignite.__version__)

        parallel.run(training)

1 Proc execution:

# python test.py 
ignite.distributed.launcher.Parallel INFO: Initialized processing group with backend: 'nccl'
1.7.1 0.5.0.dev20201222
ignite.distributed.launcher.Parallel INFO: - Run '<function training at 0x7f4cb2f71ca0>' in 1 processes
0 - dataset size per rank:  10000
Epoch length:  5000
Epoch taken:  8.64072322845459 last element: tensor([4999])                                                                                                                     
Epoch taken:  8.558730602264404 last element: tensor([9999])                                                                                                                    
0 - Completed taken per proc:  17.253478527069092 last element:  tensor([9999])
ignite.distributed.launcher.Parallel INFO: End of run
ignite.distributed.launcher.Parallel INFO: Finalized processing group with backend: 'nccl'

2 Procs execution:

# python -u -m torch.distributed.launch --nproc_per_node=2 --use_env test.py 
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
ignite.distributed.launcher.Parallel INFO: Initialized processing group with backend: 'nccl'
1.7.1 0.5.0.dev20201222
ignite.distributed.launcher.Parallel INFO: - Run '<function training at 0x7f709bc12ca0>' in 2 processes
0 - dataset size per rank:  5000
1 - dataset size per rank:  5000
Epoch length:  2500
Epoch taken:  4.561241626739502 last element: tensor([2499])                                                                                                                    
Epoch taken:  4.2690019607543945 last element: tensor([4999])                                                                                                                   
0 - Completed taken per proc:  8.88534164428711 last element:  tensor([4999])
ignite.distributed.launcher.Parallel INFO: End of run
ignite.distributed.launcher.Parallel INFO: Finalized processing group with backend: 'nccl'
1 - Completed taken per proc:  7.397317409515381 last element:  tensor([9999])

@g-karthik let me know what do you think

@vfdev-5 Hmm I see, I wasn't downscaling the epoch_length by world size. This does seem to happen implicitly in DistributedSampler, which cannot be used with IterableDataset. I think this solution makes sense!

BTW, do we really need to use idist? I don't use it in my code (I just use plain torch.distributed directly) but I get the following warning:

UserWarning: Local rank information for native distributed setting will be initialized
using heuristic approach based on hostname which can be different of real setup.
Please, either set `os.environ['LOCAL_RANK']` or use `idist.set_local_rank(local_rank)`
with correct local rank index.

What is this warning about?

BTW, do we really need to use idist? I don't use it in my code (I just use plain torch.distributed directly)

Using idist is an option. idist gives an abstraction for different distributed "backend": same code can run with native torch or horovod or xla: https://pytorch.org/ignite/master/distributed.html#ignite-distributed and some other helper methods like idist.auto_*.

but I get the following warning:

When using native torch distributed framework, local rank can not be fetched with torch.distributed and it is either passed as env var or as an argument to the application. Warning says that it could not fetch local rank from env var and will determine it using an heuristic approach which may fail in some corner cases. To avoid unexpected behaviour, we can either pass local rank as env var with --use_env arg if launch with torch.distributed.launch or in the code we can use idist.set_local_rank(local_rank) where local_rank is the argument from arg parser (if --use_env is not used).

@vfdev-5 gotcha, but why would idist even be invoked in my case, leading to that warning? I am not even importing it explicitly in my code, I have been using ignite for its other features - in fact, I did not even know about the idist feature until I saw this warning.

Once I get a better understanding of this, I'll check the phrasing of the warning on the PR.

@g-karthik we are also using internally idist to compute metrics in distributed setting, e.g. all_reduce part from all participating processes. In previous versions we used torch.distributed and asked for the correct device for the metrics. Now we are using idist for everything... and indeed this can lead to such warnings if we can not determine the local rank for torch distributed.

@vfdev-5 oh, I see, I am using MetricsLambda with a callable method to perform a torch.distributed.all_reduce of some of the metrics like NLL and Accuracy, like this one with average_distributed_scalar().

Does this mean I need to necessarily stop doing that and switch to using idist.set_local_rank() with my local_rank, so that the sync_all_reduce decorator for the metrics get triggered? Am I missing something else that needs to be upgraded?

I think it'd be cool if you could do a PR for the above repo to allow for these changes in ignite. It is a useful example repo to highlight new ignite functionality.

Does this mean I need to necessarily stop doing that and switch to using idist.set_local_rank() with my local_rank, so that the sync_all_reduce decorator for the metrics get triggered?

Either with idist.set_local_rank(...) or simply python -m torch.distributed.launch --nproc_per_node=8 --use_env ./train.py.

Am I missing something else that needs to be upgraded?

Optionally, we can use idist.auto_model and idist.auto_dataloader to simplify DDP and distributed sampler creation and batch scaling...

I think it'd be cool if you could do a PR for the above repo to allow for these changes in ignite. It is a useful example repo to highlight new ignite functionality.

Yes, I agree ! The problem is that I'm a bit far from NLP and will need a bit of assistance :)

I believe we can close this issue as solved. Feel free to ask other question related questions here or in another issue.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

vfdev-5 picture vfdev-5  ยท  4Comments

elanmart picture elanmart  ยท  4Comments

alykhantejani picture alykhantejani  ยท  3Comments

czotti picture czotti  ยท  3Comments

kilsenp picture kilsenp  ยท  3Comments