Pytorch-lightning: Hydra configs with multi GPU DDP training in Pytorch Lightning

Created on 27 Jul 2020  ·  17Comments  ·  Source: PyTorchLightning/pytorch-lightning

As far as I understand DDP backend runs my training script from beginning for each GPU that I use. Is there a way to avoid creating different hydra output directories in each of the scripts? Should I block somehow every process except one with local rank 0? In my case I'm saving model checkpoints and .yaml file to default hydra output directory, but config file is copied twice and checkpoints are saved once. Anyways, spawning too many of directories is not convenient.

What can I do?

Code

@hydra.main(config_path="train-config.yaml", strict=False)
def train(config: DictConfig) -> None:
    config.hydra_base_dir = os.getcwd()
    original_wd = hydra.utils.get_original_cwd()
    os.chdir(original_wd)

    checkpoint_callback = ModelCheckpoint(
        filepath=config.hydra_base_dir,
        save_top_k=3,
        verbose=True,
        monitor="val_loss",
        mode="min",
    )
    shutil.copy2("train-config.yaml", os.path.join(config.hydra_base_dir, "train-config.yaml"))

    wandb_logger = WandbLogger(
        offline=False,
    )

    model = MyModel(config)

    trainer = pl.Trainer(
        max_epochs=config.train.max_epochs,
        gpus=config.train.n_gpu,
        auto_select_gpus=True,
        distributed_backend="ddp",
        checkpoint_callback=checkpoint_callback,
        logger=wandb_logger,
    )

    trainer.fit(model)

What's your environment?

  • OS: Ubuntu 18.04

    • Conda, Python 3.7.7

    • hydra-core==0.11.3

    • pytorch-lightning==0.8.5

    • wandb==0.9.3

DDP question won't fix

Most helpful comment

@rakhimovv when you use ddp, PL starts subprocesses to run the training script (by simply passing the command for the training script to https://docs.python.org/3/library/subprocess.html) and gathers the gradients. What @omry means when he recommends using --multirun, is that instead of relying on the subprocess module as PL does, use --multirun to start subprocesses and gather the gradients. This is not supported by PL currently as far as I know.

ddp_spawn on the other hand starts subprocesses by using this method https://pytorch.org/docs/stable/multiprocessing.html#torch.multiprocessing.spawn, which is fundamentally different from directly calling the training script as ddp does.

All 17 comments

I've just found out that process local rank can be accessed via local_rank = os.environ.get("LOCAL_RANK", 0), because this is how lightning handles it under cover. Seems like there is needed some clarification on how to work with different DDP processes in the docs.

Nevertheless, it's not possible to delete hydra base directory from hydra.main decorated function, which is weird.

cc: @yukw777 @omry

You could configure Hydra run dir via the command line or your config file to be whatever you want it to be, see this.

However, I think the right approach to do DDP training with Hydra is to use multirun. With multirun - each running script gets it's own subdirectory under the primary working directory by design and not by accident.
I didn't see anyone doing it yet and I do not know how it will work with PL.
I think it's important to create a PL example for DDP , following the great work from @anthonytec2 (#2639) which is still in limbo.

I tried to use --multirun option
it does not work under trainer.distributed_backend=ddp
but it works withtrainer.distributed_backned=ddp_spawnwith one exception that running it in slurm it fails

I am not familiar with the difference between ddp and ddp-spawn in PL.
We may still need to make some changes or at least provide a working example. I am counting on people like you to help create that example.

Are you using the Submitit Launcher plugin to run it on SLURM?
What kind of failure?

@rakhimovv when you use ddp, PL starts subprocesses to run the training script (by simply passing the command for the training script to https://docs.python.org/3/library/subprocess.html) and gathers the gradients. What @omry means when he recommends using --multirun, is that instead of relying on the subprocess module as PL does, use --multirun to start subprocesses and gather the gradients. This is not supported by PL currently as far as I know.

ddp_spawn on the other hand starts subprocesses by using this method https://pytorch.org/docs/stable/multiprocessing.html#torch.multiprocessing.spawn, which is fundamentally different from directly calling the training script as ddp does.

Thanks for clarification @yukw777. I got it.

I misunderstood at first. In my comment above I meant the situation when I try to run several experiments using --multirunoption like
python train.py --multirun trainer.gpus=4 trainer.distributed_backend=ddp_spawn encoder.blocks_num=2,4

@yukw777, do I understand correctly that running --multirun and ddp simultaneously is not correct fundamentally?

@omry no plugin for slurm. I did not have a chance to check Sumbitit or PL's SlurmCluster object. I used just plain sbatch script

there are two options I tried:

  1. when I set --ntasks=1 despite the number of used gpus
    it works, in this case, PL manages spawning processes itself

  2. when I set --ntasks equal to the number of used gpus
    In this case, the PL transfers starting subprocesses to Slurm. Slurm makes it if one uses 'srun' command inside sbatch script.
    But I get the error below. It fails during the test phase. In my script, I ran fit and test sequentially.

Traceback (most recent call last):
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 202, in run_and_report
    return func()
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 345, in <lambda>
    lambda: hydra.multirun(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/hydra.py", line 132, in multirun
    return sweeper.sweep(arguments=task_overrides)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_sweeper.py", line 135, in sweep
    results = self.launcher.launch(batch, initial_job_idx=initial_job_idx)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_launcher.py", line 64, in launch
    ret = run_job(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/core/utils.py", line 123, in run_job
    ret.return_value = task_function(task_cfg)
  File "train_net.py", line 62, in main
    trainer.fit(model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py", line 1074, in fit
    self.ddp_train(process_idx=task, mp_queue=None, model=model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/distrib_data_parallel.py", line 577, in ddp_train
    model = model.configure_ddp(model, device_ids)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/core/lightning.py", line 887, in configure_ddp
    model = LightningDistributedDataParallel(model, device_ids=device_ids, find_unused_parameters=True)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 283, in __init__
    self._distributed_broadcast_coalesced(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 496, in _distributed_broadcast_coalesced
    dist._broadcast_coalesced(self.process_group, tensors, buffer_size)
RuntimeError: Broken pipe
Traceback (most recent call last):
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 202, in run_and_report
    return func()
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 345, in <lambda>
    lambda: hydra.multirun(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/hydra.py", line 132, in multirun
    return sweeper.sweep(arguments=task_overrides)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_sweeper.py", line 135, in sweep
    results = self.launcher.launch(batch, initial_job_idx=initial_job_idx)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_launcher.py", line 64, in launch
    ret = run_job(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/core/utils.py", line 123, in run_job
    ret.return_value = task_function(task_cfg)
  File "train_net.py", line 62, in main
    trainer.fit(model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py", line 1074, in fit
    self.ddp_train(process_idx=task, mp_queue=None, model=model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/distrib_data_parallel.py", line 577, in ddp_train
    model = model.configure_ddp(model, device_ids)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/core/lightning.py", line 887, in configure_ddp
    model = LightningDistributedDataParallel(model, device_ids=device_ids, find_unused_parameters=True)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 283, in __init__
    self._distributed_broadcast_coalesced(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 496, in _distributed_broadcast_coalesced
    dist._broadcast_coalesced(self.process_group, tensors, buffer_size)
RuntimeError: Broken pipe
Traceback (most recent call last):
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 202, in run_and_report
    return func()
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/utils.py", line 345, in <lambda>
    lambda: hydra.multirun(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/hydra.py", line 132, in multirun
    return sweeper.sweep(arguments=task_overrides)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_sweeper.py", line 135, in sweep
    results = self.launcher.launch(batch, initial_job_idx=initial_job_idx)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/_internal/core_plugins/basic_launcher.py", line 64, in launch
    ret = run_job(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/hydra/core/utils.py", line 123, in run_job
    ret.return_value = task_function(task_cfg)
  File "train_net.py", line 62, in main
    trainer.fit(model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/trainer.py", line 1074, in fit
    self.ddp_train(process_idx=task, mp_queue=None, model=model)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/trainer/distrib_data_parallel.py", line 577, in ddp_train
    model = model.configure_ddp(model, device_ids)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/pytorch_lightning/core/lightning.py", line 887, in configure_ddp
    model = LightningDistributedDataParallel(model, device_ids=device_ids, find_unused_parameters=True)
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 283, in __init__
    self._distributed_broadcast_coalesced(
  File "/trinity/home/r.rakhimov/.local/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 496, in _distributed_broadcast_coalesced
    dist._broadcast_coalesced(self.process_group, tensors, buffer_size)
RuntimeError: Broken pipe

The main problem here that setting --ntasksequal to the number of used gpus is the only option if I, for example, want to run multi-node (not just multi-gpu) training.

By ntask I mean you mean sbatch parameter? I have no intention of supporting it. If you want to use sbatch you are on your own, at least from my perspective.

Try with the submitit plugin and we can discuss further.
Also, please create a _minimal_ example demonstrating the problem. (again, using the submitit plugin).

@omry I'm trying to think through what you are suggesting here and what Lightning does.

Lightning currently handles multiple GPUs per node by launching a subprocess for each additional GPU on the node using the sys.argv command (this apparently reduces training times over multiprocessing). The use of sys.argv obviously causes issues with Hydra because it will include all the Hydra commands. For example, if multirun command is used, each node would spawn a new multirun job. I currently modify the sys.argv commands to get around this.

Would you suggest that the config for each node be a multirun job across available GPUs (ntasks == ngpus)? If we launch with the submitit launcher with command task=1,2 --multirun would that launch two jobs per node? I like this idea but I don't think Lightning supports it.

I'll think about this because I would like to find a simple approach to this issue. I don't think Lightning can support choosing the GPU for a job. I ran into this issue using mpirun (#2408). It might work to modify the CUDA_VISIBLE_DEVICES per job.

@rakhimovv sorry for the late reply, but yes you're correct, currently they don't work together.

Here's an example script that outlines the Hydra/Lightning issue with ddp backend

This example simulates how lightning spawns a process on a node with 2 GPUs (spawns one process along with the main process). You can see how sys.argv is used here. There will be a log directory for each process.

Also, if you updated the example to run multirun you will see it executes twice: python test_argv.py test=1,2 --multirun

import os
import sys
import subprocess
from os.path import abspath

import hydra
from omegaconf import DictConfig


def spawner(cfg):
    command = sys.argv
    full_path = hydra.utils.to_absolute_path(command[0])
    command[0] = full_path
    command = [sys.executable] + command
    cwd = hydra.utils.get_original_cwd()

    env_copy = os.environ.copy()
    env_copy['LOCAL_RANK'] = '1'
    proc = subprocess.Popen(command, env=env_copy, cwd=cwd)

def objective(cfg):
    if 'LOCAL_RANK' in os.environ:
        print('bar')
    else:
        print('foo')
        spawner(cfg)


@hydra.main(config_path='.', config_name='argv.yaml')
def main(cfg: DictConfig):
    objective(cfg)

if __name__ == '__main__':
    main()

Here is argv.yaml

test: 1

@omry I'm trying to think through what you are suggesting here and what Lightning does.

Lightning currently handles multiple GPUs per node by launching a subprocess for each additional GPU on the node using the sys.argv command (this apparently reduces training times over multiprocessing). The use of sys.argv obviously causes issues with Hydra because it will include all the Hydra commands. For example, if multirun command is used, each node would spawn a new multirun job. I currently modify the sys.argv commands to get around this.

Yes, it is an issue.
It also sounds like you need to create a Sweeper plugin for this, but I am not sure it's the best coarse of action.
Sweeper plugins takes the input overrides (command line) for a multirun and break it down to overrides for individual jobs.

Would you suggest that the config for each node be a multirun job across available GPUs (ntasks == ngpus)? If we launch with the submitit launcher with command task=1,2 --multirun would that launch two jobs per node? I like this idea but I don't think Lightning supports it.

I'll think about this because I would like to find a simple approach to this issue. I don't think Lightning can support choosing the GPU for a job. I ran into this issue using mpirun (#2408). It might work to modify the CUDA_VISIBLE_DEVICES per job.

I think for multiprocessing you need treat the application as a single run (not multirun) and let PL do the multiprocessing.
My suggestions to use the Slurm launcher were thinking more of the case for multi-node but to be honest I think this is not going to work right now and there should be more work to enable it.

Hydra can set environment variables for jobs, see this.
It's likely that this will help. In fact - I am specifically calling out RANK in Torch distributed as a use case there.

So it actually works out great to just have a configuration for submitit. For my example above, if you call objective via executor.submit(objective, cfg) it works out great, no need to mess with sys.argv. I think this is because submitit generates a new command (using pickles?) for the submission. I wonder if the Lightning folks would benefit in generating a subprocess similar to how submitit generates the submission??

@jgbos To clarify, the solution is to use submitit, but not with the hydra submitit plugin or hydra --multirun? Do you mind showing a working example using pl?

Also, does this mean there is currently no solution in the case that Slurm is not being used (outside of @omry suggestion to go down one level of abstraction and deal with Torch distributed ourselves)?

Sorry, but I don't have an example for DDP with Hydra.
Supporting it properly will take some development.

In the mean time try to get help from people that have been successful using PL DDP with Hydra. I think you can find a few on this issue.

@AlexSchuy I'm still trying to figure out the best options, but there are two steps I take to ensure Lightning DDP works. First I modify sys.argv in the main function (after hydra has been initiated) using the following (which should support multirun)

if distributed_backend == 'ddp':
    cwd = os.getcwd()
    sys.argv = sys.argv[:1]
    sys.argv.extend([
        f"hydra.run.dir={cwd}",
        "hydra/hydra_logging=disabled",
        "hydra/job_logging=disabled",
    ])
    overrides = OmegaConf.load('.hydra/overrides.yaml')
    for o in overrides:
        if 'hydra/sweeper' in o:
            continue

        if 'hydra/launcher' in o:
            continue

        sys.argv.append(o)

For launching via submitit, I have a command function, such as train(cfg: DictConfig), that is used for the submission

job = executor.submit(train, *args, **kwargs)

This issue has been automatically marked as stale because it hasn't had any recent activity. This issue will be closed in 7 days if no further activity occurs. Thank you for your contributions, Pytorch Lightning Team!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

srush picture srush  ·  3Comments

Vichoko picture Vichoko  ·  3Comments

justusschock picture justusschock  ·  3Comments

edenlightning picture edenlightning  ·  3Comments

as754770178 picture as754770178  ·  3Comments