Mysql: Using streams to Insert Large File

Created on 23 Jan 2018  路  13Comments  路  Source: mysqljs/mysql

I am trying to insert a huge csv to the database but keep running into memory issues. I am using node stream to read the file and want to insert each row individually. Is there a way to write to db and wait to get the connection and reuse it. I know mysqlnode has pool option but I am afraid I will just be adding new connection to the pool and run into memory limit.

stream.on('line', line => {
            if (count > 0) {
                this.election.insertData(line);
            }
            count++;
            //stream.output.write(line + '\n');
        });




insertData(args) {
    const sql = `Insert into ${this.table} (
        county,
        voter_reg_num,
        race,
        sex,
        age,
        party,
        precinct,
        cong_dist,
        house_dist,
        sen_dist,
        voting_method,
        requested,
        ballot_req_dt,
        election_lbl,
        site_name
    ) VALUES (${args})`;
    super
        .query(sql)
        .then()
        .catch(e => {
            console.log('My Error', e);
        });
}
question

Most helpful comment

IMO, you should try a different angle to solve this problem.

Using pool or stream does not solve the underlying problem with back-pressure.
Your database host has to write millions of lines to disk. Open the table, updating indexes etc. for EACH single line. Meaning SLOW disk I/O. Even with a fast disk it is SLOW, compared to operations in memory of reading contents from a single file pointer into some JavaScript object.

My suggestion is either "transactions and multiple-line-statements" or to use a slightly different MySQL query.
For the first have a look at a twitter-reader I wrote a few years ago.
https://github.com/kai-koch/wut
It is in ES5, possible doesn't work any more and should have been update some time ago, but you get the idea of the pattern I suggest. (or you could work out something by yourself with the "new" API functionality connection.beginTransaction that got invented into mysqljs since then. :-D)

The second way would be faster to implement, doesn't need transaction or multiple line statements and you would not need to change your connection-setup.

Instead of building queries with only values for a single row, you build queries with values for multiple rows.

Values for a single row:

INSERT INTO tbl_name (a,b,c) VALUES(1,2,3);

Values for multiple rows:

INSERT INTO tbl_name (a,b,c) VALUES (1,2,3),(4,5,6),(7,8,9);

You add a layer that accumulates values for maybe a hundred lines in a two-dimensional array. (Sanity checks of the input lines also go there.)
Once you have filled the accumulator up, you call a function, that makes a multiple-row SQL-String out of the array, sends it to your connection and resets the accumulator.

You should use mysql.escape on each value in each row when you store a line into the accumulator.

Use Array.join to construct (huge) strings out of an array. (This was faster than concatenating Strings in other ways, last time I checked that.)

Once the Input-File is read, you make a final call to the function that constructs and sends the SQL-string to empty the accumulator with the remaining rows. (Make sure you handle the edge case where the accumulator is empty when the input-file is read completely.)

Make also sure your script waits for the database to have written all lines before exiting, we are in an asynchronous environment after all and do not want to loose some lines due to some race conditions.

This way you can cut down the disk I/O of the database by factor 100 and reduce the memory stress from handling callbacks and back pressure from the DB.
You can tweak the "100 lines" per database call as you see fit for your configuration. Do not set it to high 100 - 5000 lines per query should be save, depending an the total length of your input lines.

All 13 comments

I'd probably implement writable stream where write side is mysql connection. One thing to keep in mind is to prevent commands queue to grow up ( you probably read lines much faster than insert data to db ) you need to report pipe pressure back to writer. AFAIK there is no official api to monitor queue size, but you can read from connection._protocol._queue.length - get queue length, connection._protocol.on('enqueue') - queue +1, connection._protocol.on('drain') - queue empty

Sorry I am a bit confused by your answer. I used a pool and tried to pause the stream when there was no connection and resume when the connection was available but it slowed down the write significantly and I ended up with this. Error: Handshake inactivity timeout

this.mysql.on('enqueue', function() {
                stream.pause();
            });
            this.mysql.on('acquire', function() {
                stream.resume();
            });

try this:

this.mysql.on('enqueue', function() {
                stream.pause();
            });
            this.mysql.on('release', function() {
                stream.resume();
            });

Thanks. This somewhat works but it is really slow. I also got the following error. Is there a better way to do this? Maybe something like connection.query().pipe(transformFunc).query(insert);

(node:21645) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 enqueue listeners added. Use emitter.setMaxListeners() to increase limit
(node:21645) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 release listeners added. Use emitter.setMaxListeners() to increase limit

you need to organise your code in a way you wire up events only once. Looks like you are doing on('enqueue') / on.('release') each time you write record?

Another good solution to insert large CSV is to use LOAD DATA LOCAL INFILE.
Two benefits: 1) don't need to wait each record to complete before next is sent 2) no need for server to parse sql of each query

see example here - https://github.com/mysqljs/mysql/blob/eeca1326c5f4d67725a486ddf1ef12c3f12d2187/test/integration/connection/test-load-data-infile.js

Yeah it seems like it. I am checking for the connection inside the stream. You mean something like adding an iterator inside the following function. I know about load data infile and that's how my program was designed previously but it is too error prone and hard to debug. Every time a row has wrong/missing data the whole insert fails.

this.mysql.on('release', function() {
   stream.next();  // write one line
});
stream.on('line', line => {
            this.mysql.on('enqueue', function() {
                stream.pause();
            });
            this.mysql.on('release', function() {
                stream.resume();
            });
            if (count > 0) {
                this.election.insertData(line);
            }
            count++;
            //stream.output.write(line + '\n');
        });

yes, just move pause/resume outside of the loop so it's not added on each line:


// "this.mysql" is set in in line reader, right? 
// need to store reference to pool somehow else   
// depending on where it comes from
            this.mysql.on('enqueue', function() {
                stream.pause();
            });
            this.mysql.on('release', function() {
                stream.resume();
            });

stream.on('line', line => {
            if (count > 0) {
                this.election.insertData(line);
            }
            count++;
            //stream.output.write(line + '\n');
        });

This eventually runs out of memory too. I wish there was a way to use mysql.on('maxEnqueue') so we didn't sacrifice so much speed. I am wondering why there is no pipe method for write operations in this package. It would be great if I could provide it a stream and it would write to db.

IMO, you should try a different angle to solve this problem.

Using pool or stream does not solve the underlying problem with back-pressure.
Your database host has to write millions of lines to disk. Open the table, updating indexes etc. for EACH single line. Meaning SLOW disk I/O. Even with a fast disk it is SLOW, compared to operations in memory of reading contents from a single file pointer into some JavaScript object.

My suggestion is either "transactions and multiple-line-statements" or to use a slightly different MySQL query.
For the first have a look at a twitter-reader I wrote a few years ago.
https://github.com/kai-koch/wut
It is in ES5, possible doesn't work any more and should have been update some time ago, but you get the idea of the pattern I suggest. (or you could work out something by yourself with the "new" API functionality connection.beginTransaction that got invented into mysqljs since then. :-D)

The second way would be faster to implement, doesn't need transaction or multiple line statements and you would not need to change your connection-setup.

Instead of building queries with only values for a single row, you build queries with values for multiple rows.

Values for a single row:

INSERT INTO tbl_name (a,b,c) VALUES(1,2,3);

Values for multiple rows:

INSERT INTO tbl_name (a,b,c) VALUES (1,2,3),(4,5,6),(7,8,9);

You add a layer that accumulates values for maybe a hundred lines in a two-dimensional array. (Sanity checks of the input lines also go there.)
Once you have filled the accumulator up, you call a function, that makes a multiple-row SQL-String out of the array, sends it to your connection and resets the accumulator.

You should use mysql.escape on each value in each row when you store a line into the accumulator.

Use Array.join to construct (huge) strings out of an array. (This was faster than concatenating Strings in other ways, last time I checked that.)

Once the Input-File is read, you make a final call to the function that constructs and sends the SQL-string to empty the accumulator with the remaining rows. (Make sure you handle the edge case where the accumulator is empty when the input-file is read completely.)

Make also sure your script waits for the database to have written all lines before exiting, we are in an asynchronous environment after all and do not want to loose some lines due to some race conditions.

This way you can cut down the disk I/O of the database by factor 100 and reduce the memory stress from handling callbacks and back pressure from the DB.
You can tweak the "100 lines" per database call as you see fit for your configuration. Do not set it to high 100 - 5000 lines per query should be save, depending an the total length of your input lines.

In addition to all @kai-koch suggestions: you can also mini-batch " load data infile" into groups of 100-500 records, I'd expect that to have best performance (and if minibatch failed you can restart it)

I'm going to close this as it looks answered as OP never replied back.

Many thanks for the suggestions @sidorares @kai-koch. In the end I decided to use load data infile because of it's speed. I realized that with better error handling load data infile can be pretty reliable. Thank you @kai-koch for the detailed explanation. It definitely helped me understand the problem better. Tested your solution with 1gb csv and it worked.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

acefxlabs picture acefxlabs  路  4Comments

flowl picture flowl  路  4Comments

ajpyoung picture ajpyoung  路  4Comments

johnrc picture johnrc  路  3Comments

wahengchang picture wahengchang  路  3Comments