Objection.js: Stream implementation

Created on 17 Dec 2015  路  8Comments  路  Source: Vincit/objection.js

Is there no pass through for the stream implementation?

wontfix

Most helpful comment

We too would profit for having the stream() method

All 8 comments

Unfortunately no. Many of the features, like eager loading, would need a completely separate implementation to be used with streaming. Consider this:

Movie
  .query()
  .where('foo', 'bar')
  .eager('[actors, reviews]')
  .stream();

We would need to fetch the related models each time we get a row out of the stream. That would actually be really inefficient compared to the way it is done without streams.

What we could do, is to support the streaming with basic cases, where there is no eager or other special ORM features.

You can always just bypass the objection query builder and do this:

Movie
  .knex()
  .where('foo', 'bar')
  .stream();

but you don't get the row --> model mapping when you do this.

How would you like to use the streaming interface? Would the knex interface with model mapping be enough?

Yea, in my specific case I don't need the eager loading and am pulling a lot of records. I had done what you suggested and just used the knex function to build the query.

What I did to get back the Model though was I added a method to the model on creation that I could use to create the stream. The stream returns an instance of the model for each write:

Stream = require 'stream'

_model.createStream = _model.prototype.createStream = (write, end) ->
      stream = new Stream
      stream.writable = true
      stream.write = (buffer) ->
        model = new _self.model
        model.$setDatabaseJson buffer
        write model
      stream.end = (buffer)->
        if arguments.length
          stream.write buffer
        stream.writable = false
        end()
      stream

_model.knex()
    .select()
    .from _model.tableName
    .stream()
    .pipe _model.createStream (myModel) ->
        #doSomething with each model
    , -> #stream complete

I'll probably add a pass through method for the stream at some point. I'll post here when I start working on it. The basic implementation is simple, but I need to make sure the method handles all the cases (throws if there is an eager or insert call etc.)

I don't think this is going to happen since no one else has wanted this. Closing this now.

I just googled for this, grabbing batches of records is fine, but being able to leverage a stream for a where query would be great.

We too would profit for having the stream() method

I've found myself needing this so worked through a rough implementation of a stream() method on a custom query builder. The implementation is below, hopefully it is useful to somebody.

Firstly, you need to implement a custom query builder. The best way to do this is to use a base model class, this is thoroughly documented on the Objection site.

Now, we can add the following method to the custom query builder (implementation is shown below in TypeScript: strip out the type annotations for plain JS):

/**
 * Stream the query results, calling the passed handler once for each result.
 *
 * @param resultHandler handler which is called for each streamed model instance.
 */
public stream(resultHandler: (result: M) => void | Promise<void>) {
  const modelClass = this.modelClass();
  const { parse: keysToCamelCase } = snakeCaseMappers();  // Make sure to import this from 'objection'

  // Create a custom writable stream to handle each result.
  const out = new Writable({
    // objectMode is needed as we are streaming knex pojo results.
    objectMode: true,
    async write(knexResult, _, next) {
      try {
        // Since we are using the knex API to stream, the results passed to the writable stream are knex-formatted objects.
        // We therefore need to convert these into the respective objection model.
        // Firstly, perform the snake_case -> camelCase conversion for the keys in the knexResult object.
        const objectionPojo = keysToCamelCase(knexResult);

        // Then, pass in this POJO to the respective Objection model.
        const model = modelClass.fromDatabaseJson(objectionPojo);

        // We now have our model which can be passed into the handler.
        await resultHandler(model);

        // Move on to the next result.
        next();
      } catch (e) {
        // There was an error above: abort the stream.
        next(e);
      }
    },
  });

  // Wrap up the stream events as a promise.
  return new Promise((resolve, reject) => {
    // Once the stream has finished, resolve the promise.
    out.on('finish', resolve);

    // If the stream encounters an error, reject the promise.
    out.on('error', reject);

    // Finally, we need to drop down into knex-land to use streams.
    this.toKnexQuery().pipe(out);
  });
}

We are now free to use the following in our calling code (service, etc):

await SomeModel.query().where('someCondition', 'foo').stream((modelInstance) => {
  // This handler will be called for every result that is streamed from the database.
  // Return a promise to run async code here: the stream will wait until the promise resolves
  // before streaming the next result from the database.
});

// At this point, the stream has finished, and we can move on.

There are some obvious caveats: only use this for basic select queries, and don't expect things like eager loading to work. But, for simpler use cases, this is working nicely for me.

We would greatly benefit from a stream() implementation as well.

Use Case

We have a web API with a route required to fetch a large number of records to be downloaded as a CSV file. Due to the large number of records (hundreds of thousands), it is not possible to load the query's result into memory before delivering to the client.

We have built a CSV library that adds decorators to the model class to simplify CSV header mapping, eg:

@Sheet()
export class Snack {
  @SheetColumn({ header: 'Name' })
  name!: string;

  @SheetColumn({ header: 'Deliciousness Level' })
  deliciousnessLevel!: number;

  @SheetColumn({ header: 'Date Eaten', dateFormat: ['YYYY-MM-DD'] })
  dateEaten!: Date;
}

This fits perfectly with Objection since we can simply add these decorators to our existing Objection models. Our CSV library supports streaming, so the only missing piece is an Objection implementation of stream().

As a work around we have tried the method above suggested by @fiznool, but came across errors when using joins (joinEager() since eager() is understandably not possible with streaming).

We could try to implement this entirely in Knex, but after manually mapping columns and performing joins with relations, we are basically re-implementing what Objection is doing.

I am interesting in contributing, but first need to learn a lot more about Objection internals, hooks, and strategies to throw for operations that are not possible with streams (like eager and insert as you mention @koskimas).

We have found that streams are becoming a common requirement in this space and hope the above is a compelling use case.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

rickmed picture rickmed  路  4Comments

bsdo64 picture bsdo64  路  3Comments

purepear picture purepear  路  3Comments

haywirez picture haywirez  路  3Comments

nicolaracco picture nicolaracco  路  3Comments