We're in the process of moving from S3 to GCS and we've hit a snag: with boto S3 one can stream a download using the read method on a key, and stream an upload using multiple 'multipart' calls: initiate_multipart_upload, upload_part_from* and complete_upload.
It's imperfect, but this allows simple streaming uploads and downloads of very large files. Furthermore, stick some buffering in there and you can easily wrap S3 reads and writes into file-like objects (without seeking), like smart_open does.
I think that the API supports all the pieces necessary to implement this in the client (which would be greatly appreciated!), but in the meantime can anyone suggest a workaround which does not require a temporary file?
Thanks!
At least for uploads, there is support, https://github.com/GoogleCloudPlatform/google-cloud-python/pull/1914
Could be documented better...
Simple example for a streaming upload of a large dataframe
impos os
import threading
r, w = os.pipe()
r, w = os.fdopen(r, 'rb'), os.fdopen(w, 'wb')
df = _large_pandas_df
path = 'gs://test_bucket/testfile.pickle'
# Important to set a chunk size when creating the blob `bucket.blob(blob_path, chunk_size=10 << 20)`
b = get_blob(path)
# Perform the upload in a separate thread
t = threading.Thread(target=lambda b, r: b.upload_from_file(r), name='upload-blob:%s' % path, args=(b, r))
# Since the blob upload is reading from a pipe, the operation is blocking until the pipe closes
t.start()
# This starts writing to the pipe
df.to_pickle(w)
# Make sure to close the pipe so the upload completes
w.close()
# Wait for the upload to complete
t.join()
@linar-jether that's a great snippet. I have a question though: what does your get_blob() function do exactly? Is it the same as the .blob() method of the bucket class?
Just a util function that gets a Blob object from a path ('gs://bucket_name/obj.name')
Thanks! Do you have suggestions for the recommended chunk size?
Whatever works best for your memory/throughput requirements, when a lower chunk size will require less memory but might have a lower throughput and vise versa
Thanks a lot, and I'm sorry, I am quite new to this. Could I use the same procedure to stream up JSON strings directly instead of bytes?
Yes sure, will work for any kind of stream
This seems to be resolved, as best as I can tell. Closing.
@cristicbz / @teoguso, poke if I got it wrong, and I will reopen.
Can I request that this issue is reopened? We use GCS extensively at Verily and we frequently deal with large indexed files since we use a variety of industry-standard genomics file formats (VCF, BAM, SAM, etc...). Being able to read a file header then seek to a specific position is a MAJOR speedup for some processes and we've had to hack around the lack of this by manually modifying the Content-Range headers a few times.
It seems to me that implementing Python's io.IOBase interface would be an ideal solution to this problem.
There is prior work on this task (see gcs-client) and it's even under a compatible license, but I'm guessing that some amount of work would be needed to clean this code up and adapt it to follow the conventions and style here.
@e4p This functionality exists right now in google-resumable-media.
I'm fairly certain the thing you're asking for (i.e. partial reads) are not the same as what this issue was originally about.
Just in case I misunderstand you, could you provide a code snippet (real or "fictional") of what you'd like to do and how it fails with the current code?
First, I want to thank you for pointing me to google-resumable-media it certainly will cover some of our uses. That said, it's not a replacement for a standard file object. One example of this; biopython can accept file handles as arguments. The following code can stream a FASTQ (genomic read quality file) if you have access to a file object.
from Bio import SeqIO
with open("example.fastq", "rU") as fileobj:
for rec in SeqIO.parse(fileobj, "fastq"):
print(rec.seq)
print(' name=%s\n annotations=%r' % (rec.name, rec.annotations))
Should this be opened as a new issue? I was looking for an existing issue covering this request and couldn't find one.
@linar-jether do you know if there is any other way nowadays to achieve what you have done in your code snippet? It's not working anymore, because of the changes in reusable-media, which expects to invoke seek on stream object
@gastlich Yes, the snippet seems to be broken with the current version...
@dhermes, looks like a regression on recent versions
Haven't had the chance to use the google-resumable-media library but it might provide the functionality to do streaming uploads.
Don't know the reason but our use case for streaming uploads still works (uploading from a streaming http request), might be a different code path for different objects
import threading
import requests
# Fetch
res = requests.get('http://speedtest.ftp.otenet.gr/files/test10Mb.db', stream=True)
r = res.raw
path = 'gs://test_bucket/testfile.pickle'
# Important to set a chunk size when creating the blob `bucket.blob(blob_path, chunk_size=10 << 20)`
b = get_blob(path, 1 << 20)
# Perform the upload in a separate thread
t = threading.Thread(target=lambda b, r: b.upload_from_file(r), name='upload-blob:%s' % path, args=(b, r))
# Since the blob upload is reading from a pipe, the operation is blocking until the pipe closes
t.start()
# Wait for the upload to complete
t.join()
@linar-jether thanks for quick reply
I checked your code and it's working fine with requests response.raw object. I think what's happening here is, writing to response.raw doesn't move a position of cursor, so that the upload_from_file might start from the beginning - otherwise it complains with:
if stream.tell() != 0:
raise ValueError(u'Stream must be at beginning.')
I still haven't solved my issue, because I'm writing into memory buffer io.BytesIO, and every write moves the cursor to the end of written bytes...
with BytesIO() as file:
destination = f'postgres/{table_name}/{org_id}.csv'
blob = bucket.blob(destination, chunk_size=10 << 20)
thread = threading.Thread(
target=lambda b, r: blob.upload_from_file(r),
name='upload-blob:%s' % destination,
args=(blob, file))
query = f'SELECT * FROM {table_name} WHERE organisation_id={org_id} LIMIT 10000'
cursor.execute(query)
# writes into memory, but moves cursor all the time
export_query = f'COPY ({query}) TO STDOUT WITH CSV HEADER'
cursor.copy_expert(export_query, file)
# Seek to the beginning
file.seek(0)
# I cannot start thread before `cursor.copy_expert` because resumable_media expects to start from beginning
thread.start()
# Wait for the upload to complete
thread.join()
As you can see, in my example I'm able to get rid of threading and do everything in blocking/synchronous way. But my problem is that the data set might be very big, and python will consume all of the allowed RAM memory. I partially solved my issue by using stored temp file on file disk, at least it won't consume memory but it's slower and I'm still hoping to do this properly with in-memory approach.
I don't get why stream.tell() != 0: should raise an exception instead of a warning.
I was using the example https://github.com/googleapis/google-cloud-python/issues/2871#issuecomment-311291059 we were able to continue mocking the tell method as a opened pipe tell will give an os exception.
However when the program continues we get an extrange error from the API:
google.api_core.exceptions.BadRequest: 400 PUT https://storage.googleapis.com/upload/storage/v1/b/proteus-dev-eov-delete_me/o?uploadType=resumable&upload_id=ADPycdvFahNsxwUYj0gixPSamAP8N07ru_yTNRef49xdXbAt89IKNiAyY1tp5CDxXzt3U0M0Jo4KRd-awc8tyHYmu48: Client sent query request with a non-empty body.: ('Request failed with status code', 400, 'Expected one of', <HTTPStatus.OK: 200>, 308)
This error is caused in this line:
venv/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2348, in upload_from_file
created_json = self._do_upload(
Most helpful comment
Simple example for a streaming upload of a large dataframe