(this is similar to an issue I opened some time ago and then closed because I couldn't reproduce it)
If I call the clean function periodically, sooner or later I get the following error:
Error: Could not get lock for job: 2738. Cannot remove job.
This is not unlike #282 which also appeared to be triggered by cleaning, but that's ancient history at this point.
I finally found some time to write a smaller test program that triggers this error, but it probably isn't small enough. I'll trim it down, remove Ramda etc. Until the next version, this is the best I have. It takes some time to trigger the error. I made sure the process terminates when the error occurs.
Edit: updated to make it happen quicker
var Promise = require('bluebird'),
Queue = require('bull');
const
RHOST = 'localhost',
RPORT = 6379,
procTime = 40,
concurrency = 4,
jobCreationDelay = 10,
cleanDelay = 40;
var trace = (x) => {
console.log(x);
return x;
},
printJobMessage = (j) => {
trace('Processing job; here\'s a random number: ' + j.data.message);
return Promise.delay(procTime);
},
printErrorAndQuit = (e) => {
console.log(e);
process.exit(1);
};
var q = Queue('testq', RPORT, RHOST);
q.on('error', printErrorAndQuit);
q.process(concurrency, printJobMessage);
//clean every second
var cleanLoop = () => Promise
.all([
Promise.resolve('CLEANING STARTED').then(trace),
q.clean(0, 'completed'),
q.clean(0, 'failed')
])
.then(() => trace('CLEANING FINISHED'))
.delay(cleanDelay)
.then(() => process.nextTick(cleanLoop));
var addJob = (x) => {
q.add(x);
return x;
};
var jobLoop = () => Promise
.resolve({message: Math.random()})
.then(addJob)
.delay(jobCreationDelay)
.then(() => process.nextTick(jobLoop));
cleanLoop();
jobLoop();
Here's the extremely boring log. Error at the very bottom.
I have identified a likely cause of this scenario:
processStalledJobs is racing for the lock for jobs that just moved to completed:processStalledJobs picks all jobs in the active set, including the one above.completed set.Queue#clean runs and picks up the job that just finished (which was also picked up by Queue#processStalledJobsQueue#processStalledJobs gets to the job above and takes the lock to attempt to process itQueue#clean attemps to remove the completed job but is currently locked by Queue#processStalledJobsThis issue is much more aggravated due the fact that Queue#processStalledJobs is not releasing the lock when skipping a job that just moved from active to completed, this means that the job will remain locked until it expires by Queue#LOCK_RENEW_TIME
Got it, so maybe by tuning the timing I can test it more precisely
...no I can't, but shortening the delays to something unreasonable makes it happen quicker, so there's that at least. I updated the code. I don't think I can offer anything more than this, sorry.
The higher volume and the more queues running on parallel, the more likely this issue will happen. This boils down to race conditions for the job lock. Releasing the lock on Queue#processStalledJobs will make this much better but that won't fix it entirely.
@chuym so If I understand this correctly, the reason for the problem is that moveToCompleted and moveToFailed do not release the lock atomically?
I see how moveToCompleted not being atomic could also induce this condition: the job is moved to completed and immediately a Queue#clean picks the job for removal before moveToCompleted has the chance to release the lock first.
But this condition is also caused because Queue#processStalledJobs will pick all jobs in the active set, then go through each one, lock first, then check if it's in completed, and if it's not, then process. If a job originally taken from the active set here, transitioned to completed, then Queue#processStalledJobs will lock it, and currently, won't release the lock until it expires with the TTL setting, this is a bug that can be fixed relatively easily, but the condition will still be there.
ok, so what it is also needed is to write an atomic LUA script that gets all the jobs in the active set that are not already locked and lock them before returning. By doing that we would be safe, what do you think?
Seriously, you guys offer better support than a few corporations I could name :D
These fixes are not completely trivial so expect a few days before we make a new release...
That would be ideal @manast, but that was already attempted before, wasn't it? See: https://github.com/OptimalBits/bull/issues/258
@chuym In this case we can. We cannot use blocking operations in LUA scripts, but in this case we are just reading from a set at a given point in time, so we can write it as a LUA script.
First 5 minutes of using this library (500 requests a second, 150k total requests) already experienced this issue.
Like you guys said, its a 'lock' issue / race condition.
This is a major issue since none of the consumers can operate once this happens.
In the meantime, will stick to kafka in production :)
@abacaj which version are you using? version 1.0.0-rc2 should already have a fix where this should never happen. If it still happens, can you post code that reproduces the issue?
Looks like it is 1.0.0-rc2, from my package json. All I did was start two instances that would "process" the queue. I had another instance producing jobs "add" them to the queue. I mean I followed the guide and from my assumptions, it is "at most once" messaging so once one of my consumers starts processing jobs it should be fine. However somewhere down the line the queue got into a bad state.
What is actually considered locking for too long as mentioned here: "it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long"
What happens when a consumer stalls and keeps the lock i.e never releasing it or calling done, does this put it into a bad state? This could be the case for my situation then.
UPDATE: Hmm strange, why is the queue count 0, but there is actually a job there (it picks up the job)? Seems inconsistent.
The only way to resolve this - restart my redis server, nothing that I do inside my client code works, i.e queue.empty() / clean.
@abacaj Could you share a bit more about your environment?
Queue#clean and how are you calling it (inside a setInterval)?Queue#count is 0?Thanks for following up @abacaj ! It doesn't have to be the real thing, just a test case that simulates the real scenario you have will be fine, it will greatly help to debug and fix this.
Queue#empty takes all jobs that are delayed, paused or waiting and removes them. It doesn't touch active, completed or failed jobs. Ideally you should stop receiving jobs before calling it, otherwise you may end up with new jobs after calling Queue#empty.
Queue#clean clears all jobs in a given state: queue.clean(60 * 1000, 'completed') would clean all completed jobs that completed 1 minute ago or before that.
Queue#count returns the number of jobs awaiting to be processed, so only jobs in paused, wait, delayed are counted.
I will take a pass on the documentation, I think the current docs are not 100% precise about what they do
@MySidesTheyAreGone @abacaj Could you please confirm that this issue is still present in 1.0.0-rc3 ?
By the way, just to clarify: a job gets locked as long as a worker is working on it and it has not kept the event loop busy for more than 5 seconds (not configurable at the time). Meaning that if there is some kind of hazard condition that produces the error above when calling clean, normally, calling clean after 5 seconds should make the error go away. So in this code:
//clean every second
var cleanLoop = () => Promise
.all([
Promise.resolve('CLEANING STARTED').then(trace),
q.clean(0, 'completed'),
q.clean(0, 'failed')
])
.then(() => trace('CLEANING FINISHED'))
.delay(cleanDelay)
.then(() => process.nextTick(cleanLoop));
Change the last line to:
.then(() => process.nextTick(cleanLoop), () => process.nextTick(cleanLoop));
(you can also log the error so that you do not swallow potential important errors, you get the idea.)
@chuym I am considering also the following case: Lets say that a job stalls the event loop more than 5 seconds, this will lead to another worker starting the same job as well. After a while the first worker continues working, and eventually completes the work. The second worker would also continue working effectively leading to two (or in practice more) completed jobs... A way to mitigate this is to check in the completedHandler if the job has already being completed and then just ignoring the result.
Happily, I can't confirm anything since I'm on a beach in the middle of the ocean :D
Will test, and update here. :)
First Update:
timeout: 2 * 60 * 1000.Below is the log shot with timestamps, and only publish once.
My package.json:
@abacaj if you are kind enough to test the latest version rc4, the duplication issue should be gone and hopefully even the initial issue you reported as well.
@manast looking good my friend. No duplication - haven't noticed the locking issue as of yet.
Will keep you posted if I see anything else.
side note: Still a bit confused why the job keeps being re-attempted if I gave the job attempts: 1
good. We are very close to release 1.0.
I will check the attempt issue you are mentioning.
can you post a minimal example code that reproduces the attempts issue? you can check here for inspiration: https://github.com/OptimalBits/bull/blob/master/test/test_queue.js#L1196
Must of been me, seems like it is operating correctly - i.e once the _job fails it is not re-attempted._
I also see more consistent behavior when running multiple instances of the consumer (processor) instance compared to previous RC versions.
Most helpful comment
Happily, I can't confirm anything since I'm on a beach in the middle of the ocean :D