koa streaming with eventsource... how to?

Created on 23 Oct 2015  路  4Comments  路  Source: koajs/koa

I'm going to rewrite my ExpressJS app in Koa.

The app streams data (used for graphs and keeping changes between multiple clients in sync) the simplest way I could come up with.

Server-side:

app.get('/json/data/:source', function(req, res) {
    var reader = db.createReader(req.params.source);

    res.writeHead(200, { 'Content-Type': 'text/event-stream; charset=utf-8' });
    res.write(": begin streaming\n");

    var send = function(row) {
        res.write("data: " + JSON.stringify(row) + "\n\n");
    };

    db.on('data', send);
});

Client-side:

var stream = new EventSource('/json/data/' + name, { withCredentials: true });
stream.onmessage = function(line) {
    var json = JSON.parse(line.data);
    process_data(name, json);
};

Works absolutely amazing, and very simple to implement.

Documentation says I'm not allowed to use response.write() anymore, so my question is, how do I rewrite code that do JSON streaming towards the browser's EventSource API into the new Koa syntax? Is there an easy (or a possible) way?

Apologies beforehand if this is a stupid question!

(I'm being a little lazy here and considering giving up before starting if it's not possible yet...)

documentation question

Most helpful comment

We have something along these lines in our live debugger (kudos to @juliangruber and @tj):

const PassThrough = require('stream').PassThrough;
const router = require('koa-66')();
const app = require('koa')();

// ...

router.get(`/json/data/:source`, ctx => {
  const stream = new PassThrough();
  const { source } = ctx.params;
  const { db } = app.context; // or whatever this lives :)
  const reader = db.createReader(source); 
  const send = (data) => {
    let { type, msg } = data; // or whatever this obj looks like :)
    stream.write(sse(type, msg));
  }

  reader.on('data', send);
  ctx.req.on('close', ctx.res.end());
  ctx.req.on('finish', ctx.res.end());
  ctx.req.on('error', ctx.res.end());
  ctx.type = 'text/event-stream';
  ctx.body = stream;
});

const sse = (event, data) => {
  return `event:${ event }\ndata: ${ data }\n\n`
}

app.use(router.routes());
app.listen(3000);

I'm sure i made some dumb mistakes in generalizing and porting over to the koa 2 api / idioms, so feel free to call them out :)

All 4 comments

Hey! You can assign this.body to an SSE stream and then send data to that, which has the nice side-effect or reducing the amount of non-routing logic into a route since it has to be a stream otherwise it becomes awkward

Hmm okay, so I need to create a new object that util.inherit()s from stream.Readable and every time read() is called on that, just pull a row array from the database, convert it to a newline-free JSON string, convert that into a Buffer and return it..

Then assign the this.body property with the new stream object.

Sound right?

We have something along these lines in our live debugger (kudos to @juliangruber and @tj):

const PassThrough = require('stream').PassThrough;
const router = require('koa-66')();
const app = require('koa')();

// ...

router.get(`/json/data/:source`, ctx => {
  const stream = new PassThrough();
  const { source } = ctx.params;
  const { db } = app.context; // or whatever this lives :)
  const reader = db.createReader(source); 
  const send = (data) => {
    let { type, msg } = data; // or whatever this obj looks like :)
    stream.write(sse(type, msg));
  }

  reader.on('data', send);
  ctx.req.on('close', ctx.res.end());
  ctx.req.on('finish', ctx.res.end());
  ctx.req.on('error', ctx.res.end());
  ctx.type = 'text/event-stream';
  ctx.body = stream;
});

const sse = (event, data) => {
  return `event:${ event }\ndata: ${ data }\n\n`
}

app.use(router.routes());
app.listen(3000);

I'm sure i made some dumb mistakes in generalizing and porting over to the koa 2 api / idioms, so feel free to call them out :)

Perfect, thanks!

Was this page helpful?
0 / 5 - 0 ratings