Pytorch-lightning: DDP bug with ModelCheckpoint on ckp file saving

Created on 3 Nov 2020  路  10Comments  路  Source: PyTorchLightning/pytorch-lightning

馃悰 Bug


On the DDP mode with 2 gpus, trainer.test(model) will start before the bestmodel is saved on disk of the previous trainer.fit(model). It seems trainer.fit() and trainer.test() in ddp model is not synchronized.

Please reproduce using [the BoringModel and post here]

BoringModel doesn't support real 2 GPUs ddp.

To Reproduce

bm_cb = ModelCheckpoint(
        monitor='val_loss',
        filepath= './bestmodel/bestmodel-{epoch:02d}-{val_loss:.2f}',
        save_top_k=1,
        save_weights_only=True,
        mode='min'
)
trainer = pl.Trainer(gpus=2,
        max_epochs=20,
        checkpoint_callback=bm_cb,
        accelerator='ddp'
)

trainer.fit(model)
print(bm_cb.best_model_path)  
# example output: "./bestmodel/bestmodel-epoch=19-val_loss=0.02.ckpt
model.load_from_checkpoint(bm_cb.best_model_path) 
# error will raise here
# No such file or directory: ./bestmodel/bestmodel-epoch=19-val_loss=0.02.ckpt
test_result = self.trainer.test(model)

Expected behavior

The saved checkpoint model path is "./bestmodel/bestmodel-epoch=18-val_loss=0.03.ckpt".
However, the file "./bestmodel/bestmodel-epoch=19-val_loss=0.02.ckpt" is not saved on disk after the trainer.fit return.
The expected behavior of trainer.fit should wait the checkpoint finishing saving on disk.

I guess this bug can be reproduced if the last epoch of training is the best model.

Environment

  • PyTorch Version : 1.6.0+cu92
  • Pytorch lightning version: 1.0.4
  • OS (e.g., Linux): Redhat
  • How you installed PyTorch (conda, pip, source): install using pip
  • Python version: python 3.6.8
  • CUDA/cuDNN version: cuda/10.1.243-gcc-9.2.0-gl7b4mh
  • GPU models and configuration: 2 Nvidia v100 gpus
  • Job is running on Slurm cluster with one node, 2 gpus, 56 cpus, mem-per-cpu=2G
DDP bug / fix help wanted

All 10 comments

Hi! thanks for your contribution!, great first issue!

Hey thanks for the issue, would you be able to create a reproducible test with ddp 2gpus using this? https://github.com/PyTorchLightning/pytorch-lightning/blob/master/pl_examples/bug_report_model.py

Will help us debug much faster!

While editing the issue, I got another No such file or directory error cause the by ModelCheckpoint.best_model_path.

In this time, the best_model_path returns the bestmodel at epoch 42, but bestmodel at epoch 43 is on disk instead of bestmodel 42.

The code cause the bug are the same except the max epoch is set to 50 in trainer initialization. The training phase actually runs to the epoch 49 which is the last epoch (epoch number is starting from 0).

Hey thanks for the issue, would you be able to create a reproducible test with ddp 2gpus using this? https://github.com/PyTorchLightning/pytorch-lightning/blob/master/pl_examples/bug_report_model.py

Will help us debug much faster!

I will try that

Opps. accidently clicked the close button

Thanks, I haven't been able to reproduce this issue on my side, this script (slightly different) works fine on my end:

import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, random_split, TensorDataset

import pytorch_lightning as pl
from pytorch_lightning import seed_everything
from pytorch_lightning.callbacks import ModelCheckpoint


class IrisClassification(pl.LightningModule):
    def __init__(self):
        super(IrisClassification, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(4, 10),
            nn.ReLU(),
            nn.Linear(10, 3),
            nn.ReLU(),
            nn.Softmax(dim=1)
        )
        self.cross_entropy_loss = nn.CrossEntropyLoss()

    def forward(self, x):
        x = self.model(x)
        return x

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.02)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = self.cross_entropy_loss(logits, y)
        self.log('loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = F.cross_entropy(logits, y)
        self.log("val_loss", loss, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self.forward(x)
        loss = F.cross_entropy(logits, y)
        a, y_hat = torch.max(logits, dim=1)
        test_acc = accuracy_score(y_hat.cpu(), y.cpu())
        self.log_dict(
            dictionary={
                "test_loss": loss,
                "test_acc": torch.tensor(test_acc)
            },
            prog_bar=True,
            on_epoch=True
        )


def run_test():
    model = IrisClassification()
    seed_everything(42)

    iris = load_iris()
    df = iris.data
    target = iris["target"]

    data = torch.Tensor(df).float()
    labels = torch.Tensor(target).long()

    data_set = TensorDataset(data, labels)
    train_set, val_set = random_split(data_set, [130, 20])
    train_set, test_set = random_split(train_set, [110, 20])
    train_loader = DataLoader(dataset=train_set, batch_size=8, num_workers=32)
    val_loader = DataLoader(dataset=val_set, batch_size=8, num_workers=32)
    test_loader = DataLoader(dataset=test_set, batch_size=8, num_workers=32)
    bm_cb = ModelCheckpoint(
        monitor='val_loss',
        filepath='./bestmodel/bestmodel-{epoch:02d}-{val_loss:.2f}',
        save_top_k=1,
        save_weights_only=True,
        mode='min'
    )
    trainer = pl.Trainer(
        max_epochs=10,
        gpus=2,
        accelerator='ddp',
        callbacks=[bm_cb]
    )

    trainer.fit(model, train_loader, val_loader)
    print(bm_cb.best_model_path)
    # example output: "./bestmodel/bestmodel-epoch=19-val_loss=0.02.ckpt
    model.load_from_checkpoint(bm_cb.best_model_path)
    # error will raise here
    # No such file or directory: ./bestmodel/bestmodel-epoch=19-val_loss=0.02.ckpt
    test_result = trainer.test(model, test_loader)


if __name__ == '__main__':
    run_test()
import os
from pytorch_lightning.accelerators import accelerator
from pytorch_lightning.callbacks.progress import ProgressBar
import torch
from torch.utils.data import Dataset
from pytorch_lightning import Trainer, LightningModule
from pytorch_lightning.callbacks import ModelCheckpoint


class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len


class BoringModel(LightningModule):

    def __init__(self):
        """
        Testing PL Module
        Use as follows:
        - subclass
        - modify the behavior for what you want
        class TestModel(BaseTestModel):
            def training_step(...):
                # do your own thing
        or:
        model = BaseTestModel()
        model.training_epoch_end = None
        """
        super().__init__()
        self.layer = torch.nn.Linear(32, 2)

    def forward(self, x):
        return self.layer(x)

    def loss(self, batch, prediction):
        # An arbitrary loss to have a loss that updates the model weights during `Trainer.fit` calls
        return torch.nn.functional.mse_loss(prediction, torch.ones_like(prediction))

    def step(self, x):
        x = self.layer(x)
        out = torch.nn.functional.mse_loss(x, torch.ones_like(x))
        return out

    def training_step(self, batch, batch_idx):
        output = self.layer(batch)
        loss = self.loss(batch, output)
        return {"loss": loss}

    def training_step_end(self, training_step_outputs):
        return training_step_outputs

    def training_epoch_end(self, outputs) -> None:
        torch.stack([x["loss"] for x in outputs]).mean()

    def validation_step(self, batch, batch_idx):
        output = self.layer(batch)
        loss = self.loss(batch, output)
        self.log('val_loss', loss, prog_bar=True)
        return {"x": loss}

    def validation_epoch_end(self, outputs) -> None:
        torch.stack([x['x'] for x in outputs]).mean()

    def test_step(self, batch, batch_idx):
        output = self.layer(batch)
        loss = self.loss(batch, output)
        return {"y": loss}

    def test_epoch_end(self, outputs) -> None:
        torch.stack([x["y"] for x in outputs]).mean()

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(self.layer.parameters(), lr=0.1)
        lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1)
        return [optimizer], [lr_scheduler]


#  NOTE: If you are using a cmd line to run your script,
#  provide the cmd line as below.
#  opt = "--max_epochs 1 --limit_train_batches 1".split(" ")
#  parser = ArgumentParser()
#  args = parser.parse_args(opt)

def run_test():

    class TestModel(BoringModel):

        def on_train_epoch_start(self) -> None:
            print('override any method to prove your bug')

    # fake data
    train_data = torch.utils.data.DataLoader(RandomDataset(32, 64))
    val_data = torch.utils.data.DataLoader(RandomDataset(32, 64))
    test_data = torch.utils.data.DataLoader(RandomDataset(32, 64))

    # model
    model = TestModel()
    bm_cb = ModelCheckpoint(
        monitor='val_loss',
        filepath= './bestmodel-{epoch:02d}-{val_loss:.2f}',
        save_top_k=1,
        save_weights_only=True,
        mode='min'
    )
    trainer = Trainer(
        gpus=2,
        max_epochs=20,
        progress_bar_refresh_rate=20,
        flush_logs_every_n_steps=20,
        checkpoint_callback=bm_cb,
        accelerator='ddp'
    )

    trainer.fit(model, train_data, val_data)
    print('load from checkpoint', bm_cb.best_model_path)
    model.load_from_checkpoint(bm_cb.best_model_path)
    trainer.test(model,test_dataloaders=test_data)


if __name__ == '__main__':
    run_test()

The above code cause the following error.

load from checkpoint /fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/bestmodel-epoch=07-val_loss=0.50.ckpt
Traceback (most recent call last):
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/bug_report.py", line 150, in <module>
    run_test()
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/bug_report.py", line 145, in run_test
    model.load_from_checkpoint(bm_cb.best_model_path)
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/pytorch_lightning/core/saving.py", line 132, in load_from_checkpoint
    checkpoint = pl_load(checkpoint_path, map_location=lambda storage, loc: storage)
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/pytorch_lightning/utilities/cloud_io.py", line 31, in load
    with fs.open(path_or_url, "rb") as f:
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/fsspec/spec.py", line 903, in open
    **kwargs
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/fsspec/implementations/local.py", line 115, in _open
    return LocalFileOpener(path, mode, fs=self, **kwargs)
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/fsspec/implementations/local.py", line 197, in __init__
    self._open()
  File "/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/venv/lib64/python3.6/site-packages/fsspec/implementations/local.py", line 202, in _open
    self.f = open(self.path, mode=self.mode)
FileNotFoundError: [Errno 2] No such file or directory: '/fs1/epscor/home/zluo_epscor/plant_disease/ensemble_DL/bestmodel-epoch=07-val_loss=0.50.ckpt'
#!/bin/bash

#SBATCH --job-name gpu   ## name that will show up in the queue
#SBATCH --output ./slurmout/slurm_test-%j.out   ## filename of the output; the %j is equal to jobID; default is slurm-[jobID].out
##SBATCH --error ./slurmout/slurm-%j_error.out
#SBATCH --ntasks=1  ## number of tasks (analyses) to run
#SBATCH --cpus-per-task=16  ## the number of threads allocated to each task
#SBATCH --mem-per-cpu=2G   # memory per CPU core
#SBATCH --gres=gpu:2  ## number of GPUs per node
#SBATCH --partition=anonymos ## the partitions to run in (comma seperated)
#SBATCH --time=0-00:30:00  ## time for analysis (day-hour:min:sec)

## Load modules
module load cuda/10.1.243-gcc-9.2.0-gl7b4mh
## Insert code, and run your programs here (use 'srun').
srun python3 bug_report.py

This is the slurm command

Hey @lzrpotato when you log your val loss, make sure you add the flag to sync across processes:

self.log('val_loss', loss, prog_bar=True, sync_dist=True)

This fixed this bug report for me, I'll find the docs where this is found at clarify it :)

https://pytorch-lightning.readthedocs.io/en/latest/lightning_module.html#log

LightningModule.log(name, value, prog_bar=False, logger=True, on_step=None, on_epoch=None, reduce_fx=torch.mean, tbptt_reduce_fx=torch.mean, tbptt_pad_token=0, enable_graph=False, sync_dist=False, sync_dist_op='mean', sync_dist_group=None)

This explained the self.log API.

@SeanNaren Thank you for your help.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

polars05 picture polars05  路  3Comments

mmsamiei picture mmsamiei  路  3Comments

baeseongsu picture baeseongsu  路  3Comments

jcreinhold picture jcreinhold  路  3Comments

williamFalcon picture williamFalcon  路  3Comments