I had an idea, I searched around and didnt see anyone else asking about it.
I want to support the below test case.
'use strict';
//deps
var streamArray = require('stream-array')
var sequelize = require('sequelize')()
var through2 = require('through2')
//define module
var People = sequelize.define('People',{
name: {
type: sequelize.DataTypes.STRING,
allowNull: false
}
})
//setup readable object stream
var stream = streamArray([
{name: 'Bob'},
{name: 'Bill'},
{name: 'Susan'}
])
//setup a response handler
var transform = through2.obj(function(inst,enc,next){
console.log('Saved person: ' + inst.name)
next(null,inst)
})
stream.pipe(People.createStream()).pipe(transform)
I implemented this manually in a migrator but I think it would be an awesome feature to just be able to stream objects to a Model and have them create it and pass the resulting instance to a readable stream. Then this could be use with packages like promisepipe to make truly useful streams.
Also I was thinking of making it to where options could be passed to use a findOrCreate, find, or even build so they can be saved in the response handler. I envision the options could be defined in the createStream method that the model would expose.
It would make it much easier to deal with large amounts of data since streams implement all the back pressure automatically and there is no need for queuing. Extremely useful for doing transform update queries. (I am aware these are reasonably easy to implement manually but a macro would make it more useful for the average user)
If anyone else is interested in this I could work on a PR for it.
Streaming on findAll is something i've thought about since it would be very handy for large result sets where loading everything into memory could have it's downsides - I've not considered the usecases you are presenting here but it's certainly interesting.
If you could start out with an API proposal for the most common usecases we could go from there.
Ya, I was actually really surprised this wasnt implemented yet.
Ill mess around in my repo and see what makes sense as a flacid API change. I think I would need to write several test cases in order to make the API stable early.
I really think it would be advantageous for Sequelize to be fully stream compatible. I have an example of a really nice migration script that uses Sequelize and Mongoose which utilizes streams to handle backpressure and buffering. It also uses transform streams to translate the data prior to insertion.
I just know that for lots of people streams seem sort of voodoo in JS but if possible Sequelize should really promote them. They can be extremely useful in scripting situations (they are not so much on browser calls since the page output has to be buffered anyway).
@nullivex It will require quite a few changes to the core though since we use promises all the way down from the dialect connector libraries, and we need to make sure those connector libraries support streaming as well. It's something i really want to do but have to be smart about it.
Why not make it explicit?
For example
Model.findAllStream().pipe(wherever)
Then even use the promisepipe tool to do something cool like
var promisePipe = require('promisepipe')
var through2 = require('through2')
var transform = through2.obj(function(item,enc,next){
console.log(item.toJSON())
})
var stream = Model.findAllStream({where: {active: true}, include: [Stuff]})
promisePipe(stream,transform)
.then(function(){
console.log('stream complete')
})
.catch(function(err){
console.error('pipe blew up',err)
})
Well i would want it to be implicit.
But we still need to support streams end 2 end - And there's tricky stuff like row deduplication that can't support streams.
Ya I think that would be great too but it would least need some kind of option to know not to return a promise.
So like
Model.findAll({where: {foo: 'bar'}},{stream: true, raw: true}).pipe(wherever)
Or is there some way to make it work without that?
And what about creating writable streams.
myStream.pipe(Model.createWriteStream()).pipe(wherever)
Would that be an acceptable method that would take whichever options we come up with to prime the writable stream (maybe default options, findOrCreate support).
Just brainstorming here.
Also just some food for thought. This is how I did it in my migration script. It works well since the stream will handle the backpressure just by not stepping the stream until the creation has returned.
/**
* Run a migration in its own queue and return a promise
* @param {string} name
* @param {stream.Readable} stream
* @param {sequelize.Model} Model
* @param {function} iterator
* @return {Promise}
*/
var runMigration = function(name,stream,Model,iterator){
var processed = 0
var statusInterval
if(name){
logger.info('Migrating ' + name)
statusInterval = setInterval(function(){
reportStatus(name,processed)
},1000)
}
var transform = through2.obj(function(req,enc,next){
//console.log('mig record',req)
processed++
Model.create(req)
.then(function(res){
if('function' === typeof iterator)
return iterator(req,res)
})
.then(function(){
next()
})
.catch(sequelize.UniqueConstraintError,function(){
//logger.warning('Skipped duplicate row',err,Model.name,req)
//silently ignore these?
next()
})
.catch(sequelize.ValidationError,function(err){
logger.warning('Skipped record due to a validation error',err,Model.name,req)
//silently continue
next()
})
.catch(function(err){
next(err)
})
})
return promisePipe(stream,transform).then(function(){
//stop the status reporting
if(statusInterval && name){
clearInterval(statusInterval)
//one final report for completion proof
reportStatus(name,processed)
logger.info(name + ' have finished migrating')
}
})
}
Sorry i meant explicit, i agree with your findAllStream (stream suffix in general) suggestion.
Ah okay, that makes more sense haha.
It wouldnt hurt to start with a single stream method at first. See how intuitive everyone finds it, In case the API needed re-engineered.
Our maybe it could be added as a plugin (does sequelize have plugin support?)
We are working on plugin support. However something like streaming needs to be implemented from the bottom up so it's probably not a great case for a plugin.
@mickhansen Agreed.
If I get some spare time Ill mess around with a PR. Im still trying to get the hang of how everything in sequelize is put together. And I dont have anything waiting on this feature so for the moment I probably wont have time to do it any time soon.
Besides this would be something that happens after 2.0.0 is finalized.
I had an unrelated question, there should be some kind of note on the docs in the main website because I have ran into quite a few situations where I was using 1.7.x docs with 2.0.0 code (i thought the 'latest' docs were 2.0.0 but I see they arent). Just pointing out that as a new user I found it very confusing navigating between the two. So is the API reference the best place to be looking right now?
@nullivex API reference is best place right now - We're working on converting the docs to Read The Docs and updating them to 2.0 at the moment.
+1 I will need this feature also.
Great discussion so far! Perhaps I would vote for stream query option instead of Stream appended method names.
+1
This is kind of a double edged sword.
Because input streaming is best to just handle using through2 then we dont have to try and re-implement all the options to make the stream compatible which feels like mirroring a lot of code and prone to bugs and the hassle of maintaining both sets of code.
Output streaming on the other hand is something the library should offer. However, this requires properly implementing stream support from the MySQL handler itself. @mickhansen Im not very familiar with the underlying drivers, will they even accommodate this? If we are buffering the results to publish the stream that is pointless.
@nullivex most of the drivers support streaming but we're using promises from the bottom up, not sure how we'd architect it to support both.
Always it would never work with *:M includes since we need to know all rows before we can combine. (so in that case we would have to buffer)
But i do agree, this would likely be a performance bump in some cases with findAll and streaming JSON.stringify for API responses.
+1 on using stream as option.
+1
Since this doesn't seem like it will happen soon, is there a way to tell Sequelize to return an SQL query so that we can then manually use it with something like node-mysql to stream the results?
@aaronshaf It's not the topmost priority currently no, but hopefully we will get to it.
As for retrieving the SQL, unfortuneatly not.
We had a problem with thousands of rows filling up the memory.
I just created a solution for reducing large amount of rows to a single value in a memory efficient way.
This solution does not use streaming, but could perhaps be a workaround for those with similar needs.
module.exports = function setupReduceAll(Sequelize) {
'use strict';
// findAll in chunks and reduce to single value.
// This is done in a memory efficient way by taking x rows each time.
// Where x is defined by options.limit.
//
// options : options passed to findAll. options.limit defines how many rows
// should be reduced per iteration.
// action : callback that is called for each iteration.
// gets parameter data and previous reduce value.
// The callback should return new reduce value
// or a Promise that returns a reduced value.
// initialValue : Initial value of for the reduction.
//
// Returns Promise with reduced value
//
// Example:
// var initialSum = 0;
// Model
// .reduceAll({
// limit: 100
// }, function (data, sum) {
// return data.reduce(function (row, currentSum) {
// return currentSum + row.number;
// }, sum);
// }, initialSum)
// .then(function (totalSum) {
// console.info('Total sum is: ', totalSum);
// });
Sequelize.Model.prototype.reduceAll = function (options, action, initialValue) {
var self = this,
foundRecords = -1,
reducedValue = initialValue;
options = options || {};
options.limit = options.limit || 1000;
options.offset = 0;
function reduceNextChunk() {
if (foundRecords === 0) {
// No more records found in table
// return the final reduced value.
return reducedValue;
} else {
return self
.findAll(options)
.tap(function (data) {
foundRecords = data.length;
// Increase the offset for next interation.
options.offset += options.limit;
})
.then(function (data) {
// Call the callback with data rows and last reduced value.
return action(data, reducedValue);
})
.then(function (newReducedValue) {
// Update reduced value
reducedValue = newReducedValue;
})
// Find next chunk
.then(reduceNextChunk);
}
}
return reduceNextChunk();
};
};
@hacker112 very interresting and clean solution :)
Does other dialects engines have support for stream output as mysql does?
Think i can get on this.
@cusspvz most of them do i believe.
@hacker112 your solution is super interesting but i think we can get better performance with:
This will avoid mysql to filter/sort on each query.
My approach based on @hacker112 's approach:
Model.prototype.reduceAll = function ( options, queryOptions ) {
// ... stream defining
var stream; // ...
var chunkLimit = options.chunkLimit || 100;
var primaryKeysOptions = { __proto__: options,
attributes: Object.keys( Model.primaryKeys ),
};
var reducedOptions = { __proto__: options,
where: undefined,
};
var streamNextChunk = function () {
if(
stream.primaryKeys.length === 0 ||
options.limit && stream.primaryKeys.length < options.limit
) {
stream.end();
return;
}
var primaryKey;
var where = reducedOptions.where = [];
while( chunkLimit < where.length && primaryKey = stream.primaryKeys.splice() ) {
where.push( primaryKey );
}
return this.findAll( reducedOptions, queryOptions )
.each(function ( instance ) {
stream.write( instance );
})
.then( streamNextChunk );
};
return this.findAll( primaryKeysOptions, { __proto__: queryOptions, raw: true })
.then(function ( rows ) {
stream.primaryKeys = rows;
})
.then(function () {
// Fire error from promise to stream
streamNextChunk()
.error(function ( err ) {
stream.emit( 'error', err );
});
})
.return( stream );
};
EDIT: stream must be returned before processing next chunk
Of course that we will need to watch for drain event before running streamNextChunk again.
In case @mickhansen, @janmeier or @sdepold don't want to return directly a stream, and since sequelize is extending bluebird, we could add a method to allow piping directly from promise:
SequelizePromise.prototype.pipe = function $PromisePipe ( destStream ) {
// detect if stream is writable
// ...
return this._then(function ( origStream ) {
// Detect if it is a Stream instance
// also if it is readable
if( /* not stream or not readable */ ) {
throw new Error( "..." );
}
return origStream.pipe( destStream );
});
};
This will allow:
return this.findAll( { /* ... */ }, { stream: true }).pipe( destStream );
I suggest that observables are almost always better to use than promises, since they can be cancelled/aborted, and since they are future-proof for use-cases like this (i.e. streaming).
@aaronshaf I haven't read much up on observables yet (but it's on my todo list). However I can say that it is very unlikely sequelize is going to move away from promises any time soon.
We are only just removing the last remnants of event emitters from the test-base, and there are plenty of other issues to consider before we can start ripping up the internals again :).
Furthermore, I don't really see the merrits of being able to cancel something in a sequelize context. Once a query is sent to the DB, it can't be canceled (to my knowledge). And if you find out in the meantime that you don't need the result, just don't attach a listener to it. Of course there will be some unnecesary work done parsing the results etc. but I'd say that's negible compared to the work done by the DB, which we can't abort
@cusspvz Interesting idea regarding Promise.pipe
@mickhansen @janmeier I've liked @hacker112 's approach so much that I think we should use something similar (like the approach that i've suggested after) instead of dialect's stream support.
I could implement this, but i need your final decision if we gonna close this with current solution.
@cusspvz I'll have to give this one a bit more thought. Yours and @hacker112s proposed solutions are not really streaming - executing multiple queries and processing the result of each (pagination basically), as opposed to executing a single query and proccessing its result in chunks.
It might be a reasonable solution though, and one that would not require as much change as true stream support. I don't think we should advertise it as streaming however, since that might give the wrong implications about what it can be used for
@janmeier As far as I know in the sync languages (which MySQL is) it uses pointers to retrieve a result set.
So the pipe would need to use the pointer and read a result set on each read. If the dialect doesnt support pointers then we would need to buffer the result to an array and shift that array on each read.
Then of course it would be an object stream. The writable would be the same way.
I had been meaning to dig into this for a while but I have been really busy with some storage clusters and I havent ran into another scenario where I really needed streams so I havent tried my hand at implementing it.
The node MySQL driver has stream support built in so it would be easiest to buffer the other dialects and then just wire this in.
connection.query('SELECT * FROM posts')
.stream({highWaterMark: 5})
.pipe(...);
All that would really need to happen in this scenario would be sequelize returning the query object when for example I would call.
MyModel.findAll({stream: 5}).pipe(MyWritableStream)
That feels like the cleanest method to me.
I was looking into how to go about implementing such a thing. And here are the concerns I have.
First, its a bit troublesome to pass a new option all the way down the abstraction path. But Im sure I can figure it out.
But in the MySQL dialect query.js file I see that is where the streams need to be implemented since that is where the driver is interacted with.
So I would have to look for the stream flag from the instance options, and then diverge logic to either execute the query with a callback or execute the query in the first RTC and return it.
However what I am worried about is how to hand the query handle all the way back up the chain in the same RTC. Otherwise the query will start processing and buffering until someone hooks up to the pipe. I guess this wont be an issue with the MySQL driver since I am sure it will do backflow control just fine using the pointers internally.
I am willing to give implementing this a shot although I really thing it belongs in its own branch.
Has anyone made any progress on this? I started using sequelize in a project thinking it had streaming but came here because I couldn't find the api in the docs. I guess I will have to go directly to the db driver for now.
@garvincasimir No progress. It's not exactly trivial to implement since its a bottom-up refactor.
@mickhansen I understand. I will keep watching the project for any help wanted tags on streams. It might be worth it to take a look at https://www.npmjs.com/package/mssql as they support both promises and streams. Might be the inspiration needed to begin planning for stream support.
+1
It's not exactly trivial to implement since its a bottom-up refactor.
We could support streaming based on @hacker112 initial idea, this should allow us to support streaming without using dialects capabilities. It is not a perfect solution, but is a valid workarround for those that need it now.
:+1: This would be awesome!
+1
+1
I am going to take a stab at implementing this this week as I see no one has submitted a pull request yet and I finally have some time to work on open source projects.
I will make an update when I get a branch going.
Just an idea - if making Model classes streamable is a lot of work, maybe it would be easier creating an intermediate solution where Sequelize.query (http://sequelize.readthedocs.org/en/latest/api/sequelize/#querysql-options-promise) supported streaming for RAW queries?
@tiblu Not really sure that would make things easier - it still requries the internals to be rewritten to handle streams :)
+1
+1
+1
+1 Would greatly appreciate this.
Any progress on this issue ?
+1 this would be a great feature
+1 Mandatory if you want sequelize to be respectable product.
This gist is what I'm using at the moment to convert a select into a stream. If that looks useful to other people, I'm happy to publish it as a module with some docs.
Example:
// Insert code to create your sequelize object
var through = require('through');
function ModelService(model) {
this._model = model;
}
// Limit is the size of the batches of rows you want Sequelize to get.
ModelService.prototype.streamAll = function (limit) {
return createSequelizeStream(
this._model.findAll.bind(this._model),
{
limit: limit || 10,
offset: 0
}
)
}
var service = new ModelService(sequelize.models.theModel);
service.streamAll()
.on('error', function () { /*handle error*/ }
.pipe(through(function (data) {
console.log('Got:', data.get());
}))
.on('error', function () { /*handle error*/ }
.pipe(through(null, function () {
console.log('All done');
// Usually I would resolve a Promise here that wraps the stream.
}));
@fcagalj +1 Mandatory if you want sequelize to be respectable product.
I guess we'll have to suffer the shame for a while longer.
@eddieajau That's sort of a pseudo stream isn't it? It will still wait for the entire result set (and parse it), before it starts streaming it to the end-user.
If we were to add streaming (which i do find to be a nice feature, but not a critical one for most of the work being done with Sequelize) it would have to be bottom-up, so everything needs to be refactored to support streaming.
Yeah, the goal is not just to get a streaming interface, but also to get the potential for the improved client memory performance that end-to-end streaming could offer.
For example, this feature would enable a machine with 1GB of RAM to process a 10GB result set. You find this use case when looking at things like data warehousing and analytics processing. Where large volumes of data are read and aggregated into summary statistics. Being efficient can be real important to keeping the hardware costs of such systems in check.
It may be that an ORM is not the right tool for this job to begin with.
@eddieajau That's sort of a pseudo stream isn't it? It will still wait for the entire result set (and parse it), before it starts streaming it to the end-user.
Yes and no. You pass the options.limit which is the "batch" size you want to load at any one time. At work I set it at 100 so the function grabs 100, steams it, grabs another 100, steams it, and so on. You certainly get a true stream out the other end and it's a fairly useful technique for moulding any API that doesn't support streaming natively.
If I knew how to tap into Sequelize internals more, I could do something more creative, but for now it fills a gap and because I use a service layer, I can replace the actual mechanism with a native Sequelize at a latter date and everything will still "just work".
Mike, perhaps you can consider doing a couple of "training" hangouts in the future to help those of us heavily invested in Sequelize to be in a better position to contribute/help with things like this? I use this particular feature for overnight batch updates to tens of thousands of user profiles so I would be a position to commit some work time to it.
For example, this feature would enable a machine with 1GB of RAM to process a 10GB result set.
@ericrini that is possible with the code I posted. Just tune the query limit to suit your memory requirements.
I understand the appeal of using the sequelize syntax but seems like it defeats the purpose of streaming if you're not using a cursor on the postgres end of the query.
I've attached pg-query-stream to my models classmethods and have to write raw query but the large queries required are typically contained in only a couple joins.
https://github.com/brianc/node-pg-query-stream
This lib uses a cursor so for my use case it looks something like:
classMethods: {
queryStream(query) {
return new Promise((res, rej) => {
pg.connect(function(err, client, done) {
if(err) return rej(err);
var query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
stream.on('end', done)
res(client.query(query))
})
}
}
}
@thecaddy Good question, i'm not sure if the basic streaming is good enough - Haven't done thorough testing it but obviously a true stream from the postgres server would work the best.
Well, there's definitely two ways to implement this.
Either way effectively breaks the result set into finite sized "chunks" for a ReadableStream.
Because database cursors require state in the database connection (which must be subsequently cleaned-up), it might be problematic for something where a few database connections are shared by many logical processes as you usually see with a connection pool.
I'm starting to agree that the proof of concept @eddieajau suggested could be a viable answer.
Perhaps, instead of having a separate ModelService, the Sequelize Models could add a findAllStream() method that would work like findAll() except it ignores any limitskip clause and it returns a ReadableStream instead of a Promise. The ReadableStream object returned encapsulates the limitskip state and retrieves "chunks" until it retrieves an empty result set (which ends the stream) or hits the high water mark for the stream (which pauses the stream).
+1 over here, specifically for "data too large to fit in ram". I was looking for something like Rails/Activerecord's find_in_batches, which would make a JS API like:
User.findAllInBatches({
where: {a: 'b')
}, function(batch){
batch.forEach(function(user){
// do stuff with the user
});
}, function(){
// second callback is the 'done' event
}).catch(function(error){
// deal with error.
});
// Or, if you prefer the event route:
var batcher = new User.findAllInBatches({where: {a: 'b'}});
batcher.on('error', function(error){
// deal with error.
});
batcher.on('done', function(){
// move on
});
batcher.on('batch', function(batch){
batch.forEach(function(user){
// do stuff with the user
});
});
We've made some ETL tools in node (which deal with large datasets explicitly), and perhaps seeing a similar pattern in use might help:
+1 here, it's pretty useful to have this feature.
I also use this feature before in other php framework.
Since we are now moving onto version 4 (which includes re-writes in many parts of the codebase), can't this be a part of that roadmap as well?
@expl0r3r If someone takes the time to actually write the code, then yes - I doubt someone from the core team has the time, so this would have to be a user-contribution from one of the many people who say they need this :)
In terms of the batch solution, I just needed to do that.
Created a gist of the solution, may turn it into an npm module if I get chance
https://gist.github.com/crisward/7c541376d7a9f4aa40b89d07d64558e2
See it being used (with mock model here)
http://codepen.io/crisward/pen/wWXvNj?editors=0012
@evantahler, re https://github.com/sequelize/sequelize/issues/2454#issuecomment-215916059
I was looking for something like Rails/Activerecord's find_in_batches
There is an old issue for this: #686
I have a Sequelize port of find_each below. Perhaps this is what you are looking for?
https://gist.github.com/ippeiukai/1d9f717711414118b360eff0718c07c0
+1
I wrote a little package to work around this using hooks. Due to the way sequelize deals with bulk hooks, bulk delete and bulk updated are limited, but aside from that it works well. We've been using it in production for a few months and it does the job: https://github.com/joeybaker/sequelize-stream
// setup sequelize
import Sequelize from 'sequelize'
const sequelize = new Sequelize({ dialect: 'sqlite' })
const Cat = sequelize.define('cat', {
name: Sequelize.STRING
, spots: Sequelize.INTEGER
})
sequelize.sync({force: true})
// install sequelizeStream
import sequelizeStream from 'sequelize-stream'
const stream = sequelizeStream(sequelize)
// when the stream receives data, log
stream.on('data', ({instance, event}) => console.log(event, instance.toJSON()))
// examples
Cat.bulkCreate([{name: 'fluffy'}, {name: 'spot'}])
// => 'create', {name: 'fluffy', id: 1}
// => 'create', {name: 'spot', id: 2}
Cat.create({name: 'sparky'})
// => 'create', {name: 'sparky', id: 3}
.then((sparky) => {
return sparky.update({spots: 2})
})
// => 'update', {name: 'sparky', spots: 2, id: 3}
.tap((sparky) => {
return Cat.update({spots: 1}, {where: {name: 'sparky'}})
})
// => 'update', {name: 'sparky', spots: 1, id: 3}
.then((sparky) => {
sparky.destroy()
})
// => 'destroy', {name: 'sparky', spots: 1, id: 3}
I think a possible solution here, that wouldn't require as much work once it's done, is to complete this other ticket first https://github.com/sequelize/sequelize/issues/2325
If this ticket were done I _think_ the part that generates the raw sql string could remove the limit and offset before generating the query. Then feed the query to a streaming driver and return a ReadStream.
My particular use case is getting CSVs from the DB to our users based on searches they are doing. Some users like to download _all_ the data they can and do more work in excel than just the UI we offer.
Is it possible to stream now using sequelize?
I'm also interested in this, what's the status??
Stale
Sequelize already uses the pg module, right? How hard would it be to integrate node-pg-query-stream to enable this feature?
Hey @janmeier any guidelines as to how we could implement this feature? I can try to help, with some guidance.
If anyone is looking for a workaround, you can do something like this:
import { connectionManager, dialect } from 'your-sequelize-instance'
import { Query } from 'pg'
import through2 from 'through2'
// this wraps a sql query in a stream via a cursor so as each row is found
// it gets transformed and emitted from the stream
// this is how you want to return millions of rows with 0 memory overhead
const streamable = async (sql, transform) => {
const conn = await connectionManager.getConnection()
const stream = through2.obj()
const query = conn.query(new Query(sql))
const end = () => {
connectionManager.releaseConnection(conn)
.then(() => null)
.catch(() => null)
stream.end()
}
query.on('row', (r) =>
stream.push(transform ? transform(r) : r)
)
query.once('error', (err) => {
stream.emit('error', err)
end()
})
query.once('end', end)
return stream
}
export default async ({ Model, tableName, query, transform, scope='public' }) => {
// get the query we need to tail
const model = Model.scope(scope)
model._expandAttributes(query) // make the attribute option valid for selectQuery
const sql = dialect.QueryGenerator.selectQuery(tableName, query, model)
const src = await streamable(sql, transform)
return out
}
And FYI to some people - you can use QueryGenerator to stringify different sequelize types as SQL. The API is internal (and thus the options are not very descriptive) but with sufficient wrapping it isn't too hard to work with.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. If this is still an issue, just leave a comment 馃檪
馃
I will close this in favor of #10347.
Most helpful comment
Is it possible to stream now using sequelize?