Socket.io: Target specific client on clustered socket.io server and using acknowledgment callback

Created on 2 Oct 2014  ·  20Comments  ·  Source: socketio/socket.io

I've clustered my socket.io server and I'm looking for a way to emit to a specific client and receiving a acknowledgment callback. The problem that the server doesn't have a reference to a instance of the target client, because this client can be connected to another socket.io server instance (because it's clustered).
I know I can emit to a room named after the the socket.id or a custom room where I put the client in. But the problem is that this is considered broadcasting and then I can't use a acknowledgment callback.
I don't want to add complexity to the target client. The target client should just be able to call the callback.

I see two possible solutions:

A fake socket instance wouldn't actually have a connection to a client, but I hope to use it's emit function, have it talk to it's adapter (socket.io-redis) and have it receive the acknowledgment callback.

Any tips on these solutions, maybe something I should also consider.

Most helpful comment

Has this scenario been solved using socket.io, socket.io-redis, and socket.io-emitter?

All 20 comments

@FREEZX has been implementing the functionality to receive clients in rooms (even over multiple node instances, see:
https://github.com/Automattic/socket.io/pull/1630
https://github.com/Automattic/socket.io-adapter/pull/5
https://github.com/Automattic/socket.io-redis/pull/15
But even then I need a way to emit to a specific client and get my callback called.

Guille: Update: we're making a minor release 1.1.1 with bugfixes, and .clients(fn) should be in the next one 1.2.0

Every client joins a room with his socket id as a key. You could emit to that room if you know the id of the client's socket (you do if you're using my redis adapter with the clients function).
However be careful with it because it doesn't clean up the data at exit.

True, I understand this, but my main issue is that the callback mechanism won't work when you emit in rooms.

One, quite complex workaround:

  1. Server B which has the target socket, is also a client itself and joined the target room
  2. Server A is also a client
  3. Server A broadcasts event over target room and defines callback function
  4. Server A starts listening to {event}-ack
  5. Server B receives event as client
  6. Server B emit’s event to target socket
  7. Server B receives callback
  8. Server B broadcasts {event}-ack
  9. Server A receives {event}-ack as client
  10. Server A stops listening to {event}-ack
  11. Server A calls callback

One thing I should clarify, I don't want to add complexity to the target client. The target client should just be able to call the callback.

You could implement a redis pubsub to do IPC between nodes, so whichever node receives the acknowledgement can publish a message on the IPC channel and the correct node could handle it as an acknowledgement callback

This avoids cluttering the sockets, and IPC will probably also be needed in the future.

True, but I'm not sure if that's easier. That would work something like this I think:

  1. All servers subscribe to a “targetedevent” pub/sub message.
  2. Server A publishes a "targetedevent" with the event type, data, channel and a unique event id
  3. Server A temporarily also subsribes to the “targetedevent-ack” pub/sub message.
  4. All servers receive the "targetedevent"
  5. if one has the target client they emit it and they wait for a callback
  6. when the callback is called they publish a “targetedevent-ack” with the response data and the unique id
  7. Server A receives the “targetedevent-ack”
  8. Server A checks the unique id
  9. It calls the original callback
  10. It unsubsribes from “targetedevent-ack”

When subscribing, unsubscribing is inefficient I could also store the callbacks in an object under the unique event id.

You don't have to temporarily subscribe/unsubscribe. Just handle the events if the client is in an array or something

Alright, that would work something like:

  1. All servers subscribe to “targetedevent” and “targetedevent-ack” pub/sub messages.
  2. Server A publishes a targetedevent with the event type, data, channel and a unique message id
  3. Server A stores a callback in a object under the unique message id (this could be done per channel)
  4. All servers receive the “targetedevent”
  5. If one server has the target client they emit it to this client and they wait for a callback
  6. When a callback is called they publish “targetedevent-ack” with the response data and the unique message id
  7. All servers receive the “targetedevent-ack”
  8. Server A finds the callback under the unique id
  9. It calls the original callback
  10. It removes the callback from memory

Of course, one day I hope to simply be able to request the clients from a room and emit to a specific client (possibly on another node.js instance) (using a callback). @FREEZX has already made step one possible.

You can avoid targetedevent IPC calls, because you could emit that from any process (to the room with the socket's ID as a name). Socket.io automatically decides which server has the socket. So that's one less IPC event to listen for.

I was curious how difficult it would be to emit to a specific client (possibly on another node.js instance) using a callback.
Looking through the code socket.io, when you call emit on a specific client it's not passed to the adapter, it's passed straight into engine.io instance. The adapter also has no way to receive a acknowledgment event/packet, basically the adapter only get's input by his pub/sub channel and when it's broadcast method is called.

So for this to happen adapters should also handle general packets (not just broadcasts). The socket module's packet method would need to call something like a adapter's packet method, the adapter should pub/sub this to all instances, and the adapters in the other instances would need to call the onpacket of the socket module. This might allow multiple node.js instances to create socket objects that communicate to the same client. Servers also need another way to create socket objects, not only by receiving them through a new connection, but also manually with a socket id.

@FREEZX, but then there is no way to add a callback? Or am I forgetting something?

Add the callback to an array of your choice, where the key is the client id and then broadcast to the client's room. The client responds back to a process of your cluster that emits the ack message via redis IPC. The node that originally sent the message will have the callback stored in the callbacks array and will call it.

Your help is greatly appreciated but I don't follow the "The client responds back to a process of your cluster" part.

I think I got the basic idea working, using the following utility. It's a pub/sub channel wrapper with a callback feature.

// Redis pub/sub channel wrapper
// Messages are serialized to JSON by default, so you can send regular objects across the wire. 
var redis = require('redis');
var util = require("util");
var EventEmitter = require("events").EventEmitter;
var debug = require('debug')('redis-channel');
var EVENT_MSG = "event";
var ACK_MSG = "ack";
var callbackIDCounter = 0;
var callbacks = {};
module.exports = Channel;

function Channel(name,port,host) {
  if (!(this instanceof Channel)) return new Channel(name,port,host);

  var _self = this;
  var pub = redis.createClient(port,host);
  var sub = redis.createClient(port,host);
  pub.on('error',onRedisError);
  sub.on('error',onRedisError);
  function onRedisError(err){
    _self.emit("error",err);
  }

  sub.subscribe(name,function(err){
    debug("subsribed to channel");
    if(err !== null) _self.emit("error",err);
  });

  sub.on("message", function (channel, packet) {
    if(channel !== name) return;
    packet = JSON.parse(packet);
    debug("received packet: ",packet);
    var data = packet.data;
    var callbackID = packet.id;
    switch(packet.type){
      case EVENT_MSG:
        // ToDo: emit all arguments from data
        data.unshift("message"); // add event type in front
        data.push(eventCallback); // add callback to end
        _self.emit.apply(_self,data);
        //_self.emit("message",message,eventCallback);

        function eventCallback() {
          if(callbackID === undefined) {
            return _self.emit("error","No callback defined");
          }
          var args = Array.prototype.slice.call(arguments);
          var ackPacket = {type:ACK_MSG,
                          data:args,
                          id: callbackID};
          debug("publishing ack packet: ",ackPacket);
          pub.publish(name,JSON.stringify(ackPacket),function(err){
            if(err !== null) _self.emit("error",err);
          });
        }


        break;
      case ACK_MSG:
        if(typeof callbacks[callbackID] == 'function'){
          //callbacks[callbackID](message);
          callbacks[callbackID].apply(this, packet.data);
          delete callbacks[callbackID];
        }
        break;
    }
  });

  this.publish = function() {
    var args = Array.prototype.slice.call(arguments);
    var packet = {type:EVENT_MSG,
                  data:args};
    // is there a callback function?
    if(typeof args[args.length - 1] == 'function') {
      packet.id = callbackIDCounter++;
      callbacks[packet.id] = packet.data.pop();
    }
    debug("publishing packet: ",packet);
    pub.publish(name,JSON.stringify(packet),function(err){
      if(err !== null) _self.emit("error",err);
    });
  };
}
util.inherits(Channel, EventEmitter);

Experiment server:

'use strict';

var debug = require('debug')('clustering:server');

var express = require('express');
var app = express();
var http = require('http').Server(app);
var io = require('socket.io')(http);
var socketIORedis = require('socket.io-redis');

var redisHost = 'localhost';
var redisPort = 6379;

var redisChannel = require('./util/redis-channel')('pub/sub-ack experiment',redisPort,redisHost);

var PORT = process.env.PORT ? process.env.PORT : 7000;

io.adapter(socketIORedis({ key: 'socket.io-redis-experiment'}));

http.listen(PORT, function(){
  debug('server listening on *:' + PORT);
});

var nsp = io.of("/");
var targetSocket = null;
nsp.on('connection', function(socket){
  var query = socket.handshake.query;
  debug('new connection: '+socket.id,query.target||'');

  if(query.type === "target") { 
    targetSocket = socket;
    redisChannel.on("message", function (message,callback) {
      debug("received hello pub/sub message: ",message,callback);
      debug("emitting hello to target");
      targetSocket.emit(message,callback);
    });
  } else {
    socket.on('hello', function(data,callback){
      debug('received hello event: ',data,callback);
      // can't emit to targetSocket directly so 
      // we use a redis pub/sub channel
      debug("publishing hello pub/sub message");
      redisChannel.publish("hello",callback);
    });
  }

  socket.on('disconnect', function(){
    debug('disconnect: '+socket.id);
  });
});

Experiment client:

'use strict';

var debug = require('debug')('clustering:client');
var socketClient = require('socket.io-client');

var PORT = process.env.PORT ? process.env.PORT : 7000;
var TYPE = process.env.TYPE ? process.env.TYPE : '';

var nspName = "/";
var nspURL = "http://localhost:"+PORT+nspName+"?type="+TYPE;
debug("connecting to: "+nspName);
var nsp = socketClient(nspURL);
nsp.once('connect', function(){
  debug("connected to: "+nspName);

  if(TYPE !== "target") {
    nsp.emit("hello",{},function(data) {
      debug("received hello response: ",data);
    });
  }

  nsp.on("hello",function(callback) {
    debug("received hello! ",callback);
    callback("client on port "+PORT);
  });
});
nsp.on('error', function (err){
  debug("error connecting to: "+nspName+": "+err);
  //callback(err,nsp);
});

I published my pub/sub wrapper as an npm package.
https://www.npmjs.org/package/redis-pubsubber

What i meant with "The client responds back to a process of your cluster":
The client can be connected to any cluster worker, and only sends messages to that one worker.

@freezx your solution with the array makes a ton of sense! Im wondering if this is the mechanism you were going to use behind the scenes in 1.2? Im only asking because im litterally migrating my site as we speek to a more scalable solution and need to be able to implement the callback. Would this also work with the socket.io-emitter package if the server registered to the same redis channel?

Do you have an idea of timeframe? I want to determine if I should wait for your release or move forward with the above solution.

Thanks!

IPC has to be done manually at the moment, with arrays as i said. I am not an official maintainer so i cannot say whether or not IPC will get added and if so, when.
The given solution is not too complicated for implementing, so you should probably do it.
As for the socket.io-emitter question, i do not know, you should try it out and see if it works as you think.

Has this scenario been solved using socket.io, socket.io-redis, and socket.io-emitter?

Was this page helpful?
0 / 5 - 0 ratings