Rq: Job is executed before "depends_on" job is finished when using Job.create and q.enqueue_job

Created on 27 Jan 2021  路  5Comments  路  Source: rq/rq

Consider the following snippet below and two running workers:

r = Redis(
    host="localhost",
    port=6379,
    db=0
)

q = Queue(connection=r)

id_1 = str(uuid4())

q.enqueue(func_1, job_id=id_1)
q.enqueue(func_2, depends_on=id_1)

In that snippet, everything works as expected; both jobs are enqueued ( one on each worker ) but func_2 does not run before func_1 is finished.

Now consider the following snippet, still with two workers:

r = Redis(
    host="localhost",
    port=6379,
    db=0
)

q = Queue(connection=r)

id_1 = str(uuid4())
id_2 = str(uuid4())

job_1 = Job.create(func_1, id=id_1, connection=r) 
job_2 = Job.create(func_2, id=id_2, depends_on=id_1, connection=r)
q.enqueue_job(job_1)
q.enqueue_job(job_2)

When I run that code both jobs are started immediately, despite setting the depends_on argument.




Mini bonus question

Why is it necessary to set connection= on the jobs when I use Job.create? Doesn't the Queue instance know what Redis connection to use already? Code fails without me setting connection on all the jobs as well.

Most helpful comment

I think this issue should be reopened as it appears to still occur(new failing test):
https://github.com/rq/rq/blob/9bef2aa5a49d894bd03bd772b670e207f8edd0ef/tests/test_queue.py#L530

I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170.

The crux of the issue is that enqueue_job does not check dependencies at all.

All 5 comments

This PR adds multi dependency feature to RQ proper. You should try this instead.

Thanks for answering!

I've looked through the PR, but I cant see how those changes would fix this problem? The only way I managed to fix this was to create a custom Queue class and implement the following function as a replacement for the initial enqueue_job call:

def enqueue_custom(
        self, job: CustomJob, pipeline=None, at_front: bool = False
    ) -> CustomJob:

        # If a _dependent_ job depends on any unfinished job, register all the
        # _dependent_ job's dependencies instead of enqueueing it.
        #
        # `Job#fetch_dependencies` sets WATCH on all dependencies. If
        # WatchError is raised in the when the pipeline is executed, that means
        # something else has modified either the set of dependencies or the
        # status of one of them. In this case, we simply retry.
        job.set_status(FlowStatus.QUEUED)

        if job._dependency_id:
            with self.connection.pipeline() as pipe:
                while True:
                    try:

                        pipe.watch(job.dependencies_key)

                        dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)

                        pipe.multi()

                        for dependency in dependencies:
                            if (
                                dependency.get_status(refresh=False)
                                != FlowStatus.FINISHED
                            ):
                                job.set_status(FlowStatus.DEFERRED, pipeline=pipe)
                                job.register_dependency(pipeline=pipe)
                                job.save(pipeline=pipe)
                                job.cleanup(ttl=job.ttl, pipeline=pipe)
                                pipe.execute()
                                return job

                        break
                    except WatchError:
                        continue
        return super().enqueue_job(job, pipeline, at_front=at_front)

You have this if-block inside enqueue_call as well, and I couldn't get dependencies to work without it.

With the PR merged, you can do queue.enqueue(my_func, depends_on=[job_1, job_2])

Yes I understand ( and really like this feature! )
But, in the docs you give this example:

from rq.job import Job

job = Job.create(count_words_at_url, 'http://nvie.com')
print('Job id: %s' % job.id)
q.enqueue_job(job)

# create a job with a predetermined id
job = Job.create(count_words_at url, 'http://nvie.com', id='my_job_id')

This way of doing it fits my needs _very_ well, but if I do this job dependencies does not work. If this is by design then of course I will have to use q.enqueue( ... ).

Edit:
My functions ( i.e. count_words_at_url ) are loaded dynamically from a configuration file, and are therefore attached to my Job class at runtime.

I think this issue should be reopened as it appears to still occur(new failing test):
https://github.com/rq/rq/blob/9bef2aa5a49d894bd03bd772b670e207f8edd0ef/tests/test_queue.py#L530

I agree with @jtfidje that this is not related multiple dependencies, though a few folks documented it in #1170.

The crux of the issue is that enqueue_job does not check dependencies at all.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

mitakuye picture mitakuye  路  7Comments

cw1427 picture cw1427  路  4Comments

EliFinkelshteyn picture EliFinkelshteyn  路  7Comments

jkryanchou picture jkryanchou  路  7Comments

zt9 picture zt9  路  5Comments