Keras: fit_generator seems dead lock

Created on 2 Jun 2018  路  24Comments  路  Source: keras-team/keras

  • [. ] Check that you are up-to-date with the master branch of Keras. You can update with:
    pip install git+git://github.com/keras-team/keras.git --upgrade --no-deps

  • [. ] If running on TensorFlow, check that you are up-to-date with the latest version. The installation instructions can be found here.

  • [. ] If running on Theano, check that you are up-to-date with the master branch of Theano. You can update with:
    pip install git+git://github.com/Theano/Theano.git --upgrade --no-deps

  • [. ] Provide a link to a GitHub Gist of a Python script that can reproduce your issue (or just copy the script here if it is short).


I'm using fit_generator with multiprocessing. The first epoch is normal, but second epoch did not finish for a long time. The CPU usage of these process is quite low (about 0%). After I kill the process, it is found that main process is waiting for a lock.
Ubuntu 16.04
Python 3.6.5
Keras 2.1.6
TensorFlow 1.8.0 with GPU

Some Log:

2018-06-02 10:08:08.448699: Total params: 21,519,259
2018-06-02 10:08:08.448710: Trainable params: 1,514,965
2018-06-02 10:08:08.448718: Non-trainable params: 20,004,294
2018-06-02 10:08:10.959486: Epoch 1/50
2018-06-02 10:28:38.247401:  - 1227s - loss: 0.2332 - rmse: 0.2332 - val_loss: 0.2218 - val_rmse: 0.2218
2018-06-02 10:28:38.248712: 
Epoch 00001: val_rmse improved from inf to 0.22183, saving model to ../log/2018-06-02 10:03:57/keras_nn_0.hdf5
2018-06-02 10:28:40.006534: Epoch 2/50
2018-06-02 11:23:44.784851: Traceback (most recent call last):

Main Process Trackback:

2018-06-02 11:23:44.785217: KeyboardInterrupt
2018-06-02 11:23:44.803122: Traceback (most recent call last):
2018-06-02 11:23:44.803174:   File "/home/**********/image_model.py", line 498, in <module>
2018-06-02 11:23:44.803383: validation_data=CustomSequence(X_valid, y_valid, batch_size),
2018-06-02 11:23:44.803390:   File "/home/****/miniconda3/lib/python3.6/site-packages/keras/legacy/interfaces.py", line 91, in wrapper
2018-06-02 11:23:44.803792: return func(*args, **kwargs)
2018-06-02 11:23:44.803800:   File "/home/****/miniconda3/lib/python3.6/site-packages/keras/engine/training.py", line 2194, in fit_generator
2018-06-02 11:23:44.805133: generator_output = next(output_generator)
2018-06-02 11:23:44.805141:   File "/home/****/miniconda3/lib/python3.6/site-packages/keras/utils/data_utils.py", line 578, in get
2018-06-02 11:23:44.805721: inputs = self.queue.get(block=True).get()
2018-06-02 11:23:44.805728:   File "/home/****/miniconda3/lib/python3.6/multiprocessing/pool.py", line 638, in get
2018-06-02 11:23:44.805851: self.wait(timeout)
2018-06-02 11:23:44.805857:   File "/home/****/miniconda3/lib/python3.6/multiprocessing/pool.py", line 635, in wait
2018-06-02 11:23:44.805962: self._event.wait(timeout)
2018-06-02 11:23:44.805969:   File "/home/****/miniconda3/lib/python3.6/threading.py", line 551, in wait
2018-06-02 11:23:44.806527: signaled = self._cond.wait(timeout)
2018-06-02 11:23:44.806534:   File "/home/****/miniconda3/lib/python3.6/threading.py", line 295, in wait 
2018-06-02 11:23:44.806606: waiter.acquire()
2018-06-02 11:23:44.806620: KeyboardInterrupt

Releated Code:

class CustomSequence(Sequence):
  def __init__(self, df, y_set=None, batch_size=batch_size, isTrainDataOrTest=True):
    self.X, self.y = df, y_set
    self.batch_size = batch_size
    self.isTrainDataOrTest = isTrainDataOrTest
    self.counter = 0
    self.epoch_counter = 0

  def __len__(self):
    return int(np.ceil(len(self.X) / self.batch_size))

  def __getitem__(self, idx):
    # print(f'Batch #: {idx}')
    # print(f'From {idx * self.batch_size} to {(idx + 1) * self.batch_size}')
    last_index = (idx + 1) * self.batch_size
    if last_index > self.X.shape[0]:
      last_index = self.X.shape[0]
    batch_x = self.X[idx * self.batch_size:last_index]
    title = tokenizer.texts_to_sequences(batch_x['title'])
    title = sequence.pad_sequences(title, maxlen=title_maxlen)
    desp = tokenizer.texts_to_sequences(batch_x['description'])
    desp = sequence.pad_sequences(desp, maxlen=desp_maxlen)
    img_data = np.zeros(shape=(batch_x.shape[0], 128, 128, 3))
    for index, im_id in enumerate(batch_x['image']):
      img_data[index] = get_image_data(im_id, self.isTrainDataOrTest)

    # print(f'Returned batch size: {len(batch_x)}')
    self.counter += 1
    if self.isTrainDataOrTest:
      batch_y = self.y[idx * self.batch_size: last_index]
      return [batch_x[categorical], batch_x[continous], title, desp, img_data], np.array(batch_y)
    else:
      return [batch_x[categorical], batch_x[continous], title, desp, img_data]


  def on_epoch_end(self):
    print('\nEpoch end: ' + str(self.epoch_counter) + ' counter: ' + str(self.counter))
    self.epoch_counter += 1


 model.fit_generator(generator=CustomSequence(X_train, y_train, batch_size),
                      epochs=50,
                      verbose=2,
                      use_multiprocessing=True,
                      workers=4,
                      validation_data=CustomSequence(X_valid, y_valid, batch_size),
                      callbacks=[check_point, early_stop, rlrop])

Most helpful comment

I was facing the same issue here. What did the trick for me was switching the multiprocessing library process starting method to 'spawn' or 'forkserver', from the default 'fork' method, as pointed out by @saulvargas. At the end of each epoch, the multiprocessing library complains about some leaked semaphores, which it releases, but the show goes on.

The main script should look like:

if __name__ == '__main__:
  import multiprocessing as mp
  mp.set_start_method('spawn')
  <import everything else and run your code>

I am using Keras 2.3.30 with Tensorflow 1.13.1 (same results with CPU and GPU versions alike) under Linux (Arch). My input pipeline uses only numpy, scipy.ndimage and pydicom. Everything is installed in a python virtual env.

The PR cited above, which is merged into the Keras master branch by now, did not help much. It simply uses a 30 second timeout to detect when the worker process pool hangs for some batch, prints a warning message and falls back to sequential code for that batch.

The fix was designed for the case when the culprit for the hanging is the data generating code in the worker processes for some batch, but that is not the case with me. When hanging does happen, it happens at the very beginning of epochs, or when starting validation, when the worker processes pool is spawned, and they never get to the actual data generating code. They hang even before initialization code in keras/utils/data_utils.py gets called. In that way, with the fix in the PR, you end up getting a sequential input pipeline that sleeps for 30 seconds before producing each batch.

All 24 comments

I reproduce this problem by rerun the script. After setting use_multiprocessing=False, the problem disappear, but the speed decrease a lot.

Some questions :

  • What's inside get_image_data
  • Are you reading in a file (hdf5, json, csv) at the same time?
  • What's rlprop?
  • Type of X_train, y_train?

Thank's for answering.
python def get_image_data(input_param): im_id, train = input_param if train: img_file = '../../train_jpg/{}.jpg'.format(im_id) else: img_file = '../../test_jpg/{}.jpg'.format(im_id) img = cv2.imread(img_file) if img is None: print(f"Error: FOUND empty image path: {img_file}") img = np.zeros(shape=(img_width, img_height, 3)) target_size = (img_width, img_height) if img.size != target_size: img = cv2.resize(img, target_size, interpolation=cv2.INTER_CUBIC) return img

I'm not 100% sure, but it seems no file io when training.

rlprop is a ReduceLROnPlateau

X_train is a Pandas.DataFrame, y_train is a numpy.array.

The worker is waiting for the result of your Sequence.

Could you try to avoid using Dataframe? Just a plain numpy array or a list would be good.

I didn't catch up. There seems be no blocking operation in my Sequence.

Where is the difference between DataFrame and numpy.array?

My computer is busy now, I would try use numpy.array later, maybe tomorrow.

I've noticed the same issue twice now. fit_generator completes an epoch and sometimes dead lock. It has happened to me on the fifth epoch as well as on the third. ModelCheckpoint was triggered on both of those instances and ReduceLROnPlateau on one of them. I'm using the following callbacks:

ReduceLROnPlateau(monitor='val_mask_instances_loss',
                  factor=0.3, verbose=1, min_lr=1e-5, patience=2)
TensorBoard(log_dir='some_path', batch_size=BATCH_SIZE))
ModelCheckpoint(filepath='some_path', verbose=1,
                period=1, save_best_only=True, save_weights_only=False))

My generator is a custom generator that simply loads numpy arrays using numpy and images using cv2 and does some basic normalization as well as conversion from sparse matrices.

I'm getting the same process trace as @mutexliu.

Specs:
Python 3.5.2
Keras 2.1.6
TensorFlow 1.8
Ubuntu 16.04

Noticed that we're both using cv2 in our generators, perhaps that is a part of the problem?

@mutexliu As you can see from the stacktrace, you were waiting at : ApplyResult.get which waits for the result of the Sequence.
You said that your Sequence works while using thread (use_multiprocessing=False), pandas Dataframe may not be shareable across processes. Your Sequence seems fine otherwise.

For both of you, I'm always using cv2 in my Sequence and I haven't got into any problem.

I came across the same problem that second epoch does not terminate. My training data exists in multiple parquet files, and every epoch I traverse all the files to generate numpy array by batch. I use the parameter use_multiprocessing by default.
python: 2.7
keras: 2.1.6
tensorflow: 1.7

Same issue here. My workaround consisted on developing my own parallel generator mechanism, making sure that new processes were started with the "forkserver" method instead of the default "fork", which copies resources from the parent process (threads, locks, etc. which are _likely_ behind the deadlocks problems).

+1

+1

I think my issue is related to this, I'm using multiple workers as well. I have two test system, on equipped with V-100 GPUs and one with K-20 GPUs. Multiprocessing works fine with one GPU on both systems. Two V-100 work fine as well, but two K-20 will run into a deadlock before the validation process of the first epoch.

https://gist.github.com/PowerOfCreation/b0940d377180d0131f51a538152f788b

Sadly, I'm on a HPC cluster and can't control the installed Tensorflow version, I'm on Tensorflow version 1.10 and Keras version 2.1.6.

In my case, call the generator once before using it would fix this problem.

# call generator once to avoid dead lock
next(train_gen)
history = m.fit_generator(train_gen)

Keras (2.1.5)
tensorflow-gpu (1.12.0)

In my case, call the generator once before using it would fix this problem.

# call generator once to avoid dead lock
next(train_gen)
history = m.fit_generator(train_gen)

Keras (2.1.5)
tensorflow-gpu (1.12.0)

Doesn't work for me, the problem persists. I have done further testing and it seems like I can run the fit_generator fine with 2xV-100 GPUs, but 2xK-20 or 2xK-80 GPUs will result in a deadlock. My log:
https://gist.github.com/PowerOfCreation/e831022bb0c5dde36f701c5eac5e9ea3

We are running into this issue a lot, after writing a subclass of keras.utils.Sequence which works with np.arrays. Our trainings hang on that same self.queue.get(block=True).get() in data_utils.py. Had the issue on 2.1.6 and 2.2.4

Also running into this issue a lot with my colleague, don't know what to do...

@Dref360 if it is of any help, we logged a few things in our subclass of Sequence, notably at the very start of the __getitem__ method and just before its return statement (which does not do any compute). At the start of an epoch, it gets called two times and then hangs. Both calls reach the return statement as our logs show (the #xxxx shows the requested index):

Epoch 64/1500
======= in Sequence subclass: getitem #3702
======= in Sequence subclass: getitem #5062
======= in Sequence subclass: getitem #3702 just before return
======= in Sequence subclass: getitem #5062 just before return

Then we hang forever.

I'd say this rules out our Subclass causing the unexpected wait?

We've been trying to reproduce this on a toy example but no luck so far.

Same issue. Also using fit_generator and multiprocessing. For some reason the same code works when I run it from a jupyter notebook (with the kernel pointing at the same conda environment from where I get the failure)

I was facing the same issue here. What did the trick for me was switching the multiprocessing library process starting method to 'spawn' or 'forkserver', from the default 'fork' method, as pointed out by @saulvargas. At the end of each epoch, the multiprocessing library complains about some leaked semaphores, which it releases, but the show goes on.

The main script should look like:

if __name__ == '__main__:
  import multiprocessing as mp
  mp.set_start_method('spawn')
  <import everything else and run your code>

I am using Keras 2.3.30 with Tensorflow 1.13.1 (same results with CPU and GPU versions alike) under Linux (Arch). My input pipeline uses only numpy, scipy.ndimage and pydicom. Everything is installed in a python virtual env.

The PR cited above, which is merged into the Keras master branch by now, did not help much. It simply uses a 30 second timeout to detect when the worker process pool hangs for some batch, prints a warning message and falls back to sequential code for that batch.

The fix was designed for the case when the culprit for the hanging is the data generating code in the worker processes for some batch, but that is not the case with me. When hanging does happen, it happens at the very beginning of epochs, or when starting validation, when the worker processes pool is spawned, and they never get to the actual data generating code. They hang even before initialization code in keras/utils/data_utils.py gets called. In that way, with the fix in the PR, you end up getting a sequential input pipeline that sleeps for 30 seconds before producing each batch.

On my case, where I run multiprocessing in module file for example my_util.py
then I need to pre-set the method as
`mp.set_forkserver_preload(my_util)'

other wise mp.set_start_method('forkserver') works under if __name__ == '__main__' (for short)
by default it will be fork and not really safe for within-threaded processes.

I got this issue because I have a bug in my own valid data generator, which has 0 lengths of data. Everything works after I fixed that bug.

Same issue here. My workaround consisted on developing my own parallel generator mechanism, making sure that new processes were started with the "forkserver" method instead of the default "fork", which copies resources from the parent process (threads, locks, etc. which are _likely_ behind the deadlocks problems).

i did the same but with large batches the efficiency is bad because data needs to be copied around between processes. I tested the multiprocessing queue, zmq, plain socket, pipe, none of it can achieve the parallelism like tf.data.Dataset. However, since I'm working on image, tf image augmentation is very limited. This is very annoying.

@ysyyork I was having same problem. With large tensors, data copy takes lot of time and blocks working process. As solution, in Python 3.8, you can use multiprocessing.shared_memory to share numpy arrays across multiple processes (zero copy). This solved my problem and made my input pipeline blazingly fast.

EDIT: A rough code looks like this

def shared_mem_multiprocessing(sequence, workers=2, queue_max_size=16):
    from multiprocessing import Process, Queue, shared_memory, managers
    queue = Queue(maxsize=queue_max_size)
    manager = managers.SharedMemoryManager()
    manager.start()


    def worker(sequence, idxs):
        for i in idxs:
            x = sequence[i]
            a = x[0] # Taking x only from (x,y)
            shm = manager.SharedMemory(size=a.nbytes)
            b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
            b[:] = a[:]
            queue.put((a.shape, a.dtype, shm.name))
            shm.close()
            del shm


    idxs = np.array_split(np.arange(len(sequence)), workers)
    args = zip([sequence] * workers, idxs)
    processes = [Process(target=worker, args=(s, i)) for s, i in args]
    _ = [p.start() for p in processes]


    for i in range(len(sequence)):
        shape, dtype, shm_name = queue.get(block=True)
        existing_shm = shared_memory.SharedMemory(name=shm_name)
        arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
        arr2 = tf.convert_to_tensor(arr)
        yield arr2
        del arr
        existing_shm.close()
        existing_shm.unlink()


    manager.shutdown()
    manager.join()
    print("Joining all the processed")
    _ = [p.join() for p in processes]
    print("Closing all the processed")
    _ = [p.close() for p in processes]

Hope it helps.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

anjishnu picture anjishnu  路  3Comments

harishkrishnav picture harishkrishnav  路  3Comments

NancyZxll picture NancyZxll  路  3Comments

nryant picture nryant  路  3Comments

somewacko picture somewacko  路  3Comments