Hi, how can I create a list of connected clients to actix websocket for message broadcasting?
Currently I'm using ws-rs library but I would like to switch to actix instead. This is my current code, I have no idea how to repace ws-rs with actix because actix chat server example is really complicated and I couldn't decipher it even after couple of hours of staring at it, guides at actix.rs doesn't help as well.
use std::sync::{ Arc, Mutex };
use actix_web::{ App, HttpRequest, server, fs };
use ws::Sender;
struct ChatState {
pub clients: Mutex<Vec<Sender>>
}
fn index(_req: &HttpRequest) -> actix_web::Result<fs::NamedFile> {
Ok(fs::NamedFile::open("index.html")?)
}
fn main() {
std::thread::spawn(|| {
let state = Arc::new(ChatState { clients: Mutex::new(vec![]) });
ws::listen("localhost:8001", |out| {
let state = state.clone();
{
let mut clients = state.clients.lock().unwrap();
clients.push(out.clone());
}
move |msg: ws::Message| {
let clients = state.clients.lock().unwrap();
for client in clients.iter() {
client.send(msg.clone()).unwrap();
}
Ok(())
}
}).unwrap();
});
server::new(|| {
App::new()
.resource("/", |r| r.f(index))
})
.bind("localhost:8000").unwrap()
.run();
}
Hi @Eilie - I don't quite have the time to write an example, but I maybe I can provide some hints. I'm working on something similar (but slightly more elaborate since I'm having to use redis to share state across multiple instances of my app).
If I were doing a single-node system, I might try it like this...
Websocket handlers in actix are based on the StreamHandler trait.
The started() and finished() methods could be used to push and pop an Addr of your actor in some global structure, perhaps a Mutex<Vec<Addr<T>>> defined inside of a lazy_static! block.
You can get an Addr of an actor using the address() method on the context.
I'm still figuring things out, but maybe this is a place to start.
@onelson Thank you for the hint! I have arrived at following code and got stuck again, how do I use
Addr<Ws> to send message to connected client?
use actix::*;
use std::sync::Mutex;
use actix_web::{ App, HttpRequest, server, fs, ws };
struct ChatState {
pub clients: Mutex<Vec<Addr<Ws>>>
}
struct Ws;
impl Actor for Ws {
type Context = ws::WebsocketContext<Self, ChatState>;
}
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
fn started(&mut self, ctx: &mut Self::Context) {
let mut clients = ctx.state().clients.lock().unwrap();
clients.push(ctx.address());
}
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => ctx.pong(&msg),
ws::Message::Text(text) => {
for client in ctx.state().clients.lock().unwrap().iter() {
// ???
};
},
ws::Message::Binary(bin) => ctx.binary(bin),
_ => (),
}
}
}
fn index(_req: &HttpRequest<ChatState>) -> actix_web::Result<fs::NamedFile> {
Ok(fs::NamedFile::open("index.html")?)
}
fn main() {
server::new(|| {
let state = ChatState { clients: Mutex::new(vec![]) };
App::with_state(state)
.resource("/", |r| r.f(index))
.resource("/ws/", |r| r.f(|req| ws::start(req, Ws)))
})
.bind("localhost:8000").unwrap()
.run();
}
In my case, I have a thread running elsewhere listening for redis pubsub messages, then I pipe them to the various connected clients if there's a topic match...
I'll try to cut this down to the bits relevant to you.
Here's the loop I run in a separate thread. It needs access to the Addr so it can pass a message via try_send()
/// Subscribes to a list of pubsub topics and pipes messages to a
/// web socket handler when received.
pub fn subscribe(topics: &[String], ws: Addr<Ws>, rx: mpsc::Receiver<()>) -> RedisResult<()> {
// ... snip ...
loop {
// more snips...
if let Ok(msg) = pipe.get_message() {
trace!("{}", msg.get_payload::<String>()?);
if let Err(e) = ws.try_send(RedisMsg(msg)) {
error!("Error pushing message to Ws Actor: `{}`", e);
}
}
}
Ok(())
}
RedisMsg is a newtype to wrap the struct I get back from the redis client, allowing me to implement Handler/Message related traits:
impl Message for RedisMsg {
type Result = Result<(), failure::Error>;
}
impl Handler<RedisMsg> for Ws {
type Result = Result<(), failure::Error>;
fn handle(&mut self, msg: RedisMsg, ctx: &mut Self::Context) -> Self::Result {
// ... snipping (building payload from `RedisMsg`)
ctx.text(serde_json::to_string(&payload)?);
Ok(())
}
}
Once we're inside the Handler implementation, we can use ctx.text() to send to the client.
The result type for the message and handler must be aligned.
@onelson That did it! Tank you a lot! :100: :+1:
Hi Ellie, I'm trying to use your actix websocket code to make a chatting system. Could I take a look at your code as of now? It seems I can't add ChatState to ws::WebSocket<> as it'll only take one argument. Also, it doesn't seem as though a with_state() method exists. I'm assuming this is because you were using old dependencies, do you have an updated one?
Most helpful comment
In my case, I have a thread running elsewhere listening for redis pubsub messages, then I pipe them to the various connected clients if there's a topic match...
I'll try to cut this down to the bits relevant to you.
Here's the loop I run in a separate thread. It needs access to the
Addrso it can pass a message viatry_send()RedisMsgis a newtype to wrap the struct I get back from the redis client, allowing me to implement Handler/Message related traits:Once we're inside the
Handlerimplementation, we can usectx.text()to send to the client.The result type for the message and handler must be aligned.