I've clustered my socket.io server and I'm looking for a way to emit to a specific client and receiving a acknowledgment callback. The problem that the server doesn't have a reference to a instance of the target client, because this client can be connected to another socket.io server instance (because it's clustered).
I know I can emit to a room named after the the socket.id or a custom room where I put the client in. But the problem is that this is considered broadcasting and then I can't use a acknowledgment callback.
I don't want to add complexity to the target client. The target client should just be able to call the callback.
I see two possible solutions:
A fake socket instance wouldn't actually have a connection to a client, but I hope to use it's emit function, have it talk to it's adapter (socket.io-redis) and have it receive the acknowledgment callback.
Any tips on these solutions, maybe something I should also consider.
@FREEZX has been implementing the functionality to receive clients in rooms (even over multiple node instances, see:
https://github.com/Automattic/socket.io/pull/1630
https://github.com/Automattic/socket.io-adapter/pull/5
https://github.com/Automattic/socket.io-redis/pull/15
But even then I need a way to emit to a specific client and get my callback called.
Guille: Update: we're making a minor release 1.1.1 with bugfixes, and .clients(fn) should be in the next one 1.2.0
Every client joins a room with his socket id as a key. You could emit to that room if you know the id of the client's socket (you do if you're using my redis adapter with the clients function).
However be careful with it because it doesn't clean up the data at exit.
True, I understand this, but my main issue is that the callback mechanism won't work when you emit in rooms.
One, quite complex workaround:
One thing I should clarify, I don't want to add complexity to the target client. The target client should just be able to call the callback.
You could implement a redis pubsub to do IPC between nodes, so whichever node receives the acknowledgement can publish a message on the IPC channel and the correct node could handle it as an acknowledgement callback
This avoids cluttering the sockets, and IPC will probably also be needed in the future.
True, but I'm not sure if that's easier. That would work something like this I think:
When subscribing, unsubscribing is inefficient I could also store the callbacks in an object under the unique event id.
You don't have to temporarily subscribe/unsubscribe. Just handle the events if the client is in an array or something
Alright, that would work something like:
Of course, one day I hope to simply be able to request the clients from a room and emit to a specific client (possibly on another node.js instance) (using a callback). @FREEZX has already made step one possible.
You can avoid targetedevent IPC calls, because you could emit that from any process (to the room with the socket's ID as a name). Socket.io automatically decides which server has the socket. So that's one less IPC event to listen for.
I was curious how difficult it would be to emit to a specific client (possibly on another node.js instance) using a callback.
Looking through the code socket.io, when you call emit on a specific client it's not passed to the adapter, it's passed straight into engine.io instance. The adapter also has no way to receive a acknowledgment event/packet, basically the adapter only get's input by his pub/sub channel and when it's broadcast method is called.
So for this to happen adapters should also handle general packets (not just broadcasts). The socket module's packet
method would need to call something like a adapter's packet
method, the adapter should pub/sub this to all instances, and the adapters in the other instances would need to call the onpacket
of the socket module. This might allow multiple node.js instances to create socket objects that communicate to the same client. Servers also need another way to create socket objects, not only by receiving them through a new connection, but also manually with a socket id.
@FREEZX, but then there is no way to add a callback? Or am I forgetting something?
Add the callback to an array of your choice, where the key is the client id and then broadcast to the client's room. The client responds back to a process of your cluster that emits the ack message via redis IPC. The node that originally sent the message will have the callback stored in the callbacks array and will call it.
Your help is greatly appreciated but I don't follow the "The client responds back to a process of your cluster" part.
I think I got the basic idea working, using the following utility. It's a pub/sub channel wrapper with a callback feature.
// Redis pub/sub channel wrapper
// Messages are serialized to JSON by default, so you can send regular objects across the wire.
var redis = require('redis');
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var debug = require('debug')('redis-channel');
var EVENT_MSG = "event";
var ACK_MSG = "ack";
var callbackIDCounter = 0;
var callbacks = {};
module.exports = Channel;
function Channel(name,port,host) {
if (!(this instanceof Channel)) return new Channel(name,port,host);
var _self = this;
var pub = redis.createClient(port,host);
var sub = redis.createClient(port,host);
pub.on('error',onRedisError);
sub.on('error',onRedisError);
function onRedisError(err){
_self.emit("error",err);
}
sub.subscribe(name,function(err){
debug("subsribed to channel");
if(err !== null) _self.emit("error",err);
});
sub.on("message", function (channel, packet) {
if(channel !== name) return;
packet = JSON.parse(packet);
debug("received packet: ",packet);
var data = packet.data;
var callbackID = packet.id;
switch(packet.type){
case EVENT_MSG:
// ToDo: emit all arguments from data
data.unshift("message"); // add event type in front
data.push(eventCallback); // add callback to end
_self.emit.apply(_self,data);
//_self.emit("message",message,eventCallback);
function eventCallback() {
if(callbackID === undefined) {
return _self.emit("error","No callback defined");
}
var args = Array.prototype.slice.call(arguments);
var ackPacket = {type:ACK_MSG,
data:args,
id: callbackID};
debug("publishing ack packet: ",ackPacket);
pub.publish(name,JSON.stringify(ackPacket),function(err){
if(err !== null) _self.emit("error",err);
});
}
break;
case ACK_MSG:
if(typeof callbacks[callbackID] == 'function'){
//callbacks[callbackID](message);
callbacks[callbackID].apply(this, packet.data);
delete callbacks[callbackID];
}
break;
}
});
this.publish = function() {
var args = Array.prototype.slice.call(arguments);
var packet = {type:EVENT_MSG,
data:args};
// is there a callback function?
if(typeof args[args.length - 1] == 'function') {
packet.id = callbackIDCounter++;
callbacks[packet.id] = packet.data.pop();
}
debug("publishing packet: ",packet);
pub.publish(name,JSON.stringify(packet),function(err){
if(err !== null) _self.emit("error",err);
});
};
}
util.inherits(Channel, EventEmitter);
Experiment server:
'use strict';
var debug = require('debug')('clustering:server');
var express = require('express');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var socketIORedis = require('socket.io-redis');
var redisHost = 'localhost';
var redisPort = 6379;
var redisChannel = require('./util/redis-channel')('pub/sub-ack experiment',redisPort,redisHost);
var PORT = process.env.PORT ? process.env.PORT : 7000;
io.adapter(socketIORedis({ key: 'socket.io-redis-experiment'}));
http.listen(PORT, function(){
debug('server listening on *:' + PORT);
});
var nsp = io.of("/");
var targetSocket = null;
nsp.on('connection', function(socket){
var query = socket.handshake.query;
debug('new connection: '+socket.id,query.target||'');
if(query.type === "target") {
targetSocket = socket;
redisChannel.on("message", function (message,callback) {
debug("received hello pub/sub message: ",message,callback);
debug("emitting hello to target");
targetSocket.emit(message,callback);
});
} else {
socket.on('hello', function(data,callback){
debug('received hello event: ',data,callback);
// can't emit to targetSocket directly so
// we use a redis pub/sub channel
debug("publishing hello pub/sub message");
redisChannel.publish("hello",callback);
});
}
socket.on('disconnect', function(){
debug('disconnect: '+socket.id);
});
});
Experiment client:
'use strict';
var debug = require('debug')('clustering:client');
var socketClient = require('socket.io-client');
var PORT = process.env.PORT ? process.env.PORT : 7000;
var TYPE = process.env.TYPE ? process.env.TYPE : '';
var nspName = "/";
var nspURL = "http://localhost:"+PORT+nspName+"?type="+TYPE;
debug("connecting to: "+nspName);
var nsp = socketClient(nspURL);
nsp.once('connect', function(){
debug("connected to: "+nspName);
if(TYPE !== "target") {
nsp.emit("hello",{},function(data) {
debug("received hello response: ",data);
});
}
nsp.on("hello",function(callback) {
debug("received hello! ",callback);
callback("client on port "+PORT);
});
});
nsp.on('error', function (err){
debug("error connecting to: "+nspName+": "+err);
//callback(err,nsp);
});
I published my pub/sub wrapper as an npm package.
https://www.npmjs.org/package/redis-pubsubber
What i meant with "The client responds back to a process of your cluster":
The client can be connected to any cluster worker, and only sends messages to that one worker.
@freezx your solution with the array makes a ton of sense! Im wondering if this is the mechanism you were going to use behind the scenes in 1.2? Im only asking because im litterally migrating my site as we speek to a more scalable solution and need to be able to implement the callback. Would this also work with the socket.io-emitter package if the server registered to the same redis channel?
Do you have an idea of timeframe? I want to determine if I should wait for your release or move forward with the above solution.
Thanks!
IPC has to be done manually at the moment, with arrays as i said. I am not an official maintainer so i cannot say whether or not IPC will get added and if so, when.
The given solution is not too complicated for implementing, so you should probably do it.
As for the socket.io-emitter question, i do not know, you should try it out and see if it works as you think.
Has this scenario been solved using socket.io, socket.io-redis, and socket.io-emitter?
Most helpful comment
Has this scenario been solved using socket.io, socket.io-redis, and socket.io-emitter?