Actix-web: HttpResponse should be `Send` (3.0.0-beta.3)

Created on 29 Aug 2020  路  6Comments  路  Source: actix/actix-web

I have requests that are computationally costly and also do blocking IO. Therefor, I want to make them run on their own threads so that it doesn't cause the actix server to be unable to initiate new requests.

So, I wrap my "to" in actix_web::web::block:

        App::new()
          .service(
            web::resource("/")
              .to(
                |req| async {
                  web::block(
                    move || -> HttpResponse {
                      // Do work and return HttpResponse
                    }
                  ).unwrap().await
                }
              )
          )

Expected Behavior

This should compile and work

Current Behavior

You get an error like so:

error[E0277]: `(dyn actix_web::dev::MessageBody + std::marker::Unpin + 'static)` cannot be sent between threads safely
    --> eva-services/src/bin/eva-api-search-es.rs:1469:19
     |
1469 |                   web::block(
     |                   ^^^^^^^^^^ `(dyn actix_web::dev::MessageBody + std::marker::Unpin + 'static)` cannot be sent between threads safely
     | 
    ::: /home/charles/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-web-3.0.0-beta.3/src/web.rs:280:8
     |
280  |     I: Send + 'static,
     |        ---- required by this bound in `actix_web::web::block`
     |
     = help: the trait `std::marker::Send` is not implemented for `(dyn actix_web::dev::MessageBody + std::marker::Unpin + 'static)`
     = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<(dyn actix_web::dev::MessageBody + std::marker::Unpin + 'static)>`
     = note: required because it appears within the type `std::boxed::Box<(dyn actix_web::dev::MessageBody + std::marker::Unpin + 'static)>`
     = note: required because it appears within the type `actix_web::dev::Body`
     = note: required because it appears within the type `actix_web::dev::ResponseBody<actix_web::dev::Body>`
     = note: required because it appears within the type `actix_web::HttpResponse`

Context

Along with the having to process a bunch of data in order to service a request, I also have code that uses postgres::blocking, which is out of my control (it needs to be the blocking api), which means I must have it in its own task.

  • Rust Version: 1.46.0
  • Actix Web Version: 3.0.0-beta.3

Most helpful comment

The key here is to work with the actix-web model and not against it.
If you are gonna block just block. send Sendable data to the blocking pool and use the result it returns. If you can stream it then just use the web::block in your stream, passing chunck of your data and poll it.

actix-file is a perfect example on how to use the blocking pool in streaming manner.source

All 6 comments

actix-web is mainly built upon non Send futures and utilize single thread as much as possible. If your code compiles we would have race conditions and even UB.

You should do your blocking work in web::block and use the returned result building response in your worker thread.

A Body of variant Body::Bytes _is_ Send, so it seems reasonable that a HttpResponse<Body> with that variant of Body should also be Send-able.

I guess Rust doesn't have a way to express that my thread Sends a HttpResponse<Body=Body::Bytes>, because that seems safe and correct. Could there be a way for Actix-web to be able to differentiate those types so I can avoid the boilerplate?

As is, I could do

                  web::block(
                    move || -> MyOwnHttpResponseType<Bytes> {
                      // Do work and return MyOwnHttpResponseType<Bytes>
                    }
                  ).unwrap().await.convert_my_own_http_response_type_into_actix_http_request()

(being stupidly verbose just to be clear), but this is just... not fun: I'd have to replicate all of actix's HttpResponse.

Notice Body is an enum with the Message variant that is a non Send trait object. We surely can add Send bound to it but to be honest I don't know how much actix-web and usercode it would break. It would also limit the type impl MessageBody to be Send and push the problem further down the line.

Not sure if this will be suitable for your needs, but if you have to send data between multiple threads, you may just add a MPSC channel to produce requests and receive responses.

Here's a code snippet to show my idea:

use actix_web::{get, web, App, HttpResponse, HttpServer, Responder};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, RwLock};

pub enum Request {
    ComputeA(u8, u8, oneshot::Sender<u16>),
}

impl Request {
    pub fn compute_a(val_1: u8, val_2: u8) -> (oneshot::Receiver<u16>, Self) {
        let (sender, receiver) = oneshot::channel();

        (receiver, Request::ComputeA(val_1, val_2, sender))
    }
}

#[derive(Debug, Clone)]
struct AppData {
    sender: Arc<RwLock<mpsc::Sender<Request>>>,
}

#[get("/{val_1}/{val_2}/")]
async fn index(data: web::Data<AppData>, input: web::Path<(u8, u8)>) -> impl Responder {
    let (val_1, val_2) = *input;
    let (response, request) = Request::compute_a(val_1, val_2);

    let mut sender = data.sender.write().await;
    if let Err(_err) = sender.send(request).await {
        panic!("Oh no");
    }

    let response = response.await.unwrap();

    HttpResponse::Ok().json(response)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let (sender, mut receiver) = mpsc::channel(128);

    let app_state = AppData {
        sender: Arc::new(RwLock::new(sender)),
    };

    std::thread::spawn(move || {
        let mut runtime = tokio::runtime::Builder::new()
            .basic_scheduler()
            .build()
            .unwrap();

        runtime.block_on(async move {
            while let Some(request) = receiver.recv().await {
                match request {
                    Request::ComputeA(val_1, val_2, resp) => {
                        let result = val_1 + val_2;
                        resp.send(result as u16).unwrap();
                    }
                }
            }
        });
    });

    HttpServer::new(move || {
        let app_state = app_state.clone();
        App::new().data(app_state).service(index)
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

web::block already uses channel under to the hood. What he want is a way to send HttpResponse to a blocking thread pool(Maybe for some heavy compuation) so your example should change to use tokio::spawn_blocking or tokio::block_in_place or your are just blocking your tokio executor.

The key here is to work with the actix-web model and not against it.
If you are gonna block just block. send Sendable data to the blocking pool and use the result it returns. If you can stream it then just use the web::block in your stream, passing chunck of your data and poll it.

actix-file is a perfect example on how to use the blocking pool in streaming manner.source

Was this page helpful?
0 / 5 - 0 ratings