Sanic: Large file upload (streaming support)

Created on 13 Mar 2017  路  23Comments  路  Source: sanic-org/sanic

There is currently no support for uploading very large files greater than the amount of RAM available to the machine.

enhancement

Most helpful comment

@r0fls sure. I can also add a compact unit test soon too.

All 23 comments

@nszceta did you checkout the stream response type? https://github.com/channelcat/sanic/blob/master/docs/sanic/response.md

@r0fls input, not output. Request, not response streaming to file.

ah yes, that's why you said _upload_ :)

I have successfully implemented a method for this in Flask, using a server side session due to the insane way browsers stream files to the server.

Here is a snippet of my Flask code I used in one of my projects

@app.route('/image', methods=['POST'])
def image_post():
    rf = request.files['files[]']
    ext = rf.mimetype.split('/')[1]
    dst = session['image'] = 'tmp/{}.{}'.format(session['key'], ext)

    try:
        # extract starting byte from Content-Range header string
        range_str = request.headers['Content-Range']
        start_bytes = int(range_str.split(' ')[1].split('-')[0])
        with open(dst, 'ab') as f:
            f.seek(start_bytes)
            f.write(rf.stream.read())
        return jsonify({})
    except KeyError:
        with open(dst, 'wb') as f:
            f.write(rf.stream.read())
        return jsonify({})

A few issues:

  • Returning return jsonify({}) is sub-optimal
  • There has to be a better way to identify start_bytes
  • Is there any other way to keep track of who is uploading besides a session?

I'd love to see support for this in Sanic, and I may have time to implement it as well, but I would like more feedback first.

I would like to see this too. I miss the "request.stream" feature a lot. (similar to request.body but a file-like object not already in-memory)
I'm willing to help to implement it as well :)

@FrnkVieira what would be the best way to get started?

@nszceta I really don't know. I have to check Sanic internals but I guess Sanic is already reading the entire stream which means we would need a pretty big refactoring with a lot of lazy loaders added... @r0fls Do you have any opinions about this ?

@nszceta Thanks for linking #568 to this issue. This is exactly what I am looking for.

I am happy to contribute some changes to Sanic to address this. It looks like some rework is needed on sanic.request.Request.form and sanic.request.Request.files properties and parse_multipart_form?

not quite a "pretty big refactoring", am I missing something?

This could be covered by https://github.com/channelcat/sanic/pull/697 but I'm not sure. @nszceta do you care to test with your example that was larger than the machine's ram? cc @38elements

@r0fls sure. I can also add a compact unit test soon too.

What's the current state on this?

I use aiofiles to write the bytes got from response.stream queue instantaneously, it works fine. My memory's ok.

Get to override default REQUEST_MAX_SIZE and REQUEST_TIMEOUT though.

Sanic stores request body in memory even for streamed requests: https://github.com/channelcat/sanic/blob/master/sanic/server.py#L278

The framework has no flow control at all unfortunately.

@asvetlov response.stream is a asyncio.Queue object. So while the request handler keeps putting bytes into it, my stream handler keeps getting the bytes out. Thus no memory problem. Or am I missing something?

The mentioned above line pushes a data into the queue. The data is in memory already
If your code consumes the data slower than a peer sends it -- out-of-memory error is unavoidable.
To solve it sanic should pause reading from the socket on reaching some memory limit.

@Jeffwhen I agree with @asvetlov . You need to await the put coroutine to finish and then read new body, which means you may not use the javascript-like callback style. Currently while pushing the new-received body to the queue (as a task), the server is also receiving new body. Since the previous body may not have been processed (freed), after new body is pushed, the total memory consumption will increase which will surely lead to out-of-memory. You could expose the socket to the stream handler(async def) so one could await the reading coroutine on demand.

Are there any way to do flow control using asyncio.Protocol abstraction?

Sure, there is but somebody should make a Pull Request :)

one temporary solution for nginx users: nginx-upload-module

What a pity is that aiohttp supports it.

one temporary advice for flow control in sanic.server.HttpProtocol

    # line 247 nearby
    def on_headers_complete(self):
        from multidict import CIMultiDict

        self.request = self.request_class(
            url_bytes=self.url,
            headers=CIMultiDict(self.headers),
            version=self.parser.get_http_version(),
            method=self.parser.get_method().decode(),
            transport=self.transport
        )
        # Remove any existing KeepAlive handler here,
        # It will be recreated if required on the new request.
        if self._keep_alive_timeout_handler:
            self._keep_alive_timeout_handler.cancel()
            self._keep_alive_timeout_handler = None
        if self.is_request_stream:
            self._is_stream_handler = self.router.is_stream_handler(
                self.request)
            if self._is_stream_handler:
                ################################## fix-1 start
                self.request.stream = asyncio.Queue(10)  # whatever size you like
                ################################## fix-1 end
                self.execute_request_handler()

    # line 267 nearby
    def on_body(self, body):
        if self.is_request_stream and self._is_stream_handler:

            #################################### fix-2 start
            async def put_body(body):
                if self.request.stream.full():
                    self.transport.pause_reading()
                    await self.request.stream.put(body)
                    self.transport.resume_reading()
                else:
                    await self.request.stream.put(body)

            self._request_stream_task = self.loop.create_task(put_body(body))
            #################################### fix-2 end
            return
        self.request.body.append(body)

Closed per #1423

Was this page helpful?
0 / 5 - 0 ratings