Google-cloud-python: Can't use/pickle datastore Clients on google dataflow (apache beam)

Created on 23 Mar 2017  Â·  29Comments  Â·  Source: googleapis/google-cloud-python

I am attempting to use datastore.Client() from within a google cloud dataflow (apache beam) pipeline.

It attempts to pickle objects being passed around (lexically or arguments) to processing stages, but unfortunately the Client is not pickleable:

  File "lib/apache_beam/transforms/ptransform.py", line 474, in __init__
    self.args = pickler.loads(pickler.dumps(self.args))
  File "lib/apache_beam/internal/pickler.py", line 212, in loads
    return dill.loads(s)
  File "/Users/me/Library/Python/2.7/lib/python/site-packages/dill/dill.py", line 277, in loads
    return load(file)
  File "/Users/me/Library/Python/2.7/lib/python/site-packages/dill/dill.py", line 266, in load
    obj = pik.load()
  File "/usr/local/Cellar/python/2.7.12_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/local/Cellar/python/2.7.12_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1089, in load_newobj
    obj = cls.__new__(cls, *args)
  File "src/python/grpcio/grpc/_cython/_cygrpc/channel.pyx.pxi", line 35, in grpc._cython.cygrpc.Channel.__cinit__ (src/python/grpcio/grpc/_cython/cygrpc.c:4022)
TypeError: __cinit__() takes at least 2 positional arguments (0 given)

I believe the correct fix is to discard the Connection when serializing, and rebuild it when deserialized.

I could attempt to recreate the Client within each processing pipeline, but that can cause O(Records) Client creations...and since in my testing I see:

DEBUG:google_auth_httplib2:Making request: POST https://accounts.google.com/o/oauth2/token

printed on each creation, then I imagine Google SRE would really prefer we not do this O(N) times.

This is a tricky cross-team interaction issue (only occurs for those pickling Clients, in my case: google-cloud-datastore and apache-beam google-dataflow), so not sure the proper place to file this. I've cross-posted it to the apache beam JIRA as well https://issues.apache.org/jira/browse/BEAM-1788, though the issue is in the google cloud datastore code.

Mac 10.12.3, Python 2.7.12, google-cloud-dataflow 0.23.0

will not fix datastore

Most helpful comment

@mikelambert The correct and idiomatic solution is to build client creation into your DoFn:

class MyDoFn(beam.DoFn):

  def start_bundle(self, process_context):
     self._dsclient = datastore.Client()

  def process(self, context, *args, **kwargs):
     # do stuff with self._dsclient

No change is needed here from the client library team. This this is a very standard pattern in beam. It will not result in an O(records) cost, only an O(shards) cost, which the beam runner will likely factor in when deciding how large to make bundles.

I do however have a proposal to allow creation of beam.DoFns from python generators:see: https://github.com/elibixby/incubator-beam/pull/1 which gets you a little bit better of a syntax maybe.

Still the team is very justified in not reopening.

EDIT: Also @mikelambert RE serialization, Beam allows you to write your own "coders" to control serialization method. This would still be a better solution than changing the client to be pickleable, as you could serialize ONLY the information that was unique to your clients, rather than the entire client.

All 29 comments

Ahh seems as a temporary workaround, I can disable GRPC altogether:

os.environ['GOOGLE_CLOUD_DISABLE_GRPC'] = 'true'

(Though having it serialize/deserialize correctly, and reconnect using the saved oauth credentials, would be a much preferable solution.)

@mikelambert The issue with pickling is not the credentials, it's the gRPC transport object.

A simple hack for you would be just to do

client._datastore_api_internal = None
pickle.dumps(client)

This way you don't have to go with the "inferior" HTTP transport.

I'm not sure how much we want to prioritize being pickle-able. Though maybe serialization is a worthwhile discussion. @lukesneeringer @tseaver WDYT?

I'm not sure how much we want to prioritize being pickle-able. Though maybe serialization is a worthwhile discussion. @lukesneeringer @tseaver WDYT?

I am pretty uninterested in putting any priority on being pickle-able.

I am going to close this. @dhermes has posted a workaround, and I think that is the extent that I want to make pickle an authorized way to do something. :-)

Would you be interested in making that workaround happen automatically in __getstate__?

Or will users be forced to investigate and discover this (generally unindexed-by-Google) github issue in order to get these two Google products working with each other?

Anecdotally, these are the things that frustrate me about "buying into" the Google cloud infrastructure (and I say this as a former Google SWE who is more-committed-emotionally-than-most). I'm already jumping through hoops to switch off one deprecated product (GAE Managed VMs and appengine mapreduce library and DB/NDB) onto another beta product (Google Cloud Dataflow Python SDK, which necessitates the gcloud python SDK), and stumbling across issues like these.

I guess the question I would ask is:

Pickling is an anti-pattern. It is not a particularly good (or safe!) serialization protocol. Do you _have_ to pickle this thing? Is that something that is forced by the Dataflow pipeline?

If it is something that we really cannot get around for reasons outside our control, then the case for making sure these objects are pickleable is better. But if you have the option to serialize this in some more controlled way, you should.

I'm relatively new to Dataflow, but AFAIK pickling is the serialization method used by Dataflow. They run multiple worker instances, all on the same GCE image, with the same python and package code versions, so I'm guaranteed identical code running on both ends of the pickling. In Dataflow, I yield/return python objects from one function in the pipeline, and they are magically sent as an argument to another function (possibly running on a different worker instance).

It's possible I could manually serialize everything myself, into protobufs or strings or another such serialization format. But those strings would then still be pickled anyway...so... :)

(I debugged and fixed YT pickling bwcompat issues when migrating from python2.2->2.4 at Google, so yes I'm aware of many reasons to consider pickling an anti-pattern :P, but I'm not sure how many apply of those reasons apply here...)

CC-ing Google Dataflow engineers (@aaltay and @robertwb) who worked on pickling, that can probably speak to the decisions and justifications better than myself as an ex-Google just-a-Dataflow-newbie.

Yeah, it does not help much to say that pickling is an anti-pattern if you have no choice. :-) That is a valid argument.

@dhermes Does this change your calculus?

@lukesneeringer It does not change my stance on pickle. Users of dataflow could always just create a PickleClient subclass of our Client with __setstate__ / __getstate__ defined on it. We could even document this in some deep dark place, but I don't want to encourage use of pickle by explicitly supporting it.

Most arguments against pickling (java serialization, etc.) center around the fact that it's a bad format for shipping or persisting data, but Dataflow uses it to ship (references to) "code" around (e.g. a users particular subclass of DoFn) to be executed on remote machines.

The difficulty here is that a user's DoFn may hold a datastore client object preventing us from serializing their Fn. One could of course force authors of such DoFns to implement __setstate__/__getstate__ to serialize and re-instanciate the client object, but that would place the burden on every user rather than the library.

Aside from the inanity of two google products intentionally not working well together, and asking users to lookup and write extra workaround code, because of a philosophical belief about built-in language features you want to impose on other devs...

...this also means you will be teaching these users (who may have never used pickle before in their life) to learn more about pickle and how to write getstate/setstate code. So it's quite possible your decision will backfire and educate more people about how to use pickle, than if you implemented and hid the getstate/setstate yourself and kept them unaware of the details.

For example: I wouldn't even be having this conversation about pickle (posting here or stackoverflow or learning about getstate), and wouldn't even have known I was using pickle, if the client had been pickleable... :)

For example: I wouldn't even be having this conversation about pickle (posting here or stackoverflow or learning about getstate), and wouldn't even have known I was using pickle, if the client had been pickleable... :)

It's also worth pointing out that you wouldn't have even known your were using pickle if you were never using pickle in the first place.

because of a philosophical belief about built-in language features you want to impose on other devs

It's not a philosophical belief. Pickle is objectively bad and insecure. While I agree that it's frustrating that these two Google products don't work together in every case, the alternative of encouraging users to put their heads in the sand while handing them the gun to shoot themselves in the foot with isn't acceptable either.

Client are not serializable, nor do I think they should be. Several of the resources a client use (sockets, credentials, thread-locals, etc.) are inextricably tied to the current process and implementing pickle support will just lead to sneaky, unexpected bugs.

I'd recommend your DoFn construct a new client. If you're worried about the auth server being overloaded, don't, it'll give you a rate limit exceeded if you manage to surprise it. If you do, come back, I have two workarounds.

Note that many DoFns are simply lambdas that capture necessary state by closure. Forcing one to create a full class and implement __getstate__/__setstate__ (or adding our own "serialize this class" API to avoid Python's pickle) greatly adds to the verbosity, once they even discover that's what they need to do. Suggesting client._datastore_api_internal = None seem seven more fragile.

Seems better for the user to put this reconnecting capability into the library rather than having every user re-implement it. Unless somehow clients are deeply stateful which would be unfortunate.

Unless somehow clients are deeply stateful which would be unfortunate.

There are some very stateful clients, especially those that deal with transactions and batches.

I'm not overly familiar with dataflow, but is it not unusual to construct a client locally and try to use that remotely? That seems like a recipe for pain regardless of if this library supported serialization.

That all depends on the properties of a client. Suppose I have a library that interacts with Datastore. When I invoke it, I could either give it a Client object, or I could give it the full set of parameters used to construct a client object and let it create the Client object itself. The latter seems a lot less clean (testable, etc.), but is what you're imposing if the library possibly does any remote processing.

So I think the final answer here is this (please let me know if I'm misrepresenting @lukesneeringer @dhermes):

  1. It's extremely unlikely we'll support pickling of clients in this library.
  2. @dhermes and I discussed adding a helpful error message if you try to pickle one of these clients.
  3. Our recommendation for dataflow is to create the client inside of your DoFn instead of outside of it. If there's any samples/tutorial that show otherwise, let us know so we can fix it.
  4. We've yet to hear anyone run into rate limits on the auth server for service accounts, so we'd be surprised if that happened. However, if it does happen we have several ways to mitigate that.

/cc @elibixby for some strong dataflow knowledge and perspective.

@mikelambert The correct and idiomatic solution is to build client creation into your DoFn:

class MyDoFn(beam.DoFn):

  def start_bundle(self, process_context):
     self._dsclient = datastore.Client()

  def process(self, context, *args, **kwargs):
     # do stuff with self._dsclient

No change is needed here from the client library team. This this is a very standard pattern in beam. It will not result in an O(records) cost, only an O(shards) cost, which the beam runner will likely factor in when deciding how large to make bundles.

I do however have a proposal to allow creation of beam.DoFns from python generators:see: https://github.com/elibixby/incubator-beam/pull/1 which gets you a little bit better of a syntax maybe.

Still the team is very justified in not reopening.

EDIT: Also @mikelambert RE serialization, Beam allows you to write your own "coders" to control serialization method. This would still be a better solution than changing the client to be pickleable, as you could serialize ONLY the information that was unique to your clients, rather than the entire client.

At least a more helpful message would be nice.

On Fri, Mar 24, 2017 at 3:11 PM, Eli Bixby notifications@github.com wrote:

@mikelambert https://github.com/mikelambert The correct and idiomatic
solution is to build client creation into your DoFn:

class MyDoFn(beam.DoFn):

def start_bundle(self, process_context):
self._dsclient = datastore.Client()

def process(self, context, args, *kwargs):
# do stuff with self._dsclient

No change is needed here from the client library team. This this is a very
standard pattern in beam. It will not result in an O(records) cost, only an
O(shards) cost, which the beam runner will likely factor in when deciding
how large to make bundles.

I do however have a proposal to allow creation of beam.DoFns from python
generators:see: elibixby/incubator-beam#1
https://github.com/elibixby/incubator-beam/pull/1 which gets you a
little bit better of a syntax maybe.

Still the team is very justified in not reopening.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191#issuecomment-289155346,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAdqgZxzFDruxTz-qlElABna5en3-jTjks5rpD-pgaJpZM4MmP-_
.

@robertwb I think requesting a more helpful message in Beam is reasonable (Something along the lines of "Serialization failed: consider creating this property within a beam.DoFn.start_bundle call or implementing a custom beam.Coder" ) although it might be hard to implement, and would probably be pretty low on the team's priority list (but PRs accepted!) You're a Beam engineer so you know that =P

But I don't think we should be encouraging devs to assume everything is picklable, and I don't think every object that isn't picklable needs to throw a readable message when pickling is tried and fails.

So I think the final answer here is this (please let me know if I'm misrepresenting @lukesneeringer @dhermes):

  1. It's extremely unlikely we'll support pickling of clients in this library.

This is still my current position. I could be talked out of it, and I would like for the interop to be easier, but not enough to make clients pickleable.

Thanks for the discussion and good ideas, appreciate your ideas..

I wasn't aware of the start_bundle approach, definitely sounds cleanest for
my purposes here to avoid excessive reauthentication!

And yeah, a message when pickling about how one should recreate clients
instead of pickling them would be great, including assurances about
authentication being cheap. :) Should I file a bug on this, or are you
tracking it yourselves?

2017年3月26日(日) 3:49 Luke Sneeringer notifications@github.com:

So I think the final answer here is this (please let me know if I'm
misrepresenting @lukesneeringer https://github.com/lukesneeringer
@dhermes https://github.com/dhermes):

  1. It's extremely unlikely we'll support pickling of clients in this
    library.

This is still my current position. I could be talked out of it, and I
would like for the interop to be easier, but not enough to make clients
pickleable.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191#issuecomment-289231558,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AHlba_OS2Qe0jVugbDcDdazBAdEFw9TFks5rpWHLgaJpZM4MmP-_
.

I just added #3211 to track it.

In case somebody ends up here with a problem similar to mine, I believe this happens when using multiple cpu-cores in threading. For example, using multiprocessing.ThreadPool works but multiprocessing.Pool throws this error (in my case, for BigQuery).

So make sure it's a single core execution? (I'm shooting in the blind here, but using Pool caused this exception for me and ThreadPool resolved it.)

multiprocessing.Pool is probably pickling objects to pass them to the other processes in the worker pool, while ThreadPool is using regular shared memory to pass in-memory python objects. This would explain why you saw this problem, and why your change fixes it.

@elibixby Thank you for the start_bundle example -- that is exactly what I needed!

Hi,
I just wanted to post here because this answer was hugely useful for me as well, and I did not find this in any of the regular apache beam tutos.
In fact, I first saw the "start_bundle" method mentioned here, and then had to look into the source code to see how it works. Since it is seems to be a quite important method, it would maybe be nice to have it mentioned more explicitly in the doc.

I'm not sure this is high enough priority to start a new issue (let me know if you'd like me to), but Jobs are also not picklable:

In [16]: job = client.copy_table(source_table,source_table)

In [17]: job
Out[17]: <google.cloud.bigquery.job.CopyJob at 0x1163a3320>

In [18]: pickle.dumps(job)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-18-0c1255e2d696> in <module>()
----> 1 pickle.dumps(job)

AttributeError: Can't pickle local object 'if_exception_type.<locals>.if_exception_type_predicate'

I'm guessing that's because it represents some sort of future? IMO it would be fine if __getstate__ deleted the polling and let a reference that could be checked.

@max-sixty You likely do want to raise this as a new issue, given that this one is closed as "won't fix" and is about datastore, rather than bigquery.

As a workaround, you could pickle the result of job._to_api_repr, and then reconstitute it using client.job_from_resource.

Was this page helpful?
0 / 5 - 0 ratings