Graphql: How to clean up resources when subscriptions end?

Created on 30 Mar 2019  路  6Comments  路  Source: nestjs/graphql

I'm submitting a...


[ ] Regression 
[ ] Bug report
[ ] Feature request
[x] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

Suppose there is a Subscription defined, which sets up an interval that polls a remote service every X seconds. I'm not sure how this interval could be cleared when the subscription is done, either via forced unsubscription from a client, or a disconnect of the underlying websocket.

I have looked through upstream graphql subscriptions for an answer but it seems unclear whether this is possible at all.

Expected behavior

Have a way to do cleanup when a subscription ends.

Minimal reproduction of the problem with instructions

@Subscription(returns => Something)
someSubscription() {
  setInterval(async () => {
      pubSub.publish('someSubscription', {
        someSubscription: (await fetchDataFromRemoteService())
      });
    }, 5000);
  return pubSub.asyncIterator('someSubscription');
}

If the client disappears, this interval would keep firing forever. With multiple clients, this would quickly end up in a huge memory leak and potential DoS on the remote service.

How would one clean up the interval? (or could be RxJS observable, or anything else)

What is the motivation / use case for changing the behavior?

Environment


Nest version: 6.0.1

Most helpful comment

Leaving this here as it may help someone:

// with-observable.ts
import { Observable } from 'rxjs';

export function withCancel<T>(
  asyncIterator: AsyncIterator<T | undefined>,
  onCancel: () => void,
): AsyncIterator<T | undefined> {
  return {
    ...asyncIterator,
    return() {
      onCancel();
      return asyncIterator.return
        ? asyncIterator.return()
        : Promise.resolve({ value: undefined, done: true });
    },
  };
}

interface PubSub {
  publish: (trigger: string, data: any) => void;
  asyncIterator: (trigger: string) => AsyncIterator<{}>;
}

export function withObservable<T>(
  observable: Observable<T>,
  pubSub: PubSub,
  trigger: string,
) {
  const subscription = observable.subscribe(data => {
    pubSub.publish(trigger, {
      [trigger]: data,
    });
  });

  return withCancel(pubSub.asyncIterator(trigger), () => {
    subscription.unsubscribe();
  });
}
  // usage
  @Subscription(returns => Something)
  public someSubscription() {
    return withObservable(this.changes$, pubSub, 'someSubscription');
  }

Your observable's teardown logic will be called when subscriptions end.

All 6 comments

Seems like this works (taken from here):

export function withCancel<T>(
  asyncIterator: AsyncIterator<T | undefined>,
  onCancel: () => void,
): AsyncIterator<T | undefined> {
  return {
    ...asyncIterator,
    return() {
      onCancel();
      return asyncIterator.return
        ? asyncIterator.return()
        : Promise.resolve({ value: undefined, done: true });
    },
  };
}
@Subscription(returns => Something)
someSubscription() {
  const interval = setInterval(async () => {
      pubSub.publish('someSubscription', {
        someSubscription: (await fetchDataFromRemoteService())
      });
    }, 5000);
  return withCancel(pubSub.asyncIterator('someSubscription'), () => {
      clearInterval(interval);
    });
}

Should this be in the documentation? It seems like something people might overlook if not warned of, and end up with apps that leak memory.

Is there an alternative already built in?

Should this be in the documentation? It seems like something people might overlook if not warned of, and end up with apps that leak memory.

It looks more or less as a custom implementation of the custom feature honestly. We cannot cover everything in the docs.

I'm glad that you found solution + thanks for sharing with other people.

Yup, it's custom, but I was expecting something built-in.

Everything seems to be abstracted in various ways with decorators already by Nest. So perhaps it could allow returning a RxJS Observable from the Subscription handler, and if an Observable is returned, wrap it in an AsyncIterable itself.

Basically turning this:

@Subscription(returns => Something)
  public someSubscription() {
    const subscription = this.changes$.subscribe(async data => {
      pubSub.publish('someSubscription', {
        someSubscription: data,
      });
    });

    const iterator = withCancel(pubSub.asyncIterator('someSubscription'), () => {
      subscription.unsubscribe();
    });

    return iterator;
  }

into this:

  @Subscription(returns => Something)
  public someSubscription() {
    return this.changes$;
  }

I feel it would enforce better practices and help with discoverability, because one would know that standard RxJS Observable teardown logic would be available.

Leaving this here as it may help someone:

// with-observable.ts
import { Observable } from 'rxjs';

export function withCancel<T>(
  asyncIterator: AsyncIterator<T | undefined>,
  onCancel: () => void,
): AsyncIterator<T | undefined> {
  return {
    ...asyncIterator,
    return() {
      onCancel();
      return asyncIterator.return
        ? asyncIterator.return()
        : Promise.resolve({ value: undefined, done: true });
    },
  };
}

interface PubSub {
  publish: (trigger: string, data: any) => void;
  asyncIterator: (trigger: string) => AsyncIterator<{}>;
}

export function withObservable<T>(
  observable: Observable<T>,
  pubSub: PubSub,
  trigger: string,
) {
  const subscription = observable.subscribe(data => {
    pubSub.publish(trigger, {
      [trigger]: data,
    });
  });

  return withCancel(pubSub.asyncIterator(trigger), () => {
    subscription.unsubscribe();
  });
}
  // usage
  @Subscription(returns => Something)
  public someSubscription() {
    return withObservable(this.changes$, pubSub, 'someSubscription');
  }

Your observable's teardown logic will be called when subscriptions end.

@andreialecu nice solution! The withCancel function however throws an error when using the graphql-redis-subscriptions subscription package. For anyone running into the same issue, I modified the withCancel function in the following way to get it to work:

export function withCancel<T>(
  asyncIterator: AsyncIterator<T | undefined>,
  onCancel: () => void
): AsyncIterator<T | undefined> {
  if (!asyncIterator.return) {
    asyncIterator.return = () => Promise.resolve({ value: undefined, done: true });
  }

  const savedReturn = asyncIterator.return.bind(asyncIterator);
  asyncIterator.return = () => {
    onCancel();
    return savedReturn();
  };

  return asyncIterator;
}

So basically the AsyncIterator needs the this context to be preserved in order to work properly.

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings