I see in the old deno repo there was a PR in which @ry suggested SSE could instead be implemented outside of std:
https://github.com/denoland/deno_std/pull/91
However I don't think that is doable today because there is no way to flush (short of not using respond()/writeResponse() but writing the response out to req.w manually)
Is there another way I'm missing?
I guess one way is to monkey patch req.w to always flush:
const origWrite = req.w.write
req.w.write = async function (p: Uint8Array): Promise<number> {
const n = await origWrite.call(this, p)
await this.flush()
return n
}
Writing to req.w is the way to do this. You shouldn't need to monkey patch - if so we should fix that.
You cannot combine req.respond()/writeResponse() with writing to req.w manually. It works for std/ws because after the response is written, the protocol is effectively changed, and you take over from there. In case of SSE, you want a HTTP Transfer-Encoding: chunked response, but one without buffering. However writeResponse() does buffering. And you cannot use req.w afterwards, because after that the HTTP response is already finished.
I think the most appropriate solution would be to extend Response, e.g.:
export interface Response {
status?: number;
headers?: Headers;
body?: Uint8Array | Reader | string;
bodyBuffering?: boolean // default false
trailers?: () => Promise<Headers> | Headers;
}
or
export interface Response {
status?: number;
headers?: Headers;
body?: Uint8Array | Reader | string;
trailers?: () => Promise<Headers> | Headers;
options?: { buffering?: boolean }
}
or
export interface Response {
status?: number;
headers?: Headers;
body?: Uint8Array | Reader | RealtimeReader | string;
trailers?: () => Promise<Headers> | Headers;
}
or some other way to indicate that writeResponse() should flush data as soon as it becomes available on the Reader, even if byte by byte.
I don't like extending Response really, but see no other way.
Happy to prepare a PR if you let me know how you want to go about it.
Specifically, a flush is needed at the end of each iteration here:
https://github.com/denoland/deno/blob/4e3532fe7b61a1050b00611081cc83af8b02de70/std/http/io.ts#L167-L174
Created a PR for the second option, let me know your thoughts.
@keroxp @nayeemrmn Any thoughts about the above extension? (since you seem to have contributed a lot to std/http)
Is there currently an easy way to write an sse response? :-D
I am currently using the monkey patch version :-P
Thanks a lot
@auryn31 How are you performing the monkey patch? When I try the snippet above I get an error:
error: Uncaught BadResource: Bad resource ID
at unwrapResponse ($deno$/ops/dispatch_minimal.ts:63:11)
at Object.sendAsyncMinimal ($deno$/ops/dispatch_minimal.ts:106:10)
at async Object.write ($deno$/ops/io.ts:65:18)
at async BufWriter.flush (https://deno.land/std/io/bufio.ts:475:25)
at async BufWriter.req.w.write (file:///Users/joakimunge/dev/_labs/deno-server/mod.ts:82:9)
Code:
`
const origWrite = req.w.write;
req.w.write = async function (p: Uint8Array): Promise<number> {
const n = await origWrite.call(this, p);
await this.flush();
return n;
};
const encoder = new TextEncoder();
await req.w.write(encoder.encode(`event: "message"\nid: 0\ndata: "hello"\n`));
await req.w.write(encoder.encode("\n\n"));
`
@joakimunge thats interesting, i basically did the same thing, but without any issues:
function str2ab(str: string) {
var buf = new ArrayBuffer(str.length * 2);
var bufView = new Uint8Array(buf);
for (var i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i);
}
return bufView;
}
for await (const req of s) {
const origWrite = req.w.write
req.w.write = async function (p: Uint8Array): Promise<number> {
const n = await origWrite.call(this, p)
await this.flush()
return n
}
req.w.write(str2ab("Test\n"))
req.w.write(str2ab("Test\n"))
await req.w.write(str2ab("Test\n"))
req.conn.close()
}
Maybe this will help?
@joakimunge thats interesting, i basically did the same thing, but without any issues:
Maybe this will help?
Thanks. I was able to solve my error. But my client (the browser in this case) is not receiving the events. I'm trying to open a keep-alive connection between browser and server, but without much luck. Trying to write a header response without closing the connection
const res = {
status: 200,
headers: new Headers({
Connection: "keep-alive",
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*",
}),
};
const origWrite = req.w.write;
req.w.write = async function (p: Uint8Array): Promise<number> {
const n = await origWrite.call(this, p);
await this.flush();
return n;
};
const msg = str2ab(`${res}`);
await req.w.write(msg);
await req.w.write(str2ab(`\n\n`));
Any idea?
Hwo you read the stream from your browser? I am not sure, if you can read this, this easy 馃
At the terminal i have to read the stream like this:
curl http://localhost:8000 --output -
I have done some tutorials with spring, maybe this will help for your browser part?
@auryn31 Thanks for the tips. I will check it out. For now I decided to use WebSockets instead and it turned out pretty good! :)
yea, websockets are pretty nice, i use them often too, but sometimes sse have some advantages and have a little less overhead :-)
Yeah, I would prefer to use SSE since its a unidirectional pattern. But I will have to make do with WS for now :)
Thanks for all your help
For whoever is struggling with this, wrote a few functions that should make it easier
const encoder = new TextEncoder();
// This is the header part taken from writeResponse
function encodeHeader(res: {status: number, headers: Headers}) {
const protoMajor = 1;
const protoMinor = 1;
const statusCode = res.status || 200;
const statusText = STATUS_TEXT.get(statusCode);
let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;
const headers = res.headers ?? new Headers();
for (const [key, value] of headers) {
out += `${key}: ${value}\r\n`;
}
out += `\r\n`;
return encoder.encode(out);
}
// Writes and flushes the header
export async function setSSE(req: ServerRequest) {
const res = {
status: 200,
headers: new Headers({
Connection: "keep-alive",
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*",
})
};
await req.w.write(encodeHeader(res));
return req.w.flush();
}
// Writes and flushes an event
export async function writeSSE(event: string, data: string|object|any[], req: ServerRequest) {
let result = `event: change\n`;
if (typeof data == 'string') {
result += `data: ${data}\n\n`;
} else {
result += `data: ${JSON.stringify(data)}\n\n`;
}
await req.w.write(encoder.encode(result));
return req.w.flush();
}
These can then be used to send SSE just like this:
await setSSE(req);
await writeSSE('example_event', 'some data1', req);
await writeSSE('example_event', 'some data2', req);
This should keep the connection open after writing the headers
Just as you were trying to do @joakimunge
For reference, SSE is possible to implement outside of std. Here is an example I build in oak: https://github.com/oakserver/oak/blob/main/server_sent_event.ts. The user needs to not use request.response() and manage writing to the .w directly. It doesn't require any changes to std/http to accomplish.
@kitsonk
Thank you for posting this here!
The implementation I send was originally made to use with oak, but it's cool to see that it is now build in :smile:
Think this issue can be closed though right? Or maybe changed but it is definitely possible to do SSE.
Might be nice if this was doable without having to encoding and write the headers manually...
@GitHubJasper thanks, your solution works quite well. But how do you safely handle a client disconnect?
Without any treatment of this issue Deno throws error: Uncaught (in promise) BrokenPipe: Broken pipe (os error 32) the next time the flush method gets called after the client has disconnected.
Even more so I did not succeed in catching the error when calling flush with try... catch, which seems strange to me and might be an issue in Deno itself.
Closing as the original question has been answered.
@HappyStriker
I am not sure how I handled that error, though it does sound familiar...
Maybe check the implementation of oak as that does work fine for me now
@HappyStriker
I am not sure how I handled that error, though it does sound familiar...
Maybe check the implementation of oak as that does work fine for me now
I tried replicating the solution of oak, which was the exact same way I tried, to solve it. Unfortunately I run into the above mentioned situation where the error is not catchable and Deno crashes, when the client disconnects 馃槵
@HappyStriker
Hm that is unfortunate, I sadly don't have the code I used for this (as I now just use the one provided in oak)
So I don't remember how I solved it...
However, I do think it was related to flushing/writing.
Think it could happen that the totalBytesWritten were different from the numBytesWritten.
Though I am not sure how exactly, it might be useful to print these values out and see if that is the cause.
If you are using vs code, you can just go to the definition of the write and flush functions and edit them.
Sorry that I can't help you much further, hopefully you can get to a fix!
Most helpful comment
For reference, SSE is possible to implement outside of
std. Here is an example I build in oak: https://github.com/oakserver/oak/blob/main/server_sent_event.ts. The user needs to not userequest.response()and manage writing to the.wdirectly. It doesn't require any changes tostd/httpto accomplish.