Node-oracledb: QueryStream - NJS-030: Connection cannot be released because Lob operations are in progress (CLOB)

Created on 6 Jan 2017  路  7Comments  路  Source: oracle/node-oracledb

Hi,
Thanks for the great library! I was wondering if you could help me as I'm getting the above error when trying to process clobs using queryStream.

const await = require('asyncawait/await');
const async = require('asyncawait/async');
var oracledb = require('oracledb');

const oracleParams = {
    connectString: '<connection-url>',
    user: '<blah>',
    password:<blah>
};

var count = 0;
oracledb.prefetchRows = 4000;
oracledb.maxRows = 10000;

module.exports = (callback) => {
    return oracledb.getConnection(
        oracleParams,
        function(err, connection)
        {
            if (err) {
                console.error(err.message);
                return;
            }

            var stream = connection.queryStream(
                "select qs.seq_id, qs.name, qs.seq_data from <sometable> qs where datasource_id=45"
            );

            stream.on('error', function (error) {
                console.log("stream 'error' event");
                console.error(error);
                return;
            });

            stream.on('metadata', function (metadata) {
                // console.log("stream 'metadata' event");
            });

            stream.on('data', function (data) {
                ++count;
                // stream.pause();
                const process = async(function (){
                    const seqClob = await (processClob(data[2], stream, (err,sequence) => {
                         //doSomething with Result
                        // stream.resume();
                    }));
                });
                process().catch(function(err) {
                    console.log("Error in the process Rows:\n", err);
                    callback(err);
                });
            });

            stream.on('end', function () {
                // console.log("stream 'end' event");
                console.log("ending count", count);
                console.log(end-start);
                connection.close(
                    function(err) {
                        if (err) {
                            console.log("Error in stream.end listener");
                            console.error(err.message);
                        }
                    });
            });
        });
}

const processClob =  (row, stream, callback) => {
    var sequenceData = '';
    row
        .on('data',
            function (chunk) {
                sequenceData += chunk;
            })
        .on('close',
            function () {
                callback(null, sequenceData);
            })
        .on('error',
            function(err) {
                console.log("Error in extracting sequence");
                console.log(err);
                callback(err, null);
            });
};

The behavior that I'm seeing is that stream.end event is being emitted before all the clobs have been received. I've used the async/await pattern above for querying with resultSets and it worked as expected. Futhermore, I've tried pausing the stream(See commented out lines) to wait for the clob to finishing processing. When I do this however, I get Uncaught TypeError: Data must be a string or a buffer in the processClob function. Any insights on where I'm going wrong would be greatly appreciated!

question

All 7 comments

stream.end has no relation to the clob reading that you are doing on the side. it can't see it.
so you need to close it after you finish reading all clobs.
you can't pause the stream as it is an object stream.

you could ask for strings instead of clobs when doing the fetching to resolve it quickly.

@pmmarq Yeah, I agree with @sagiegurari. The easiest way to fix this would be to use the new ability to fetch CLOBs as a string. Are your CLOBs relatively small (as in a few MB or less)? Unless you're working with really large CLOBs (relative to the amount of memory your Node.js instance has)...

@pmmarq Here's an example... But make sure you're on the latest version of the driver (1.12.2):

Given the following:

create table jsao_streaming_clobs (
  id           number,
  name         varchar2(20),
  clob_content clob
);

insert into jsao_streaming_clobs (id, name, clob_content) values (1, 'Clob 1', 'Clob content for clob 1');
insert into jsao_streaming_clobs (id, name, clob_content) values (2, 'Clob 2', 'Clob content for clob 2');
insert into jsao_streaming_clobs (id, name, clob_content) values (3, 'Clob 3', 'Clob content for clob 3');

You can now do this:

var oracledb = require('oracledb');
var config = require('./dbconfig');

oracledb.getConnection(config, function(err, connection) {
  var queryStream;

  if (err) {
    throw err;
  }

  queryStream = connection.queryStream(
    'select * from jsao_streaming_clobs',
    [], // no binds
    {
      outFormat: oracledb.OBJECT,
      fetchInfo: {
        'CLOB_CONTENT': {
          type: oracledb.STRING
        }
      }
    }
  );

  queryStream.on('data', function(row) {
    console.log('Got row:', row);
  });

  queryStream.on('end', function() {
    console.log('Finished processing stream.');
  });

  queryStream.on('error', function(err) {
    throw err;
  });
});

Note that the CLOB_CONTENT property of the row is now just a string, no stream needed. :)

Grabbing the CLOB as a STRING works beautifully using your code as example. Thanks very much!

@sagiegurari

you can't pause the stream as it is an object stream.

Is that documented somewhere? It seems to be working in this test:

var oracledb = require('oracledb');
var config = require('./dbconfig');

oracledb.getConnection(config, function(err, connection) {
  var queryStream;

  if (err) {
    throw err;
  }

  queryStream = connection.queryStream(
    'select * from jsao_streaming_clobs',
    [], // no binds
    {
      outFormat: oracledb.OBJECT,
      fetchInfo: {
        'CLOB_CONTENT': {
          type: oracledb.STRING
        }
      }
    }
  );

  queryStream.on('data', function(row) {
    console.log('Got row:', row);

    queryStream.pause();

    setTimeout(function() {
      queryStream.resume();
    }, 3000);
  });

  queryStream.on('end', function() {
    console.log('Finished processing stream.');
  });

  queryStream.on('error', function(err) {
    throw err;
  });
});

Hmmm might be wrong here. I remember some limitation with object mode, maybe its not the pause.

@sagiegurari Okay, cool...

@pmmarq In that case, and really just for completeness, here's an example that uses streams in streams with pause. This would only be needed for really large LOBs (and the content wouldn't be buffered, but likely streamed somewhere else or written to a file.)

var oracledb = require('oracledb');
var config = require('./dbconfig');

oracledb.getConnection(config, function(err, connection) {
  var queryStream;
  var streamingComplete = false;

  if (err) {
    throw err;
  }

  queryStream = connection.queryStream(
    'select * from jsao_streaming_clobs',
    [], // no binds
    {
      outFormat: oracledb.OBJECT
    }
  );

  queryStream.on('data', function(row) {
    var clobContent = '';

    queryStream.pause();

    row.CLOB_CONTENT.on('data', function(chunk) {
      clobContent += chunk;
    });

    row.CLOB_CONTENT.on('end', function() {
      console.log(`Got the clob content for row ${row.ID}:`, clobContent);

      queryStream.resume();
    });

    row.CLOB_CONTENT.on('close', function() {
      if (streamingComplete) {
        connection.close(function() {
          if (err) {
            throw err;
          }

          console.log('Closed connection after processing queryStream and all the CLOBs.');
        });
      }
    });

    row.CLOB_CONTENT.on('error', function(err) {
      throw err;
    });
  });

  queryStream.on('end', function() {
    streamingComplete = true;

    console.log('Finished processing queryStream.');
  });

  queryStream.on('error', function(err) {
    throw err;
  });
});
Was this page helpful?
0 / 5 - 0 ratings