Keras: Memory consumption when using fit generator

Created on 2 Sep 2016  路  28Comments  路  Source: keras-team/keras

Hi,

I have noticed that when I use fit generator with pickle safe to True, number of workers > 1, and max generator queue > 2 the python processes accumulate memory leading to hanging of the execution due to no more available memory (I have 40GB of RAM).

This does not happening when setting pickle safe to False.

Most helpful comment

Had the same issue. Added gc.collect() to the end of my custom generator and it helped to get rid of memory errors.

All 28 comments

The process should accumulate memory as the queue is being filled. If you run out of memory, consider reducing max_q_size.

pickle_safe=False will use threading instead of multiprocessing, which is lighter on memory use but slower.

I meet the same problem. When I set pickle_safe=True, I find my memory is accumulating, which leads to memory out problem. However, I use fit generator with pickle safe to False, no memory error is raised.
I wonder the reason why it happens.

I've run into the same thing and noticed that many child processes are being created (1 per training epoch) but those are not being joined after the epoch, resulting in a memory leak.

I am facing the same problem. My data is about 167G and I used two processes which costs 123G+106G. I think the main problem is that the new batches are added to the multiprocessing queue which pickles the data. This is really slow and costs a lot of memory.

A better idea is to maintain this data in a pre-allocated big array according to max_q_size and then put the index into the queue.

Yeah, refactoring would be welcome. I've moved over to setting max_q_size=1, nb_worker=1 and using an external multiprocessing.Queue instead.

Perhaps Keras would benefit from looking into Dask integration like tflearn instead.

I watched this video and it looks nice.

The big advantage is that it can utilize multiple gpu and one gpu for multiple process. But I am not sure it is compatible with Theano.

My thought was that you'd run the Dask graph on CPU and leave the GPU alone for Keras and model training. I'd bet most preprocessing is faster than the training graph, otherwise one could also use joblib.Memory or something. Of course, it should also be possible to just add all preprocessing into Theano/TensorFlow's graph and set device properly, but that might be overkill.

I agree and for me the preprocessing is just normalization which can be added to the model directly.
And I also found that although the free memory is low, most memory is used for disk cache. This is reasonable since I used mmap to load the data.

@yanyongluan Did you know the reason of the problem?

I encounter the same problem using fit_generator on a large train set with pickle_safe=False and a threading generator, it just keep allocating ram until it eventually goes to swap. Anyone founds a solution?

@lucabergamini Have you found a solution? I run into the same issue, using Keras 2.0.2 and Tensorflow 1.1.0. I've set pickle_safe=False, workers=1, but the memory usage just keeps growing.

Tried downgrading to Tensorflow 1.0.1 and Keras 1.1.1, but to no success.

@riley-x eventually I switched to another machine and now I'm not experiencing the memory allocation bug, but I also changed my code a lot implementing multi-thread. Any way my sw configuration is the following:

  • tensorflow==1.2.0rc0

  • Keras==2.0.4

And this is how my generator class looks like now (I do some GC calls, but honestly i don't know if those make any differences or if I just leave them there from previous version, you can try to remove them and see if something changes...):

def __init__(self, base_folder, generator_batch, label_size, image_shape):
    self.train_data_list = numpy.asarray(sorted([i for i in os.listdir(base_folder) if i.endswith(".jpg")]))
    self.train_labels_list = numpy.asarray(sorted([i for i in os.listdir(base_folder) if i.endswith(".npy") and "_p" not in i]))
    self.generator_batch = generator_batch
    self.base_folder = base_folder
    self.label_size = label_size
    assert len(self.train_data_list) == len(self.train_labels_list)
    self.lock = threading.Lock()
    self.image_shape = image_shape
    self.i = 0

def __iter__(self):
    return self

def get_len(self):
    return len(self.train_data_list)

def next(self):
    # LOCK
    with self.lock:
        if self.i == 0 or len(
                self.train_data_list[self.i * self.generator_batch:(self.i + 1) * self.generator_batch]) == 0:
            self.i = 0
            # shuffle
            perm = numpy.random.permutation(self.get_len())
            self.train_data_list = self.train_data_list[perm]
            self.train_labels_list = self.train_labels_list[perm]

        train_data_list_t = self.train_data_list[self.i * self.generator_batch:(self.i + 1) * self.generator_batch]
        train_labels_list_t = self.train_labels_list[
                              self.i * self.generator_batch:(self.i + 1) * self.generator_batch]
        self.i += 1
        # GC
        if self.i + 1 % 100 == 0:
            for i in xrange(10):
                gc.collect()

    # NO LOCK

    train_data = numpy.zeros((len(train_data_list_t), self.image_shape[0], self.image_shape[1], 3), dtype="float16")
    train_labels = numpy.zeros((len(train_labels_list_t), self.label_size, 4, 2), dtype="float16")
    for j in xrange(len(train_data_list_t)):
        image_t = cv2.imread(self.base_folder + train_data_list_t[j])
        label_t = numpy.load(self.base_folder + train_labels_list_t[j])[0:self.label_size]
       # DATA AUGMENTATION HERE
        train_data[j] = image_t
        train_labels[j] = label_t
    return (train_data, train_labels)

Im having similar issues with the latest tf and keras (1.2 and 2.0.4)

i define my generator:

def generate_data(N=32):

    while True:
        x  = []
        y = []

        D = generate_action_space(N)
        for d in D:
            x.append(cube2np(d[0]))
            y.append(to_categorical(possible_moves.index((str(d[1]))),len(possible_moves)))

        x = np.asarray(x)
        x = x.reshape(x.shape[0],18, 3, 1)
        x = x.astype('float32')

        y = np.asarray(y)
        y = y.reshape(y.shape[0],y.shape[2] )

        count+=1

        yield (x,y)

if i use fit_generator directly:
```
model.fit_generator(generator= generate_data(32),steps_per_epoch=50,
epochs=30,verbose=2,validation_data=None,callbacks = [tbCallBack],max_queue_size=1,use_multiprocessing=True,workers=6,initial_epoch =0)

i find that my ram increases continually until my system runs out of memory.

A work around that I found is using fit_generator is using a for loop : 

for j in range(10):
model.fit_generator(generator= generate_data(32),steps_per_epoch=50,
epochs=3,verbose=2,validation_data=None,callbacks = [tbCallBack],max_queue_size=1,use_multiprocessing=True,workers=6,initial_epoch =0)
```
This limits my ram and i am able to train as much as I want. My accuracy seems to be steadily increasing. However, this doesn't behave nicely with the tensorboard background. It also seems bodgy and like it might not be the best way to implement this. Does anyone have suggestions how to make it work with tensorboard and if there is a better way to do this?

I also have an issue I believe is the same as or related to this one.

If I use fit_generator with use_multiprocessing in a Jupyter notebook, then stop the execution (the equivalent of a KeyboardInterrupt) and restart the kernel, then Tensorflow still has GPU memory allocated. This keeps me from running code again until I restart Jupyter. Usually this wouldn't be a problem, but it's an issue if I have other code in the middle of the long training process.

Any solution to this yeT?

same problem...
running into ResourceExhaustedError: OOM

Same here, running into an OOM after a few iterations. I am using a Sequence generator in my case:

    def __getitem__(self, i):        
        for n, (image_path, label_path) in enumerate(zip(self.image_path_list[i*self.batch_size:(i+1)*self.batch_size], self.label_path_list[i*self.batch_size:(i+1)*self.batch_size])):

            image = cv2.imread(image_path, 1)
            label = cv2.imread(label_path, 0)
            if self.resize_shape:
                image = cv2.resize(image, self.resize_shape)
                label = cv2.resize(label, self.resize_shape)

            # Do augmentation (only if training)
            if self.mode == 'training':
                if self.horizontal_flip and random.randint(0,1):
                    image = cv2.flip(image, 1)
                    label = cv2.flip(label, 1)
                if self.vertical_flip and random.randint(0,1):
                    image = cv2.flip(image, 0)
                    label = cv2.flip(label, 0)
                if self.brightness:
                    factor = 1.0 + abs(random.gauss(mu=0, sigma=self.brightness))
                    if random.randint(0,1):
                        factor = 1.0/factor
                    image = (255.0*((image/255.0)**factor)).astype(np.uint8)
                if self.rotation:
                    angle = random.gauss(mu=0.0, sigma=self.rotation)
                else:
                    angle = 0.0
                if self.zoom:
                    scale = random.gauss(mu=1.0, sigma=self.zoom)
                else:
                    scale = 1.0
                if self.rotation or self.zoom:
                    M = cv2.getRotationMatrix2D((image.shape[1]//2, image.shape[0]//2), angle, scale)
                    image = cv2.warpAffine(image, M, image.shape[:2])
                    label = cv2.warpAffine(label, M, label.shape[:2])
                if self.crop_shape:
                    image, label = _random_crop(image, label, self.crop_shape)

            self.X[n] = image
            self.Y1[n] = to_categorical(cv2.resize(label, (label.shape[1]//4, label.shape[0]//4)), self.n_classes).reshape((label.shape[0]//4, label.shape[1]//4, -1))   
            self.Y2[n] = to_categorical(cv2.resize(label, (label.shape[1]//8, label.shape[0]//8)), self.n_classes).reshape((label.shape[0]//8, label.shape[1]//8, -1))
            self.Y3[n] = to_categorical(cv2.resize(label, (label.shape[1]//16, label.shape[0]//16)), self.n_classes).reshape((label.shape[0]//16, label.shape[1]//16, -1))         

        return self.X, [self.Y1, self.Y2, self.Y3]
net.fit_generator(train_generator, len(train_generator), opt.epochs, callbacks=[checkpoint, tensorboard, lr_decay], validation_data=val_generator, validation_steps=len(val_generator), workers=opt.n_cpu, use_multiprocessing=True, shuffle=True)

Are you aware of this issue @fchollet?

Had the same issue. Added gc.collect() to the end of my custom generator and it helped to get rid of memory errors.

Exact same issue here. Anyone has a solution?

@oleg-panichev solution worked for me, but after setting use_multiprocessing to false

Hello there. I had the same problem and ran out of memory after some time. I have sticked with the solution of @carlthome and written a custom MP queue, which I use with fit_generator and use_mutliprocessing=False. To save some of you some hassle, you can find the code attached. Since for my application each batch is generated randomly, the current implementation does not support indexed batches.
For me this implementation works and I have not experienced out-of-memory problems since. However, as documented in https://github.com/keras-team/keras/issues/6499#issuecomment-351068255 Python's multiprocessing needs to be handled with care.

from keras.utils import Sequence    
from multiprocessing import Process, Queue
from abc import ABCMeta, abstractmethod

class ClBatchGenerator(Process):
    __metaclass__ = ABCMeta

    @abstractmethod
    def generateBatch(self):
        "Please implement me."
        batch = None
        return batch

    def __init__(self, batchQueue, notificationQueue):
        Process.__init__(self)
        self._batchQueue = batchQueue
        self._notificationQueue = notificationQueue

    def run(self):
        while True:
            msg = self._notificationQueue.get()
            if msg=="terminate":
                break
            elif msg=="generateBatch":
                batch = self.generateBatch()
                self._batchQueue.put(batch)
            else:
                raise ValueError("Message %s unknown"%msg)

class ClMPDataGenerator(Sequence):
    """
    Multiprocessing data generator to be passed to fit_generator. 
    This spawns multiple instances of ClBatchGenerator.
    """

    def __init__(self, workers, max_queue_size, steps_per_epoch)
        self._workers = workers
        self._max_queue_size = max_queue_size
        self._steps_per_epoch = steps_per_epoch
        self._batchQueue = Queue(max_queue_size)
        self._notificationQueue = Queue()
        self._workerList = []
        for i in range(workers):
            oWorker = ClBatchGenerator(self._batchQueue, self._notificationQueue)
            oWorker.daemon = True #makes sure the workers terminate if the main process is exited
            oWorker.start()
            self._workerList.append(oWorker)
        for i in range(max_queue_size):
            self._notificationQueue.put("generateBatch")

    def __len__(self):
        return self._steps_per_epoch

    def __getitem__(self, idx):
        batch = self._batchQueue.get()
        self._notificationQueue.put("generateBatch")
        return batch

    def __del__(self):
        #put termination message
        for i in range(self._workers)
            self._notificationQueue.put("terminate")
        #empty queues, otherwise the workers won't terminate
        for i in range(self._max_queue_size):
            self._batchQueue.get(timeout=10)

@check0104 thanks for sharing. Do you by any chance have the code that was causing the OOM so we can have it for reference to help debug? I am compiling a sample on my own and would love to see if the issue appears with similar configs?

I'm having the same problem. If I set an higher number of workers the training process seems faster but then it get stuck. It looks like the GPU is not able to consume all the data in the queue, however it never achieves 100% utilisation. Did someone find the solution?

    model.fit_generator(generator=training_generator,
                        validation_data=validation_generator,
                        epochs=epochs,
                        verbose=1,
                        max_queue_size=10,
                        use_multiprocessing=True,
                        workers=workers,
                        shuffle=False,
                        callbacks=callbacks)

I have just discovered that the memory (RAM) consumption keeps increasing during the training. Then, at a certain point, the memory get full and the training process get stuck. I guess that the problem is in the generator when I use h5py to read an image from a one of the .h5 files (__data_generation method), however I don't know how to solve it

import keras
import numpy as np
import h5py
import os


class DataGeneratorC(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, h5files, batch_size=32, shuffle=True):
        self.batch_size = batch_size
        self.h5files = h5files
        self.indexes = np.array([], dtype=np.int64).reshape(0, 3)
        self.shuffle = shuffle
        self.n_images = 0
        self.generate_indexes()
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        # total number of images in the dataset
        return int(np.floor(self.n_images/self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # index goes from 0 to the number of batches
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Generate data
        x, y = self.__data_generation(indexes)

        return x, y

    def generate_indexes(self):
        # content not important 

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        if self.shuffle:
            np.random.shuffle(self.indexes) 

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'
        # Initialization
        x = np.empty([self.batch_size, 100, 100])
        y = np.empty([self.batch_size], dtype=int)

        # Generate data
        for i, row in enumerate(indexes):
            filename = self.h5files[row[0]]

            h5f = h5py.File(filename, 'r')
            # Store image
            x[i, ] = h5f['section/array_images'][row[1]]
            h5f.close()
            # Store class
            y[i] = row[2]

        x = x.reshape(x.shape[0], 1, 100, 100)

        return x, y

Hello,
This issue is over a year old, you may want to submit a new issue with an up-to-date version of Keras/TF.

Please submit a reproducible example.
Side note: HDF5 are not easily shareable across processes and this may be the cause of your issue.

I have just discovered that the memory (RAM) consumption keeps increasing during the training. Then, at a certain point, the memory get full and the training process get stuck. I guess that the problem is in the generator when I use h5py to read an image from a one of the .h5 files (__data_generation method), however I don't know how to solve it

import keras
import numpy as np
import h5py
import os


class DataGeneratorC(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, h5files, batch_size=32, shuffle=True):
        self.batch_size = batch_size
        self.h5files = h5files
        self.indexes = np.array([], dtype=np.int64).reshape(0, 3)
        self.shuffle = shuffle
        self.n_images = 0
        self.generate_indexes()
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        # total number of images in the dataset
        return int(np.floor(self.n_images/self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # index goes from 0 to the number of batches
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Generate data
        x, y = self.__data_generation(indexes)

        return x, y

    def generate_indexes(self):
        # content not important 

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        if self.shuffle:
            np.random.shuffle(self.indexes) 

    def __data_generation(self, indexes):
        'Generates data containing batch_size samples'
        # Initialization
        x = np.empty([self.batch_size, 100, 100])
        y = np.empty([self.batch_size], dtype=int)

        # Generate data
        for i, row in enumerate(indexes):
            filename = self.h5files[row[0]]

            h5f = h5py.File(filename, 'r')
            # Store image
            x[i, ] = h5f['section/array_images'][row[1]]
            h5f.close()
            # Store class
            y[i] = row[2]

        x = x.reshape(x.shape[0], 1, 100, 100)

        return x, y

Have found out how to solve it? I run into the exact same issue. My generator is pretty similar to yours. It doesn't matter if use just one worker and max_q to 1 and multi processing to false. the memory is just being filled with every batch.

I would really appreciate any help. As it is right now, the fit generator is a worse choice compared to the normal fit function.

Since for my application each batch is generated randomly, the current implementation does not support indexed batches.
https://github.com/keras-team/keras/issues/3675#issuecomment-352185331
@csandmann isn't there a probability of batches getting repeated, How do you keep track of batches?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

harishkrishnav picture harishkrishnav  路  3Comments

oweingrod picture oweingrod  路  3Comments

Imorton-zd picture Imorton-zd  路  3Comments

amityaffliction picture amityaffliction  路  3Comments

fredtcaroli picture fredtcaroli  路  3Comments