In reference to the issues #125 and #1045 I would like to ask whether or not someone already has tested the current alpha version with Spark.
As far as I tried, running nlp() during a map job still doesn't run:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
Is this still an issue or am I doing something wrong?
Below is my example code:
nlp = en_core_web_sm.load()
conf = SparkConf().setAppName("PythonSpacy").setMaster("local[*]")
sc = SparkContext(conf=conf)
lines = sc.textFile(lines)
print(lines.count())
counts = lines.map(lambda x: nlp(x, parse=True)).saveAsTextFile("out")
sc.stop()
I am also getting that error.
def prepare_doc(doc):
doc = nlp(unicode(doc))
l = [[t.lemma_ for t in s if not punct_space(t)] for s in doc.sents]
flat_list = [item for sublist in l for item in sublist if item not in spacy.lang.en.STOP_WORDS]
return u' '.join(flat_list)
sqlContext.udf.register("prepare_doc", prepare_doc, StringType())
udf_prepare_doc = udf(prepare_doc, StringType())
Exception:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
Using AWS Databricks 3.1 and Python 2.7.12
Thanks for testing this.
spaCy uses dill to pickle. Is there a way to replace the serializer used by PySpark in this context, resulting in calls to dill.dump() and dill.load() instead of cloudpickle.dump() and cloudpickle.load()?
Edit: Looking at the code here: https://people.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/serializers.html
It looks like we need something like:
class DillSerializer(PickleSerializer):
def dumps(self, obj):
return dill.dumps(obj)
One work-around to avoid serializing spacy.Language or spacy.Doc objects:
def spacy_map(data, f):
def g(items):
nlp = spacy.load('en_core_web_sm', parser=False)
for item in items:
yield f(nlp, item)
return data.mapPartitions(g)
def do_stuff(nlp, d):
doc = nlp(d['text'])
# etc ...
docs = sc.textFile('/documents')
results = spacy_map(docs, do_stuff)
While we do have to run spacy.load once per partition, that's not so bad for the latest versions of spacy and amortizes away over large partitions.
Pickle is now fixed on develop, for both the Language class and the Doc. :tada:
Pickling should be quite fast, with no redundant copies of the Vocab --- so you can pickle a list of Doc objects, and it'll do the right thing and only have one copy of all the shared data.
I went and built the latest version of spaCy from the develop branch. I then downloaded the tar for the en_core_web_sm-2.0.0 model and removed the version requirement from requires.txt since it was installing spaCy from pypi when loading it in the regular manner. I then python3 setup.py install the model as well.
I then attempted a simple map within my Jupyter notebook while running Spark 2.2.0:
nlp = spacy.load('en_core_web_sm')
rdd = df.rdd.map(lambda x: nlp(x)).collect()
I still got the same error as the OP on this issue: PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 148, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.5/pickle.py", line 408, in dump
self.save(obj)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 255, in save_function
self.save_function_tuple(obj)
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 292, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 725, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 770, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.5/pickle.py", line 797, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 249, in save_function
self.save_function_tuple(obj)
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 297, in save_function_tuple
save(f_globals)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.5/pickle.py", line 841, in _batch_setitems
save(v)
File "/usr/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 810, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.5/pickle.py", line 836, in _batch_setitems
save(v)
File "/usr/lib/python3.5/pickle.py", line 520, in save
self.save_reduce(obj=obj, *rv)
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 582, in save_reduce
save(args)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.5/pickle.py", line 740, in save_tuple
save(element)
File "/usr/lib/python3.5/pickle.py", line 475, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 368, in save_builtin_function
return self.save_function(obj)
File "/usr/local/lib/python3.5/dist-packages/pyspark/cloudpickle.py", line 247, in save_function
if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
I am also facing a similar issue, where I get the following error:
Traceback (most recent call last):
File "/var/folders/fd/3pl667f517s9wk70z83rkm51smftcr/T/kernel-PySpark-56cb2d2e-f3e9-4951-9b6a-0b0e9596d4d7/pyspark_runner.py", line 196, in <module>
state.markSuccess(code_info.codeId(), str(the_last_expression_to_assign_temp_value))
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/rdd.py", line 199, in __repr__
return self._jrdd.toString()
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/rdd.py", line 2455, in _jrdd
self._jrdd_deserializer, profiler)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/rdd.py", line 2388, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/serializers.py", line 460, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/cloudpickle.py", line 704, in dumps
cp.dump(obj)
File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/cloudpickle.py", line 162, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has no attribute '__code__'
Spacy version : 2.0.5
Language Model : en_core_web_lg
Spark : 2.2.1
Python 3.6
Has anyone found a solution to this issue? Thanks.
@eiso @bosco-raju I'm not aware of direct solution to this, but some workarounds exist:
Here is my code inspired by this solution:
sc = pyspark.SparkContext(appName="Spacy")
# Spacy isn't serializable but loading it is semi-expensive
class SpacyMagic(object):
"""
Simple Spacy Magic to minimize loading time.
>>> SpacyMagic.get("en")
<spacy.en.English ...
"""
_spacys = {}
@classmethod
def get(cls, lang):
if lang not in cls._spacys:
import spacy
cls._spacys[lang] = spacy.load(lang, disable=['parser', 'tagger', 'ner'])
return cls._spacys[lang]
def run_spacy(sent):
nlp = SpacyMagic.get('en')
return [wrd.text for wrd in nlp(sent)]
data = ["Hello world %d!" % (i) for i in range(10000)]
res = sc.parallelize(data).map(run_spacy)
print(res.take(100))
sc.stop()
Hope this will help
@gsoul If that works I just have to try that tomorrow right away. But how "semi-expensive" is it?
def run_spacy(sent):
nlp = SpacyMagic.get('en')
return [wrd.text for wrd in nlp(sent)]
Wouldn't this load an entire model on each call basically?
And what would be the difference to this "slight" change:
def run_spacy(sent):
nlp = spacy.load('en', disable=['parser', 'tagger', 'ner'])
return [wrd.text for wrd in nlp(sent)]
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
Most helpful comment
@eiso @bosco-raju I'm not aware of direct solution to this, but some workarounds exist:
Here is my code inspired by this solution:
Hope this will help