Keras: Issue while using keras pre-trained model with pyspark

Created on 8 Dec 2017  路  5Comments  路  Source: keras-team/keras

I trying to load Resnet50 pre-trained keras model in pyspark. I get the following error when i try to execute model.predict() function.

'PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects'

I tried to add serialization to keras model as per following blog
http://zachmoshe.com/2017/04/03/pickling-keras-models.html

However, then i get the following error

AttributeError: 'Model' object has no attribute '_feed_input_names'

Has anybody faced a similar issue and know the solution?

Most helpful comment

I am not really experienced on this topic but let me try to reproduce with a bit of code what @mandareln is trying, and later I will explain my problem.

```
from keras.models import load_model

Credit to Zach Moshe: http://zachmoshe.com/2017/04/03/pickling-keras-models.html

import types
import tempfile
import keras.models

def make_keras_picklable():
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
keras.models.save_model(self, fd.name, overwrite=True)
model_str = fd.read()
d = { 'model_str': model_str }
return d

def __setstate__(self, state):
    with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
        fd.write(state['model_str'])
        fd.flush()
        model = keras.models.load_model(fd.name)
    self.__dict__ = model.__dict__


cls = keras.models.Model
cls.__getstate__ = __getstate__
cls.__setstate__ = __setstate__

make_keras_picklable()

path_to_model = "path/to/model.h5"
model = load_model(path_to_model)
model_bcast = sc.broadcast(model)

#

Assume we already have an RDD called my_rdd with the desired information

#

def map_for_partition_predict(partition):
make_keras_picklable()
for row in partition:
prediction = model_bcast.value.predict(row)
yield prediction

my_rdd = my_rdd.mapPartitions(map_for_partition_predict)
````
This piece of code should achieve what we are trying here, use a pre-trained model in a pyspark environment to predict values on an RDD.

However, in my case, I end with OOM errors, I am not really sure why this happens, my guess is each of the executors tries to instantiate a different TensorFlow backend, consuming all the resources available on my GPU's.

All 5 comments

You have to run make_keras_picklable() on each worker as well. Otherwise, the __setstate__ method of Model object on worker node is not updated and thus will not be deserialized as expected.

@LiamHe Would you mind to explain more detail? Thank you.

I am not really experienced on this topic but let me try to reproduce with a bit of code what @mandareln is trying, and later I will explain my problem.

```
from keras.models import load_model

Credit to Zach Moshe: http://zachmoshe.com/2017/04/03/pickling-keras-models.html

import types
import tempfile
import keras.models

def make_keras_picklable():
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
keras.models.save_model(self, fd.name, overwrite=True)
model_str = fd.read()
d = { 'model_str': model_str }
return d

def __setstate__(self, state):
    with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
        fd.write(state['model_str'])
        fd.flush()
        model = keras.models.load_model(fd.name)
    self.__dict__ = model.__dict__


cls = keras.models.Model
cls.__getstate__ = __getstate__
cls.__setstate__ = __setstate__

make_keras_picklable()

path_to_model = "path/to/model.h5"
model = load_model(path_to_model)
model_bcast = sc.broadcast(model)

#

Assume we already have an RDD called my_rdd with the desired information

#

def map_for_partition_predict(partition):
make_keras_picklable()
for row in partition:
prediction = model_bcast.value.predict(row)
yield prediction

my_rdd = my_rdd.mapPartitions(map_for_partition_predict)
````
This piece of code should achieve what we are trying here, use a pre-trained model in a pyspark environment to predict values on an RDD.

However, in my case, I end with OOM errors, I am not really sure why this happens, my guess is each of the executors tries to instantiate a different TensorFlow backend, consuming all the resources available on my GPU's.

@jose-goncabel @LiamHe make_keras_picklable() works well to solve PicklingError: But I still have no idea how to use keras pre-trained model in Spark. I am trying to run the model in Spark structured streaming, but how to broadcast model in structured streaming. And even I can do that, I guess I would encounter the same OOM errors. So what is the correct way to run keras pre-trained model in Spark structured streaming?

@ZedYeung. I'm encountering the same problem as yours. Have you managed to resolve it ? If so, can you tell me how to bypass this problem. Thank you in advance.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

braingineer picture braingineer  路  3Comments

fredtcaroli picture fredtcaroli  路  3Comments

zygmuntz picture zygmuntz  路  3Comments

NancyZxll picture NancyZxll  路  3Comments

MarkVdBergh picture MarkVdBergh  路  3Comments