Ray: Discuss on supporting long running job

Created on 6 Jan 2020  路  16Comments  路  Source: ray-project/ray

Current Approach

Currently, the most jobs of the production environment are long running in our experiences. Because it's often used, it's very important to add an appropriate and easy-to-use API for this.

I have noticed that @simon-mo implemented a relevant feature called detached actor in the PR #6036. But there might be some issues in detached actor:
1) It doesn't work for normal tasks.
2) The cost that users rewrite a normal job to a long running job is too expensive. They should add the detached_actor flags for every actor creation.(Please correct me if I'm wrong.)
3) Because users should do a dummy call and get for the actor to make sure it is created, users have to know more about the details of the ray. It's not in line with Simply principle.

Another Proposal

According to what we have been using for a long time, we'd like to support this in another approach.

Add a flag clean_up for the ray.shutdown() method to indicate whether we will clean up everything of this job.

# It will not clean up the things of this job even if this driver exits immediately.
ray.shutdown(clean_up=False)

Then there're 2 ways to drop the job from cluster if we want:

# execute a drop command
ray drop address="redis_address" job-id=xxxxxx

or drop it in another job with ray.drop API:

ray.init(address="xxxx")
ray.drop(job_id=xxxxx)

ps: It will be more natural if we enable job-name for a job.

If you think the API ray.shutdown(clean_up=False) is a bit weird, it will make more sense to put the flag to ray.init like:

ray.init(long_running=True)

Any other proposal is welcome.

@simon-mo @stephanie-wang @robertnishihara cc

P2 enhancement

Most helpful comment

I mean the name to be assigning a name to the job. How about this:

ray.init(address="..", job_name="my job name")
ray.shutdown(detach=True)

All 16 comments

Thanks @jovany-wang, can you provide some more details about the use case? For example

  • Why not just keep the driver alive? Is the concern that the driver is a possible point of failure for the job? Are there any other concerns?
  • Can you give a concrete use case?

Because users should do a dummy call and get for the actor to make sure it is created, users have to know more about the details of the ray. It's not in line with Simply principle.

That's a good point.

If you think the API ray.shutdown(clean_up=False) is a bit weird, it will make more sense to put the flag to ray.init like:

I like doing this on shutdown, it ensures that the job will be cleaned properly up if it crashes at any point prior to the shutdown() call.

I agree with having a name for the job though, maybe there can be a call like ray.detach_job(job_name="my_streaming_job")?

  • Can you give a concrete use case?

Yes, there're several concrete cases actually. One case is our streaming job. A driver is used to create some actors with certain responsibilities, and the actors both are long running to do their work.

Other online cases like serving are also need this requirement: A driver is used to create some long running serving actors to receive requests from users.

  • Why not just keep the driver alive? Is the concern that the driver is a possible point of failure for the job? Are there any other concerns?

The major reason for not keeping driver alive is the lack of driver's fail-over ability as you said. Another point is there's no graceful way to keep driver alive. Both dummy get or while True: sleep are too weird.

@ericl
User shouldn't call ray.detach_job(job_name="a") in job b, because we're not able to guarantee job a is done.
So if we use ray.detach_job, it should be with no job_name parameter. Because we can only call it in the same job.

How to define the behavior of the following code:

# driver
ray.init(address=xxx)

ray.shutdown() # do clean up
ray.detach_job() # The job is cleaned up

I mean the name to be assigning a name to the job. How about this:

ray.init(address="..", job_name="my job name")
ray.shutdown(detach=True)

I mean the name to be assigning a name to the job. How about this:

ray.init(address="..", job_name="my job name")
ray.shutdown(detach=True)

This looks good to me as well.

@jovany-wang what is the intended way to clean up a detached job?

What about just ray.detach_job() and nothing else? This has the benefit that @ericl mentioned about not detaching the job if it crashes before that line.

My mental model of a detached actor is a little bit different from everybody else's I think, instead of making all the actors in an application detached, there should be a single detached actor that then launches all the other (non-named) actors. In that view, detached drivers and detached actors are roughly similar. The advantage of the detached actor is that we already need an API to kill actors, so that would be a natural one to terminate detached actors too. Normal tasks launched from a detached actor should also only be cleaned up when the detached actor is cleaned up.

For the point 3. in the original post, I'm not sure if I understand. It should be enough to just call the constructor of the detachable actor, that will make sure it gets executed. No get should be needed (if it is that sounds like a bug).

It shouldn't make too much difference from the applications perspective which of the two solutions we end up going with, i.e. should be quite easy to take all the code from one of your detached drivers and call it in the constructor of a detached actor for example, and vice versa, having a detached driver that starts a single actor is equivalent to having a detached actor.

My mental model of a detached actor is a little bit different from everybody else's I think, instead of making all the actors in an application detached, there should be a single detached actor that then launches all the other (non-named) actors. In that view, detached drivers and detached actors are roughly similar. The advantage of the detached actor is that we already need an API to kill actors, so that would be a natural one to terminate detached actors too. Normal tasks launched from a detached actor should also only be cleaned up when the detached actor is cleaned up.

For the point 3. in the original post, I'm not sure if I understand. It should be enough to just call the constructor of the detachable actor, that will make sure it gets executed. No get should be needed (if it is that sounds like a bug).

Maybe it isn't quite good solution to manage all actors by a detached driver actor.
Let us image a case in streaming:
A driver launchers a streaming job including job master (actor) and wokers( a lot of stateful actors).
The driver actor should not handle all recovery operations because of all if it's killed or lost,
which is side effect.
@jovany-wang

@pcmoritz

Let's image that we have almost 100 actors in our APP, and then we create these around 100 actors in another detached actor(call it my-detached-actor):

@ray.remote
class MyDetachedActor:
  def __init__(self):
      # create my 100 actors.

# driver code:
my-detached-actor = MyDetachedActor.remote(detached=true)

Some issues are introduced now:

  1. In this APP, there is a my-detached-actor actor which is doing nothing and just for submitting the 100 work-actors. It looks too weird for users I think.
  1. For user, I must consider the failure cases of my-detached-actor, which will introduce more mental burden to user.

@pcmoritz

should be quite easy to take all the code from one of your detached drivers and call it in the constructor of a detached actor for example, and vice versa, having a detached driver that starts a single actor is equivalent to having a detached actor.

I can get the point as you said that users can use detached-actor to implement it and then we needn't to provide another drop job API. I'd like to address the root issue instead of doing it with reluctance.

Thanks :)

Normal tasks launched from a detached actor should also only be cleaned up when the detached actor is cleaned up.

@pcmoritz Currently, if we kill the detached root actor, those normal tasks won't be cleaned up, right? However, if we terminate a job, Ray will clean up all related resources. Considering this, having detached drivers + a kill job API sounds better than having detached actors + a kill actor API.

@ All,

If we both agree the API(ray.shutdown(detach=True)) proposed by @ericl , I鈥榤 going to do it.

As I indicated above, I'm ok with the ray.shutdown(detach=True) solution, it is a good API I think :)

Me too.

A couple things.

  • For now, there will not be any way to clean up a detached job?
  • We will need to clarify the semantics around detached actors. For example, if an actor or task is created by a detached actor, what happens to that actor or task when the original job finishes.

For now, there will not be any way to clean up a detached job?

There is no way to clean up it now. we should support it via:
a. ray.drop job-name/job-id command
b. ray.drop(job_name/job_id) API in another driver.

We will need to clarify the semantics around detached actors. For example, if an actor or task is created by a detached actor, what happens to that actor or task when the original job finishes.

Agree with you on this point.

Was this page helpful?
0 / 5 - 0 ratings