Pg-promise: Compatible with node-pg-copy-streams?

Created on 23 Apr 2016  Â·  24Comments  Â·  Source: vitaly-t/pg-promise

Hello!

I may be trying to use this incorrectly, but I wanted to check. I've been able to get node-pg-query-stream to work with pg-promise based on the documentation example, but I'm having no luck with node-pg-copy-streams.

I've been trying to use it in the same way as pg-query-stream, but that may be incorrect. #34 says it was compatible as of v2, but that was a couple major versions ago. :D

// a prepped pg-promise connection, does *not* use pg-native
var db = require('./db');
var copyTo = require('pg-copy-streams').to;

var ct = copyTo('COPY (SELECT * FROM users) TO STDOUT');

db.stream(ct, function(s) {
  s.pipe(process.stdout);
}).then(function (data) {
  console.log('Rows:', data.processed, 'Duration:', data.duration); 
}).catch(function (err) {
  throw err;
});

But it throws this error:

TypeError: self.activeQuery.handleCopyData is not a function
    at null.<anonymous> (/Users/ryan/src/app/node_modules/pg/lib/client.js:140:24)
    at emitOne (events.js:90:13)
    at emit (events.js:182:7)
    at Socket.<anonymous> (/Users/ryan/src/app/node_modules/pg/lib/connection.js:109:12)
    at emitOne (events.js:90:13)
    at Socket.emit (events.js:182:7)
    at readableAddChunk (_stream_readable.js:153:18)
    at Socket.Readable.push (_stream_readable.js:111:10)
    at TCP.onread (net.js:531:20)

I know there are a few moving parts in here, so I understand if the issue isn't with pg-promise, but I figured I'd start here. (I was able to get it to work with just pg.)

Thanks!

question

Most helpful comment

FYI, you can use pg-copy-stream, with pg-promise, but you have to use the connect() method to get at a client instance (I'm using Q promises in this example):

var db = require('./db');
var copyTo = require('pg-copy-streams').to;

var ct = copyTo('COPY (SELECT * FROM users) TO STDOUT');

db.connect()
    .then(function (con) {
        var client = con.client;
        var dfd = Q.defer();

        var stream = client.query(ct, done);
        stream.pipe(process.stdout);
        stream.on('end', done);
        stream.on('error', done);

        function done(err) {
            con.done();
            if (err) {
                dfd.reject(err);
            } else {
                dfd.resolve();
            }
        }
        return dfd.promise();
    });

FWIW, I use streams when doing bulk inserts - COPY FROM STDIN is way faster than doing inserts. Even when compared to the helpers.insert approach.

All 24 comments

pg-copy-streams was never supported, only pg-query-stream is supported.

Primarily because pg-copy-streams support is really bad, while having many issues.

Aw, bummer. I'll probably just defer to using pg to do COPY calls for now, although I am weary of those issues. Unless there's any other suggestion you or others may have! I really like the promise based workflow afforded here. I actually left out a part of that query — the end goal is tack on a WITH CSV HEADER too and save the CSV dump to a file. The potential size of the output though makes the ability to stream appealing.

Thanks for the speedy reply!

If you are streaming from the database, pg-query-stream is sufficient.

For other cases see:

Thanks for the links! (I'm not trying to insert in this scenario, only need to pull from the database.)

It's not so much that streaming from the database isn't sufficient — it's the _format_ of the data in that stream that's not ideal. My end goal is to produce a CSV. That's the benefit of being able to send a query using COPY TO ... WITH CSV HEADER — Postgres sends it over ready to go. I could try to do something via pg-query-stream to a Node.js CSV writer, but it'd be nice to skip that step and let Postgres handle it, which was the original appeal of pg-copy-streams.

FYI, you can use pg-copy-stream, with pg-promise, but you have to use the connect() method to get at a client instance (I'm using Q promises in this example):

var db = require('./db');
var copyTo = require('pg-copy-streams').to;

var ct = copyTo('COPY (SELECT * FROM users) TO STDOUT');

db.connect()
    .then(function (con) {
        var client = con.client;
        var dfd = Q.defer();

        var stream = client.query(ct, done);
        stream.pipe(process.stdout);
        stream.on('end', done);
        stream.on('error', done);

        function done(err) {
            con.done();
            if (err) {
                dfd.reject(err);
            } else {
                dfd.resolve();
            }
        }
        return dfd.promise();
    });

FWIW, I use streams when doing bulk inserts - COPY FROM STDIN is way faster than doing inserts. Even when compared to the helpers.insert approach.

@nonplus you forget to release the connection there ;)

Fixed it. ;-)

@vitaly-t, would you be open for a feature request/PR for adding support for the COPY FROM STDIN and COPY TO STDOUT commands? Basically node-pg-copy-streams functionality but integrated into pg-promise (hence it could be used in a pg-promise task or transaction). The commands would return a promise that resolves when the stream is done. I would envision the API like this:

copyTo(query: string, values: any, stream: NodeJS.WritableStream): XPromise<any>;
copyFrom(query: string, values: any, stream: NodeJS.ReadableStream): XPromise<any>;

FWIW, I'm not sure when you'd need bound parameters for "copyFrom", but it probably makes sense for the APIs to be consistent.

The following is an example of usage:

var db = pgp(connection);

// piping from db to a stream
db.copyTo("COPY (SELECT * FROM user WHERE last_name = $name) TO STDOUT",
    { name: "Smith" }, process.stdout
)
    .then(...);

// piping stream into db
var fileStream = fs.createReadStream('users.csv')
db.copyFrom("COPY users FROM STDIN WITH CSV HEADERS", null, fileStream)
    .then(...);

@nonplus why not just use event extend and add whatever API you need for your project? :wink:

Can I get my hands on the client in the extension method? If so, how?

Yeah, I don't think you can, unless you allocate the connection right in the method, not sure it will work with tasks/transactions though.

What you did there in your example, it will only fit into the existing architecture exactly as is, but it cannot fit in integrated, because we cannot both allocate a connection manually and propagate it through tasks/transactions at the same time.

Probably best is to leave it as an example.

No chance of always exposing the client as a ctx property the way database.connect() does?

No, that would mean exposing client at all times, which would constitute a disaster eventually, because it is not supposed to be used normally, only in exceptional cases, like yours. It would be a breaking change in the concept of how one should use the library ;)

I figured as much and can't really argue with that. Hence my original suggestion to have copyFrom and copyTo be part of IBaseProtocol.

Anyway, in my project I don't need my bulk uploads to be inside a transaction, so the workaround with db.connect() and node-pg-copy-streams works for me.

I think best is to leave it as is. It would be too much work, for something of infrequent use, and to further bloat the existing protocol. The generic one is good as is ;)

Now that we finally have branch 6.x available, I might revisit supporting node-pg-copy-streams out of the box.

@nonplus please check out this new article Data Imports, and let us know how the performance figures given there compare to the ones you get by using the COPY query.

@vitaly-t, I couldn't compare performance of 'pgp.helpers.insert' and 'COPY' yet but I see an issue with the first method as it is necesary to write the name of the column for each value to be inserted, which increases the size of the object. I.E:

const data = [
    { title: 'red apples', cost: 2.35, units: 1000 },
    { title: 'large oranges', cost: 4.50 },
];

Is there any way to avoid writing the name of the columns?

Thanks!

@goyoGit You are not writing names of the object anywhere, you are just declaring the object. I do not understand your concern.

Hi @vitaly-t and thanks for your response,
My concern is the size of the object to be inserted, in the previous sample if we want to insert a million rows, the object called 'data' will have a million attributes called 'title'. I was wondering if it si possible to avoid this, maybe using an array for each row or a 'csv' format string.

if we want to insert a million rows, the object called 'data' will have a million attributes called 'title'

The number of attributes is unrelated to the number of properties on the object. Something you are doing wrong. See Data Imports.

@vitaly-t ,

I think I didn't explain myself. Considering the example in Data Imports, let say that I want to insert 1 million rows, then the data object will be:

const data = [
{ title: 'red apples', cost: 2.35, units: 1000 },
{ title: 'large oranges', cost: 4.50 },
...
{ title: 'bla', cost: 25, units: 22 },
...
{ title: 'blabla', cost: 5, units: 22 }
...
];
// Aray of 1 million rows, each row is an object with 3 attributes

My question is if there is a chance to use a smaller object for the data, for example
const dataArray = [
['red apples', 2.35, 1000 ],
['large oranges', 4.50 ],
...
['bla', 25, 22 ],
...
['blabla', 5, 2999 ]
...
];
// Aray of 1 million rows, each row is an array

I think dataArray needs less memory than data, and my question is if 'pgp.helpers.insert' can handle data in this format or if it is planned.

Thanks a lot for you time.

@goyoGit If you read through Data Import, you will see that you would never end up with 1 million rows. The article shows you how to page through inserts.

@vitaly-t I went through your docs, but I find it hard to stream JSON data into the function for massive inserts. I created a post to describe the issue in the below link.
https://stackoverflow.com/questions/50206942/massive-insert-from-a-json-file-using-pg-promise

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ForbesLindesay picture ForbesLindesay  Â·  3Comments

vitaly-t picture vitaly-t  Â·  3Comments

dzaman picture dzaman  Â·  3Comments

hawkeye64 picture hawkeye64  Â·  4Comments

cortopy picture cortopy  Â·  5Comments