Consider the following snippet below and two running workers:
r = Redis(
host="localhost",
port=6379,
db=0
)
q = Queue(connection=r)
id_1 = str(uuid4())
q.enqueue(func_1, job_id=id_1)
q.enqueue(func_2, depends_on=id_1)
In that snippet, everything works as expected; both jobs are enqueued ( one on each worker ) but func_2 does not run before func_1 is finished.
Now consider the following snippet, still with two workers:
r = Redis(
host="localhost",
port=6379,
db=0
)
q = Queue(connection=r)
id_1 = str(uuid4())
id_2 = str(uuid4())
job_1 = Job.create(func_1, id=id_1, connection=r)
job_2 = Job.create(func_2, id=id_2, depends_on=id_1, connection=r)
q.enqueue_job(job_1)
q.enqueue_job(job_2)
When I run that code both jobs are started immediately, despite setting the depends_on
argument.
Why is it necessary to set connection=
on the jobs when I use Job.create
? Doesn't the Queue
instance know what Redis connection to use already? Code fails without me setting connection
on all the jobs as well.
This PR adds multi dependency feature to RQ proper. You should try this instead.
Thanks for answering!
I've looked through the PR, but I cant see how those changes would fix this problem? The only way I managed to fix this was to create a custom Queue class and implement the following function as a replacement for the initial enqueue_job
call:
def enqueue_custom(
self, job: CustomJob, pipeline=None, at_front: bool = False
) -> CustomJob:
# If a _dependent_ job depends on any unfinished job, register all the
# _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
job.set_status(FlowStatus.QUEUED)
if job._dependency_id:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(job.dependencies_key)
dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)
pipe.multi()
for dependency in dependencies:
if (
dependency.get_status(refresh=False)
!= FlowStatus.FINISHED
):
job.set_status(FlowStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute()
return job
break
except WatchError:
continue
return super().enqueue_job(job, pipeline, at_front=at_front)
You have this if-block inside enqueue_call
as well, and I couldn't get dependencies to work without it.
With the PR merged, you can do queue.enqueue(my_func, depends_on=[job_1, job_2])
Yes I understand ( and really like this feature! )
But, in the docs you give this example:
from rq.job import Job
job = Job.create(count_words_at_url, 'http://nvie.com')
print('Job id: %s' % job.id)
q.enqueue_job(job)
# create a job with a predetermined id
job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id')
This way of doing it fits my needs _very_ well, but if I do this job dependencies does not work. If this is by design then of course I will have to use q.enqueue( ... )
.
Edit:
My functions ( i.e. count_words_at_url ) are loaded dynamically from a configuration file, and are therefore attached to my Job class at runtime.
I think this issue should be reopened as it appears to still occur(new failing test):
https://github.com/rq/rq/blob/9bef2aa5a49d894bd03bd772b670e207f8edd0ef/tests/test_queue.py#L530
I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170.
The crux of the issue is that enqueue_job
does not check dependencies at all.
Most helpful comment
I think this issue should be reopened as it appears to still occur(new failing test):
https://github.com/rq/rq/blob/9bef2aa5a49d894bd03bd772b670e207f8edd0ef/tests/test_queue.py#L530
I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170.
The crux of the issue is that
enqueue_job
does not check dependencies at all.