Actix-web: How to stream a large JSON response to limit memory consumption?

Created on 6 Sep 2020  ·  5Comments  ·  Source: actix/actix-web

Hello,

Actix-web version : 2.0.0

I'm new with Rust and Actix-web, so sorry if my question sounds stupid.

I'm used to stream large JSON with Golang and Echo like this:

package main

import (
    "net/http"

    "github.com/labstack/echo/v4"
)

func main() {
    e := echo.New()
    e.GET("/", func(c echo.Context) error {
        resp := c.Response()
        resp.WriteHeader(http.StatusOK)

        if _, err := io.WriteString(resp, "["); err != nil {
            return err
        }
        enc := json.NewEncoder(resp)
        for i := 0; i < 100000; i++ {
            if i > 0 {
                if _, err := io.WriteString(resp, ","); err != nil {
                    return err
                }
            }
            user := User{
                Username:  "My username_" + strconv.Itoa(i),
                Password:  "gjgjghjggjhg7884erhkhjb;lljjkbhjvftxe!hjjèkhkljkbhft",
                Lastname:  "A looooong lastname",
                Firstname: "A looooong firstname",
            }
            if err := enc.Encode(user); err != nil {
                return err
            }
        }
        if _, err := io.WriteString(resp, "]"); err != nil {
            return err
        }

        return nil
    })
    e.Logger.Fatal(e.Start(":1323"))
}

Currently, i have this code:

#[derive(Serialize, Debug)]
pub struct Task {
    pub id: u32,
    pub name: &'static str,
    pub message: String,
}

#[get("/big-json")]
async fn big_json() -> Result<web::Json<Vec<models::Task>>, AppError> {
    let mut v: Vec<models::Task> = Vec::new();
    for i in 0..100_000 {
        v.push(models::Task {
            id: i,
            name: "Coucou ceci est mon nom",
            message: String::from("Mon message doit être un peu long pour augmenter la taille"),
        });
    }
    Ok(web::Json(v))
}

My machine specs :
Linux Mint 20
Intel© Core™ i7-6700 CPU @ 3.40GHz × 4
16Go DDR3

Initial memory size: 428kib
After runnig ab -n1000 -c100 http://127.0.0.1:8089/v1/big-json: 1GiB
Twice : 1.3GiB
Three times: 1.6GiB

To limit memory consumption, I'd like to stream JSON response, but I don't see how to do it.

Thanks

Most helpful comment

yea this could be useful but preferably the buffer should be either handled with BytesMut or [u8; n]. I failed to make it work on this example so I can give it another try and make a PR to examples then.

All 5 comments

You can check doc of crates like tokio and futures for how to use stream.
For example the tutorial session for stream from tokio.rs is pretty good one.

And here is an example of how to stream handling your specific code in actix-web. In realworld you would want to use a fixed slice as buf and a cursor to help you reuse the buf effciently but it's beyond the scope of this example.

use std::pin::Pin;
use std::task::{Context, Poll};

use actix_web::{get, web::Bytes, App, Error, HttpResponse, HttpServer};

#[derive(serde::Serialize, Debug)]
pub struct Task {
    pub id: u32,
    pub name: &'static str,
    pub message: String,
}

pub struct TaskStream {
    next: u32,
    buf: Vec<u8>,
}

impl futures::Stream for TaskStream {
    type Item = Result<Bytes, Error>;

    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();

        if this.next == 100_000 {
            Poll::Ready(None)
        } else {
            let res = serde_json::to_writer(
                &mut this.buf,
                &Task {
                    id: this.next,
                    name: "Coucou ceci est mon nom",
                    message: String::from(
                        "Mon message doit être un peu long pour augmenter la taille",
                    ),
                },
            );

            if let Err(e) = res {
                return Poll::Ready(Some(Err(e.into())));
            }

            this.next += 1;

            let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));

            this.buf.clear();

            poll
        }
    }
}

#[get("/big-json")]
async fn big_json() -> HttpResponse {
    let stream = TaskStream {
        next: 0,
        buf: Default::default(),
    };

    HttpResponse::Ok().streaming(stream)
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().service(big_json))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

@fakeshadow, thanks a lot 🙏 It is exactly what i needed.

This is the code to have a valid JSON:

use std::pin::Pin;
use std::task::{Context, Poll};

use actix_web::{get, web::Bytes, App, Error, HttpResponse, HttpServer};

#[derive(serde::Serialize, Debug)]
pub struct Task {
    pub id: u32,
    pub name: &'static str,
    pub message: String,
}

pub struct TaskStream {
    next: u32,
    buf: Vec<u8>,
}

impl futures::Stream for TaskStream {
    type Item = Result<Bytes, Error>;

    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();

        if this.next == 100_000 {
            Poll::Ready(None)
        } else {
            if this.next == 0 {
                for v in b"[" {
                    this.buf.push(*v);
                }
            }

            let res = serde_json::to_writer(
                &mut this.buf,
                &Task {
                    id: this.next,
                    name: "Coucou ceci est mon nom",
                    message: String::from(
                        "Mon message doit être un peu long pour augmenter la taille",
                    ),
                },
            );

            if let Err(e) = res {
                return Poll::Ready(Some(Err(e.into())));
            }

            this.next += 1;

            if this.next < 100_000 {
                for v in b"," {
                    this.buf.push(*v);
                }
            } else {
                for v in b"]" {
                    this.buf.push(*v);
                }
            }

            let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));

            this.buf.clear();

            poll
        }
    }
}

#[get("/big-json")]
async fn big_json() -> HttpResponse {
    let stream = TaskStream {
        next: 0,
        buf: Default::default(),
    };

    HttpResponse::Ok().streaming(stream)
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().service(big_json))
        .bind("127.0.0.1:8080")?
        .run()
        .await
}

I do not known if it is the most idiomatic way, but it works 😄

Thanks again!

This might be a good thing to have in the examples repo @fakeshadow ?

yea this could be useful but preferably the buffer should be either handled with BytesMut or [u8; n]. I failed to make it work on this example so I can give it another try and make a PR to examples then.

Hi, there is a documented way to make a stream response (i want to send feed of a xlsx file parsing)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

icommit picture icommit  ·  3Comments

joshsleeper picture joshsleeper  ·  3Comments

volfco picture volfco  ·  4Comments

cheolgyu picture cheolgyu  ·  3Comments

gh67uyyghj picture gh67uyyghj  ·  3Comments