Rocket: How to stream a large JSON response?

Created on 4 Sep 2020  路  9Comments  路  Source: SergioBenitez/Rocket

Hello,

Rocket version : 0.4.5

I'm new with Rust and Rocket, so sorry if my question sounds stupid.

I'm used to stream large JSON with Golang and Echo like this:

package main

import (
    "net/http"

    "github.com/labstack/echo/v4"
)

func main() {
    e := echo.New()
    e.GET("/", func(c echo.Context) error {
        resp := c.Response()
        resp.WriteHeader(http.StatusOK)

        if _, err := io.WriteString(resp, "["); err != nil {
            return err
        }
        enc := json.NewEncoder(resp)
        for i := 0; i < 100000; i++ {
            if i > 0 {
                if _, err := io.WriteString(resp, ","); err != nil {
                    return err
                }
            }
            user := User{
                Username:  "My username_" + strconv.Itoa(i),
                Password:  "gjgjghjggjhg7884erhkhjb;lljjkbhjvftxe!hjj猫khkljkbhft",
                Lastname:  "A looooong lastname",
                Firstname: "A looooong firstname",
            }
            if err := enc.Encode(user); err != nil {
                return err
            }
        }
        if _, err := io.WriteString(resp, "]"); err != nil {
            return err
        }

        return nil
    })
    e.Logger.Fatal(e.Start(":1323"))
}

I read the streaming section in Rocket documentation, but i definitely do not see how to do with a JSON.

How can i do this with Rocket?

Thanks

question

Most helpful comment

@jebrosen, thank you so much for your help, it works perfectly :tada:

Here the final code:

use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .streamed_body(self)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream{
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Can i create a PR to add this case in the examples folder?

All 9 comments

Unfortunately there isn't an easy one-to-one translation of this code, because at the end of the day Rocket wants the data body to implement Read (see e.g. streamed_body). You'll have to implement Responder to set one of these bodies, and there are a few different kinds of choices:

  • Implement Read as a state machine manually. Self-contained, but usually more difficult to implement and maintain.
  • Create an in-memory pipe or channel, move the write end to a thread (or thread pool) and write to it in that new thread, then respond with the read end of the channel. One downside here is needing another thread, but the code should be easier to understand than the state machine (and more closely resembles the code you posted).
  • Use a crate that implements Read, possibly based on one of these approaches. I haven't used such a crate before, so I won't make any specific recommendations. If a good one is out there, this would be my preferred choice for an application.

Here is some (untested!) partial code as an example of the state machine approach.

struct UsersStream {
    state: State,
    users: Vec<User>,
    pos: 0,
    pending: Cursor<Vec<u8>>,
}

enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            encode_user_to_bytes(user, &mut bytes);
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

Hi @jebrosen,

Thanks for your response.
I try your solution but i do not know how implement Responder trait for UserStream.

use rocket::http::ContentType;
use rocket::request::Request;
use rocket::response::{self, Responder, Response};
use serde::Serialize;
use std::io::{Cursor, Read};

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State {
    Header,
    Users,
    Trailer,
    Done,
}

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .sized_body(self.pending)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream {
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Sorry, i'm still newby with rust 馃槥

This looks almost right, except that you want .sized_body(self) -- self is a UsersStream, which is the type that implements Read.

Hi @jebrosen,

I tried this:

impl<S: Seek + Write> Seek for BufStream<S> {
    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
        self.inner.seek(pos)
    }
}

But i had this error:

.sized_body(self)
            ^^^^ the trait `std::io::Seek` is not implemented for `handlers::UsersStream`

So i implemented the sdt::io::Seek trait like this:

impl Seek for UsersStream {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        self.pending.seek(pos)
    }
}

It compiles but curl http://localhost:8000/big-json-stream returns nothing :disappointed:

Here all the code:

use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .sized_body(self)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

impl Seek for UsersStream {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        self.pending.seek(pos)
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream{
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Whoops. You can't use sized_body for this, since you don't know the size of the body up front. I should have suggested .streamed_body(self), and you also don't need that Seek impl.

@jebrosen, thank you so much for your help, it works perfectly :tada:

Here the final code:

use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;

#[derive(Serialize, Debug)]
pub struct User {
    pub id: u32,
    pub lastname: String,
    pub firstname: String,
}

#[derive(Debug)]
pub struct UsersStream {
    pub state: State,
    pub users: Vec<User>,
    pub pos: usize,
    pub pending: Cursor<Vec<u8>>,
}

#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }

impl Read for UsersStream {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        loop {
            // first, try to read any unfinished data from the buffer
            match self.pending.read(buf) {
                // end of buffer; need to get more data
                Ok(0) => (),
                Ok(n) => return Ok(n),
                Err(e) => return Err(e),
            };

            // determine the next data to read
            match self.state {
                State::Header => {
                    self.pending = Cursor::new(vec![b'[']);
                    self.state = State::Users;
                }
                State::Users => {
                    // encode the next user
                    match self.users.get(self.pos) {
                        Some(user) => {
                            let mut bytes = vec![b','];
                            bytes.append(&mut serde_json::to_vec(user).unwrap());
                            self.pos += 1;
                            self.pending = Cursor::new(bytes);
                        }
                        None => self.state = State::Trailer,
                    }
                }
                State::Trailer => {
                    self.pending = Cursor::new(vec![b']']);
                    self.state = State::Done;
                }
                State::Done => return Ok(0),
            }
        }
    }
}

impl<'r> Responder<'r> for UsersStream {
    fn respond_to(self, _: &Request) -> response::Result<'r> {
        Response::build()
            .streamed_body(self)
            .header(ContentType::new("application", "json"))
            .ok()
    }
}

#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
    let mut v: Vec<User> = Vec::new();
    for i in 0..100_000 {
        v.push(User {
            id: i,
            lastname: "My lastname".to_owned(),
            firstname: String::from("My firstname"),
        });
    }
    Ok(UsersStream{
        state: State::Header,
        users: v,
        pos: 0,
        pending: Cursor::new(vec![]),
    })
}

Can i create a PR to add this case in the examples folder?

The code would need some changes to work with async, in particular AsyncRead instead of Read - which might make it a little bit more tricky because of pinning and Poll. I have a long backlog so it might be a bit before I get back to this issue, but I did write something similar for Server-Sent Events that wraps futures::stream::Stream (https://git.jebrosen.com/jeb/rocket-rooms/src/commit/731bca81912176e96b6976eb7a8de579c7b99c51/src/sse.rs) and might be helpful as a reference.

OK :)
AsyncRead will come with Rocket 0.5?
I will try to adapt this code to work with AsynRead.
May i close the issue?

@fabienbellanger It's already in master. Let's keep this use-case in mind when resolving #33.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

klnusbaum picture klnusbaum  路  4Comments

denysvitali picture denysvitali  路  3Comments

PSeitz picture PSeitz  路  3Comments

marceloboeira picture marceloboeira  路  3Comments

GoRustafari picture GoRustafari  路  3Comments