I fetch n millions records from SqlServer (streaming) and process over every record. I want to call nested functions into _request.on('row', row => { })_ and pause streaming until end of latest nested functions.
const sql = require('mssql')
sql.connect(config, err => {
// ... error checks
const request = new sql.Request()
request.stream = true // You can set streaming differently for each request
request.query('select * from verylargetable') // or request.execute(procedure)
request.on('recordset', columns => {
// Emitted once for each recordset in a query
})
request.on('row', row => {
// Emitted for each row in a recordset
//call process_1(row)
})
request.on('error', err => {
// May be emitted multiple times
})
request.on('done', result => {
// Always emitted as the last one
})
})
sql.on('error', err => {
// ... error handler
})
function process_1(data){
//process done after n minutes
//if end process, then call process_2(newData)
}
function process_2(newData){
//process done after n minutes
//if end process, then resume streaming request
}
Notice: I road https://github.com/tediousjs/tedious/issues/181 and other issue about _pause/pesume stream request_ but I didn't get any idea!
thanks
Use-case: backpressure from async-iterable usage:
type Deferred<T> = {
promise: Promise<T>,
resolve: (result: T) => void,
reject: (reason: any) => void,
};
function createDeferred<T>(): Deferred<T> {
const result = {} as Deferred<T>;
result.promise = new Promise<T>((resolve, reject) => {
result.resolve = resolve;
result.reject = reject;
});
return result;
}
function streamQuery<T>(query: TemplateStringsArray, ...interpolations: any[]): AsyncIterable<T> {
return { [Symbol.asyncIterator]: asyncIterator };
function asyncIterator(): AsyncIterator<any> {
const request = pool.request();
request.stream = true;
// Reproduce pool.query but for request.
// This is equivalent to request._template('query', query, interpolations);
const command = [query[0]];
for (let i = 0; i !== interpolations.length; i++) {
request.input(`p${i}`, interpolations[i]);
command.push(`@p${i}`, query[i + 1]);
}
request.query(command.join('')); // ignoring query result.
request.on('row', row);
request.on('error', error);
request.on('done', done);
// Note this will buffer without bound, mssql doesn't let us apply backpressure (suspend streaming).
// Results received, but not requested, in the order received
const buffer: Array<Promise<IteratorResult<T>>> = [];
let bufferDone = false;
// Results requested, but not received, in the order requested
const pending: Array<Deferred<IteratorResult<T>>> = [];
// At least one of the above should always be empty.
return { next };
function next(): Promise<IteratorResult<T>> {
// if (buffer.length === LOW) request.resume();
if (buffer.length) {
return buffer.shift()!;
}
if (bufferDone) {
return Promise.resolve({ value: undefined as any as T, done: true });
}
const deferred = createDeferred<IteratorResult<T>>();
pending.push(deferred);
return deferred.promise;
}
function row(row: T) {
const result = { value: row, done: false };
if (!pending.length) {
buffer.push(Promise.resolve(result));
// if (buffer.length === FULL) request.pause();
} else {
pending.shift()!.resolve(result);
}
}
function error(reason: any) {
if (!pending.length) {
buffer.push(Promise.reject(reason));
// if (buffer.length === FULL) request.pause();
} else {
pending.shift()!.reject(reason);
}
}
function done() {
bufferDone = true;
while (pending.length) {
pending.shift()!.resolve({ value: undefined as any as T, done: true });
}
}
}
}
(note that this code is not terribly well tested, sorry!)
@simonbuchan, do you have an example which does not use TypeScript?
If you can't be bothered to strip the types yourself, you can paste it in the typescript playground
Hello!
I have the same problem and solve this with tedious _pause/pesume stream request_.
I've create small repo with example: tediousStreaming
There are 2 examples:
pause/resume of streams is now added
OH MY GOD...After about 2 years solved this issue.
Tha's one small step for a man, one giant leap for mankind
https://www.linkedin.com/feed/update/urn:li:activity:6531761519288418304/
Thanks!!!!!!
Most helpful comment
Yep - This was added in #775 and released in v5.0.0 - however, due to a bug in the tedious driver, it didn't actually work properly and is now in v6.0.0 without any bugs (see #838 and #832) 馃帀