Moor: [Question] Stream with self updating DateTime in where clause

Created on 17 Jul 2020  路  8Comments  路  Source: simolus3/moor

Hey everyone,

I am currently using a Stream to update my UI but encountered a problem with it, my code is based on the "Many to many relationships" example from the docs.

My app needs to get the results of this Stream to always use an updated from and to DateTime pair but it always uses the values the stream has been initialized with. The current query/stream looks like this:

final discoveredContactsStream = (select(discoveredContacts)
    ..where(
        (contact) => 
            contact.date.isBetweenValues(from, to)
    )).watch();

And the whole subscription is initialized like this:

Future<void> _initDatabaseStreams() async {
    _database.watchAllContacts(
        from: getTimeFifteenMinutesAgo(),
        to: DateTime.now().add(Duration(hours: 3))
    ).listen((contactsWithNotifications) {
        setState(() {
            _recentActivity = RenderActivity(context, contactsWithNotifications);
        });
    });
}

Is there a way to update the from and to DateTime pair without canceling and restarting the subscription?
I am completley new to Streams, if there is an easy and very straightforward solution I'd be really happy to hear about it.

Most helpful comment

Does this not work?

import 'package:rxdart/rxdart.dart';

        _uniqueContactsSubscription = Rx
            .concat<Null>([
              Stream.value(null),
              Stream.periodic(_infoUpdateDuration),
            ])
            .switchMap((_) {
                return _database.watchUniqueContacts(
                    from: getLastMidnight(),
                    to: getCurrentDateTime()
                );
            })
            .listen((uniqueContacts) {
                setState(() {
                    // states to be set...
                });
            });

All 8 comments

Is there a way to update the from and to DateTime pair without canceling and restarting the subscription?

There isn't. But dropping and re-creating the stream isn't very expensive, you can do that regularly. If you want to update the DateTimes every now and then, there's a nice way to do that with the rxdart package:

const updateDuration = Duration(minutes: 1);

Future<void> _initDatabaseStreams() async {
  Stream
    .periodic(updateDuration)
    .switchMap((_) {
      return _database.watchAllContacts(
        from: getTimeFifteenMinutesAgo(),
        to: DateTime.now().add(Duration(hours: 3))
      );
    })
    .listen((contactsWithNotifications) {
      // setState(...)
    })
}

By the way - if you don't use a StreamBuilder widget, you should always store the result of listen and cancel the subscriptions later:

class MyState extends State<MyWidget> {
  StreamSubscription _contactsSubscription;

  void _initDatabaseStreams() {
    _contactsSubscriptions = Stream.periodic(upddateDuration).switchMap(...).listen(...);
  }

  @override
  void dispose() {
    _contactsSubscription?.cancel();
  }
}

Otherwise you'll keep the computation running or get exceptions for calling setState on a disposed widget.

This is perfect, now everything updates as it should thank you for your help!

I was actually trying to use a StreamBuilder widget before but I decided to not go with it until I do understand Streams a bit better, and also because it didn't work :)

Hey @simolus3,

I do have to ask you one more thing about the Stream.periodic() you suggested, is there a way to start the Stream immediately after creation without waiting for the first pass of the Duration?

I guess you could replace Stream.periodic with

Rx.concat([
  Stream.value(null), // emit the first value immediately
  Stream.periodic(updateDuration) // and then repeatedly, according to the duration
])

I just tried to change my subscription

        _uniqueContactsSubscription = Stream
            .periodic(_infoUpdateDuration)
            .switchMap((_) {
                return _database.watchUniqueContacts(
                    from: getLastMidnight(),
                    to: getCurrentDateTime()
                );
            })
            .listen((uniqueContacts) {
                setState(() {
                    // states to be set...
                });
            });

but I failed since I have no idea how to transform it with your suggestion :(

Does this not work?

import 'package:rxdart/rxdart.dart';

        _uniqueContactsSubscription = Rx
            .concat<Null>([
              Stream.value(null),
              Stream.periodic(_infoUpdateDuration),
            ])
            .switchMap((_) {
                return _database.watchUniqueContacts(
                    from: getLastMidnight(),
                    to: getCurrentDateTime()
                );
            })
            .listen((uniqueContacts) {
                setState(() {
                    // states to be set...
                });
            });

Oh, I totally misunderstood the

you could replace Stream.periodic

part.. I actually ended up with something like:

        _uniqueContactsSubscription = Rx
            .concat<Null>([
                Stream.value(null),
                Stream
                    .periodic(_infoUpdateDuration)
                    .switchMap((_) {
                        ...
                    })
                ...
            ])

I guess I have to study the whole Rx/Stream part a lot more.. Now it works, thanks a lot for your help!

Sure! In case your wondering why that doesn't work, it's because it would

  • emit null
  • then emit the results after _infoUpdateDuration

Since that's the two streams you're concatenating. If you call switchMap after concat, it will emit the results initially and then periodically.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Holofox picture Holofox  路  4Comments

simolus3 picture simolus3  路  4Comments

cadaniel picture cadaniel  路  4Comments

felixjunghans picture felixjunghans  路  4Comments

KKRoko picture KKRoko  路  3Comments