Keras: TF/threads: Must call train_on_batch and predict_on_batch before starting threads, get errors otherwise

Created on 29 Jan 2017  路  13Comments  路  Source: keras-team/keras

If I use models in other threads I get errors. If I run them first (call train_on_batch and predict_on_batch) it works. But I cannot do that for my real models because I only get train data after I eval a few times (simulation stuff).

I tried "tfGraph.as_default()" and "global/local_variables_initializer()", that things just give different errors for all combinations, see 4 examples:

Just spawning threads:
ValueError: Tensor("Const:0", shape=(), dtype=float32) must be from the same graph as Tensor("sub_3:0", shape=(), dtype=float32).

Using: with tfGraph.as_default()
IndexError: pop from empty list

Using: tfGraph.as_default() and global/local_variables_initializer()
InvalidArgumentError (see above for traceback): You must feed a value for placeholder tensor 'keras_learning_phase' with dtype bool

Using: tfGraph.as_default() and global/local_variables_initializer() and K.manual_variable_initialization(True) [not shown in 2nd Gist]:
FailedPreconditionError (see above for traceback): Attempting to use uninitialized value dense_1_W

Gist: test.py (run without arguments, could also test sequential or processes, commented out atm to show the bug. If you remove the comment on line 209 it works (calls warmup(), which calls train_on_batch and predict_on_batch, before starting the threads):
https://gist.github.com/droid666/eebf14dc8e92f3c4bccb92a1c0fd4279

Same with "tfGraph.as_default()" and "global/local_variables_initializer()" at lines 134-143:
https://gist.github.com/droid666/3353c8f1f225a1245fab59f4d1570e89

Sorry if I just do it wrong (global/local_variables_initializer() is used wrong, I am rather sure).

Edit: also I cannot just do the following because I have complex custom loss functions, so y_predict and y_train are very different in size and content:

for m in myModels:
        x = np.zeros((1,m.inSize))
        y = m.model.predict_on_batch(x)
        m.model.train_on_batch(x, y)
stale

Most helpful comment

Note: if I add a sleep between starting training and starting eval, it works. The inverse case works too (start eval, sleep, start training).

There is some kind of corruption if 2 threads start using the model at the same time. Some kind of setup is done on first eval and first training call that is not thread safe. After that initial calls it is safe to start multiple threads at once.

thread1.start()
import time
time.sleep(1)
thread2.start()

All 13 comments

This can be tricky. Try setting sessions and graphs as defaults in with statements:

import numpy as np
import tensorflow as tf

from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
from threading import Thread

tf_session = K.get_session() # this creates a new session since one doesn't exist already.
tf_graph = tf.get_default_graph()

X_train = np.array([[-1,-1],[-1,1],[1,-1],[1,1]]).astype('float32')
Y_train = np.array([1, -1, -1, 1]).astype('float32')
X_test = np.array([[-1,-1],[1,1]]).astype('float32')

def create_play_model():
    model = Sequential()
    model.add(Dense(3,  input_shape=(2,), activation='tanh'))
    model.add(Dense(1, activation='tanh'))
    model.compile(optimizer='adadelta', loss='mean_squared_error')
    model.fit(X_train, Y_train, nb_epoch=1000, verbose=0)
    return model

with tf_session.as_default():
    with tf_graph.as_default():
        model = create_play_model()

model2 = None
def func_to_run_in_other_thread(model, tf_session=tf.get_default_session(), tf_graph=tf.get_default_graph()):
    with tf_session.as_default():
        with tf_graph.as_default():
            Y  = model.predict(X_test)
            print Y
            print "Success1! I can train in the main thread and predict in another!"

            global model2
            model2 = create_play_model()

thread = Thread(target=func_to_run_in_other_thread, args=(model, tf_session, tf_graph))
thread.start()
thread.join()

Y  = model2.predict(X_test)
print Y
print "Success2! I can train in another thread and predict in the main thread!"

https://www.tensorflow.org/api_docs/python/client/session_management

Thank you, I tried this variant, does not work in my case. If I change your example to use 2 threads, one for train and one for eval, it fails too:

Predict throws:

/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2947, in device
    self._device_function_stack.pop()
IndexError: pop from empty list

Train throws:

lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 3082, in _pop_control_dependencies_controller
    assert self._control_dependencies_stack[-1] is controller
AssertionError
import numpy as np
import tensorflow as tf

from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential
from threading import Thread

tf_session = K.get_session() # this creates a new session since one doesn't exist already.
tf_graph = tf.get_default_graph()

X_train = np.array([[-1,-1],[-1,1],[1,-1],[1,1]]).astype('float32')
Y_train = np.array([1, -1, -1, 1]).astype('float32')
X_test = np.array([[-1,-1],[1,1]]).astype('float32')

def create_play_model():
    model2 = Sequential()
    model2.add(Dense(3,  input_shape=(2,), activation='tanh'))
    model2.add(Dense(1, activation='tanh'))
    model2.compile(optimizer='adadelta', loss='mean_squared_error')
    #model2.fit(X_train, Y_train, nb_epoch=1000, verbose=0)
    return model2

with tf_session.as_default():
    with tf_graph.as_default():
        model = create_play_model()

def func_to_run_in_other_thread(model, tf_session=tf.get_default_session(), tf_graph=tf.get_default_graph()):
    with tf_session.as_default():
        with tf_graph.as_default():
            for i in range(0, 1000):
                Y  = model.predict(X_test)
            print Y
            print "Success! I can predict in this thread and train in another!"


def func_to_train_in_other_thread(model, tf_session=tf.get_default_session(), tf_graph=tf.get_default_graph()):
    with tf_session.as_default():
        with tf_graph.as_default():
            model.fit(X_train, Y_train, nb_epoch=1000, verbose=0)
            print "Success! I can train in this thread and predict in another!"

#uncomment to make it work: if train and eval is used before starting threads, it works
#model.fit(X_train, Y_train, nb_epoch=1, verbose=0)
#Y  = model.predict(X_test)

thread1 = Thread(target=func_to_train_in_other_thread, args=(model, tf_session, tf_graph))
thread2 = Thread(target=func_to_run_in_other_thread, args=(model, tf_session, tf_graph))
thread1.start()
thread2.start()
thread1.join()
thread2.join()

print "Done."

Note: if I add a sleep between starting training and starting eval, it works. The inverse case works too (start eval, sleep, start training).

There is some kind of corruption if 2 threads start using the model at the same time. Some kind of setup is done on first eval and first training call that is not thread safe. After that initial calls it is safe to start multiple threads at once.

thread1.start()
import time
time.sleep(1)
thread2.start()

In my code above the two threads never run the model at the same time. There seems to be a race condition that shows up with larger models. I recreated the same two bugs using a larger model. Both errors are related to some stack in the ops module being in the wrong state for just a second. A simple lock somewhere could probably fix it. But given how simple the work around is, I'm not sure it's worth hunting down.

Even, I am facing this issue, though I have two completely different models being trained on two threads.
I too get the stack in th ops module error. May be there are global ops which need locks.

I had to give a sleep between the two threads to start to get it worked.
thread1.start()
time.sleep(1)
thread2.start()

Hello, I am having the same problem,
how did you solve it?

Thanks!

Hi!
Having the same issue here: calling time.sleep takes care of the error, but I'd like to tackle the race condition efficiently. Anybody knows where it could come from?

Hi! I tried training several models in parallel thanks to joblib, and indeed adding a time.sleep(0.1) worked! What kind of black magic is that ?

Here's the code in case anyone has the same issue:

def f(g):
            time.sleep(0.1)
            model = create_model(g)

            model.fit(x=fm[indices.train], y=labels_0based[str(i)][indices.train],
                            validation_data=(fm[indices.test], labels_0based[str(i)][indices.test]),
                            epochs=g['epochs'],
                            verbose=1)

            return model.history

            hparams_array = params_array_per_node[foo]

entries = np.array(Parallel(n_jobs=-1, prefer="threads")(delayed(f)(g) for g in hparams_array)).flatten()

Hello @AlexandreRozier ,

(I think your code is not properly formatted).

You are able to train different models at the same time just by adding the time.sleep(0.1)?? I am quite impressed. Have you had any closed_session errors or any tensorflow errors when parallel training?

Which version of tensorflow are you using (1.14 or 2.0 which is in beta)??

Thanks!

Hi!
I think the reson time.sleep worked for me is because I called the function twice simultaneously. Please make sure this does not happen or use use_locking=True in your Optimizer. Also, I think you should not have this error if you are using eager execution

@adr-arroyo I updated the formatting, thanks :)

I thought maybe giving a simpler example of what's working by me would be helpful:

from joblib import Parallel, delayed

def parallel_execution(x, y):

            time.sleep(0.01)
            with tensorflow.Session().as_default():
                history = model.fit(x=x, y=y)
                return history

results = Parallel(n_jobs=-1, require='sharedmem')(delayed(parallel_execution)(x[i], y[i]) for i in range(10))

This allows to leverage multi-core training (CPU) :)
Maybe the time.sleep is irrelevant but I didn't test it further.

I faced a similar problem - running models in different threads in parallel. And the https://github.com/keras-team/keras/issues/9424#issuecomment-367300516 answer helped me.
I'd like to share my solution of @droid666 problem:

import numpy as np
import tensorflow as tf

from keras import backend as K
from keras.layers import Dense
from keras.models import Sequential, model_from_json
from threading import Thread

X_train = np.array([[-1, -1], [-1, 1], [1, -1], [1, 1]]).astype('float32')
Y_train = np.array([1, -1, -1, 1]).astype('float32')
X_test = np.array([[-1, -1], [1, 1]]).astype('float32')


def create_play_model():
    model2 = Sequential()
    model2.add(Dense(3, input_shape=(2,), activation='tanh'))
    model2.add(Dense(1, activation='tanh'))
    model2.compile(optimizer='adadelta', loss='mean_squared_error')
    model2.fit(X_train, Y_train, nb_epoch=1000, verbose=0)
    return model2


def func_to_run_in_other_thread(architecture, weights):
    print('start func_to_run_in_other_thread')
    with tf.Session(graph=tf.Graph()) as sess:
        # Build model.
        model = model_from_json(architecture)
        model.set_weights(weights)
        model.compile(optimizer='rmsprop', loss='mse')

        for i in range(0, 1000):
            Y = model.predict(X_test)
        print(Y)
        print("Success! I can predict in this thread and train in another!")
    print('end func_to_run_in_other_thread')


def func_to_train_in_other_thread(architecture, weights):
    print('start func_to_train_in_other_thread')
    with tf.Session(graph=tf.Graph()) as sess:
        # Build model.
        model = model_from_json(architecture)
        model.set_weights(weights)
        model.compile(optimizer='rmsprop', loss='mse')

        model.fit(X_train, Y_train, nb_epoch=1000, verbose=0)
        print("Success! I can train in this thread and predict in another!")
    print('end func_to_train_in_other_thread')


if __name__ == "__main__":
    model = create_play_model()
    # Get the network in a portable format.
    architecture = model.to_json()
    weights = model.get_weights()

    thread1 = Thread(target=func_to_train_in_other_thread, args=(architecture, weights))
    thread2 = Thread(target=func_to_run_in_other_thread, args=(architecture, weights))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    print("Done.")

The solution was to pass not the compiled model to the thread, but the architecture and weights separately and to use not default tensorflow graphs, but new for each thread.
The solution time.sleep() not actually helps, it's only make to run threads one after another (if the sleep time is enough).
Please, correct me if something wrong.

Hello @GusevaAnna
Thanks for the post!

Your solution is more elegant than just adding some time.sleep() even though is more elaborated. I would like to add also that if you are using __Tensorflow 2.0__ the variable Session is not needed any more, so hopefully with only using the Graph it also works.
One disadvantage that I see is the fact that you need to compile the model every time you want to use it (in different threads), this will introduce some delays in the code.

Now that Keras is part of TF, let us hope that in a future update of TF 2.0 it is allowed for parallel model training/predicting without taking us so many troubles!

Cheers!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

anjishnu picture anjishnu  路  3Comments

nryant picture nryant  路  3Comments

harishkrishnav picture harishkrishnav  路  3Comments

zygmuntz picture zygmuntz  路  3Comments

rantsandruse picture rantsandruse  路  3Comments