I extended rq for our project with the following functionality:
Functions can be marked as reentrant (via a decorator). If a reentrant task is killed by a Heroku shutdown, it is not moved to the failed queue, but immediately re-enqueued at the begging of the queue that it came from.
I would like to get this functionality merged here, but wanted to ask whether you would want to merge such a feature before investing time into creating a PR.
Yes, I think job retries is a nice feature to have. will gladly merge in a PR for this if we can come up with an elegant API and implementation.
I propose this API:
reentrant
in rq.decorators
. If the job function is decorated with this decorator, it is considered reentrant.reentrant
parameter to enqueue()
. That would be more in line with the rest of the API, but I don't like this solution as much. From experience reentrancy is a property of the function, not of it's enqueued instance. Maybe a good solution would be to just implement both?sigterm
give the worker 10 seconds (configurable) to shut down. After 10 seconds, raise a WorkerShutdown
exception (that does not inherit from Exception
, but from BaseException
).Chronial, we'll be also interested on a similar feature. We had something similar implemented in an internal app, but we're facing some minor issues from 0.7.x, I guess due to changes in the job and queue management.
From your comments, I'd rather prefer the reentrant
parameter for enqueue()
since it feels more like all other features in this library.
About the reenqueueing policy, I'd also suggest to make it a choice. In some cases, it would be interesting to reenqueue the job at the end of the origin queue not to interfere in other jobs workflow.
Is there any pull request we can look at to see your work? We can try to help there as well 😄
I'm still waiting on @selwin to give some basic input on the design before I proceed.
About the reenqueueing policy, I'd also suggest to make it a choice. In some cases, it would be interesting to reenqueue the job at the end of the origin queue not to interfere in other jobs workflow.
I don't understand this scenario – how could there be interference in other workflows? The job was at the beginning of the queue. Putting it back there can not cause it to interfere with anything it wasn't already interfering with. Putting it at the end of the queue is what causes hard to predict behavior.
Chronial, we have some long jobs that might block resources for a while.
If something fails in one of the jobs, we prefer not to affect the order for already queue jobs and enqueue the reentrant jobs at the end of the queue.
Ah, but this is not meant to ever affect failed jobs. This is only for the worker beeing killed by external reasons, completely independent of the currently running job.
@Chronial sorry for the late response. We can add a few options to Queue
's init method:
queue = Queue(max_retries=3, requeue=Requeue.AT_FRONT)
# The retry behavior can also be overridden by specifying additional arguments to `enqueue`
queue.enqueue(foo, max_retries=2)
I think the default behavior for requeueing should be to put it back at the tail end of the queue. This default behavior is up for debate, but shouldn't be much of an issue as long as we provide a way to override it.
What do you think?
Hey @selwin, thanks for the response.
Just to clarify: I would apply this feature only to controlled shutdowns of the worker. Not for unexpected deaths or failures of any sort. In this context I also don't think a max_retries
is needed, or at least has a confusing name. These are not retries, but continues.
I strongly believe that jobs that got interrupted should be reenqued at the beginning of the queue, because:
B
after job A
and only have one worker, A
should never run after B
.acks_late
behaviour: https://gist.github.com/mlavin/6671079This feature is not meant as a way to stop jobs in the middle of their processing. If such a feature is required I think it should be implemented in a different way (for example because you can't actually control which job you will stop with this mechanism – you can only stop the current job, which might no be the same since the last time you looked). This is only about handling the need of workers to shut down in the middle of a longer tasks due to admin reasons.
Due to all of this I think adding options about retries or where to put interrupted jobs will only be confusing and lead people to make incorrect assumptions.
But I also care about getting support for our use-case merged upstream, so as long as I am able to handle that, I am happy to go along with whatever you as the maintainer decide on this issue :).
I assume I can interpret your lack of comment on the other aspects of my design as approval?
Since you seemed sufficiently interested to warrant the effort I went ahead and ported the implementation in our project into the python-rq
codebase and created #837. As mentioned there this is more of a draft at the moment, but feel free to give feedback on the aspects that we are not already discussing here.
@Chronial if we were to implement this feature, I think it makes sense for us to implement it in such a way that RQ can automatically requeue jobs that don't finish successfully - not just jobs that are forcefully killed by external factors like Heroku.
Hard shutdowns of servers may also cause jobs to not complete successfully. In this case, RQ should automatically requeue jobs instead of moving them to the failed queue so they can be retried.
Regarding hard shutdowns, It might make sense to handle them the same way. It is just a little bit dangerous, because if you have a c module that crashes the process, it will look exactly like a hard shutdown, but might always happen when running the task.
I think in most environments hard servers deaths are not a frequent thing. And if it's not frequent, it's probably a better idea to manually handle the fallout than to do automatic handling.
But detecting that is very different from handling controlled worker shutdowns, so I would handle that as separate feature.
I think it makes sense for us to implement it in such a way that RQ can automatically requeue jobs that don't finish successfully - not just jobs that are forcefully killed by external factors like Heroku.
I think that's a completely different feature. If you compare this to celery, this is acks_late
vs retry
(comparison in celery docs), which also have nothing in common.
The acks_late
equivalent is just a simple switch for a job, that says reentrant
.
For a retry feature, you need quite a bit more – see all the arguments celery has for this. In celery it's also not even a property of the job, but a function you can call inside a job – which probably also makes more sense, given how complex the retry logic might need to be.
I personally think that python-rq, which is supposed to be lightweight, should probably not implement a retry logic at all. It's easy enough to this yourself in the client application, by just putting your code in a try-catch and enqueuing a new instance on error.
We do it the way @Chronial describes and it worked properly until 0.7.x.
From that version, we found a failed job is moved to the failed_queue
and its status is set to failed'
. If we reenqueue the same job, we see it is not backed safely to redis: the job is then in its origin
queue AND the failed_queue
as well.
So, if you use rq-dashboard, the job is present in both queues and appears to be repeated. That's why we digged a little on this, we became crazy looking for a reason to explain why our operations team complained about failed jobs being replicated.
@guillemcabrera Are you doing it inside the job, or after the job has failed?
If the job has already failed, you are probably looking for requeue_job.
After looking into this, I would say it's currently not quite clear how to handle a retry from inside the job. You could just enqueue a new copy of your job and swallow the exception in your handler, but then your original job will end up in the FinishedJobRegistry and dependent jobs will be enqueued.
If you requeue the job and reraise the exception, your job will be duplicated into the failed queue. I guess the best solution at the moment is:
def retry_handler(job, exc_type, exception, traceback):
if isinstance(exception, RetryException):
# directly requeue job or do something smarter based on the details in
# RetryException
requeue_job(job.id)
return False
worker = Worker()
worker.push_exc_handler(retry_handler)
worker.work()
def myjob():
try:
...
except MyException:
raise RetryException(retry_details)
Any update on this?
@selwin You need to make a final decision on this. I made my case as well as I can :). I now also gave code that shows how it is easily possible to handle retries with the current state of python-rq.
@Chronial , @guillemcabrera, @selwin It looks like what Guilem has described is still the issue. The exception handler does not seem to be working as described in that even if we return False
in our exception handler, it seems to be requeuing the job... but adding a copy of the failed job in the failed queue.
I cannot share the exact code but what we are doing is:
max_retries
which is added in job.meta
and saved.failure_count
, it returns False when the job is requeued using queue.enqueue_job
class UnataRQWorker(Worker):
"""Unata RQ Worker.
Notes
-----
- Pushes custom exception handlers to the worker.
"""
def __init__(self, *args, **kwargs):
super(UnataRQWorker, self).__init__(*args, **kwargs)
self.push_exc_handler(retry_handler)
The exceptions are handled by the following order:
Since #1 is returning False, and from the logs.. #2 is never called. It seems like there is a bigger issue somewhere that is not being noticed.
I'd love to hear some feedback on this.
Just an update: It seems https://github.com/rq/rq/blob/master/rq/worker.py#L625 is the cause of this as when we requeue, its not in the expected statuses.
@guillemcabrera Are you doing it inside the job, or after the job has failed?
If the job has already failed, you are probably looking for requeue_job.After looking into this, I would say it's currently not quite clear how to handle a retry from inside the job. You could just enqueue a new copy of your job and swallow the exception in your handler, but then your original job will end up in the FinishedJobRegistry and dependent jobs will be enqueued.
If you requeue the job and reraise the exception, your job will be duplicated into the failed queue. I guess the best solution at the moment is:def retry_handler(job, exc_type, exception, traceback): if isinstance(exception, RetryException): # directly requeue job or do something smarter based on the details in # RetryException requeue_job(job.id) return False worker = Worker() worker.push_exc_handler(retry_handler) worker.work() def myjob(): try: ... except MyException: raise RetryException(retry_details)
I have changed to have
RetryNowException
RetrySlowException
RetrySomethingException
withsleep_time
property and all inheritance fromRetryException
:
def retry_handler(job, exc_type, exception, traceback):
if isinstance(exception, RetryException):
sleep(RetryException.sleep_time)
job.requeue()
return False