System information
Describe the current behavior
I want to use a generator with the model.fit_generator(). Therefore, I implemented a generator function with yield, as described in fchollet's deep learning book, see Code Cell [9] in the book's example notebook.
When I ran my code with workers=2, I got an
ValueError: generator already executing
and an
RuntimeError: Your generator is NOT thread-safe. Keras requires a thread-safe generator when `use_multiprocessing=False, workers > 1`.
I then tried to make my generator function thread-safe, as described in this Stackoverflow post and in Anand's blog post.
This made the errors go away, but now I get an
AttributeError: 'ThreadSafeIterator' object has no attribute 'shape'
instead. To be more precise, when I run the code below, the output is:
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense (Dense) (None, 512) 401920
_________________________________________________________________
dropout (Dropout) (None, 512) 0
_________________________________________________________________
dense_1 (Dense) (None, 512) 262656
_________________________________________________________________
dropout_1 (Dropout) (None, 512) 0
_________________________________________________________________
dense_2 (Dense) (None, 10) 5130
=================================================================
Total params: 669,706
Trainable params: 669,706
Non-trainable params: 0
_________________________________________________________________
Traceback (most recent call last):
File "w:/GIT_WORKSPACES/MyProject/SampleCode.py", line 74, in <module>
workers=NUM_WORKERS)
File "C:\Users\XXX\AppData\Local\Continuum\anaconda3\envs\tf-2-env\lib\site-packages\tensorflow\python\keras\engine\training.py", line 1176, in fit_generator
steps_name='steps_per_epoch')
File "C:\Users\XXX\AppData\Local\Continuum\anaconda3\envs\tf-2-env\lib\site-packages\tensorflow\python\keras\engine\training_generator.py", line 144, in model_iteration
shuffle=shuffle)
File "C:\Users\XXX\AppData\Local\Continuum\anaconda3\envs\tf-2-env\lib\site-packages\tensorflow\python\keras\engine\training_generator.py", line 480, in convert_to_generator_like
num_samples = int(nest.flatten(data)[0].shape[0])
AttributeError: 'ThreadSafeIterator' object has no attribute 'shape
I have also read issue #12586, but the solution does not apply to my problem, because I am not using Sequence subclassing but, instead, a custom iterator to make my generator thread-safe.
Also note, that the code below runs without any problems by either setting USE_GENERATOR = False or NUM_WORKERS = 1.
Describe the expected behavior
Run the code below without an error.
Code example
import numpy as np
import threading
from tensorflow import keras
# ===============================================================
# Modify this:
# ===============================================================
USE_GENERATOR = True
NUM_WORKERS = 2
# ===============================================================
batch_size = 128
num_classes = 10
num_epochs = 5
# ===============================================================
class ThreadSafeIterator:
"""
Takes an iterator/generator and makes it thread-safe by
serializing call to the `next` method of given iterator/generator.
"""
def __init__(self, it):
self.it = it
self.lock = threading.Lock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
return self.it.__next__()
def thread_safe_generator(f):
"""A decorator that takes a generator function and makes it thread-safe."""
def g(*a, **kw):
return ThreadSafeIterator(f(*a, **kw))
return g
@thread_safe_generator
def create_generator(x, y):
assert x.shape[0] == y.shape[0]
i = 0
while True:
batch_slice = np.arange(i, i + batch_size) % x.shape[0]
x_batch = x[batch_slice, ...]
y_batch = y[batch_slice, ...]
i = i + 1
yield x_batch, y_batch
# ===============================================================
(x_train, y_train), (x_val, y_val) = keras.datasets.mnist.load_data()
# ===============================================================
x_train = x_train.reshape(60000, 28 * 28).astype('float32')/255
x_val = x_val.reshape(10000, 28 * 28).astype('float32')/255
y_train = keras.utils.to_categorical(y_train, num_classes)
y_val = keras.utils.to_categorical(y_val, num_classes)
# ===============================================================
model = keras.models.Sequential()
model.add(keras.layers.Dense(512, activation='relu', input_shape=(28 * 28,)))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(512, activation='relu'))
model.add(keras.layers.Dropout(0.2))
model.add(keras.layers.Dense(10, activation='softmax'))
model.summary()
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
# ===============================================================
if USE_GENERATOR:
train_gen = create_generator(x_train, y_train)
steps_per_epoch = int(np.ceil(x_train.shape[0] / batch_size))
history = model.fit_generator(train_gen,
steps_per_epoch=steps_per_epoch,
epochs=num_epochs,
validation_data=(x_val, y_val),
workers=NUM_WORKERS)
else:
history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=num_epochs,
validation_data=(x_val, y_val),
workers=NUM_WORKERS)
# ===============================================================
score = model.evaluate(x_val, y_val, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
Edit: I've checked on another machine: The same problem also occurs with the GPU-version and NUM_WORKERS=1 using an NVIDIA RTX 2070 with CUDA 10.1.
I just found out that the keras repo itself is using the above generator pattern for its utils and unit tests, see this repo search or any of the following definitions with their respective usages:
So, in principal my approach should work, since the Keras team is using the same implementation technique. What is the problem then and why do I get an AttributeError: 'ThreadSafeIterator' object has no attribute 'shape?
@svdHero I assume you're using TF.Keras and not Keras.
The underlying implementation is quite different and while everything is fine using Keras, your thread-safe generator will fail (and not considered a generator anymore) in TF.Keras.
The TF team actually fixed the issue 2 months ago here:
https://github.com/tensorflow/tensorflow/commit/177b6056239805e4de0a8e8e9c258edfa4a21099#diff-ba2b45d565934dc5120efeef5457ea2eR109-R114
The added "is_iterator()" will make your implementation work here.
It's not in TF 2.0.0 or TF 1.15.0rc3 unfortunately but it's in master.
Switching to Sequence might be a good bet long term.
There's a good tutorial here:
https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly
Most helpful comment
@svdHero I assume you're using TF.Keras and not Keras.
The underlying implementation is quite different and while everything is fine using Keras, your thread-safe generator will fail (and not considered a generator anymore) in TF.Keras.
The TF team actually fixed the issue 2 months ago here:
https://github.com/tensorflow/tensorflow/commit/177b6056239805e4de0a8e8e9c258edfa4a21099#diff-ba2b45d565934dc5120efeef5457ea2eR109-R114
The added "is_iterator()" will make your implementation work here.
It's not in TF 2.0.0 or TF 1.15.0rc3 unfortunately but it's in master.
Switching to Sequence might be a good bet long term.
There's a good tutorial here:
https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly