Hi, I am trying to implement the ImageDataGenerator class in a custom generator, but the problem is that when I implement it the training takes toooo long.
This is the generator without implemeting the image augmentation:
import numpy as np
import keras
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, features, targets,
n_classes =99, batch_size=32, shuffle=True):
'Initialization'
self.n_classes = n_classes
self.n_vals = len(targets)
self.list_IDs = np.arange(self.n_vals)
self.batch_size = batch_size
self.features = features
self.shuffle = shuffle
self.targets = targets
self.targets_mc = keras.utils.to_categorical(targets, num_classes=self.n_classes)
self.indexes = np.arange(len(self.list_IDs))
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
# # Generate data
# for i, ID in enumerate(list_IDs_temp):
# # Store sample
# X[i,] = np.load('data/' + ID + '.npy')
#
# # Store class
# y[i] = self.labels[ID]
return X, y
when using the previous code to train a network each epoch takes about 2 seconds:
batch_size = 16
params = {'batch_size': batch_size,
'n_classes': 99,
'shuffle': True}
images_training_generator = DataGenerator(X_img_tr,y_tr, **params)
images_validation_generator = DataGenerator(X_img_val,y_val, **params)
model = images_model()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit_generator(images_training_generator,
validation_data=images_validation_generator,
epochs=100,
callbacks = callbacks)
Epoch 1/100
49/49 [==============================] - 3s 67ms/step - loss: 4.2916 - acc: 0.0689 - val_loss: 3.1654 - val_acc: 0.1979
Epoch 2/100
49/49 [==============================] - 2s 44ms/step - loss: 2.2260 - acc: 0.4247 - val_loss: 2.0634 - val_acc: 0.4375
Epoch 3/100
49/49 [==============================] - 2s 45ms/step - loss: 1.1918 - acc: 0.6467 - val_loss: 1.9450 - val_acc: 0.4792
Now I try to implement image augmentation with almostalmost the same generator:
import numpy as np
import keras
from keras.preprocessing.image import ImageDataGenerator
class ImageGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, features, targets,
n_classes =99,
batch_size=32,
shuffle=True,
repeats = 1):
self.n_classes = n_classes
self.n_vals = len(targets)
# since we are using data agumentation we repeat the number of times we show each image.
# we show the same original image but it can be rotated or flipper each time, so it is not the "same" image
self.list_IDs = np.repeat(np.arange(self.n_vals),repeats) #OJO con esto, deberian ser las imagenes validas si queremos hacer bien las cosas
self.batch_size = batch_size
self.features = features
self.shuffle = shuffle
self.targets = targets
self.targets_mc = keras.utils.to_categorical(targets, num_classes=self.n_classes)
self.indexes = np.arange(len(self.list_IDs))
self.agumentator = ImageDataGenerator(
#featurewise_center=True,
#featurewise_std_normalization=True,
rotation_range=30,
zoom_range=0.1,
width_shift_range=0.01,
height_shift_range=0.01,
horizontal_flip=True,
vertical_flip=True)
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
X, y = self.agumentator.flow(X, y, batch_size=self.batch_size).next()
# # Generate data
# for i, ID in enumerate(list_IDs_temp):
# # Store sample
# X[i,] = np.load('data/' + ID + '.npy')
#
# # Store class
# y[i] = self.labels[ID]
return X, y
note that the difference between the two generators is at the end of the method __data_generator.
Well, so when I use this generator to train a network:
augmentation_image_repetition = 1
batch_size = 16
params = {'batch_size': batch_size,
'n_classes': 99,
'shuffle': True,}
images_validation_generator = DataGenerator(X_img_val,y_val, **params) # <-- no augmentation
params['repeats'] = augmentation_image_repetition
images_training_generator = ImageGenerator(X_img_tr,y_tr, **params)
model.fit_generator(images_training_generator,
validation_data=images_validation_generator,
epochs=100,
callbacks = callbacks
)
the training time takes about 11 seconds
Epoch 1/100
49/49 [==============================] - 11s 227ms/step - loss: 4.5657 - acc: 0.0153 - val_loss: 4.1507 - val_acc: 0.0625
Epoch 2/100
49/49 [==============================] - 10s 214ms/step - loss: 3.9298 - acc: 0.0867 - val_loss: 3.1752 - val_acc: 0.1771
Epoch 3/100
49/49 [==============================] - 11s 218ms/step - loss: 3.2304 - acc: 0.1531 - val_loss: 2.9212 - val_acc: 0.2292
I triend to implement the arguments
use_multiprocessing=True,
max_queue_size=10,
workers=4
in the fit_generator to speed up the process, and speeds up a little bit but still is 4 times slower than the training without the augmentation.
Probably I am implementing something wrong but googling I couldn't find out what I am doing wrong.
So any peace of advise to make the training faster would be very appreciated.
You could use a Threadpool? But I do not know how this will work on your computer and with the GIL. DA is expensive, it involves a lot of matrix computations.
I got a 3x speedup using this.
Snippet (Switch between data_generation_threads and data_generation to see the speedup)
from multiprocessing.pool import ThreadPool
import keras
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import OrderedEnqueuer
class ImageGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, features, targets,
n_classes=99,
batch_size=32,
shuffle=True,
repeats=1):
self.n_classes = n_classes
self.n_vals = len(targets)
# since we are using data agumentation we repeat the number of times we show each image.
# we show the same original image but it can be rotated or flipper each time, so it is not the "same" image
self.list_IDs = np.repeat(np.arange(self.n_vals),
repeats) # OJO con esto, deberian ser las imagenes validas si queremos hacer bien las cosas
self.batch_size = batch_size
self.features = features
self.shuffle = shuffle
self.targets = targets
self.targets_mc = keras.utils.to_categorical(targets, num_classes=self.n_classes)
self.indexes = np.arange(len(self.list_IDs))
self.pool = None
self.agumentator = ImageDataGenerator(
# featurewise_center=True,
# featurewise_std_normalization=True,
rotation_range=30,
zoom_range=0.1,
width_shift_range=0.01,
height_shift_range=0.01,
horizontal_flip=True,
vertical_flip=True)
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
if self.pool is None:
self.pool = ThreadPool(6)
indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
X, y = self.__data_generation_threads(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
X = np.array(self.agumentator.flow(X, batch_size=self.batch_size, shuffle=self.shuffle).next())
return X, y
def __data_generation_threads(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
X = np.array(self.pool.map(lambda xi: self.agumentator.random_transform(self.agumentator.standardize(xi)), X))
return X, y
X_data = np.ones([10000, 100, 100, 3])
y_data = np.ones([10000])
g = ImageGenerator(X_data, y_data)
import time
t = time.time()
enqueuer = OrderedEnqueuer(g, True)
enqueuer.start(4)
gen = enqueuer.get()
for _ in range(len(g)):
next(gen)
print("Took", time.time() - t)
Hey @Dref360 thank you very much for your response.
I slightly modified the code that you provided me so I can switch between using __data_generation_threadsand __data_generation using exactly the same class:
from multiprocessing.pool import ThreadPool
import keras
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.utils import OrderedEnqueuer
class ImageGeneratorParallel(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, features, targets,
n_classes=99,
batch_size=32,
shuffle=True,
repeats=1,
parallel = True):
self.n_classes = n_classes
self.n_vals = len(targets)
# since we are using data agumentation we repeat the number of times we show each image.
# we show the same original image but it can be rotated or flipper each time, so it is not the "same" image
self.list_IDs = np.repeat(np.arange(self.n_vals),
repeats) # OJO con esto, deberian ser las imagenes validas si queremos hacer bien las cosas
self.batch_size = batch_size
self.features = features
self.shuffle = shuffle
self.targets = targets
self.targets_mc = keras.utils.to_categorical(targets, num_classes=self.n_classes)
self.indexes = np.arange(len(self.list_IDs))
self.pool = None
self.parallel = parallel
self.agumentator = ImageDataGenerator(
# featurewise_center=True,
# featurewise_std_normalization=True,
rotation_range=30,
zoom_range=0.1,
width_shift_range=0.01,
height_shift_range=0.01,
horizontal_flip=True,
vertical_flip=True)
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
if self.parallel:
if self.pool is None:
self.pool = ThreadPool(4)
indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
if self.parallel:
X, y = self.__data_generation_threads(list_IDs_temp)
else:
X, y = self.__data_generation(list_IDs_temp)
return X, y
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
def __data_generation(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
X = np.array(self.agumentator.flow(X, batch_size=self.batch_size, shuffle=self.shuffle).next())
return X, y
def __data_generation_threads(self, list_IDs_temp):
'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
# Initialization
X = self.features[list_IDs_temp]
y = self.targets_mc[list_IDs_temp]
X = np.array(self.pool.map(lambda xi: self.agumentator.random_transform(self.agumentator.standardize(xi)), X))
return X, y
when using parallel=True (your suggestion) I obtain the following training times:
Epoch 1/100
49/49 [==============================] - 19s 387ms/step - loss: 4.5787 - acc: 0.0191 - val_loss: 4.2784 - val_acc: 0.0208
Epoch 2/100
49/49 [==============================] - 15s 312ms/step - loss: 4.0272 - acc: 0.0702 - val_loss: 3.2079 - val_acc: 0.2083
Epoch 3/100
49/49 [==============================] - 17s 340ms/step - loss: 3.2917 - acc: 0.1416 - val_loss: 2.7439 - val_acc: 0.2812
Epoch 4/100
49/49 [==============================] - 15s 312ms/step - loss: 2.9124 - acc: 0.1722 - val_loss: 2.4835 - val_acc:
and when using parallel=True I obtained:
Epoch 1/100
49/49 [==============================] - 11s 232ms/step - loss: 4.6051 - acc: 0.0051 - val_loss: 4.5955 - val_acc: 0.0208
Epoch 2/100
49/49 [==============================] - 11s 215ms/step - loss: 4.5961 - acc: 0.0064 - val_loss: 4.5952 - val_acc: 0.0104
Epoch 3/100
49/49 [==============================] - 11s 220ms/step - loss: 4.5961 - acc: 0.0077 - val_loss: 4.5951 - val_acc: 0.0104
so apparently this is slowing even more the training :( .
By using htop and nvidia-smi I noticed that when I use the code that you provided me the CPUs are much more loaded but the GPU less.
Additionally in both cases what I notice the percentage of GPU usage toggles between 0% and 16% when I use your code and 3% and 25% when I use parallel = False. I suspect that the slow training time has to do with how keras implement the augmentation and the training.
I was hopping to do the augmentation and the training in parallel, so every time a batch is finished the data is already processed and augmented, but probably this is too complicated to do with keras.
To discard that the fit_generator is introducing the long time I tried using the ImageDataGenerator in fit_generator without my custom generator:
model = images_model()
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
augmentation_image_repetition = 1
batch_size = 16
params = {'batch_size': batch_size,
'n_classes': 99,
'shuffle': True,}
images_validation_generator = DataGenerator(X_img_val,y_val, **params) # <-- no augmentation
agumentator = ImageDataGenerator(
# featurewise_center=True,
# featurewise_std_normalization=True,
rotation_range=30,
zoom_range=0.1,
width_shift_range=0.01,
height_shift_range=0.01,
horizontal_flip=True,
vertical_flip=True)
y_tr_cat = to_categorical(y_tr, 99)
steps = int(np.ceil(X_img_tr.shape[0]/batch_size))
model.fit_generator(agumentator.flow(X_img_tr, y= y_tr_cat, batch_size=batch_size),
validation_data=images_validation_generator,
epochs=100,
callbacks = callbacks,
steps_per_epoch = steps
)
and I obtained almost the same times:
Epoch 1/100
50/50 [==============================] - 12s 234ms/step - loss: 4.5721 - acc: 0.0200 - val_loss: 4.1956 - val_acc: 0.0625
Epoch 2/100
50/50 [==============================] - 9s 185ms/step - loss: 3.9310 - acc: 0.0875 - val_loss: 3.1242 - val_acc: 0.1875
Epoch 3/100
50/50 [==============================] - 10s 195ms/step - loss: 3.1395 - acc: 0.1638 - val_loss: 2.7737 - val_acc: 0.3125
Epoch 4/100
50/50 [==============================] - 10s 197ms/step - loss: 2.8417 - acc: 0.2212 - val_loss: 2.4699 - val_acc: 0.2917
So, I don't know hot to speed up things more.
The augmentation and the training are done in parallel when workers > 1.
Hey @Dref360 thank you very much for your response.
I slightly modified the code that you provided me so I can switch between using__data_generation_threadsand__data_generationusing exactly the same class:from multiprocessing.pool import ThreadPool import keras import numpy as np from keras.preprocessing.image import ImageDataGenerator from keras.utils import OrderedEnqueuer class ImageGeneratorParallel(keras.utils.Sequence): 'Generates data for Keras' def __init__(self, features, targets, n_classes=99, batch_size=32, shuffle=True, repeats=1, parallel = True): self.n_classes = n_classes self.n_vals = len(targets) # since we are using data agumentation we repeat the number of times we show each image. # we show the same original image but it can be rotated or flipper each time, so it is not the "same" image self.list_IDs = np.repeat(np.arange(self.n_vals), repeats) # OJO con esto, deberian ser las imagenes validas si queremos hacer bien las cosas self.batch_size = batch_size self.features = features self.shuffle = shuffle self.targets = targets self.targets_mc = keras.utils.to_categorical(targets, num_classes=self.n_classes) self.indexes = np.arange(len(self.list_IDs)) self.pool = None self.parallel = parallel self.agumentator = ImageDataGenerator( # featurewise_center=True, # featurewise_std_normalization=True, rotation_range=30, zoom_range=0.1, width_shift_range=0.01, height_shift_range=0.01, horizontal_flip=True, vertical_flip=True) def __len__(self): 'Denotes the number of batches per epoch' return int(np.floor(len(self.list_IDs) / self.batch_size)) def __getitem__(self, index): 'Generate one batch of data' # Generate indexes of the batch if self.parallel: if self.pool is None: self.pool = ThreadPool(4) indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size] # Find list of IDs list_IDs_temp = [self.list_IDs[k] for k in indexes] # Generate data if self.parallel: X, y = self.__data_generation_threads(list_IDs_temp) else: X, y = self.__data_generation(list_IDs_temp) return X, y def on_epoch_end(self): 'Updates indexes after each epoch' self.indexes = np.arange(len(self.list_IDs)) if self.shuffle == True: np.random.shuffle(self.indexes) def __data_generation(self, list_IDs_temp): 'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels) # Initialization X = self.features[list_IDs_temp] y = self.targets_mc[list_IDs_temp] X = np.array(self.agumentator.flow(X, batch_size=self.batch_size, shuffle=self.shuffle).next()) return X, y def __data_generation_threads(self, list_IDs_temp): 'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels) # Initialization X = self.features[list_IDs_temp] y = self.targets_mc[list_IDs_temp] X = np.array(self.pool.map(lambda xi: self.agumentator.random_transform(self.agumentator.standardize(xi)), X)) return X, ywhen using
parallel=True(your suggestion) I obtain the following training times:Epoch 1/100 49/49 [==============================] - 19s 387ms/step - loss: 4.5787 - acc: 0.0191 - val_loss: 4.2784 - val_acc: 0.0208 Epoch 2/100 49/49 [==============================] - 15s 312ms/step - loss: 4.0272 - acc: 0.0702 - val_loss: 3.2079 - val_acc: 0.2083 Epoch 3/100 49/49 [==============================] - 17s 340ms/step - loss: 3.2917 - acc: 0.1416 - val_loss: 2.7439 - val_acc: 0.2812 Epoch 4/100 49/49 [==============================] - 15s 312ms/step - loss: 2.9124 - acc: 0.1722 - val_loss: 2.4835 - val_acc:and when using
parallel=TrueI obtained:Epoch 1/100 49/49 [==============================] - 11s 232ms/step - loss: 4.6051 - acc: 0.0051 - val_loss: 4.5955 - val_acc: 0.0208 Epoch 2/100 49/49 [==============================] - 11s 215ms/step - loss: 4.5961 - acc: 0.0064 - val_loss: 4.5952 - val_acc: 0.0104 Epoch 3/100 49/49 [==============================] - 11s 220ms/step - loss: 4.5961 - acc: 0.0077 - val_loss: 4.5951 - val_acc: 0.0104so apparently this is slowing even more the training :( .
By using htop and nvidia-smi I noticed that when I use the code that you provided me the CPUs are much more loaded but the GPU less.
Additionally in both cases what I notice the percentage of GPU usage toggles between 0% and 16% when I use your code and 3% and 25% when I use parallel = False. I suspect that the slow training time has to do with how keras implement the augmentation and the training.
I was hopping to do the augmentation and the training in parallel, so every time a batch is finished the data is already processed and augmented, but probably this is too complicated to do with keras.
To discard that the fit_generator is introducing the long time I tried using the
ImageDataGeneratorinfit_generatorwithout my custom generator:model = images_model() model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) augmentation_image_repetition = 1 batch_size = 16 params = {'batch_size': batch_size, 'n_classes': 99, 'shuffle': True,} images_validation_generator = DataGenerator(X_img_val,y_val, **params) # <-- no augmentation agumentator = ImageDataGenerator( # featurewise_center=True, # featurewise_std_normalization=True, rotation_range=30, zoom_range=0.1, width_shift_range=0.01, height_shift_range=0.01, horizontal_flip=True, vertical_flip=True) y_tr_cat = to_categorical(y_tr, 99) steps = int(np.ceil(X_img_tr.shape[0]/batch_size)) model.fit_generator(agumentator.flow(X_img_tr, y= y_tr_cat, batch_size=batch_size), validation_data=images_validation_generator, epochs=100, callbacks = callbacks, steps_per_epoch = steps )and I obtained almost the same times:
Epoch 1/100 50/50 [==============================] - 12s 234ms/step - loss: 4.5721 - acc: 0.0200 - val_loss: 4.1956 - val_acc: 0.0625 Epoch 2/100 50/50 [==============================] - 9s 185ms/step - loss: 3.9310 - acc: 0.0875 - val_loss: 3.1242 - val_acc: 0.1875 Epoch 3/100 50/50 [==============================] - 10s 195ms/step - loss: 3.1395 - acc: 0.1638 - val_loss: 2.7737 - val_acc: 0.3125 Epoch 4/100 50/50 [==============================] - 10s 197ms/step - loss: 2.8417 - acc: 0.2212 - val_loss: 2.4699 - val_acc: 0.2917So, I don't know hot to speed up things more.
Well, unfortunately, augmentation is doing in CPU if you using ImageDataGenerator, and CPU is not "parallel" enough to process matrix multiply. Each time you doing augmentation, is very time consuming. Try reduce your transformation type, or doing it with opencv under GPU version. Hope it help.
If some body could add a "GPU" mode for ImageDataGenerator, it will be very useful.
I was wondering if it is possible to parallelise between GPU and CPU.
While GPU trains, the CPU augmentates the data and you could save that training time since you don't have to wait till training has finished to start augmentating the images.
I was wondering if it is possible to parallelise between GPU and CPU.
While GPU trains, the CPU augmentates the data and you could save that training time since you don't have to wait till training has finished to start augmentating the images.
if you do augmentation in data_sequence or data_generator and use "fit_generator". You get the parallelise feature you want by setting
.fit_generator(
...
max_queue_size=100,
workers = 10 ,# (set a proper value > 1)
use_multiprocessing=True,
)
the fit_generator will prepare extra data on cpu continuously until the queue is full while GPU working.
I was wondering if it is possible to parallelise between GPU and CPU.
While GPU trains, the CPU augmentates the data and you could save that training time since you don't have to wait till training has finished to start augmentating the images.if you do augmentation in data_sequence or data_generator and use "fit_generator". You get the parallelise feature you want by setting
.fit_generator(
...
max_queue_size=100,
workers = 10 ,# (set a proper value > 1)
use_multiprocessing=True,
)
the fit_generator will prepare extra data on cpu continuously until the queue is full while GPU working.
@ziyigogogo This worked well for me! Thanks :)
Most helpful comment
if you do augmentation in data_sequence or data_generator and use "fit_generator". You get the parallelise feature you want by setting
.fit_generator(
...
max_queue_size=100,
workers = 10 ,# (set a proper value > 1)
use_multiprocessing=True,
)
the fit_generator will prepare extra data on cpu continuously until the queue is full while GPU working.