Ray: Running ray with pytorch lightning in slurm job causes falure with error "ValueError: signal only works in main thread"

Created on 24 Sep 2020  路  8Comments  路  Source: ray-project/ray

Hi,

I am a newbie trying to integrate ray with pytroch lightning. I followed the instructions at https://docs.ray.io/en/master/tune/tutorials/tune-pytorch-lightning.html when setting up hyperparam tuning with ray. However, I encountered 2 issues while using ray.

ISSUE 1

Importing pytorch_lightning seems to throw an error with 0.8.7 version.

Cause:
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

Error:
ModuleNotFoundError: No module named 'ray.tune.integration.pytorch_lightning'

Module versions:
ray 0.8.7
tensorflow 2.1.0
python 3.7.4

ISSUE 1: FIX

I fixed this by installing ray using a wheel.
ray 0.9.0.dev0

ISSUE 2

With the new ray version, when I submit a slurm job to run the tuning I get the following error:

ray.tune.error.TuneError: Trial raised an exception. Traceback: ray::ImplicitFunc.train() (pid=4432, ip=172.26.92.190) File "/home/user/.local/lib/python3.7/site-packages/ray/tune/function_runner.py", line 227, in run self._entrypoint() File "/home/user/.local/lib/python3.7/site-packages/ray/tune/function_runner.py", line 290, in entrypoint self._status_reporter.get_checkpoint()) File "/home/user/.local/lib/python3.7/site-packages/ray/tune/function_runner.py", line 497, in _trainable_func output = train_func(config) File "tune.py", line 261, in train_run trainer.fit(model) File "/home/user/.local/lib/python3.7/site-packages/pytorch_lightning/trainer/states.py", line 48, in wrapped_fn result = fn(self, *args, **kwargs) File "/home/user/.local/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py", line 1073, in fit results = self.accelerator_backend.train(model) File "/home/user/.local/lib/python3.7/site-packages/pytorch_lightning/accelerators/gpu_backend.py", line 51, in train results = self.trainer.run_pretrain_routine(model) File "/home/user/.local/lib/python3.7/site-packages/pytorch_lightning/trainer/trainer.py", line 1184, in run_pretrain_routine self.register_slurm_signal_handlers() File "/home/user/.local/lib/python3.7/site-packages/pytorch_lightning/trainer/training_io.py", line 240, in register_slurm_signal_handlers signal.signal(signal.SIGUSR1, self.sig_handler) File "/usr/local/easybuild-2019/easybuild/software/mpi/gcc/8.3.0/openmpi/3.1.4/python/3.7.4/lib/python3.7/signal.py", line 47, in signal handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler)) ValueError: signal only works in main thread

Can I get some advice on how to proceed after this?

question

All 8 comments

Can you post a version of your SLURM script and python script?

It seems like you shouldn't be using the SLURMConnector with Ray Tune - after all, I think Ray will solve most of the SLURM problems that PTL aims to address.

We should put together some documentation on how to use Pytorch/PTL Distributed + Ray Tune + SLURM (cc @krfricke ).

Slurm script:

#!/bin/bash

#SBATCH --gres=gpu:4
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=8
#SBATCH --time=7-0:00:00


#SBATCH --output="./logs/output-%j.txt"
#SBATCH --error="./logs/%j.err"


# load necessary modules #
module purge
module load scikit-learn/0.21.3-python-3.7.4
module load python/3.7.4

# execute the job and time it

time python -u tune.py &> "tune_output.txt"


tune.py

import sys
import torch
from torch import optim, nn
import pytorch_lightning as pl
from torchvision import transforms
import torchvision.utils as vutils
from torch.utils.data import DataLoader
# from typing import List, Callable, Union, Any, TypeVar, Tuple
import torch.nn.functional as F
from os import makedirs, listdir
from os.path import join, exists, isfile
import torch.backends.cudnn as cudnn
from pytorch_lightning import Trainer
from test_tube import Experiment
from abc import abstractmethod
import numpy as np
import pandas as pd
import tfrecord
# from pytorch_lightning.loggers.test_tube import TestTubeLogger
from torch.utils.data import Dataset
from natsort import natsorted
from PIL import Image
from model_segnet_2 import SegNet

import shutil
from functools import partial
from tempfile import mkdtemp
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.utilities.cloud_io import load as pl_load
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

# Tensor = TypeVar('torch.tensor')

class CustomDataSet(Dataset):
    def __init__(self, csv_file, img_dir, transform):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_loc = join(self.img_dir, self.data.name[idx])
        image = Image.open(img_loc).convert("RGB")
        tensor_image = self.transform(image)
        label = self.data.label[idx]

        return tensor_image, label


class ExperimentAE(pl.LightningModule):
    def __init__(self,
                 params: dict,
                 **kwargs) -> None:
        super(ExperimentAE, self).__init__()

        self.params = params
        self.model = SegNet()

    def forward(self, z):
        return self.decoder(z)

    def _run_step(self, x):
        x_hat, z = self.model(x)
        return x_hat, z

    def generate(self, x):
        """
        Given an input image x, returns the reconstructed image
        :param x: (Tensor) [B x C x H x W]
        :return: (Tensor) [B x C x H x W]
        """
        return self._run_step(x)[0]

    def step(self, batch, batch_idx):
        x, y = batch
        self.curr_device = x.device
        x_hat, z = self._run_step(x)

        recon_loss = F.mse_loss(x_hat, x, reduction='mean')

        loss = recon_loss

        logs = {
            "recon_loss": recon_loss,
            # "kl": kl,
            "loss": loss,
        }
        return {"recon_loss": recon_loss, "loss": loss}

    def training_step(self, batch, batch_idx):
        train_loss = self.step(batch, batch_idx)
        logs = {"ptl/train_loss": train_loss}
        return {"loss": train_loss, "log": logs}

    def validation_step(self, batch, batch_idx):
        val_loss = self.step(batch, batch_idx)

        return val_loss

    def validation_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        tensorboard_logs = {'val_loss': avg_loss}
        return {'val_loss': avg_loss, 'log': tensorboard_logs}

    def configure_optimizers(self):
        optims = []
        scheds = []

        optimizer = optim.Adam(self.model.parameters(),
                               lr=self.params['lr'],
                               # weight_decay=self.params['weight_decay']
                               )

        optims.append(optimizer)

        if self.params['scheduler_gamma'] is not None:
            scheduler = optim.lr_scheduler.ExponentialLR(optims[0],
                                                         gamma=self.params['scheduler_gamma'])
            scheds.append(scheduler)

            return optims, scheds

        return optims

    def train_dataloader(self):
        transform = self.data_transforms(train=True)
        img_dir = "/data/brca/"
        train_csv = "/original/train.csv"

        dataset = CustomDataSet(train_csv, img_dir, transform=transform)
        loader = DataLoader(dataset, shuffle=True, batch_size=self.params['batch_size'], num_workers=4)

        self.num_train_imgs = dataset.__len__()

        return loader

    def val_dataloader(self):
        transform = self.data_transforms(train=False)

        img_dir = "/data/brca/"
        val_csv = "/original/validation.csv"

        val_dataset = CustomDataSet(val_csv, img_dir, transform=transform)
        self.valid_dataloader = DataLoader(val_dataset, shuffle=False, batch_size=self.params['batch_size'],
                                           num_workers=4)

        self.num_val_imgs = self.valid_dataloader.__len__()

        return self.valid_dataloader

    def test_dataloader(self):
        pass

    def data_transforms(self, train=True):
        SetRange = transforms.Lambda(lambda X: 2 * X - 1.)
        SetScale = transforms.Lambda(lambda X: X / X.sum(0).expand_as(X))

        if train:
            transform_train = transforms.Compose([
                # transforms.CenterCrop(148),
                transforms.Resize(144),
                transforms.RandomHorizontalFlip(),
                transforms.RandomVerticalFlip(),
                transforms.ToTensor(),
                SetRange,
                # transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
            ])
            return transform_train

        else:
            transform_val = transforms.Compose([
                transforms.Resize(144),
                transforms.ToTensor(),
                SetRange,
                # transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
            ])
            return transform_val


def train_run(config_params, num_epochs=10, num_gpus=1):
    model = ExperimentAE(params=config_params)

    trainer = Trainer(
        max_epochs=num_epochs,
        gpus=num_gpus,
        logger=TensorBoardLogger(
            save_dir=tune.get_trial_dir(), name="", version="."),
        progress_bar_refresh_rate=0,
        callbacks=[
            TuneReportCallback(
                {
                    "loss": "val_loss",
                },
                on="validation_end")
        ])

    trainer.fit(model)


def tune_run(num_samples=20, num_epochs=10, gpus_per_trial=1):
    tune_config = {
        "lr": tune.loguniform(1e-4, 1e-5, 1e-3),
        "batch_size": tune.choice([2, 4, 8]),
        # 'weight_decay': 0.0,
        'scheduler_gamma': tune.choice([1, 0.95, 0.9, 0.85, 0.6]),
    }

    scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=10,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["lr", "batch_size"],
        metric_columns=["loss", "training_iteration"]
    )
    tune.run(
        partial(
            train_run,
            num_epochs=num_epochs,
            num_gpus=gpus_per_trial
        ),
        resources_per_trial={
            "cpu": 1,
            "gpu": gpus_per_trial
        },
        config=tune_config,
        num_samples=num_samples,
        scheduler=scheduler,
        progress_reporter=reporter,
        name="tune_segnet_v1"
    )


if __name__ == "__main__":
    tune_run(num_samples=20, num_epochs=10, gpus_per_trial=1)


Can you try this hack? Add

os.environ["SLURM_JOB_NAME"] = "bash"

to your Python script?

Hi @richardliaw ,

The hack seems to have fixed it.

Thank you!

It's very terrible :) ! If it works for you, maybe you can submit a patch to Pytorch Lightning for a way to disable the SLURM detection?

Yes true, I will look into that.
Before that got some bugs to fix in my ray run. Its still not running as expected 馃槗

Feel free to post (perhaps use a new github issue!)

Ah fixed it (phew)! It was a bug in my code.

Was this page helpful?
0 / 5 - 0 ratings