Actix-web: Stream difficulties when using Multipart with Rusoto S3

Created on 16 Mar 2020  路  7Comments  路  Source: actix/actix-web

I'm having issue with interoperability between actix-multipart 0.2.0 and rusoto_s3 0.43.0

Essentially it boils down to bounds on ByteStream::new() that look like this:

pub fn new<S>(stream: S) -> ByteStream where
    S: Stream<Item = Result<Bytes, Error>> + Send + Sync + 'static, 

However, Field has Rc in it:
https://github.com/actix/actix-web/blob/3dc859af58fa0b4676f86f7ba8a5cf5db546831a/actix-multipart/src/server.rs#L372-L378

So instead of being able to use it as a stream there I'm getting tons of cannot be shared between threads safely from the compiler.

I'd like to avoid creating temporary files, which most implementations seem to do.

Is there a better approach or maybe actix-multipart can become thread-safe?

Most helpful comment

Just in case it is useful to those finding this issue later, this is the kind of code you can use to transform body stream into byte stream acceptable by Rusoto S3:

let (mut body_sender, body_receiver) = futures::channel::mpsc::channel(UPLOADING_BUFFER_SIZE);
actix_rt::spawn(async move {
    while let Some(chunk) = body.next().await {
        match chunk {
            Ok(bytes) => {
                body_sender.send(Ok(bytes)).await.unwrap();
            }
            Err(error) => {
                body_sender
                    .send(Err(io::Error::new(
                        std::io::ErrorKind::Other,
                        "Error when receiving body",
                    )))
                    .await
                    .unwrap();
            }
        }
    }
});
let byte_stream = rusoto_core::ByteStream::new(body_receiver);

All 7 comments

Actually very similar thing happens when not using multipart and just uploading file in a body. Any ideas why is it not Sync and how to make it such?

Solved pretty easily by using futures::channel::mpsc::channel(), receiver of which was used to send data to Rusoto S3 and the other end was fed by data from request body in another spawned future.
Would be nice if it just worked though, too much boilerplate for such a simple task.

So does it mean you can close this issue?

Yes and no. I'm wondering if there is something that could be done in actix-web to avoid that bottleneck and be able to use stream as is.

"Yes and no" is too vague. actix-web is !Send, there is no any way to turn it to a Send

Closing then

Just in case it is useful to those finding this issue later, this is the kind of code you can use to transform body stream into byte stream acceptable by Rusoto S3:

let (mut body_sender, body_receiver) = futures::channel::mpsc::channel(UPLOADING_BUFFER_SIZE);
actix_rt::spawn(async move {
    while let Some(chunk) = body.next().await {
        match chunk {
            Ok(bytes) => {
                body_sender.send(Ok(bytes)).await.unwrap();
            }
            Err(error) => {
                body_sender
                    .send(Err(io::Error::new(
                        std::io::ErrorKind::Other,
                        "Error when receiving body",
                    )))
                    .await
                    .unwrap();
            }
        }
    }
});
let byte_stream = rusoto_core::ByteStream::new(body_receiver);
Was this page helpful?
0 / 5 - 0 ratings