Rq: Custom gevent worker class

Created on 23 Jan 2014  路  12Comments  路  Source: rq/rq

Hello!

Seeing the new_workers inactive I started hacking a gevent worker class a while ago. Now that custom worker classes have landed, maybe that could be a first simple way of bringing better concurrency in rq workers?

Here is what I currently have working:

class WorkerGevent(Worker):

    def get_ident(self):
        return id(gevent.getcurrent())

    def __init__(self, *nargs, **kwargs):

        self.gevent_pool = gevent.pool.Pool(int(args.processes))

        Worker.__init__(self, *nargs, **kwargs)

    def _install_signal_handlers(self):
        """Installs signal handlers for handling SIGINT and SIGTERM
        gracefully.
        """

        def request_force_stop():
            """Terminates the application (cold shutdown).
            """
            self.log.warning('Cold shut down.')

            self.gevent_pool.kill()

            raise SystemExit()

        def request_stop():
            """Stops the current worker loop but waits for child processes to
            end gracefully (warm shutdown).
            """
            gevent.signal(signal.SIGINT, request_force_stop)
            gevent.signal(signal.SIGTERM, request_force_stop)

            msg = 'Warm shut down requested.'
            self.log.warning(msg)

            # If shutdown is requested in the middle of a job, wait until
            # finish before shutting down
            self.log.debug('Stopping after all greenlets are finished. '
                               'Press Ctrl+C again for a cold shutdown.')
            self._stopped = True
            self.gevent_pool.join()

            raise StopRequested()

        gevent.signal(signal.SIGINT, request_stop)
        gevent.signal(signal.SIGTERM, request_stop)

    def fork_and_perform_job(self, job):
        """Spawns a gevent greenlet to perform the actual work.
        """
        self.gevent_pool.spawn(self.perform_job, job)

    def dequeue_job_and_maintain_ttl(self, timeout):
        while True:

            while not self.gevent_pool.free_count() > 0:
                gevent.sleep(0.1)

            try:
                return Queue.dequeue_any(self.queues, timeout,
                                         connection=self.connection)
            except DequeueTimeout:
                pass

            self.log.debug('Sending heartbeat to prevent worker timeout.')
            self.connection.expire(self.key, self.default_worker_ttl)

Most helpful comment

Hi is this GeventWorker able to be a official feature in rq?

All 12 comments

I forgot: something I also had trouble with was death_penalty_after(). It's not easily overloadable and I had to overload the whole perform_job() function just to replace it with the gevent implementation. It would be great to make it an instance method of worker ?

Here's my gevent version:

    class gevent_death_penalty_after(object):
        def __init__(self, timeout):
            self._timeout = timeout

        def __enter__(self):
            self.setup_death_penalty()

        def __exit__(self, type, value, traceback):
            # Always cancel immediately, since we're done
            try:
                self.cancel_death_penalty()
            except rq.timeouts.JobTimeoutException:
                # Weird case: we're done with the with body, but now the alarm is
                # fired.  We may safely ignore this situation and consider the
                # body done.
                pass

            # __exit__ may return True to supress further exception handling.  We
            # don't want to suppress any exceptions here, since all errors should
            # just pass through, JobTimeoutException being handled normally to the
            # invoking context.
            return False

        def setup_death_penalty(self):
            """Sets up an alarm signal and a signal handler that raises
            a JobTimeoutException after the timeout amount (expressed in
            seconds).
            """
            self.gevent_timeout = gevent.Timeout(self._timeout, rq.timeouts.JobTimeoutException('Gevent Job exceeded maximum timeout '
                                                 'value (%d seconds).' % self._timeout))
            self.gevent_timeout.start()

        def cancel_death_penalty(self):
            """Removes the death penalty alarm and puts back the system into
            default signal handling.
            """
            self.gevent_timeout.cancel()

Adding the necessary hooks to make implementing custom worker classes easier is definitely welcome.

May I suggest renaming the methods to setup_job_timeout and cancel_job_timeout though?

We also need to find a more generic name for Worker.fork_and_perform_job.

@sylvinus, thanks a ton for putting this wrapper together! While using it I had a problem with running it in burst mode under py.test---the work() method would exit before all the child greenlet jobs had finished. To handle this case, the following modification on your dequeue_job_and_maintain_ttl() worked for me:

def dequeue_job_and_maintain_ttl(self, timeout):
    while True:

        while not self.gevent_pool.free_count() > 0:
            gevent.sleep(0.1)

        try:
            job = Queue.dequeue_any(self.queues, timeout,
                                     connection=self.connection)
            # make sure all child jobs finish if queue is empty in burst
            # mode
            if job is None and timeout is None:
                self.gevent_pool.join()
            return job
        except DequeueTimeout:
            pass

        self.log.debug('Sending heartbeat to prevent worker timeout.')
        self.connection.expire(self.key, self.default_worker_ttl)

Hope someone finds it useful, and it's a case to keep in mind before merging the gevent worker.

Here is an updated version of the worker above (written against rq==0.4.6)

from __future__ import absolute_import
import signal

import gevent
import gevent.pool
from rq import Worker
from rq.timeouts import BaseDeathPenalty, JobTimeoutException
from rq.worker import StopRequested
from rq.exceptions import DequeueTimeout


class GeventDeathPenalty(BaseDeathPenalty):
    def setup_death_penalty(self):
        exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout)
        self.gevent_timeout = gevent.Timeout(self._timeout, exception)
        self.gevent_timeout.start()

    def cancel_death_penalty(self):
        self.gevent_timeout.cancel()


class GeventWorker(Worker):
    death_penalty_class = GeventDeathPenalty

    def __init__(self, queues, default_result_ttl=None, pool_size=20):
        self.gevent_pool = gevent.pool.Pool(pool_size)
        super(GeventWorker, self).__init__(queues, default_result_ttl=default_result_ttl)

    def get_ident(self):
        return id(gevent.getcurrent())

    def _install_signal_handlers(self):
        def request_force_stop():
            self.log.warning('Cold shut down.')
            self.gevent_pool.kill()
            raise SystemExit()

        def request_stop():
            gevent.signal(signal.SIGINT, request_force_stop)
            gevent.signal(signal.SIGTERM, request_force_stop)

            self.log.warning('Warm shut down requested.')
            self.log.debug('Stopping after all greenlets are finished. '
                           'Press Ctrl+C again for a cold shutdown.')

            self._stopped = True
            self.gevent_pool.join()
            raise StopRequested()

        gevent.signal(signal.SIGINT, request_stop)
        gevent.signal(signal.SIGTERM, request_stop)

    def execute_job(self, job):
        self.gevent_pool.spawn(self.perform_job, job)

    def dequeue_job_and_maintain_ttl(self, timeout):
        result = None

        while True:
            self.heartbeat()
            while not self.gevent_pool.free_count() > 0:
                gevent.sleep(0.1)

            try:
                result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)
                if result is None and timeout is None:
                    self.gevent_pool.join()
                break
            except DequeueTimeout:
                pass

        self.heartbeat()
        return result

@jhorman I've created gists here. You can create Your own pr fork it and we could discuss code and it's improvements there.

@lechup @jhorman Thanks for hacking around. The gist worked for me as expected, but it doesn't seem to be quitting properly using CTRL+C...

^C09:52:02 Warm shut down requested.
09:52:02 Stopping after all greenlets are finished. Press Ctrl+C again for a cold shutdown.
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/gevent/hub.py", line 133, in handle
    self.handler(*self.args, **self.kwargs)
  File "Worker.py", line 79, in request_stop
    raise StopRequested()
StopRequested

Thanks in advance. Any help on this would be appreciated!.

I suggest to comment gist on gist page not here. There are two version of it (my fork and jhorman's version). He also explained here why exactly CTRL+C doesn't work ...

ok will keep that in mind.. @lechup

I made some modifications based on @lechup and @jhorman 's version, and pack it in a package, this is the repo: https://github.com/zhangliyong/rq-gevent-worker

It's great to see all this. Perhaps we should document these alternative worker packages in http://python-rq.org . What do you think @nvie ?

I looked briefly at @zhangliyong's package and what stood out to me is that worker.set_current_job_id is overridden to do nothing. I created a ticket to address this here https://github.com/nvie/rq/issues/392

Thanks @selwin . I'm still working on it and now test it by hand, it's not easy to do unit test with gevent, and it works well on my project now, except the worker process crashes sometimes because of the jobs, I use supervisor to restart it automatically.

Hi is this GeventWorker able to be a official feature in rq?

Was this page helpful?
0 / 5 - 0 ratings