Everybody knows that pooling for job result sucks :)
Currently when my jobs finish work I publish
a message on channel
to wake up any subscribers interested in job's result, this has one major drawback, all my jobs have to be aware of redis connection, and I must inject it from context. This is a really good example of code smell.
I thought that adding support for publish
/notifications within job context might be a good idea, my proposition:
notify
parameter to job with default value of None
notify
is set to True
, use job_id
as a channel
for notificationnotify
is set to str
use this as a notify channelsuccess
, exception
etc)Then a job creator can just use it's own redis connection to get a PubSub
object and wait for result to come without nasty pooling.
Now before I jump start to hacking, is my way of thinking correct, or am I missing some serious flaw with this idea?
I think this is a very interesting idea and something I'd like to have.
Rather than configuring this on a per job basis though, I think I'd like to see this configured on a per queue basis.
Workers would then broadcast start
, success
, fail
,job_id
along with other relevant information to this channel.
We need to discuss new part of API rather implementation first. I don't think using raw pub/sub redis object is great idea for user experience. We can add high level methods which will use this object under the hood. Maybe something like this
job = queue.enqueue(add, 1, 2) # first class world problem
result = job.wait_result(timeout=360) # listen on channel, use job.fetch when message received
It also can raise timeout error if we wait for too long.
@proofit404 you're absolutely right.
The example you gave works on a per job basis and that could already be achieved using async=False
option. While running job synchronously is useful for tests, it most likely won't be very useful in production.
The API I currently have in mind is:
worker = Worker(queues=[queue], broadcast_events=True) # This will cause the worker to publish job started, finished and other events
To observe the events (this is done on a per queue basis):
queue = Queue()
queue.wait_for_events(): # This is simply a thin wrapper around Redis' subscribe command
...
What do you think?
Seems like a good idea to let workers always send these events when jobs complete. Or at the very least, make it the default. (If they push an event out and there are no listeners on the pubsub channel it's effectively a no-op, right?)
Then on the queue side of things, would we benefit from having an API that allows you to register a callback function instead? So rather than the wait_for_events
, have a .on_job_finished(mycallback)
, which would then receive some details like what job finished, etc.
@nvie I'm using this PoC currently from gevent greenlets, and something on Job
object like get_result(timeout=10)
will work with gevent brilliantly. I'm not a fan of callback based concurrency :)
@nvie When we register callbacks in the queue what exactly would happen? On client side queue is just a library to schedule jobs and inspect its result. It doesn't define the flow of control in our program. It isn't a framework after all. When we introduce callbacks we need something to run them. Something similar to twisted reactor. Actually this may introduce new first class citizen into api. Maybe something like this:
listener = EventListener(queue=['foo', 'bar'], handlers={
'mymodule.myfunction': 'mymodule.myhandler',
myfunction2: myhandler2,
})
listener.listen()
Do we really want this? I'm thinking on how useful pub/sub events may be for synchronous version of rq. To me it seems not much usefull. If we want to run some logic after job is finished then this is perfect case for dependents jobs.
@selwin I absolutely agree with rule "never make a job (or anything else) waiting for another job."
With synchronous version of rq it will be totally disaster in the production environment under any sensible load. Currently I'm working on rq version build on top of asyncio library. Listening on pub/sub channel from coroutine is totally fine. I want to help with api design in rq so I can copy it in my library :)
@nvie I think the published events should not be limited to job finished events.
I like the EventListener
API suggested by @proofit404 , but am not a fan of the handlers
kwarg. For the first step, I think having a thin listener on top of Redis PUB/SUB is a great first step. Something like:
listener = EventListener(queue=queue)
with listener.listen():
# Code that handles events
# If we're only interested in job completed or started events (other event types will simply be discarded):
listener = EventListener(queue=queue, events=[Event.Started, Event.Finished])
with listener.listen():
# Handle events
Once this stabilizes, perhaps we could add a script rq listener --handler='path.to.handler'
. This would translate to calling:
# Similar to what @proofit404 suggested, but this is more consistent with
# how we setup `Worker`'s exception handlers
listener = EventListener(queue=queue, handlers=['path.to.handler'])
@selwin I don't understand how we can reenter context manager to run its body again and again on each event? Does for
loop statement suite semantically here?
listener = EventListener(queue='default', handlers=['path.to.handler'])
for event in listener.listen():
# process event
I my opinion listener is kinda worker which use pub/sub transport instead of list. This add some interesting consequences. We can't guarantee that event handler will run only once.
As an alternative we can introduce hooks into worker class. For example enqueue function when job is started, finished or failed. For example:
worker.on(JobStatus.started, 'path.to.handler', queue='events-queue')
worker.work()
This will enque 'path.to.handler'
into 'events-queue'
with job id and status as arguments. This way we trigger same handler but by execution another job. It guarantee that handler will be run once but can't do anything to speed up it processing. Original job can finish before we process its start event. This can be smoothed by properly set ttl values. For example for started events job ttl should be equal original job timeout.
But for failed jobs it is really interesting mechanics. In worker exception handlers we run failure handler something like notify sentry. And in scheduled event job process business logic to handle this failure on application level.
@proofit404 you're right, we'll have to use for even in listener.listen()
, similar to redis-py. Sorry about that.
You can already subclass Worker
and override perform_job
to achieve what your second example intend to achieve so I'm not too keen on that.
The queue events, however, will enable new use cases so if possible I'd like to have that as the highlight of the next release.
Just throwing in my separate use case. I'm interested in having job completion emails (only some jobs) or even slack notifications on success/failure of the jobs. Haven't found a good way (in Django RQ) to get this done yet. Should I just be subclassing Worker
and override perform_job
?
@selwin I'm sure there are a lot of use cases where events is proper solution. I just can't figure out those cases myself.
Pub/Sub commands have interesting option unlike lists. It is low delivery lag between published and subscriber. Regular lists can hold job id long enough. But published messages appears semi-immediate on client. What we should do if published commands appears on Listener
but it busy with processing of previous message? Should we store message in the local queue? Should we skip reading from socket unless we process previous message?
@crearc sending emails when job is done is application logic, and does not belong to RQ, you have 3 options:
@proofit404 you can achieve pubsub "like" functionality with lists and BLPOP
but for that you need some form of async I/O or gevent.
for me best advantage of pubsub is "fire and forget" behaviour, and if nobody is listening, don't bother, but this is just my use-case.
@crearc I do need async I/O thats why I'm developing aiorq :)
Maybe I was not clear in my previous post. I don't mean delivery lag is a bed thing. When we use lists to store job ids it may present in the queue until one worker will be able to handle it. Jobs we enqueue earlier make this "until" period bigger or smaller. That what I mean when point to delivery lag. You can offer better name for this.
Using pub/sub commands we deliver events to listeners at the same moment. I just want to find application problems where listeners will be proper solution. Then we can discus possible listeners api and implementation.
I'm also interesting to help to build this functionality into rq core.
For example we may implement job progress notifications with this technique:
def do_stuff():
# DB access for example
get_current_job().notify(progress='50%') # pack kwargs and publish message
# More DB access here
Then we can send progress to the user browser for example. If we use some kind of map/reduce then each job finish event maybe a sign for progress notification.
@proofit404 replying to your comment about this:
What we should do if published commands appears on Listener but it busy with processing of previous message? Should we store message in the local queue? Should we skip reading from socket unless we process previous message?
This is actually already handled by redis-py in that messages that arrive after subscribe()
is called will be buffered until get_message()
is called. For example:
r = Redis()
p = r.pubsub()
p.subscribe('foo')
r.publish('foo', 'bar')
r.publish('foo', 'baz')
# Wait 2 seconds
p.get_message() # Returns "bar"
p.get_message() # Returns "baz"
However, if the only thing you need is to monitor finished jobs and you're not interested in subscribing to event updates from the queue, I think the best course of action is to monitor changes to FinishedJobRegistry
. If it makes sense, we can APIs to this to make monitoring recently finished jobs easier.
@selwin @nvie I wander if it is possible to use proposed EventListener to stop running job. It maybe useful case for users and first real application for EventListener mechanism.
@proofit404 definitely, once we have the proposed mechanism where the worker is waiting for events, we'll have a mechanism to communicate with the outside world. This will open a bunch of other advanced used cases like graceful changing of worker configurations without needing to restart the worker.
Let me add an alternative opinion here: adding dedicated notification logic (e.g. forcing redis-based notifications) into RQ is not very reasonable and would bloat the code unnecessarily. Someone wants to be notified via Redis, another one wants emails, or slack, or HTTP requests, or uses another messagequeue, etc. Consequently, the choice of notification logic (as well as the moments when the notification should be sent) should be left to the implementer.
The only thing a worker cannot do at the moment in its code, is to _notify that the job was completed_ (because while the worker is dealing with that notification, the job is still formally running). This leads to annoying cases where you receive notification of a completed job, but cannot yet retrieve the job results.
One way to overcome this would be to make it possible to register "job status change" event processor when enqueuing, e.g.
q.enqueue(...., on_job_status_change=(callable or a Job object that will be called with job_id and new status))
The purpose of this notification infrastructure is not for user notification purposes but rather to publish queue/worker events to the outside world. This would be similar to Celery's events
http://docs.celeryproject.org/en/latest/reference/celery.events.html
If you want to receive worker events via Slack, email or other means, you can write a subscriber that listens to published events and forward those to Slack.
I understand it, and I do not see any principal difference between the kinds of notifications.
My remark was about the fact that adding functionality to RQ which can easily be implemented by the person who implements the worker according to their own preferences anyway, seems like unnecessary code bloat. The only events which RQ should be notifying about are those which the worker itself has no access to. "Before start" and "After finish" are examples of such events. Would there be other worthy examples?
If not, building an event infrastructure might not be worth it - adding one or two callbacks to the enqueue method would suffice.
The ones I can think of so far:
We can also use pub/sub to listen to outside world commands like stopping a worker/job or changing some other run time attributes like @proofit404 mentioned.
I think this will be a worthwhile change.
As mentioned above, all of the job status change events (of which the "finished" is most demanded anyway) would fit nicely within
q.enqueue(...., on_job_status_change=(callable or a Job object that will be called with job_id and new status))
The job registry events could also follow a similar pattern. Now, I don't want to impose my opinion, but I'd really ask you to make sure the notifications are "easy" to receive.
The pub-sub approach may lead to a situation where I will have to run a separate thread/process subscribed and monitoring the queue for the sole purpose of observing the notifications and acting on them - this may kill the whole benefit of those events as I can solve my current notification needs in a similar manner anyway.
I would much prefer the queue _actively_ invoking my callback, thus saving me the need to monitor anything. If I then decide that I do need to communicate the event via a pub-sub queue, I can always do it within my custom callback in the way I deem appropriate.
Of course, it's our goal to make everything as easy as possible :)
A simple thing to do would be to support the Python Future
interface http://pythonhosted.org/futures/#future-objects
It has cancel()
, running()
, done()
, result(timeout)
, and add_done_callback(cb)
.
I was able to implement notifications by implementing webhooks. I publish to custom blinker signals from my custom job class.
The listener then enqueues a task that posts the job to any "webhooks" defined in job.meta
.
@job_signals.job_finished.connect
@job_signals.job_failed.connect
def notify_webhooks(job):
if 'webhooks' not in job.meta:
return
get_queue(job.origin).enqueue_call(
func=post_job_webhooks,
kwargs={'job': to_json(job)},
at_front=True)
If it counts, I would vote for the suggestion of a callback function of the original author, rather than implementing extensive notification/eventing stuff, for the sake of simplicity.
I think rq already has internal status check functions i.e. look_for_suspension, look_for_stop type functions, those can to be extended with few empty functions of has_suspended, has_completed, has_started, etc status and inside the hooks just run the callback function or the user can override those.
I am actually overriding some of above and calling a callback function to send my own status update, I can do sync, async, http, teleport whatever.
@rizplate you can already do that using a custom Worker
, Queue
or Job
class. You can override Worker.perform_job
to add your own functions to be executed before/after job execution.
If I don't misread the original author's proposal, he wants to have a pub/sub mechanism so outside entities can easily subscribe to events.
It's also something that I'd like to have so PR welcome :)
Hi! Sorry to return to this, but was this feature ever implemented? Cant find anything in the docs about subscribing to job finished events? Would be useful to know when a certain job has finished
How to override the Worker.perform_job, is there any example?
It would be also nice to have a way to get a notification that the queue became empty.
Hi 馃憢 all! Has there been any progress on this? I echo what's already been said, would be super useful to execute callback functions on job status changes.
How to override the Worker.perform_job, is there any example?
from rq.connections import push_connection, pop_connection
from rq.utils import utcnow, make_colorizer
from rq.timeouts import JobTimeoutException
from rq.compat.__init__ import as_text, text_type
import sys
import traceback
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
class MyWorker(Worker):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def perform_job(self, job, queue, heartbeat_ttl=None):
(all line from original function)
if self.log_result_lifespan:
(more lines from original function)
elif result_ttl > 0:
do_stuff()
(even more)
Most helpful comment
Hi! Sorry to return to this, but was this feature ever implemented? Cant find anything in the docs about subscribing to job finished events? Would be useful to know when a certain job has finished