Boto3: How to Use botocore.response.StreamingBody as stdin PIPE

Created on 2 Jan 2016  Â·  35Comments  Â·  Source: boto/boto3

I want to pipe large video files from AWS S3 into Popen's stdin. This code runs as an AWS Lambda function, so these files won't fit in memory or on the local file system. Also, I don't want to copy these huge files anywhere, I just want to stream the input, process on the fly, and stream the output. I've already got the processing and streaming output bits working. The problem is how to obtain an input stream as a Popen pipe.

I can access a file in an S3 bucket:

import boto3
s3 = boto3.resource('s3')
response = s3.Object(bucket_name=bucket, key=key).get()
body = response['Body']  

body is a botocore.response.StreamingBody. I intend to use body something like this:

from subprocess import Popen, PIPE
Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(input=body)[0]

But of course body needs to be converted into a file-like object. The question is how?

Most helpful comment

Not sure if this helps, but I struggled with a similar issue streaming from boto3:s3 to flask output stream. Some sample code here, may help you:

s3_response = s3_client.get_object(Bucket=BUCKET, Key=FILENAME)

def generate(result):
   for chunk in iter(lambda: result['Body'].read(self.CHUNK_SIZE), b''):
      yield chunk

return Response(generate(s3_response), mimetype='application/zip', headers={'Content-Disposition': 'attachment;filename=' + FILENAME})
`

All 35 comments

Can anyone help?

@mslinn Sorry for the delays!

StreamingBody is a file-like object, so any method that works against a file should work here as well. At the moment I'm not sure what the answer is, but it seems to be a question for the general python community so you'll probably have more luck on StackOverflow. This seems to be in the right direction.

I'm going to close this issue, but feel free to update here if you find a working solution.

I wish it were so simple. I must have dumped at least 30 hours into this problem, and I've tried a variety of approaches. I've tried the AWS & Python communities on StackOverflow but got no response. I think this issue requires too much setup for a generic Python programmer to address.

I'm sure this will be an often-repeated question, I'm just the first to hit it. I think someone from AWS should take it up.

For my part, I've got a broken product in production as a result of this issue. I am willing to pay cash money for a fix.

I was working with json files specifically, so far from your large video file requirement. However, I was able to access response['Body']._raw_stream.data to get in a I-should-not-access-this-member kinda way. I hope you can do the same, I don't know the specifics of how S3 works ATM.

if communicate needs more methods than read, you can dump the information into a different class. This will necessitate buffering the whole file in memory, but it can be done since the result from .read() is a byte array.

Buffering the entire file in memory is not an option. I am putting together some sample code that almost works, will share later today or tomorrow

A test project that works in a variety of environments is here.

If you can't fit the data in memory, you won't be able to use Popen.communicate because it buffers the whole file in memory anyway. Lambda does give you 512MB of ephemeral storage, so you could write to a temporary file and direct ffmpeg to use that.

I did not use communicate in the sample code I posted 1/2 hour ago. Take a look
BTW, videos can be larger than 512MB.

Not sure if this helps, but I struggled with a similar issue streaming from boto3:s3 to flask output stream. Some sample code here, may help you:

s3_response = s3_client.get_object(Bucket=BUCKET, Key=FILENAME)

def generate(result):
   for chunk in iter(lambda: result['Body'].read(self.CHUNK_SIZE), b''):
      yield chunk

return Response(generate(s3_response), mimetype='application/zip', headers={'Content-Disposition': 'attachment;filename=' + FILENAME})
`

Hmm, looks interesting, thanks!

I tested that it does indeed iterate per chunk size, but I have not profiled it - meaning I'm hoping StreamingBody really is a stream and it's not all consumed in memory.

performance is bad, though;

does python have something like a nodejs SrcStream.pipe(response) call? or Golang io.Copy(SrcStream, DstStream) ??

Node support multi-IO concurrency natively, Golang's io.Copy is using goroutines internally; in Python world I only found the werkzeug IterIO as a wrapper to write a stream to, which internally calls greenlet as the lightweight process model to simulate multi-io concurrency

  1. http://werkzeug.pocoo.org/docs/dev/contrib/iterio/
  2. https://github.com/marianoguerra/tubes/blob/master/werkzeug/contrib/iterio.py

Thanks! I'm looking at this same issue too. I'm trying to "stream" a StreamingBody S3 input file and copy it to an S3 output file. I want to do txt file processing on potentially LARGE files. I'm a newbie to Python and AWS, but this information is exactly what I was looking for.

I spent a lot of time but never got this to work. If someone does, please show the juicy details.

I'm resorting to using buffered reads (4096 chars/read) at the moment. But I'm getting farther. I saw your github code submission, if I find anything I'll share.

Sent from my iPad

On Aug 17, 2016, at 3:46 PM, Mike Slinn [email protected] wrote:

I spent a lot of time but never got this to work. If someone does, please show the juicy details.

—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub, or mute the thread.

What happens if you do

from contextlib import closing

with closing(response['Body']) as body_fileobj:
    p = Popen(..cmd.., stdin=body_fileobj, stdout=PIPE)
(stdout,_) = p.communicate()
...

That should be equivalent to

stdout_var = `curl URL | somecmd` 

in bash

Hey
Have you found any useful solution for this ?

@JordonPhillips I'm confused by this sentence:

StreamingBody is a file-like object, so any method that works against a file should work here as well.

I looked at the code of the StreamingBody and it seems to me that is is really a wrapper of a class inheriting from io.IOBase) but only the read method from the raw stream is exposed, so not really a file-like object.

To me it would make a lot of sense to expose the io.IOBase interface in the StreamingBody as we could wrapped S3 objects into a io.BufferedReader or a io.TextIOWrapper. Is there any reasons not to do so? Happy to make a pull request if it can help.

As pointed out above by @Scoots the actual file-like object is found in the ._raw_stream attribute of the StreamingBody class. What are the ramifications of accessing it directly for downstream file-like consumers ??

@mslinn : Were you ever able to solve this ?

Does the download_fileobj() with TransferConfig(max_concurrency=1, max_io_queue=1) solves this problem ?

Thanks.

Gave up as this turned into a research project

Sent from my iPad

On Jan 22, 2018, at 11:28 PM, bishtpradeep notifications@github.com wrote:

@mslinn : Were you ever able to solve this ?

Does the download_fileobj() with TransferConfig(max_concurrency=1, max_io_queue=1) solves this problem ?

Thanks.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.

Thank you for responding.

I did encounter this library which supports streaming large files to/from s3 and is written on top of boto - https://github.com/RaRe-Technologies/smart_open .

@bishtpradeep looks interesting and useful, thanks!

@mslinn I think I found a possible solution (at least for Python 3). Looking at the objects in boto3. I had a similar problem using pickle with a particularly large object. Here's what I got working:

>>> obj
s3.Object(bucket_name='my-bucket', key='path/to/data/pickled-object.pkl')
>>> body = obj.get()["Body"]
>>> body
<botocore.response.StreamingBody object at 0x10e48d978>
>>> body._raw_stream
<botocore.vendored.requests.packages.urllib3.response.HTTPResponse object at 0x10e48d3c8>

So looking at the docs for this response here http://urllib3.readthedocs.io/en/latest/reference/#module-urllib3.response you can see:

class urllib3.response.HTTPResponse(body='', headers=None, status=0, version=0, reason=None, strict=0, preload_content=True, decode_content=True, original_response=None, pool=None, connection=None, retries=None, enforce_content_length=False, request_method=None)
Bases: io.IOBase

This is good news because Python offers some nice IO objects to allow for BufferedIOReading (https://docs.python.org/3/library/io.html#io.BufferedReader). So continuing from the code above:

>>> import io
>>> buff_reader = io.BufferedReader(body._raw_stream)
>>> buff_reader
<_io.BufferedReader>
>>> import pickle
>>> data_dict = pickle.load(buff_reader)
>>> data_dict
{'my-data': 'VERY LONG DATA'}

The BufferedReader is a true file like object. @JordonPhillips is this dangerous?

I had to do this recently. In the end the simplest solution that I found was simply to have a loop that alternately read chunks of data from the source StreamingBody and wrote to them the stdin pipe and then read as much as possible from the stdout pipe and wrote the result to my output stream (which was an SSL connection). The problem with Popen.communicate() is that as far as I can tell it attempts to send all of the input to the process before reading back (and buffering) all of the output. This will lock up if the subprocess writes in a blocking manner and the output is so large that it fills up the output pipe's buffers before you've fed it all the input and it will also fail if the output is too large to fit in memory even if you have succeeded in feeding in all of the input.. You need to drain the output and pass it on down the line before feeding it more input.

@nickovs Got any code you could share? Feel inspired to write an article about this?

I'll see if I can did it out. I seem to recall that the key was to use call fileno() on the output and use fcntl to set the stdout pipe to have non-blocking reads, so that I could reliably read it until it was empty without getting stuck.

Note that an alternative option, if you have trouble getting non-blocking IO to work, is just to start a thread that reads from the StreamingBody and writes to the stdin pipe (and terminates when the SteamingBody runs dry or the far end of the pipe gets closed) and then have the main path of your code read the output from stdout and pass it down the line. In this case you need to take care to have a way to clean up the thread if the subprocess crashes.

@mslinn Would streaming it through BytesIO be sufficient to emulate the file for piping through? this also would work I think.

I'm currently using it like so:

    def from_bin_streaming(self, bucket_name, key_name):
        out_buffer = io.BytesIO()
        obj = self.get_s3_client().get_object(Bucket=bucket_name, Key=key_name)['Body'].iter_lines()
        for i in obj:
            out_buffer.write(i)
        out_buffer.seek(0)
        return out_buffer

Of course you could yield instead use seek to slice bytes too. I havent tested it with popen but I feel like it could be useful

This issue is still something you have to either read the boto3 source or go to stack overflow to resolve. Which means the library needs work and this issue should not have been closed and instead assigned as a feature request.

https://stackoverflow.com/questions/7624900/how-can-i-use-boto-to-stream-a-file-out-of-amazon-s3-to-rackspace-cloudfiles/40661459#40661459

I just ran into this issue.
My use case is to copy potentially large files with bo3o from one s3 bucket to another s3 bucket with different credentials.
The following snippet fails with 'ValueError: read of closed file', most probably due to underlying issue described in urllib3/urllib3#1305

import io
source_obj = source_s3.Object(bucket_name='bucket1', key='source_key')
target_s3.Object('bucket2', 'target_key').put(Body=io.BufferedReader(source_obj.get()['Body']._raw_stream))

Does anybody have a better idea how to efficiently copy objects from one bucket to the other without having to worry about tuning CHUNK_SIZE?

This is especially frustrating since apparently put_object can not take a StreamingBody as input.

I'm trying to copy file between S3-compatible services:

# both `src_client` and `tgt_client` are valid, AWS S3 clients from different accounts, `key` is a valid key
obj = src_client.get_object(Bucket="bucket", Key=key)
tgt_client.put_object(Bucket="tgt_bucket", Key=key, Body=obj['Body'])

responds: AttributeError: 'StreamingBody' object has no attribute 'tell'

using instead _raw_stream:

obj = src_client.get_object(Bucket="bucket", Key=key)
tgt_client.put_object(Bucket="tgt_bucket", Key=key, Body=obj['Body']._raw_stream)

fails with "UnsupportedOperation: seek"

@nicornk the problem with urllib3 seems to have a workaround, but it still doesn't work anyway.
FYI, you can use .auto_close on the stream to prevent it being closed at the end of .read().

obj = src_client.get_object(Bucket="bucket", Key=key)

stream = obj['Body']._raw_stream
stream.auto_close = False

tgt_client.put_object(Bucket="sharethis_archive", Key=key, Body=io.BufferedReader(stream))

fails with "UnsupportedOperation: File or stream is not seekable."

I have a question about this...

If I pass around this streaming body object, does this mean that the http connection isn't closed? Is it only closed once the streaming body object is garbage collected or when the entire stream is read?

Hello there,
I'm trying to do exactly like @eprochasson and @nicornk : copy a file from a S3 bucket in AWS account A to another S3 bucket in AWS account B.

I've read the whole discussion and merged PRs, but I have not yet found a proper way to do it: put_object() does not accept directly a StreamingBody (nor its ._raw_stream) and I don't see any good solution to turn it into a readable stream with seek() capability.

Did anyone find a good solution ?

Was this page helpful?
0 / 5 - 0 ratings