I trying to load Resnet50 pre-trained keras model in pyspark. I get the following error when i try to execute model.predict() function.
'PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects'
I tried to add serialization to keras model as per following blog
http://zachmoshe.com/2017/04/03/pickling-keras-models.html
However, then i get the following error
AttributeError: 'Model' object has no attribute '_feed_input_names'
Has anybody faced a similar issue and know the solution?
You have to run make_keras_picklable() on each worker as well. Otherwise, the __setstate__ method of Model object on worker node is not updated and thus will not be deserialized as expected.
@LiamHe Would you mind to explain more detail? Thank you.
I am not really experienced on this topic but let me try to reproduce with a bit of code what @mandareln is trying, and later I will explain my problem.
```
from keras.models import load_model
import types
import tempfile
import keras.models
def make_keras_picklable():
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
keras.models.save_model(self, fd.name, overwrite=True)
model_str = fd.read()
d = { 'model_str': model_str }
return d
def __setstate__(self, state):
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
fd.write(state['model_str'])
fd.flush()
model = keras.models.load_model(fd.name)
self.__dict__ = model.__dict__
cls = keras.models.Model
cls.__getstate__ = __getstate__
cls.__setstate__ = __setstate__
make_keras_picklable()
path_to_model = "path/to/model.h5"
model = load_model(path_to_model)
model_bcast = sc.broadcast(model)
def map_for_partition_predict(partition):
make_keras_picklable()
for row in partition:
prediction = model_bcast.value.predict(row)
yield prediction
my_rdd = my_rdd.mapPartitions(map_for_partition_predict)
````
This piece of code should achieve what we are trying here, use a pre-trained model in a pyspark environment to predict values on an RDD.
However, in my case, I end with OOM errors, I am not really sure why this happens, my guess is each of the executors tries to instantiate a different TensorFlow backend, consuming all the resources available on my GPU's.
@jose-goncabel @LiamHe make_keras_picklable() works well to solve PicklingError: But I still have no idea how to use keras pre-trained model in Spark. I am trying to run the model in Spark structured streaming, but how to broadcast model in structured streaming. And even I can do that, I guess I would encounter the same OOM errors. So what is the correct way to run keras pre-trained model in Spark structured streaming?
@ZedYeung. I'm encountering the same problem as yours. Have you managed to resolve it ? If so, can you tell me how to bypass this problem. Thank you in advance.
Most helpful comment
I am not really experienced on this topic but let me try to reproduce with a bit of code what @mandareln is trying, and later I will explain my problem.
```
from keras.models import load_model
Credit to Zach Moshe: http://zachmoshe.com/2017/04/03/pickling-keras-models.html
import types
import tempfile
import keras.models
def make_keras_picklable():
def __getstate__(self):
model_str = ""
with tempfile.NamedTemporaryFile(suffix='.hdf5', delete=True) as fd:
keras.models.save_model(self, fd.name, overwrite=True)
model_str = fd.read()
d = { 'model_str': model_str }
return d
make_keras_picklable()
path_to_model = "path/to/model.h5"
model = load_model(path_to_model)
model_bcast = sc.broadcast(model)
#
Assume we already have an RDD called my_rdd with the desired information
#
def map_for_partition_predict(partition):
make_keras_picklable()
for row in partition:
prediction = model_bcast.value.predict(row)
yield prediction
my_rdd = my_rdd.mapPartitions(map_for_partition_predict)
````
This piece of code should achieve what we are trying here, use a pre-trained model in a pyspark environment to predict values on an RDD.
However, in my case, I end with OOM errors, I am not really sure why this happens, my guess is each of the executors tries to instantiate a different TensorFlow backend, consuming all the resources available on my GPU's.