Wasm-bindgen: Using streams as event listener handlers

Created on 8 Jan 2019  路  6Comments  路  Source: rustwasm/wasm-bindgen

Hi,

i want to use streams for event handling.

let future = element.create_event_stream("click").for_each(|_| ...);
spawn_local(future); // https://github.com/rustwasm/wasm-bindgen/pull/1148
  • the goal is get something like http://reactivex.io/
  • but i also want compatibility to the rest of the ecosystem, thus i think it should be based on the futures lib
  • i would extend Stream by adding additional adapters for all kinds of functionality (eg debounce as shown on the ReactiveX website)
  • what is the best way to cancel such a stream? say i want to remove the event listeners from the element(s). Maybe by storing some sort of guard on the rust side by dropping it it would drop the future and all event stream sources causing the removal of event listeners from the element?

All 6 comments

@chpio Try this:

use wasm_bindgen::{JsValue, JsCast};
use wasm_bindgen::closure::Closure;
use web_sys::EventTarget;
use futures::Poll;
use futures::stream::Stream;
use futures::sync::mpsc::{unbounded, UnboundedReceiver};

pub struct EventStream<'a, A> {
    node: EventTarget,
    kind: &'a str,
    callback: Closure<Fn(JsValue)>,
    receiver: UnboundedReceiver<A>,
}

impl<'a, A> EventStream<'a, A> where A: JsCast + 'static {
    pub fn new(node: &EventTarget, kind: &'a str) -> Self {
        let (sender, receiver) = unbounded();

        let callback = Closure::wrap(Box::new(move |event: JsValue| {
            sender.unbounded_send(event.dyn_into().unwrap()).unwrap();
        }) as Box<Fn(JsValue)>);

        node.add_event_listener_with_callback(kind, callback.as_ref().unchecked_ref()).unwrap();

        Self {
            node: node.clone(),
            kind,
            callback,
            receiver,
        }
    }
}

impl<'a, A> Stream for EventStream<'a, A> {
    type Item = A;
    type Error = ();

    #[inline]
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.receiver.poll()
    }
}

impl<'a, A> Drop for EventStream<'a, A> {
    #[inline]
    fn drop(&mut self) {
        self.node.remove_event_listener_with_callback(self.kind, self.callback.as_ref().unchecked_ref()).unwrap();
    }
}

This lets you easily create Streams from events:

EventStream::new(&some_element, "click")
    .map(|event: MouseEvent| {
        ...
    })

Also, you should check out my Futures Signals crate. As its name suggests, it is based on top of Futures. It doesn't work for events (you should use Stream for that), but it's useful for other things, like dominator.

To answer your questions:

i would extend Stream by adding additional adapters for all kinds of functionality (eg debounce as shown on the ReactiveX website)

You would generally do that by creating a StreamExt trait, like this:

pub trait StreamExt: Stream {
    // define default methods in here
}

impl<T: ?Sized> StreamExt for T where T: Stream {}

Here's a more detailed example. It's for Futures 0.3, but the same principles apply to Futures 0.1

Of course the best option is to contribute directly to the Futures crate, rather than creating a separate StreamExt trait. That way every user of Futures can benefit.

what is the best way to cancel such a stream? say i want to remove the event listeners from the element(s). Maybe by storing some sort of guard on the rust side by dropping it it would drop the future and all event stream sources causing the removal of event listeners from the element?

You just impl Drop, as shown in my code above. It only needs to handle the event listener cleanup, since the UnboundedReceiver and Closure are automatically dropped.

You just impl Drop, as shown in my code above. It only needs to handle the event listener cleanup, since the UnboundedReceiver and Closure are automatically dropped.

I haven't tested it, so i might be wrong, but i would pass the future to the spawn_local fn, and for this reason lose the control to drop it on the rust side, so it would live for-ever on the js side?

I haven't tested it, so i might be wrong, but i would pass the future to the spawn_local fn, and for this reason lose the control to drop it on the rust side, so it would live for-ever on the js side?

Oh, that's what you meant. You can use take_while to stop the Stream early (this will drop everything, which then causes the event listener to be cleaned up).

You could also use zip to end a Stream early.

You can also use abortable in Futures 0.3 to abort a Future. This also works for aborting a Stream, since StreamExt::for_each returns a Future.

However, wasm-bindgen currently uses Futures 0.1, so you'll have to wait for wasm-bindgen to upgrade to 0.3 so you can use abortable. In the meantime you can use take_while instead.

(Or you could write your own system for aborting Futures, which is what I did for futures-signals)

I think @Pauan seems to have answered this question here (thanks!), so I'm gonna close. If there's more though please let me know and I'll reopen!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

pauldorehill picture pauldorehill  路  3Comments

poccariswet picture poccariswet  路  3Comments

gitmko0 picture gitmko0  路  3Comments

arilotter picture arilotter  路  3Comments

alexcrichton picture alexcrichton  路  3Comments