While this works in 0.4.6:
from rq import get_current_job
def add(x, y):
job = get_current_job()
print 'Current job: %s' % (job.id,)
return x + y
on 0.5.0 this throws an error:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/rq/worker.py", line 543, in perform_job
rv = job.perform()
File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 490, in perform
self._result = self.func(*self.args, **self.kwargs)
File "./test.py", line 4, in add
job = get_current_job()
File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 81, in get_current_job
return Job.fetch(job_id, connection=connection)
File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 293, in fetch
job = cls(id, connection=connection)
File "/usr/local/lib/python2.7/site-packages/rq/job.py", line 298, in __init__
self.connection = resolve_connection(connection)
File "/usr/local/lib/python2.7/site-packages/rq/connections.py", line 70, in resolve_connection
raise NoRedisConnectionException('Could not resolve a Redis connection.')
NoRedisConnectionException: Could not resolve a Redis connection.
This works:
from rq import get_current_job, Connection
def add(x, y):
with Connection():
job = get_current_job()
print 'Current job: %s' % (job.id,)
return x + y
But in https://github.com/nvie/rq/issues/164#issuecomment-12422939 @nvie wrote you shouldn't do that.
Whoops, that _is_ a regression. You initial example _should_ work. We should fix this in 0.5.1.
If you're not too worried about consuming more Redis connections, you can probably still use the latter example for now, but because you can potentially exhaust your number of allowed connections to Redis this way, you should not.
Hey @nvie, did this get fixed in 0.5.1?
I'm getting an error, but I'm not sure if its a bug in RQ or if I'm doing something wrong. I want to use get_current_job()
so that the function executing a task can read/edit the job's metadata. I'm trying something like this, but it's not working:
from rq import get_current_job
def foo(bar):
job = get_current_job()
job.meta['baz'] = 'bat'
job.save()
return ''.join(reversed(bar))
Python 3.4.0 (default, Apr 11 2014, 13:05:11)
[GCC 4.8.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import worker.test
>>> import redis
>>> import rq
>>> q = rq.Queue(connection=redis.Redis())
>>> j = q.enqueue(worker.test.foo, "hello world")
The job is queued succesfully, but my rqworker shows this stack trace:
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/rq/worker.py", line 557, in perform_job
rv = job.perform()
File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 490, in perform
self._result = self.func(*self.args, **self.kwargs)
File "/opt/avatar/lib/worker/test.py", line 5, in foo
job = get_current_job()
File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 81, in get_current_job
return Job.fetch(job_id, connection=connection)
File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 293, in fetch
job = cls(id, connection=connection)
File "/usr/local/lib/python3.4/dist-packages/rq/job.py", line 298, in __init__
self.connection = resolve_connection(connection)
File "/usr/local/lib/python3.4/dist-packages/rq/connections.py", line 71, in resolve_connection
raise NoRedisConnectionException('Could not resolve a Redis connection.')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connection.
If I change this line:
job = get_current_job()
to
job = get_current_job(connection=redis.Redis())
then it works fine. Am I doing something wrong or is this the same bug that @cbenhagen reported? I'm using rq==0.5.1
.
@mehaase did you use the latest master version? There is a test case that should cover this case, and you can find it here. It uses this fixture that calls the get_current_job()
function without arguments. Let me know if updating to the latest master version works for you. If it works for you without issues, I'll release 0.5.1 to PyPI.
I am using the 0.5.1 release that's currently on PyPI.
@mehaase I've just pushed RQ 0.5.2 out to PyPI. Please update and report if this still is broken for you. If it still poses problems, please send me a more detailed example, because perhaps I'm missing a case that isn't covered by our test suite.
Still seeing the same problem in 0.5.2. I'm running rqworker
with no arguments. Other than that, I don't think there are any other details that I haven't already specified above. I don't think I'm doing anything unusual. It just seems like get_current_job()
can't find the worker's connection, so its erroring out.
It's not a big deal to me; specifying a connection explicitly works just fine.
I'm also experiencing this when just now upgrading to rq==0.5.3
. I don't really mind adding the connection=foo
to my calls to get_current_job
, but I thought I'd share so @mehaase doesn't think he's crazy.
If it helps debugging, I also had to add the connection=foo
parameter to my cancel_job
calls as well.
I am encountering this with RQ 0.5.3. When I get a moment, I'll take a look at the test case to see if it sheds any light.
As I understand, the root of the problem is that with the stock rq worker
the Redis connection which the worker uses isn't pushed on the connection stack at all. Everything which does get_current_connection()
should fail. For example, this doesn't work for me:
def mytask(...):
q = Queue() # No connection specified
q.enqueue(myanothertask, ...)
I think most people would expect the worker to use the same connection it uses to get the jobs. Basically, to "just work". This seems like a reasonable assumption, given that it can be overridden. Doing push_connection(conn)
somewhere after https://github.com/nvie/rq/blob/master/rq/cli/cli.py#L166 should do the trick.
I think I found the problem (as @iafilatov already worked out):
The current test in https://github.com/nvie/rq/blob/df4d4c8d5dca9c70a82f8f5b22ddb5eb876e3959/tests/test_job.py#L299 doesn't fail because the worker is still running within the same process where the connection stack has at least one item. So get_current_connection doesn't fail.
The same when one ports these tests into the test_cli.py: These tests stll run the worker in the same process as the tests.
A test reproducing this behavior would have to start rqworker
in a real subprocess.
Anyway, the fix I think is simple:
diff --git a/rq/cli/cli.py b/rq/cli/cli.py
index 6b52bd7..97c5e4d 100755
--- a/rq/cli/cli.py
+++ b/rq/cli/cli.py
@@ -13,6 +13,7 @@ from redis import StrictRedis
from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, Queue
+from rq.connections import push_connection
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
@@ -171,6 +172,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
setup_loghandlers_from_args(verbose, quiet)
conn = connect(url, config)
+ push_connection(conn)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
This pushes the workers connection onto the _connection_stack in the process and makes get_current_connection()
work when called inside jobs. And then you won't have to think about connection limits or selecting the correct connection within your job as it just uses the connection the rqworker
already uses.
I think I will create a PR for this.
Bump!
I'm seeing this on 0.5.6 as well. As OP mentioned, using Connection context manager is a valid workaround.
Same here on 0.5.6. I'm using the OP's method for now
Thanks! Any chance to get a release of this on PyPI?
bump?
I'm also running into this when I'm trying to manually start a worker:
from redis import Redis
from rq import Queue, Worker
from rq.connections import push_connection
conn = Redis.from_url(u'redis://localhost:6379/0')
queue = Queue(connection=conn)
# Uncomment the following line to trigger error
push_connection(conn)
worker = Worker(queue)
worker.work()
If I uncomment the push_connection
line then I get
Traceback (most recent call last):
File "rq_test.py", line 14, in <module>
worker = Worker(queue)
File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/worker.py", line 163, in __init__
self.failed_queue = get_failed_queue(connection=self.connection)
File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 20, in get_failed_queue
return FailedQueue(connection=connection)
File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 425, in __init__
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)
File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/queue.py", line 59, in __init__
self.connection = resolve_connection(connection)
File "/home/brucker/projects/venv_ckan/local/lib/python2.7/site-packages/rq/connections.py", line 70, in resolve_connection
raise NoRedisConnectionException('Could not resolve a Redis connection')
rq.connections.NoRedisConnectionException: Could not resolve a Redis connection
I'm not really sure whether this is my fault (i.e. if I should be calling push_connection
in the first place) but I couldn't find anything in the docs, so I'm mentioning this here in case someone else runs into it.
From the examples in the docs I would have expected that I don't need to call push_connection
.
This is on version 0.6.0.
I too ran into this on version 0.6.0. Either the current behavior is desired and the docs are unclear or this is a bug. I can't tell which though.
Adding with statement still helps...
from rq import get_current_job, Connection
def do_job():
foo = []
with Connection():
job = get_current_job()
return foo
Hmm, I think a problem with the OP's version seems to be that it only get a localhost connection not the one rqworker itself is using..
bump... this problem still exists in the latest master and is _not_ fixed.
This should now be fixed with the release of 0.7.0, which includes #641.
Most helpful comment
Thanks! Any chance to get a release of this on PyPI?