Keras: Keras is not multi-processing safe.

Created on 17 Apr 2018  路  19Comments  路  Source: keras-team/keras

It seems that Keras cannot run prediction in multiple processes simultaneously.
None of the hacks and workarounds mentioned in other issues actually seem to resolve this.

Each process has it's own keras and tensorflow import, yet no matter how it is done, the execution will either hang on predict() (If the model is created external to the child) or will (for some inexplicable reason) hang on setting the weights of the model.

Does anyone have any examples of prediction working in across multiple processes?

(Eg, a python processPoolExecutor)

If this is not possible - exactly which bit of the Keras/Tensorflow code causes this?

Most helpful comment

As of TF 1.10, the library seems to be somewhat forkable. So you will have to test what you can do.

Also, something you can try is:
multiprocessing.set_start_method('spawn', force=True) if you're on UNIX and using Python3.

All 19 comments

A tensorflow session cannot be shared across processes.
If it can help you, you need to set up a worker which will be the "owner" of the model and your other components will query this worker. (Like here: https://github.com/Dref360/tuto_keras_web)

Each child process is in charge of it's own keras environment (via local imports), tensorflow session, and loaded model - certainly there is no explicit passing of a session object from initiator to child.

Are you saying there is some global session being set up in the background despite this, that is beyond control?

Additionally, previous issues have warned against having a certain process own a model and child processes predict on it due to the deadlocks this causes in the predict function. Are there any examples of this not in a web server context?

Keras creates a global session (in keras.backend), you would need to recreate this session for each process. Otherwise, the session in the main process will accidentally be shared with its children.
You can do K.set_session().

As for the example, the deadlock occurs because other processes try to access the Session. If you set up a Queue like in the example you should be fine.

@Dref360
Thanks for this, but unfortunately I am already setting the tensorflow session in the child.
It does not resolve the deadlock.

I would rather avoid having to use a queue of multiple children accessing the same model as this places a bottleneck on what I am trying to do.

Minimal example,
I can only test on CPU right now, but it should work.

import os

os.environ['CUDA_VISIBLE_DEVICES'] = ''

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue

queue = Queue()

def get_weights():
    from keras.models import Sequential
    import keras.backend as K
    import tensorflow as tf
    K.set_session(tf.Session())
    from keras.layers import Dense
    model = Sequential()
    model.add(Dense(512, input_shape=(500,)))
    model.compile('sgd', 'mse')
    return model.get_weights()

def pool_job(weights):
    from keras.models import Sequential
    import keras.backend as K
    import tensorflow as tf
    K.set_session(tf.Session())
    from keras.layers import Dense
    import numpy as np

    model = Sequential()
    model.add(Dense(512, input_shape=(500,)))
    model.compile('sgd', 'mse')
    model.set_weights(weights)
    while True:
        model.fit(np.ones([10, 500]), np.ones([10, 512]), 10, epochs=1)
        queue.put(os.getpid())


pool = ProcessPoolExecutor(3)
weights = pool.submit(get_weights).result()
[pool.submit(pool_job, weights)]

while True:
    print("Got:", queue.get(block=True))

EDIT: Added how to get the weights

The hang occurs for me upon loading weights into a new model from a pretrained model - the above code will not show the issue because this does not occur.

In that case, no, it would not be feasible. Tensorflow is not fork-safe so if you define a pre-trained model somewhere, TF will initialize a bunch of things.

One thing you could do is to get the weights from another process. The key here is to never import Tensorflow in the main process. (I edited the code above)

Tensorflow never is imported into the main process as far as I'm aware!
The model is loaded from disk.

Here is some code that reproduces the issue:

https://pastebin.com/QhbWX9H2

As far as I can tell, all loading of tensorflow, keras, and the model occurs within the forked process - and is never loaded into the main process that calls the fork.

(test_number) here is simple an image to pass in for inference

Are you on master? I have a similar setting and it works.

For reference :

import os

os.environ['CUDA_VISIBLE_DEVICES'] = ''

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue
import numpy as np

queue = Queue()


class Network():
    def __init__(self, path):
        from keras.models import load_model
        import keras.backend as K
        import tensorflow as tf
        K.set_session(tf.Session())
        self.model = load_model(path)

    def predict(self, img, **kwargs):
        return self.model.predict(img, verbose=0)


def pool_job(path):
    model = Network(path)
    while True:
        model.predict(np.ones([10, 500]))
        queue.put(os.getpid())


pool = ProcessPoolExecutor(3)
# weight.h5 is from model.save('weight.h5')
[pool.submit(pool_job, 'weight.h5') for _ in range(3)]

while True:
    print("Got:", queue.get(block=True))

This still hangs for me when it tries to load in the weights for the model.

My Keras is up to date and my Tensorflow is version 1.7.0.

The hang occurs when creating the model, (model = Network(path)) so nothing is ever added to the queue.

I'm beginning to think there is a serious bug in Keras or Tensorflow and this is simply impossible.

What happens if you run the code that I posted above? Does that still work? What OS are you running this code on?

I'm having the same problem, and it used to work but isn't anymore (I'm trying to find exactly what I changed but no luck so far...)

When multiprocessing, it faces problems at this point:

model = Sequential()
model.add(LSTM(...)) # hangs here

Edit: Fixed my issue. Mine was simply removing the import for nltk

Regardless this remains an issue for me.

Hi @Dref360,

thank you very much for sharing with us your minimal solution.
Actually, I was just looking around for something similar which would have help me in passing keras objects to suprocessing across a "pool of workers".
Do you interrupt by hand the while(True) loop ?!

Thanks in advance
Giuseppe

You can interrupt the loop with a multiprocessing.Event. (see keras/utils/data_utils.py, it's widely used)

Hi @Dref360, thanks a lot for the reply!

I will briefly attempt to explain you what is my case and I will cut/paste below a snippet of code that I have written for this purpose according to your suggestions above.

The computer I'm working with has N cores on the CPU so I'd like to
perform the evaluation on maybe N separate processes. Every subprocess
should contain it's own neural network and

  1. Receive the coordinates in data space to be evaluated.
  2. Report back to the main process the result of the evaluation.

Taking the MNIST example in Keras, this is what I have written so far:

import os
import time
import signal 
import multiprocessing as mp
os.environ['CUDA_VISIBLE_DEVICES'] = ''
from concurrent.futures import ProcessPoolExecutor

CPU_COUNT = mp.cpu_count() # how many cpus on the machine? 
print("how many cpus: {}".format(CPU_COUNT))
queue = mp.Queue() # define an output queue 

def get_weights_MNIST():
    """
    This should be the main process, 
    where you define/save your model
    """
    print("Building and compiling the model in the 'mother' process")
    from keras import layers
    from keras import models
    import keras.backend as K  
    import tensorflow as tf
    K.set_session(tf.Session())
    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))
    model.compile(optimizer='rmsprop', loss='categorical_crossentropy',
        metrics=['accuracy'])
    print("Model summary:")
    model.summary()
    return model.get_weights() # returns a list of all weight tensors in the model, as Numpy arrays


def pool_job_MNIST(weights, quit, foundit):
    """
    This should be the child process, 
    where you load/run within another process.
    It will contain whatever code should be run on 
    multiple processors.
    """
    print("This is me, your child feeding from you Mum")
    from keras import layers
    from keras import models
    import keras.backend as K
    import tensorflow as tf
    K.set_session(tf.Session())
    from keras.datasets import mnist 
    from keras.utils import to_categorical

    (train_images, train_labels), (test_images, test_labels) = mnist.load_data()
    train_images = train_images.reshape((60000, 28, 28, 1))
    train_images = train_images.astype('float32') / 255

    test_images = test_images.reshape((10000, 28, 28, 1))
    test_images = test_images.astype('float32') / 255

    train_labels = to_categorical(train_labels)
    test_labels = to_categorical(test_labels)

    model = models.Sequential()
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(10, activation='softmax'))

    model.compile(optimizer='rmsprop', loss='categorical_crossentropy',
        metrics=['accuracy'])

    model.set_weights(weights) 
    # sets the values of the weights of the model, from a list of Numpy arrays. 
    # The arrays in the list should have the same shape as those returned by get_weights().

    while True:
        model.fit(train_images, train_labels, epochs=5, batch_size=64, verbose=1)
        test_loss, test_acc = model.evaluate(test_images, test_labels)
        print("test accuracy {}".format(test_acc))
        queue.put(os.getpid())
        queue.join()
        #print("process id:", queue.get(os.getpid()))

if __name__ == "__main__":
    pool = ProcessPoolExecutor(max_workers=CPU_COUNT) # create a pool of workers 
    weights = pool.submit(get_weights_MNIST).result() # a pool of processes 
        # [pool.submit(pool_job_MNIST, weights) for p in range(CPU_COUNT)]
    [pool.submit(pool_job_MNIST, weights)]
    # with ProcessPoolExecutor(max_workers=CPU_COUNT) as pool:
    #   pool.submit(pool_job_MNIST, weights)

    #while not queue.empty():
    while(True):
        if not queue.empty():
            print("Process Job id:", queue.get(block=True))
        #queue.task_done()

    #pool.close()
    #pool.terminate()
    exit()

Anything wrong? Am I missing something? Thanks in advance!

@Dref360 I also have the same problem like the upper. Thanks for your advice, It does work your git!

Is there any resolution on how to use multiprocessing with Keras?

As of TF 1.10, the library seems to be somewhat forkable. So you will have to test what you can do.

Also, something you can try is:
multiprocessing.set_start_method('spawn', force=True) if you're on UNIX and using Python3.

It's an old issue, so I feel a little bad for necromancing ... then again I really like playing casters.

It seems that Keras cannot run prediction in multiple processes simultaneously.

Don't do this :) I strongly believe that model.predict calls in different sub-processes is an anti-pattern, and you should instead refactor your code to batch up the calls and then call model.predict once with an entire batch of inputs.

Does anyone have any examples of prediction working in across multiple processes?

For simple jobs with small state this may look like this:

import tensorflow as tf
from multiprocessing import Pool
from time import sleep
from tensorflow.keras.layers import Dense
from tensorflow.keras.initializers import Constant


def preprocess(data_in):
    # do stuff
    sleep(3)
    data_out = data_in
    return data_out


def postprocess(data_in):
    # do stuff
    sleep(2)
    data_out = data_in
    return data_out


# the original function for reference
def job_fn(model, query):
    model_input = preprocess(query)
    predictions = model.predict(model_input)[0]
    result = postprocess(predictions)
    return result


if __name__ == "__main__":
    steve = tf.keras.models.Sequential()
    steve.add(Dense(1, use_bias=False, kernel_initializer=Constant(1)))
    jobs = [[idx] for idx in range(100)]

    # # sequential (single thread) code for reference
    # sequential_processing = map(job_fn, [steve] * 100, jobs)
    # for result in sequential_processing:
    #     print(result)

    # concurrent version that aggregates predictions
    with Pool() as workers:
        model_input = workers.map(preprocess, jobs)
        predictions = steve.predict(model_input)
        result = workers.map(postprocess, predictions)
    print(result)

If you have multiple calls to model.predict in your job, or big states that you don't want to pickle, or a need for locks/semaphores/mutexes, you can refactor the job as a generator that yields CPU jobs and GPU jobs. You would then write a small scheduler that processes the generator in green threads until it encounters expensive, parallelizable CPU work or a prediction. At this point, it interrupts the job via result = yield (job_description) and resumes later via gen().send(result). I am using this in my implementation of expert iteration (a variant of the AlphaGo self-play idea) to train a RL agent to play Hex. The code for it is online (https://github.com/FirefoxMetzger/hex-agents), but perhaps not commented enough to be clear. The function that starts the job is here
(Note that this is a recursive generator that performs MCTS while using model.predict to choose leafs to expand; isn't python a fun language? :) )
The (rather basic and hacky) scheduler is here

Was this page helpful?
0 / 5 - 0 ratings