Rxjs: Proposal: attack and release times for refCount

Created on 14 Aug 2015  路  29Comments  路  Source: ReactiveX/rxjs

This is a non-breaking change proposal to extend refCount() with two optional parameters, attackTime and releaseTime.

refCount(attack?: number, release?: number)

The motivation: I have discovered a use case where I need a connectable observable to get connected, but not get disconnected/reconnected if the number of observers went from 1 to 0 then from 0 to 1 synchronously (in the same event loop). In the link above, I just used connect() without disconnecting it, which is bad for memory management. I can imagine other use cases where I'd like to delay the auto-disconnect implicit in the refCount.

For symmetry with release, I also included attack times in case that makes sense in some case.

From the Rx.NET gitter chat Lee Campbell also mentioned the existence of a LazyConnect operator they have which would be refCount(0, Infinity) ("connect when first observer subscribes, but never disconnect").

What do you think? This is definitely an advanced feature that most people don't need to use, but as a silent addition I think it's neat to avoid the imperative connect() whenever naive refCount() isn't enough.

discussion

Most helpful comment

@NMinhNguyen

source
  .multicast(() => new Subject())
  .let((source) => {
    let connection, subscriptions = new Subscription();
    return Observable.using(
      () => {
        const localSubscription = new Subscription(() => {
          subscriptions.remove(localSubscription);
          if (connection && subscriptions.length === 3 /* <-- disconnect when the subscriber count drops to 2 (+1 for the connection subscription) */) {
            subscriptions.unsubscribe();
            subscriptions = new Subscription();
            connection = null;
          }
        });
        subscriptions.add(localSubscription);
        if (!connection && subscriptions.length === 5 /*<- connect on the 5th subscription */) {
          subscriptions.add(connection = source.connect());
        }
        return localSubscription;
      },
      () => source
    );
  })

All 29 comments

I actually have a use case for this as well, involving WebSockets.

It seems like it should almost be a different operator/method. If I understand your proposal correctly, you want to add an optional temporal nature to refCount, which by name, only implies reference counting is done. Further bikeshedding, I'm not sure what "attack" means. Since it's just a parameter name, you can make it as descriptive as you like. Maybe this is a persistantRefCount, or a refCountWithDelay or the like? I don't mind it being an overload of refCount, it just seems that an overload shouldn't completely change the nature of the operator. But that's one person's opinion.

A totally different angle is maybe there is a use case for a DelayedCompletionSubject that embodies this behavior. Again, I think this would be useful for WebSockets, which I've been wrapping in Subjects.

refCount(200, 0) would mean "when the number of observers goes from 0 to 1, wait for 200ms then connect this ConnectableObservable, if by that time it still has a positive number of observers".

it just seems that an overload shouldn't completely change the nature of the operator.

It wouldn't change the nature of the operator, because refCount(0, 0) is precisely what typical refCount() does, but giving a second thought to it, maybe refCountWithDelay would be better to help people discover this functionality (as opposed to having it hidden as a shady parameter).

Yet another thing to consider is: by having a temporal nature, we are implicitly assuming the setTimeout scheduler. Maybe would make sense to let this scheduler independent by doing something like pausable: another Observable (the "pause") of boolean dictates when to connect/disconnect. Something like refCountWhen(gate$), where the ConnectableObservable connects/disconnects like this (ignore the means of achieving this, look at the semantics of connection only):

Let numObservers$ be the (theoretical) Observable emitting the number of observers of the ConnectableObservable.

The ConnectableObservable connects when

combineLatest(numObservers$, gate$, (num, gate) => ({num, gate}))
  .filter(({num, gate}) => num > 0 && gate); 

emits next, and disconnects when

combineLatest(numObservers$, gate$, (num, gate) => ({num, gate}))
  .filter(({num, gate}) => num === 0 && !gate); 

emits next.

Not sure if this is too generic to be practical. Just tossed ideas.

I like the idea of using another Observable (which can obviously be scheduled) to trigger:

someObservable.publish().refCountCompleteWhen(completions => completions.delay(200))

Where completions is an observable that emits synchronously when refCount hits zero. and the value returned by the notifier function is an Observable that when it nexts or completes tells the ConnectableObservable to disconnect if the refCount is still 0. errors should still propagate down the error path.

Functionality like this would be enough for me to use inside of my AnonymousSubject I use to create the WebSocket subject. So a new type of Subject (as I was suggesting before) probably isn't necessary.

Was talking with some folks around here about it, and this would be possible to do with current operators if RxJS had a doOnUnsubscribe. The idea being that you'd maintain the state in that way, rather than introduce another method or override to the API.

doOnUnsubscribe :+1: we need that. I think I needed it this week and was disappointed to discover it doesn't exist (while it does in RxJava). I used finally as a replacement for doOnUnsubscribe.

@blesh @staltz how is doOnUnsubscribe different from finally?

@staltz I've never liked refCount, as it seems like a specialized case of a broader host of what I've been calling _connection strategies_. Just like there's many possible flattening strategies (merge, concat, switch), buffering strategies (zip, combine, toArray, join), I suspect there's a host of unexplored connection strategies that control when to go from cold to hot.

I've thought about overloading the ConnectableObservable.prototype.connect with a connectionSelector argument. Maybe something like this?

const numbers = Observable
  .range(0, Number.POSITIVE_INFINITY)
  .map((x) => Math.random())
  .publish();

numbers
  .zip(numbers.skip(1), (x, y) => x * y)
  .subscribe(::console.log);

numbers.connect(function connectionStrategy(source, subject) {
  var connection = null;
  var hasConnected = false;
  return Observable.create(function(observer) {
    var subscription = subject.subscribe(observer);
    if(subject.observers.length === 2 && !hasConnected) {
      connection = source.subscribe(subject);
    }
    return function() {
      subscription.unsubscribe();
      if(subject.observers.length === 0) {
        hasConnected = false;
        connection.unsubscribe();
        connection = null;
      }
    }
  })
});

@blesh @staltz how is doOnUnsubscribe different from finally?

finally happens _after_ the unsubscription call, where onOnUnsubscribe would happen just before. (I believe, it's an RxJava thing)

@trxcllnt ... I like your proposal. I'd be interested in what RxJava folks like @benjchristensen would think about it. Just to get a different perspective.

I've thought about overloading the ConnectableObservable.prototype.connect with a connectionSelector argument. Maybe something like this?

numbers.connect(function connectionStrategy(source, subject) {
  var connection = null;
  var hasConnected = false;
  return Observable.create(function(observer) {
    var subscription = subject.subscribe(observer);
    if(subject.observers.length === 2 && !hasConnected) {
      connection = source.subscribe(subject);
    }
    return function() {
      subscription.unsubscribe();
      if(subject.observers.length === 0) {
        hasConnected = false;
        connection.unsubscribe();
        connection = null;
      }
    }
  })
});

Good idea.
(Bike-shed: probably missing hasConnected = true; there when the source is subscribed)

@staltz @blesh @benjchristensen note, connect with a selector function would behave like refCount does, returning an Observable not a Subscription. This is possibly a large enough reason to name it something other than connect.

Yes, got it.

I feel like there's a reason why this hasn't been done and we're not seeing it... cc/@jhusain and @mattpodwysocki for input.

Yeah, maybe we need to question the underlying assumption ("why do you need a hot behaving like that? do you even need a hot?").

Summoning @headinthebox also.

Related to this is a new ConnectableObservable.autoConnect feature added to RxJava in 1.0.13: https://github.com/ReactiveX/RxJava/blob/1877fa7bbc176029bcb5af00d8a7715dfbb6d373/src/main/java/rx/observables/ConnectableObservable.java#L96

I'm less convinced of involving time in a 1st class overload of refCount. Once time or other conditions for disconnecting get involved, that feels like the application should just directly use the building blocks to achieve the needed behavior, and not directly change ConnectableObservable. The connect/disconnect semantics already support any custom behavior an app might need.

@benjchristensen ... I'm more interested in what you think of @trxcllnt's proposal. I find it really interesting. The ability to specify a strategy for subscribing and unsubscribing from the source and the observable in it's entirety is pretty nice.

Yeah, I'm looking into how to make variants of refCount, not necessarily put time conditions in it. Paul Taylor's connect would be ideal, because I think we can't emulate refCount in a lift (or, can we?).

I would really like to see Paul's new connect(). It seems like Ben Lesh agrees too, so I'm guessing we have enough quorum to approve it? Or do we need more feedback?

@staltz @trxcllnt I really like it... we probably need to bike-shed the name a little. The other thing that is up for debate with this is whether or not to leave the current connect behavior in there if we can make something like this that returns a "cold until hot" observable. I suspect the answer is "yes", but I'd like more feedback from @mattpodwysocki and @headinthebox here, since it could be a change to a critical piece of Rx.

@benjchristensen @blesh @staltz If we're bike-shedding, I think control sounds like an appropriate name for what I proposed.

Sounds good to me

@trxcllnt, can you create an issue for your connect operator? I'm going to close this issue as it's gotten stale.

Hi guys, I need that but it's unclear to me how your discussion ended up. What is the resulting operator with Paul's connection stretegies?

@marinho I didn't get around to implementing a generic controlling operator, but you can definitely achieve any behavior you like with Observable.using and some local selector state.

@trxcllnt could you please elaborate on what you meant by

local selector state.

perhaps by providing a small example?

@NMinhNguyen

source
  .multicast(() => new Subject())
  .let((source) => {
    let connection, subscriptions = new Subscription();
    return Observable.using(
      () => {
        const localSubscription = new Subscription(() => {
          subscriptions.remove(localSubscription);
          if (connection && subscriptions.length === 3 /* <-- disconnect when the subscriber count drops to 2 (+1 for the connection subscription) */) {
            subscriptions.unsubscribe();
            subscriptions = new Subscription();
            connection = null;
          }
        });
        subscriptions.add(localSubscription);
        if (!connection && subscriptions.length === 5 /*<- connect on the 5th subscription */) {
          subscriptions.add(connection = source.connect());
        }
        return localSubscription;
      },
      () => source
    );
  })

@trxcllnt Thanks a lot for that! Really insightful. So I finally got round to trying it out, and it seems like there is no length property on Subscription objects. There is however a private array of subscriptions - subscriptions._subscriptions, but accessing it seems like bad practice? Would you say it's better to manually track the number of subscriptions?

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

peterbakonyi05 picture peterbakonyi05  路  4Comments

OliverJAsh picture OliverJAsh  路  3Comments

matthewwithanm picture matthewwithanm  路  4Comments

haf picture haf  路  3Comments

giovannicandido picture giovannicandido  路  4Comments