Hello,
Actix-web version : 2.0.0
I'm new with Rust and Actix-web, so sorry if my question sounds stupid.
I'm used to stream large JSON with Golang and Echo like this:
package main
import (
"net/http"
"github.com/labstack/echo/v4"
)
func main() {
e := echo.New()
e.GET("/", func(c echo.Context) error {
resp := c.Response()
resp.WriteHeader(http.StatusOK)
if _, err := io.WriteString(resp, "["); err != nil {
return err
}
enc := json.NewEncoder(resp)
for i := 0; i < 100000; i++ {
if i > 0 {
if _, err := io.WriteString(resp, ","); err != nil {
return err
}
}
user := User{
Username: "My username_" + strconv.Itoa(i),
Password: "gjgjghjggjhg7884erhkhjb;lljjkbhjvftxe!hjjèkhkljkbhft",
Lastname: "A looooong lastname",
Firstname: "A looooong firstname",
}
if err := enc.Encode(user); err != nil {
return err
}
}
if _, err := io.WriteString(resp, "]"); err != nil {
return err
}
return nil
})
e.Logger.Fatal(e.Start(":1323"))
}
Currently, i have this code:
#[derive(Serialize, Debug)]
pub struct Task {
pub id: u32,
pub name: &'static str,
pub message: String,
}
#[get("/big-json")]
async fn big_json() -> Result<web::Json<Vec<models::Task>>, AppError> {
let mut v: Vec<models::Task> = Vec::new();
for i in 0..100_000 {
v.push(models::Task {
id: i,
name: "Coucou ceci est mon nom",
message: String::from("Mon message doit être un peu long pour augmenter la taille"),
});
}
Ok(web::Json(v))
}
My machine specs :
Linux Mint 20
Intel© Core™ i7-6700 CPU @ 3.40GHz × 4
16Go DDR3
Initial memory size: 428kib
After runnig ab -n1000 -c100 http://127.0.0.1:8089/v1/big-json: 1GiB
Twice : 1.3GiB
Three times: 1.6GiB
To limit memory consumption, I'd like to stream JSON response, but I don't see how to do it.
Thanks
You can check doc of crates like tokio and futures for how to use stream.
For example the tutorial session for stream from tokio.rs is pretty good one.
And here is an example of how to stream handling your specific code in actix-web. In realworld you would want to use a fixed slice as buf and a cursor to help you reuse the buf effciently but it's beyond the scope of this example.
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::{get, web::Bytes, App, Error, HttpResponse, HttpServer};
#[derive(serde::Serialize, Debug)]
pub struct Task {
pub id: u32,
pub name: &'static str,
pub message: String,
}
pub struct TaskStream {
next: u32,
buf: Vec<u8>,
}
impl futures::Stream for TaskStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.next == 100_000 {
Poll::Ready(None)
} else {
let res = serde_json::to_writer(
&mut this.buf,
&Task {
id: this.next,
name: "Coucou ceci est mon nom",
message: String::from(
"Mon message doit être un peu long pour augmenter la taille",
),
},
);
if let Err(e) = res {
return Poll::Ready(Some(Err(e.into())));
}
this.next += 1;
let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));
this.buf.clear();
poll
}
}
}
#[get("/big-json")]
async fn big_json() -> HttpResponse {
let stream = TaskStream {
next: 0,
buf: Default::default(),
};
HttpResponse::Ok().streaming(stream)
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().service(big_json))
.bind("127.0.0.1:8080")?
.run()
.await
}
@fakeshadow, thanks a lot 🙏 It is exactly what i needed.
This is the code to have a valid JSON:
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_web::{get, web::Bytes, App, Error, HttpResponse, HttpServer};
#[derive(serde::Serialize, Debug)]
pub struct Task {
pub id: u32,
pub name: &'static str,
pub message: String,
}
pub struct TaskStream {
next: u32,
buf: Vec<u8>,
}
impl futures::Stream for TaskStream {
type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.next == 100_000 {
Poll::Ready(None)
} else {
if this.next == 0 {
for v in b"[" {
this.buf.push(*v);
}
}
let res = serde_json::to_writer(
&mut this.buf,
&Task {
id: this.next,
name: "Coucou ceci est mon nom",
message: String::from(
"Mon message doit être un peu long pour augmenter la taille",
),
},
);
if let Err(e) = res {
return Poll::Ready(Some(Err(e.into())));
}
this.next += 1;
if this.next < 100_000 {
for v in b"," {
this.buf.push(*v);
}
} else {
for v in b"]" {
this.buf.push(*v);
}
}
let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));
this.buf.clear();
poll
}
}
}
#[get("/big-json")]
async fn big_json() -> HttpResponse {
let stream = TaskStream {
next: 0,
buf: Default::default(),
};
HttpResponse::Ok().streaming(stream)
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(|| App::new().service(big_json))
.bind("127.0.0.1:8080")?
.run()
.await
}
I do not known if it is the most idiomatic way, but it works 😄
Thanks again!
This might be a good thing to have in the examples repo @fakeshadow ?
yea this could be useful but preferably the buffer should be either handled with BytesMut or [u8; n]. I failed to make it work on this example so I can give it another try and make a PR to examples then.
Hi, there is a documented way to make a stream response (i want to send feed of a xlsx file parsing)
Most helpful comment
yea this could be useful but preferably the buffer should be either handled with
BytesMutor[u8; n]. I failed to make it work on this example so I can give it another try and make a PR to examples then.