Hello,
Rocket version : 0.4.5
I'm new with Rust and Rocket, so sorry if my question sounds stupid.
I'm used to stream large JSON with Golang and Echo like this:
package main
import (
"net/http"
"github.com/labstack/echo/v4"
)
func main() {
e := echo.New()
e.GET("/", func(c echo.Context) error {
resp := c.Response()
resp.WriteHeader(http.StatusOK)
if _, err := io.WriteString(resp, "["); err != nil {
return err
}
enc := json.NewEncoder(resp)
for i := 0; i < 100000; i++ {
if i > 0 {
if _, err := io.WriteString(resp, ","); err != nil {
return err
}
}
user := User{
Username: "My username_" + strconv.Itoa(i),
Password: "gjgjghjggjhg7884erhkhjb;lljjkbhjvftxe!hjj猫khkljkbhft",
Lastname: "A looooong lastname",
Firstname: "A looooong firstname",
}
if err := enc.Encode(user); err != nil {
return err
}
}
if _, err := io.WriteString(resp, "]"); err != nil {
return err
}
return nil
})
e.Logger.Fatal(e.Start(":1323"))
}
I read the streaming section in Rocket documentation, but i definitely do not see how to do with a JSON.
How can i do this with Rocket?
Thanks
Unfortunately there isn't an easy one-to-one translation of this code, because at the end of the day Rocket wants the data body to implement Read (see e.g. streamed_body). You'll have to implement Responder to set one of these bodies, and there are a few different kinds of choices:
Read as a state machine manually. Self-contained, but usually more difficult to implement and maintain.Read, possibly based on one of these approaches. I haven't used such a crate before, so I won't make any specific recommendations. If a good one is out there, this would be my preferred choice for an application.Here is some (untested!) partial code as an example of the state machine approach.
struct UsersStream {
state: State,
users: Vec<User>,
pos: 0,
pending: Cursor<Vec<u8>>,
}
enum State { Header, Users, Trailer, Done }
impl Read for UsersStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
// first, try to read any unfinished data from the buffer
match self.pending.read(buf) {
// end of buffer; need to get more data
Ok(0) => (),
Ok(n) => return Ok(n),
Err(e) => return Err(e),
};
// determine the next data to read
match self.state {
State::Header => {
self.pending = Cursor::new(vec![b'[']);
self.state = State::Users;
}
State::Users => {
// encode the next user
match self.users.get(self.pos) {
Some(user) => {
let mut bytes = vec![b','];
encode_user_to_bytes(user, &mut bytes);
self.pos += 1;
self.pending = Cursor::new(bytes);
}
None => self.state = State::Trailer,
}
}
State::Trailer => {
self.pending = Cursor::new(vec![b']']);
self.state = State::Done;
}
State::Done => return Ok(0),
}
}
}
}
Hi @jebrosen,
Thanks for your response.
I try your solution but i do not know how implement Responder trait for UserStream.
use rocket::http::ContentType;
use rocket::request::Request;
use rocket::response::{self, Responder, Response};
use serde::Serialize;
use std::io::{Cursor, Read};
#[derive(Serialize, Debug)]
pub struct User {
pub id: u32,
pub lastname: String,
pub firstname: String,
}
#[derive(Debug)]
pub struct UsersStream {
pub state: State,
pub users: Vec<User>,
pub pos: usize,
pub pending: Cursor<Vec<u8>>,
}
#[derive(Debug)]
pub enum State {
Header,
Users,
Trailer,
Done,
}
impl Read for UsersStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
// first, try to read any unfinished data from the buffer
match self.pending.read(buf) {
// end of buffer; need to get more data
Ok(0) => (),
Ok(n) => return Ok(n),
Err(e) => return Err(e),
};
// determine the next data to read
match self.state {
State::Header => {
self.pending = Cursor::new(vec![b'[']);
self.state = State::Users;
}
State::Users => {
// encode the next user
match self.users.get(self.pos) {
Some(user) => {
let mut bytes = vec![b','];
bytes.append(&mut serde_json::to_vec(user).unwrap());
self.pos += 1;
self.pending = Cursor::new(bytes);
}
None => self.state = State::Trailer,
}
}
State::Trailer => {
self.pending = Cursor::new(vec![b']']);
self.state = State::Done;
}
State::Done => return Ok(0),
}
}
}
}
impl<'r> Responder<'r> for UsersStream {
fn respond_to(self, _: &Request) -> response::Result<'r> {
Response::build()
.sized_body(self.pending)
.header(ContentType::new("application", "json"))
.ok()
}
}
#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
let mut v: Vec<User> = Vec::new();
for i in 0..100_000 {
v.push(User {
id: i,
lastname: "My lastname".to_owned(),
firstname: String::from("My firstname"),
});
}
Ok(UsersStream {
state: State::Header,
users: v,
pos: 0,
pending: Cursor::new(vec![]),
})
}
Sorry, i'm still newby with rust 馃槥
This looks almost right, except that you want .sized_body(self) -- self is a UsersStream, which is the type that implements Read.
Hi @jebrosen,
I tried this:
impl<S: Seek + Write> Seek for BufStream<S> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.inner.seek(pos)
}
}
But i had this error:
.sized_body(self)
^^^^ the trait `std::io::Seek` is not implemented for `handlers::UsersStream`
So i implemented the sdt::io::Seek trait like this:
impl Seek for UsersStream {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.pending.seek(pos)
}
}
It compiles but curl http://localhost:8000/big-json-stream returns nothing :disappointed:
Here all the code:
use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;
#[derive(Serialize, Debug)]
pub struct User {
pub id: u32,
pub lastname: String,
pub firstname: String,
}
#[derive(Debug)]
pub struct UsersStream {
pub state: State,
pub users: Vec<User>,
pub pos: usize,
pub pending: Cursor<Vec<u8>>,
}
#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }
impl Read for UsersStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
// first, try to read any unfinished data from the buffer
match self.pending.read(buf) {
// end of buffer; need to get more data
Ok(0) => (),
Ok(n) => return Ok(n),
Err(e) => return Err(e),
};
// determine the next data to read
match self.state {
State::Header => {
self.pending = Cursor::new(vec![b'[']);
self.state = State::Users;
}
State::Users => {
// encode the next user
match self.users.get(self.pos) {
Some(user) => {
let mut bytes = vec![b','];
bytes.append(&mut serde_json::to_vec(user).unwrap());
self.pos += 1;
self.pending = Cursor::new(bytes);
}
None => self.state = State::Trailer,
}
}
State::Trailer => {
self.pending = Cursor::new(vec![b']']);
self.state = State::Done;
}
State::Done => return Ok(0),
}
}
}
}
impl<'r> Responder<'r> for UsersStream {
fn respond_to(self, _: &Request) -> response::Result<'r> {
Response::build()
.sized_body(self)
.header(ContentType::new("application", "json"))
.ok()
}
}
impl Seek for UsersStream {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.pending.seek(pos)
}
}
#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
let mut v: Vec<User> = Vec::new();
for i in 0..100_000 {
v.push(User {
id: i,
lastname: "My lastname".to_owned(),
firstname: String::from("My firstname"),
});
}
Ok(UsersStream{
state: State::Header,
users: v,
pos: 0,
pending: Cursor::new(vec![]),
})
}
Whoops. You can't use sized_body for this, since you don't know the size of the body up front. I should have suggested .streamed_body(self), and you also don't need that Seek impl.
@jebrosen, thank you so much for your help, it works perfectly :tada:
Here the final code:
use serde::Serialize;
use std::io::{Cursor, Read, Seek, SeekFrom};
use serde_json;
use rocket::request::Request;
use rocket::response::{self, Response, Responder};
use rocket::http::ContentType;
#[derive(Serialize, Debug)]
pub struct User {
pub id: u32,
pub lastname: String,
pub firstname: String,
}
#[derive(Debug)]
pub struct UsersStream {
pub state: State,
pub users: Vec<User>,
pub pos: usize,
pub pending: Cursor<Vec<u8>>,
}
#[derive(Debug)]
pub enum State { Header, Users, Trailer, Done }
impl Read for UsersStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
// first, try to read any unfinished data from the buffer
match self.pending.read(buf) {
// end of buffer; need to get more data
Ok(0) => (),
Ok(n) => return Ok(n),
Err(e) => return Err(e),
};
// determine the next data to read
match self.state {
State::Header => {
self.pending = Cursor::new(vec![b'[']);
self.state = State::Users;
}
State::Users => {
// encode the next user
match self.users.get(self.pos) {
Some(user) => {
let mut bytes = vec![b','];
bytes.append(&mut serde_json::to_vec(user).unwrap());
self.pos += 1;
self.pending = Cursor::new(bytes);
}
None => self.state = State::Trailer,
}
}
State::Trailer => {
self.pending = Cursor::new(vec![b']']);
self.state = State::Done;
}
State::Done => return Ok(0),
}
}
}
}
impl<'r> Responder<'r> for UsersStream {
fn respond_to(self, _: &Request) -> response::Result<'r> {
Response::build()
.streamed_body(self)
.header(ContentType::new("application", "json"))
.ok()
}
}
#[get("/big-json-stream")]
pub fn big_json_stream() -> Result<UsersStream, ()> {
let mut v: Vec<User> = Vec::new();
for i in 0..100_000 {
v.push(User {
id: i,
lastname: "My lastname".to_owned(),
firstname: String::from("My firstname"),
});
}
Ok(UsersStream{
state: State::Header,
users: v,
pos: 0,
pending: Cursor::new(vec![]),
})
}
Can i create a PR to add this case in the examples folder?
The code would need some changes to work with async, in particular AsyncRead instead of Read - which might make it a little bit more tricky because of pinning and Poll. I have a long backlog so it might be a bit before I get back to this issue, but I did write something similar for Server-Sent Events that wraps futures::stream::Stream (https://git.jebrosen.com/jeb/rocket-rooms/src/commit/731bca81912176e96b6976eb7a8de579c7b99c51/src/sse.rs) and might be helpful as a reference.
OK :)
AsyncRead will come with Rocket 0.5?
I will try to adapt this code to work with AsynRead.
May i close the issue?
@fabienbellanger It's already in master. Let's keep this use-case in mind when resolving #33.
Most helpful comment
@jebrosen, thank you so much for your help, it works perfectly :tada:
Here the final code:
Can i create a PR to add this case in the
examplesfolder?