Node-mssql: NodeJs - Pause/Resume stream request

Created on 10 May 2017  路  8Comments  路  Source: tediousjs/node-mssql

I fetch n millions records from SqlServer (streaming) and process over every record. I want to call nested functions into _request.on('row', row => { })_ and pause streaming until end of latest nested functions.

NodeJs

SqlServer

I want this:

const sql = require('mssql')

sql.connect(config, err => {
    // ... error checks 

    const request = new sql.Request()
    request.stream = true // You can set streaming differently for each request 
    request.query('select * from verylargetable') // or request.execute(procedure) 

    request.on('recordset', columns => {
        // Emitted once for each recordset in a query 
    })

    request.on('row', row => {
        // Emitted for each row in a recordset 
        //call process_1(row)
    })

    request.on('error', err => {
        // May be emitted multiple times 
    })

    request.on('done', result => {
        // Always emitted as the last one 
    })
})

sql.on('error', err => {
    // ... error handler 
})

function process_1(data){
    //process done after n minutes
   //if end process, then call process_2(newData)
}
function process_2(newData){
    //process done after n minutes
   //if end process, then resume streaming request
}

Notice: I road https://github.com/tediousjs/tedious/issues/181 and other issue about _pause/pesume stream request_ but I didn't get any idea!
thanks

Most helpful comment

Yep - This was added in #775 and released in v5.0.0 - however, due to a bug in the tedious driver, it didn't actually work properly and is now in v6.0.0 without any bugs (see #838 and #832) 馃帀

All 8 comments

Use-case: backpressure from async-iterable usage:

type Deferred<T> = {
  promise: Promise<T>,
  resolve: (result: T) => void,
  reject: (reason: any) => void,
};
function createDeferred<T>(): Deferred<T> {
  const result = {} as Deferred<T>;
  result.promise = new Promise<T>((resolve, reject) => {
    result.resolve = resolve;
    result.reject = reject;
  });
  return result;
}

function streamQuery<T>(query: TemplateStringsArray, ...interpolations: any[]): AsyncIterable<T> {
  return { [Symbol.asyncIterator]: asyncIterator };

  function asyncIterator(): AsyncIterator<any> {
    const request = pool.request();
    request.stream = true;

    // Reproduce pool.query but for request.
    // This is equivalent to request._template('query', query, interpolations);
    const command = [query[0]];
    for (let i = 0; i !== interpolations.length; i++) {
      request.input(`p${i}`, interpolations[i]);
      command.push(`@p${i}`, query[i + 1]);
    }
    request.query(command.join('')); // ignoring query result.
    request.on('row', row);
    request.on('error', error);
    request.on('done', done);

    // Note this will buffer without bound, mssql doesn't let us apply backpressure (suspend streaming).

    // Results received, but not requested, in the order received
    const buffer: Array<Promise<IteratorResult<T>>> = [];
    let bufferDone = false;
    // Results requested, but not received, in the order requested
    const pending: Array<Deferred<IteratorResult<T>>> = [];
    // At least one of the above should always be empty.

    return { next };

    function next(): Promise<IteratorResult<T>> {
      // if (buffer.length === LOW) request.resume();
      if (buffer.length) {
        return buffer.shift()!;
      }
      if (bufferDone) {
        return Promise.resolve({ value: undefined as any as T, done: true });
      }
      const deferred = createDeferred<IteratorResult<T>>();
      pending.push(deferred);
      return deferred.promise;
    }

    function row(row: T) {
      const result = { value: row, done: false };
      if (!pending.length) {
        buffer.push(Promise.resolve(result));
        // if (buffer.length === FULL) request.pause();
      } else {
        pending.shift()!.resolve(result);
      }
    }

    function error(reason: any) {
      if (!pending.length) {
        buffer.push(Promise.reject(reason));
        // if (buffer.length === FULL) request.pause();
      } else {
        pending.shift()!.reject(reason);
      }
    }

    function done() {
      bufferDone = true;
      while (pending.length) {
        pending.shift()!.resolve({ value: undefined as any as T, done: true });
      }
    }
  }
}

(note that this code is not terribly well tested, sorry!)

@simonbuchan, do you have an example which does not use TypeScript?

If you can't be bothered to strip the types yourself, you can paste it in the typescript playground

Hello!
I have the same problem and solve this with tedious _pause/pesume stream request_.
I've create small repo with example: tediousStreaming
There are 2 examples:

  1. raw tedious
  2. node-mssql
    When use node-mssql make sure this commit is in node_modules
    I think data flow (process_1, process_2) is easy to emplement using librabry such as https://highlandjs.org/

pause/resume of streams is now added

OH MY GOD...After about 2 years solved this issue.

Tha's one small step for a man, one giant leap for mankind
https://www.linkedin.com/feed/update/urn:li:activity:6531761519288418304/

Yep - This was added in #775 and released in v5.0.0 - however, due to a bug in the tedious driver, it didn't actually work properly and is now in v6.0.0 without any bugs (see #838 and #832) 馃帀

Thanks!!!!!!

Was this page helpful?
0 / 5 - 0 ratings