Rq: get_current_job() throws NoRedisConnectionException in 0.5.0

Created on 2 Feb 2015  路  21Comments  路  Source: rq/rq

While this works in 0.4.6:

from rq import get_current_job

def add(x, y):
    job = get_current_job()
    print 'Current job: %s' % (job.id,)
    return x + y

on 0.5.0 this throws an error:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/rq/worker.py", line 543, in perform_job
    rv = job.perform()
  File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 490, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "./test.py", line 4, in add
    job = get_current_job()
  File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 81, in get_current_job
    return Job.fetch(job_id, connection=connection)
  File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 293, in fetch
    job = cls(id, connection=connection)
  File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 298, in __init__
    self.connection = resolve_connection(connection)
  File "/usr/local/lib/python2.7/site-packages/rq/connections.py", line 70, in resolve_connection
    raise NoRedisConnectionException('Could not resolve a Redis connection.')
NoRedisConnectionException: Could not resolve a Redis connection.

This works:

from rq import get_current_job, Connection

def add(x, y):
    with Connection():
        job = get_current_job()
        print 'Current job: %s' % (job.id,)
        return x + y

But in https://github.com/nvie/rq/issues/164#issuecomment-12422939 @nvie wrote you shouldn't do that.

Most helpful comment

Thanks! Any chance to get a release of this on PyPI?

All 21 comments

Whoops, that _is_ a regression. You initial example _should_ work. We should fix this in 0.5.1.

If you're not too worried about consuming more Redis connections, you can probably still use the latter example for now, but because you can potentially exhaust your number of allowed connections to Redis this way, you should not.

Hey @nvie, did this get fixed in 0.5.1?

I'm getting an error, but I'm not sure if its a bug in RQ or if I'm doing something wrong. I want to use get_current_job() so that the function executing a task can read/edit the job's metadata. I'm trying something like this, but it's not working:

from rq import get_current_job

def foo(bar):
    job = get_current_job()
    job.meta['baz'] = 'bat'
    job.save()

    return ''.join(reversed(bar))
Python 3.4.0 (default, Apr 11 2014, 13:05:11) 
[GCC 4.8.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import worker.test
>>> import redis
>>> import rq
>>> q = rq.Queue(connection=redis.Redis())
>>> j = q.enqueue(worker.test.foo, "hello world")

The job is queued succesfully, but my rqworker shows this stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/rq/worker.py", line 557, in perform_job
    rv = job.perform()
  File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 490, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/opt/avatar/lib/worker/test.py", line 5, in foo
    job = get_current_job()
  File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 81, in get_current_job
    return Job.fetch(job_id, connection=connection)
  File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 293, in fetch
    job = cls(id, connection=connection)
  File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 298, in __init__
    self.connection = resolve_connection(connection)
  File "/usr/local/lib/python3.4/dist-packages/rq/connections.py", line 71, in resolve_connection
    raise NoRedisConnectionException('Could not resolve a Redis connection.')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connection.

If I change this line:

    job = get_current_job()

to

    job = get_current_job(connection=redis.Redis())

then it works fine. Am I doing something wrong or is this the same bug that @cbenhagen reported? I'm using rq==0.5.1.

@mehaase did you use the latest master version? There is a test case that should cover this case, and you can find it here. It uses this fixture that calls the get_current_job() function without arguments. Let me know if updating to the latest master version works for you. If it works for you without issues, I'll release 0.5.1 to PyPI.

I am using the 0.5.1 release that's currently on PyPI.

@mehaase I've just pushed RQ 0.5.2 out to PyPI. Please update and report if this still is broken for you. If it still poses problems, please send me a more detailed example, because perhaps I'm missing a case that isn't covered by our test suite.

Still seeing the same problem in 0.5.2. I'm running rqworker with no arguments. Other than that, I don't think there are any other details that I haven't already specified above. I don't think I'm doing anything unusual. It just seems like get_current_job() can't find the worker's connection, so its erroring out.

It's not a big deal to me; specifying a connection explicitly works just fine.

I'm also experiencing this when just now upgrading to rq==0.5.3. I don't really mind adding the connection=foo to my calls to get_current_job, but I thought I'd share so @mehaase doesn't think he's crazy.

If it helps debugging, I also had to add the connection=foo parameter to my cancel_job calls as well.

I am encountering this with RQ 0.5.3. When I get a moment, I'll take a look at the test case to see if it sheds any light.

As I understand, the root of the problem is that with the stock rq worker the Redis connection which the worker uses isn't pushed on the connection stack at all. Everything which does get_current_connection() should fail. For example, this doesn't work for me:

def mytask(...):
    q = Queue() # No connection specified
    q.enqueue(myanothertask, ...)

I think most people would expect the worker to use the same connection it uses to get the jobs. Basically, to "just work". This seems like a reasonable assumption, given that it can be overridden. Doing push_connection(conn) somewhere after https://github.com/nvie/rq/blob/master/rq/cli/cli.py#L166 should do the trick.

I think I found the problem (as @iafilatov already worked out):

The current test in https://github.com/nvie/rq/blob/df4d4c8d5dca9c70a82f8f5b22ddb5eb876e3959/tests/test_job.py#L299 doesn't fail because the worker is still running within the same process where the connection stack has at least one item. So get_current_connection doesn't fail.
The same when one ports these tests into the test_cli.py: These tests stll run the worker in the same process as the tests.

A test reproducing this behavior would have to start rqworker in a real subprocess.

Anyway, the fix I think is simple:

diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 6b52bd7..97c5e4d 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -13,6 +13,7 @@ from redis import StrictRedis
 from redis.exceptions import ConnectionError

 from rq import Connection, get_failed_queue, Queue
+from rq.connections import push_connection
 from rq.contrib.legacy import cleanup_ghosts
 from rq.exceptions import InvalidJobOperationError
 from rq.utils import import_attribute
@@ -171,6 +172,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
     setup_loghandlers_from_args(verbose, quiet)

     conn = connect(url, config)
+    push_connection(conn)
     cleanup_ghosts(conn)
     worker_class = import_attribute(worker_class)
     queue_class = import_attribute(queue_class)

This pushes the workers connection onto the _connection_stack in the process and makes get_current_connection() work when called inside jobs. And then you won't have to think about connection limits or selecting the correct connection within your job as it just uses the connection the rqworker already uses.

I think I will create a PR for this.

Bump!

I'm seeing this on 0.5.6 as well. As OP mentioned, using Connection context manager is a valid workaround.

Same here on 0.5.6. I'm using the OP's method for now

Thanks! Any chance to get a release of this on PyPI?

bump?

I'm also running into this when I'm trying to manually start a worker:

from redis import Redis
from rq import Queue, Worker
from rq.connections import push_connection

conn = Redis.from_url(u'redis://localhost:6379/0')
queue = Queue(connection=conn)

# Uncomment the following line to trigger error
push_connection(conn)

worker = Worker(queue)
worker.work()

If I uncomment the push_connection line then I get

Traceback (most recent call last):
  File "rq_test.py", line 14, in <module>
    worker = Worker(queue)
  File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/worker.py", line 163, in __init__
    self.failed_queue = get_failed_queue(connection=self.connection)
  File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 20, in get_failed_queue
    return FailedQueue(connection=connection)
  File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 425, in __init__
    super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
  File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 59, in __init__
    self.connection = resolve_connection(connection)
  File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/connections.py", line 70, in resolve_connection
    raise NoRedisConnectionException('Could not resolve a Redis connection')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connection

I'm not really sure whether this is my fault (i.e. if I should be calling push_connection in the first place) but I couldn't find anything in the docs, so I'm mentioning this here in case someone else runs into it.

From the examples in the docs I would have expected that I don't need to call push_connection.

This is on version 0.6.0.

I too ran into this on version 0.6.0. Either the current behavior is desired and the docs are unclear or this is a bug. I can't tell which though.

Adding with statement still helps...

from rq import get_current_job, Connection
def do_job():
    foo = []
    with Connection():
        job = get_current_job()
    return foo

Hmm, I think a problem with the OP's version seems to be that it only get a localhost connection not the one rqworker itself is using..

bump... this problem still exists in the latest master and is _not_ fixed.

This should now be fixed with the release of 0.7.0, which includes #641.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

adamchainz picture adamchainz  路  10Comments

sborpo picture sborpo  路  9Comments

untitaker picture untitaker  路  42Comments

mark-99 picture mark-99  路  29Comments

anandsaha picture anandsaha  路  9Comments