(This isn't an issue but a question, but I didn't see a mention of a mailing list, so posting here)
I really want to switch from Celery to rq in Re:dash, but we need the ability to cancel already running job -- actually kill the work horse process and not only remove from the task from registries.
I was thinking of implementing this by having my own worker executer with another thread running alongside waiting for cancel commands (by monitoring some Redis key). If it gets a cancel command it will call the stop command of the worker.
Has anyone implemented something like this already? Will something like this be interesting as a contribution?
Personally I don't thing killing working horse is great idea. You have job meta stored inside job hash inside Redis. Split job into logical steps and check updates in the meta attribute. If someone set special flag into it then just finish job gracefully.
At least I'll do this that way.
The use case: running (adhoc) queries. You can't split that into logical steps, and some queries might take a very long time. Those are the ones you usually would like to kill -- imagine finding out you just ran "SELECT *" on a huge table without a LIMIT clause.
Maybe thread polling job meta data will do the deal.
Did you consider to switch re:dash architecture to ZeroMQ?
Not really. ZeroMQ doesn't solve task management, persistence, etc.
What about doing those long queries on a different process/thread that can be killable from the horse?
I really wanted to have something similar to this but I've never put effort on it.
I'm also interested in a feature that allows RQ to communicate with the outside world.
I think this can be achieved by using Redis' pub/sub command. Aafter the master process create the horse, it subscribes to a channel that can be used to communicate with the worker. When the horse is done, it publishes a "I'm done" message to the master process so it can fetch a new job.
I probably won't have time to implement this in the near future though, just an idea I've been tinkering with.
I'm not sure PUB/SUB is a good choice for this, because if for some reason you miss a message, it's lost.
Also, why change the way the horse works now? I think it's ok that the horse just exits, no?
No change is required for the horse's behavior. While the horse works, the master process listens to commands from the horse or the outside world.
The master process should also check whether the horse is still alive once every X seconds. This would also prevent the master process from becoming a zombie if the horse quits unexpectedly (e.g encountering a segfault). See https://github.com/nvie/rq/issues/473
I changed my mind about using PUB/SUB
, I think we can listen for events/comands using Redis list (each worker has it's own list), similar to how workers listen to new jobs now.
Probably a stupid question, but does this mean that there is no way to cancel a running job from outside the context of that job? As this is what I need to do...
Well, let me describe our use case for this.
We're using RQ jobs to run some heavily expensive computational tasks (each taking some hours maybe).
I would like to give the ability to the user of the system, to kill one of their jobs.
As of now, we keep the Job ID that is returned from the job = q.enqueue
and I would like to job.kill()
it.
That way we do not wait for something to take hours to complete and the worker can pickup the next item from the queue to continue working.
I guess this is more like a +1 from us, with more words!
I've used rq
with django-rq
and rq-scheduler
for about a year now. I really like it, though I'm hardly an expert on its architecture.
In my setup, the actual work is done in a subprocess of what I assume is called the work horse. So when a job is actually running (work is being done), there are three processes: the worker, the work horse, and the subprocess (created with Popen
).
I now need to add some kind of "stop" or "kill" capability. As far as I can tell, this will be as simple as:
I store a bunch of stuff persistently, outside of Redis, in a RDMS (here, PostgreSQL).
I'm writing here, partly because I thought it could help others, but also because I'm unsure whether there's some kind of design flaw in my architecture, or other things I might have missed given that others here seem to be stymied in their attempt to add such functionality.
I still have the requirement to kill a job from an outside process based on some user action. I couldn't find a graceful way to do it so I ended up storing the JobId of the thing that needs killing, and then periodically checking from the code running inside the worker to see if it has been put on the killed list. It's really not a great solution.
I'd be interested in details of how you implement killing processes with the PID. I'm not that familiar with Linux but I presume you're talking about killing the process at the OS level. It sounds a bit brutal to be honest, and would that not also potentially kill the worker?
Either way I'd be interested in the exact details so I can try it for myself, specifically how to do you get the right PID given there can be multiple workers. Our setup is a bit more complex, in that we have workers potentially in other docker containers which might make the PID option not viable. But still...
Actually now that I read it again, you're using popen to create new processes. Are you sure this is the right approach? It's that the whole point of queues, you stack as much work as you want in the queues and then work through it at a rate without overwhelming the server?
@safekidda : Interesting points.
I only use one subprocess. In my code, Popen
is called within the function that's decorated with @job
. (I think django-rq wraps the rq decorator, but it's pretty much the same thing.) But there's only one such subprocess.
So I'm still of course using the queue part of the architecture; for each worker,if the worker is not idle, it has forked a horse. The horse runs a single command via subprocess.
I did it that way because I wanted the system to be fairly agnostic about what it was running. That is, roughly speaking it should be able to run things that can be run from the command line. But, importantly, not necessarily python. (It's not a Wild West, though; only things that are suitably registered with my infrastructure can run. In particular, they have to live in a particular place, and they have to accept an argument that tells them where they can dump the output.) So that's the design that led me to run a subprocess.
I only recently decided to try to implement "cancel" ability. Canceling in the sense of removing from various queues/stores is super easy. (Yesterday just wrote a couple lines of code, and now I can remove a job from rq-scheduler
, akin to turning off cron.) But as everyone here understands, one might also want to kill the actual process of an actually running job, not just remove it from a queue.
Looking at it late yesterday, I came up with the solution above (save the pid and then later provide a view to send a signal (I guess SIGTERM
) to that process). I tried it within the pdb
debugger, and it seemed to work. Haven't coded up the actual handing off of the pid to the infrastructure (which will require a Django migration..blech).
Just following up on this thread. So it seems there are only two options for canceling a job that has the JobStatus.STARTED
status?
Yes. Not great TBH.
On 15 Dec 2017 19:11, "Ralph Minderhoud" notifications@github.com wrote:
Just following up on this thread. So it seems there are only two options
for canceling a job that has the JobStatus.STARTED status?
- Do some OS magic to force kill processes
- Implement some sort of 'polling' system in the code for the job to
gracefully end itself?—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/nvie/rq/issues/684#issuecomment-352087629, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AVChQfMgsHmrSjm0tO8Puesqmu1Pc7lhks5tAsRpgaJpZM4H8gfx
.
FWIW, for my use case canceling a job, simply using os.kill(pid, signal.SIGTERM)
seems to work fine.
Which process ID are you using? Worker or your own subprocess?
I tried something like this but I can't get back a horse PID. I'd rather not shutdown the actual worker...
workers = Worker.all(connection=queue.connection)
for worker in workers:
if worker.get_current_job_id() == id:
worker.kill_horse() # Doesn't work, horse PID is always 0
worker.request_force_stop() # Raises SystemExit in wrong process
I'm trying to wrap my head around the internals to see if I can maybe work around this by forcing the TTL to 0 or something. Setting job.ttl = 0
doesn't do anything.
I'm killing my own subprocess, not the horse or worker (consistent with my description above). So that's probably special to my use case.
I assume there's a way to get the PID of the horse, but I haven't thought about this code for a few months now. ... It looks like all the necessary stuff is available in class Worker
(kill_horse()
, horse_pid
), but I haven't played with it so I can't claim that just attacking the problem that way should work.
Hi all, I've found a new solution that works well for me. Basic gist is that I override Worker
Queue
and Job
so that when I submit a kill()
command the job id is stored in a Redis set. My custom Job
will start a thread before execution of the job function which will poll that Redis set for the current jobs id. If that job is in the kill set, then the job will kill itself (basically forcing a fail).
class KillWorker(Worker):
queue_class = KillQueue
job_class = KillJob
```python
class KillQueue(Queue):
job_class = KillJob
class KillQueue(DjangoRQ):
job_class = KillJob
```python
kill_key = "rq:jobs:kill"
class KillJob(Job):
def kill(self):
""" Force kills the current job causing it to fail """
if self.is_started:
self.connection.sadd(kill_key, self.get_id())
def _execute(self):
def check_kill(conn, id, interval=1):
while True:
res = conn.srem(kill_key, id)
if res > 0:
os.kill(os.getpid(), signal.SIGKILL)
time.sleep(interval)
t = Thread(target=check_kill, args=(self.connection, self.get_id()))
t.start()
return super()._execute()
Usage:
job = queue.fetch_job(id)
job.kill()
Since I'm using RQ through django I also changed up my worker command to reference my new Worker
and Queue
and I can now kill my jobs ad-hoc.
python manage.py rqworker default --worker-class=jobs.worker.KillWorker --queue-class=jobs.queue.KillQueue
Hope this is helpful for somebody.
Hi all, I've found a new solution that works well for me. Basic gist is that I override
Worker
Queue
andJob
so that when I submit akill()
command the job id is stored in a Redis set. My customJob
will start a thread before execution of the job function which will poll that Redis set for the current jobs id. If that job is in the kill set, then the job will kill itself (basically forcing a fail).
Hi @rminderhoud , is it possible that your code only works when the worker is running on the same machine that django (or the process sending the jobs) is?
I tried it but I get no errors and the task keeps running. In my case, the rq worker is running on a different container than the web API that enqueues the job.
Thanks!
@anibalrivero It could be. I can't say concretely without doing a deep-dive into the rq
source again but I could see how this line os.kill(os.getpid(), signal.SIGKILL)
could potentially be the source of the issue. That's where I would look first.
For what it's worth, this works for me on Heroku with a separate "dyno" for the worker that's independent of the web "dyno". I believe these are isolated and it seems to work.
rminderhoud's comment pointed me to a good direction and his approach has been working so far. However, I would suggest some changes:
Using Job.meta
(documented here) to signal the intention to kill a running job.
Using threading.Event
(docs) to signal the "killer" thread to stop when the job is executed normally. This could be used for the polling strategy.
Or... using a Redis list to make the "killer" thread block waiting for an event (just any new pushed value in the list) and then check for a kill intention in the job's metadata. This could be used to avoid polling and the unblocking event could be sent from kill
method or from _execute
(in case of a normal job execution).
Isn't this kind of approach generic enough to be included in rq?
actually kill a heavy rqjob process instead of just removing the info from redis is definitely a huge plus.
to properly solve it, we could:
since the message might get lost, it's kinda best-effort approach. to address this, we could have a reverse PUB/SUB channel that communicates whether rqworker actually carries out the os.kill.
@cyruszhang's idea of using Redis' PUB/SUB mechanism is a good idea.
I'd be happy to receive pull requests implementing:
I ended up taking a different approach; instead of changing the default worker behavior, I used a custom worker that does the pubsub stuff and kills horse thread if directed.
For anyone interested:
class CustomWorker(Worker):
def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL,
connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None,
queue_class=None, log_job_description=True,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler=False,
prepare_for_work=True): # noqa
logger.debug("CustomWorker being initiated")
super().__init__(queues, name, default_result_ttl, connection, exc_handler, exception_handlers,
default_worker_ttl, job_class, queue_class, log_job_description, job_monitoring_interval,
disable_default_exception_handler, prepare_for_work)
self.pubsub_thread = None
def handle_kill_message(self, message):
logger.info("Got message: {}".format(message))
if message.get('type') == 'message' and message.get('data') == b'kill' and self.horse_pid:
try:
logger.info('Killing the horse thread: {} [{}]'
.format(self.horse_pid, ' '.join(psutil.Process(self.horse_pid).cmdline())))
super().kill_horse(SIGKILL)
logger.info('Killed: {}'.format(self.horse_pid))
except:
logger.exception("Failed to kill the horse thread")
def execute_job(self, job, queue):
# set up separate thread to listen on pubsub channel for rq:job:pubsub:job_id
if self.pubsub_thread is not None:
self.pubsub_thread.stop()
self.pubsub_thread = None
p = self.connection.pubsub()
channel = 'rq:job:pubsub:' + job.id
logger.info("Worker subscribing to pubsub channel: {}".format(channel))
p.subscribe(**{channel: self.handle_kill_message})
self.pubsub_thread = p.run_in_thread(sleep_time=0.1)
super().execute_job(job, queue)
I made another implementation of a RQ Job that can be killed.
It works by saving the RQ work-horse pid every time a new job is performed in the job metadata (this gets saved in the key-value storage). Then the work-horse pid can be used to stop (kill) the work-horse process.
I also made a demo project using Django-RQ: https://github.com/jmg/rq_stop_job
StopJob class in app/jobs/stop_job.py
:
import os
import signal
from rq.job import Job
class StopJob(Job):
"""
A job that can be stopped (Killed) using the workhorse PID.
It saves the workhorse PID in storage every time a workhorse is forked by RQ.
"""
def perform(self):
self.meta["workhorse_pid"] = os.getpid()
#save the workhorse PID in the job metadata
self.save_meta()
return super(StopJob, self).perform()
def stop(self, delete=True):
if self.is_started:
try:
os.kill(self.meta["workhorse_pid"], signal.SIGKILL)
except Exception, e:
print e
if delete:
self.delete()
And in settings.py:
RQ = {
'JOB_CLASS': 'app.jobs.stop_job.StopJob',
}
Or run the RQ worker with --job-class=app.jobs.stop_job.StopJob
:
python manage.py rqworker job_queue --job-class=app.jobs.stop_job.StopJob
Then when you want to stop()
a job just call
import redis
from app.jobs.stop_job import StopJob
conn = redis.StrictRedis(...)
job = StopJob.fetch(job_id, connection=conn)
job.stop()
I'm using this code in a Django-RQ production app and works fine so far.
Would be useful to add such job as a core RQ feature? If so I can make a PR.
@jmg if I have StopJob
on machine A and I need to call stop() from machine B, how do I make that call?
@jmg this is sounding legit. I think I didn't realize that calling a custom job class would cause immediate actions between separate machines. We carry a not-as-great job interription and cancel so +1 to including this in rq
itself.
Also to confirm, the job.stop()
is expected to be called "right away" and since it's os.pid killing the workhorse that would fully interrupt the workhorse. This is what I think it's going to do, but I wanted to ask to confirm.
@bmbouter sorry about my previous response. I missed the problem that if you are running the worker in machine A and call os.kill
in machine B it will try to kill a process that doesn't exist or even worse, another process that's not related at all to the desired process.
So maybe this is the reason there isn't a simple solution for this problem in the RQ core?
About:
Also to confirm, the job.stop() is expected to be called "right away" and since it's os.pid killing the workhorse that would fully interrupt the workhorse. This is what I think it's going to do, but I wanted to ask to confirm.
Yes, it's going to interrupt the workhorse immediately but the workhorse is only the process that you are running and you wanted to kill. It won't affect the actual RQ worker since that's another process.
I missed the problem that if you are running the worker in machine A and call os.kill in machine B it will try to kill a process that doesn't exist or even worse, another process that's not related at all to the desired process.
I'm working on this. The plan is to have the main worker listen to outside command through Redis' pub/sub mechanism, this design means you'll be able to relay commands to workers running on different boxes. One of the commands that I want to implement to start with is canceling a currently running job.
Thanks @selwin! This would be a great feature.
Using @cyruszhang idea I updated the StopJob class to use the pubsub model as following:
The PubSubWorker class creates a pubsub channel to receive stop messages from jobs.
When calling job.stop()
in a StopJob object then a message is published using the pubsub channel for the job id.
The message is received by the worker and the work-horse is killed.
This model will definitely work in a distributed environment and I guess it's a better candidate for a PR.
I'm not quite sure if creating a pubsub thread for each worker is the best way to solve this though. Since it will add some overhead to each worker spawned.
As you @selwin stated is probably better to have it only on the main worker.
I also updated my demo project at https://github.com/jmg/rq_stop_job to use this implementation.
from rq.job import Job
from rq.worker import Worker, SIGKILL
class PubSubWorker(Worker):
def handle_stop_message(self, message):
if message.get('type') == 'message' and message.get('data') == b'stop' and self.horse_pid:
try:
self.kill_horse(SIGKILL)
except Exception as e:
self.log.exception("Failed to kill the horse thread")
def execute_job(self, job, queue):
pubsub = self.connection.pubsub()
channel = 'rq:job:pubsub:{}'.format(job.id)
pubsub.subscribe(**{channel: self.handle_stop_message})
#listen for stop events in a thread
pubsub_thread = pubsub.run_in_thread(sleep_time=0.1)
val = super(PubSubWorker, self).execute_job(job, queue)
#after the job is done stop the pubsub thread
pubsub_thread.stop()
return val
class StopJob(Job):
def stop(self, delete=True):
if self.is_started:
self.connection.publish(channel='rq:job:pubsub:{}'.format(self.id), message="stop")
if delete:
self.delete()
Then in settings.py:
RQ = {
'JOB_CLASS': 'rq_stop_job_app.jobs.stop_job.StopJob',
'WORKER_CLASS': 'rq_stop_job_app.jobs.stop_job.PubSubWorker',
}
Or by running the RQ worker with --job-class=rq_stop_job_app.jobs.stop_job.StopJob --worker-class=rq_stop_job_app.jobs.stop_job.PubSubWorker
python manage.py rqworker job_queue --job-class=rq_stop_job_app.jobs.stop_job.StopJob --worker-class=rq_stop_job_app.jobs.stop_job.PubSubWorker
The stop job interface will remain unchanged:
import redis
from app.jobs.stop_job import StopJob
conn = redis.StrictRedis()
job = StopJob.fetch(job_id, connection=conn)
job.stop()
This is sounding great. We would use this if it was brought into RQ. We can use the custom Job and Worker classes, no problem.
This would replace how we do it today, which just to share was originally inspired by an RQ PR discussion from long ago:
There are probably 3 issues to this design:
1) we have to carry it and it's not in RQ itself
2) The polling model is inefficient and it causes cancellation to be delayed up to 1-second
3) I'm not sure the kill keys are ever cleaned up from sadd
.
Just posting as an FYI of an alternative implementation. I think the pub-sub model is likely stronger since it occurs immediately right? Also are those things cleaned up?
@bmbouter Also the solution you are using has the same problem I had on my first implementation. It won't work across different machines.
With the pubsub mechanism the process must stop immediately since it's a dedicated channel to pass messages that runs in it's own thread. This works even if you call stop()
in a remote machine.
About cleaning up do you mean to remove the job from the queue? If so yes, since the stop()
method can call self.delete()
internally to remove the job from the queue. Also messages exchanged using pubsub are cleaned up.
I'm not quite sure if my last implementation is the most effective way to solve it so I think I'll wait for @selwin work to confirm. Anyway please let me know if I can help with anything. I've been using RQ for many years now and have this wonderful library running on almost all my production applications.
@bmbouter Also the solution you are using has the same problem I had on my first implementation. It won't work across different machines.
Why won't this won't work across machines? The job id is what is sent via sadd
, and the pid is stored locally on the worker doing the killing.
Ah Sorry! Then it would work in different machines.
Anyway hope we can have a definitive implementation in the RQ core soon.
I wouldn't say this has actually been implemented just yet, it's a slightly different feature. If you have the worker name it's easy to cancel (whatever task is running) but if you just have a task ID and want to cancel if if it's running it's not so easy to do that.
This issue (recently filed) should cover that use case: https://github.com/rq/rq/issues/1371
I merged in a PR that adds job.worker_name
just yesterday, this allows you to find a specific worker that's executing the job. https://github.com/rq/rq/pull/1375
Most helpful comment
Well, let me describe our use case for this.
We're using RQ jobs to run some heavily expensive computational tasks (each taking some hours maybe).
I would like to give the ability to the user of the system, to kill one of their jobs.
As of now, we keep the Job ID that is returned from the
job = q.enqueue
and I would like tojob.kill()
it.That way we do not wait for something to take hours to complete and the worker can pickup the next item from the queue to continue working.
I guess this is more like a +1 from us, with more words!