@stephanie-wang and I discovered this problem while debugging a tutorial on Serve.
Ray version and other system information (Python version, TensorFlow version, OS):
Please provide a script that can be run to reproduce the issue. The script should have no external library dependencies (i.e., use fake or mock data / environments):
RAY_BACKEND_LOG_LEVEL=debug ray start --headimport ray
ray.init(address="auto")
@ray.remote
class Controller:
def __init__(self):
self.workers = []
def list(self):
return self.workers
def start(self, actor_class):
worker = actor_class.options(lifetime="detached").remote()
ray.get(worker.ping.remote())
self.workers.append(worker)
ctl = Controller.options(lifetime="detached", name="controller").remote()
print(f"Worker handles {ray.get(ctl.list.remote())}")
import ray
ray.init(address="auto")
ctl = ray.get_actor("controller")
@ray.remote
class Worker:
def __init__(self):
print("Worker started!")
def ping(self): return "pong"
ray.get(ctl.start.remote(Worker))
print("worker", ray.get(ctl.list.remote()))
grep --ignore-case job raylet.outI1016 08:54:55.621362 70924 193117632 service_based_accessor.cc:92] Reestablishing subscription for job info.
I1016 08:54:55.638834 70924 193117632 service_based_accessor.cc:110] Getting all job info.
I1016 08:54:55.639565 70924 193117632 service_based_accessor.cc:117] Finished getting all job info.
I1016 08:55:01.539261 70924 193117632 worker.cc:123] Assigned worker 01000000ffffffffffffffffffffffffffffffff to job 01000000
I1016 08:55:01.539319 70924 193117632 service_based_accessor.cc:29] Adding job, job id = 01000000, driver pid = 71004
I1016 08:55:01.539764 70924 193117632 service_based_accessor.cc:39] Finished adding job, status = OK, job id = 01000000, driver pid = 71004
I1016 08:55:01.539840 70924 193117632 node_manager.cc:336] HandleJobStarted 01000000
I1016 08:55:01.546902 70924 193117632 service_based_accessor.cc:1150] Adding object location, object id = 4ea01d51bed991a789781a50e0e8dbfd00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = e0e8dbfd
I1016 08:55:01.547286 70924 193117632 service_based_accessor.cc:1205] Removing object location, object id = 4ea01d51bed991a789781a50e0e8dbfd00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = e0e8dbfd
I1016 08:55:01.547652 70924 193117632 service_based_accessor.cc:1166] Finished adding object location, status = OK, object id = 4ea01d51bed991a789781a50e0e8dbfd00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = e0e8dbfd
I1016 08:55:01.548303 70924 193117632 service_based_accessor.cc:1220] Finished removing object location, status = OK, object id = 4ea01d51bed991a789781a50e0e8dbfd00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = e0e8dbfd
I1016 08:55:01.556632 70924 193117632 service_based_accessor.cc:850] Adding task, task id = ffffffffffffffffdf5a1a8201000000, job id = 01000000
I1016 08:55:01.556875 70924 193117632 node_manager.cc:2256] Submitting task: task_spec={Type=ACTOR_CREATION_TASK, Language=PYTHON, Resources: {}, function_descriptor={type=PythonFunctionDescriptor, module_name=__main__, class_name=Controller, function_name=__init__, function_hash=8ba8d4fb-0588-4001-a7fe-2772faf99d8c}, task_id=ffffffffffffffffdf5a1a8201000000, task_name=controller:Controller.__init__, job_id=01000000, num_args=0, num_returns=1, actor_creation_task_spec={actor_id=df5a1a8201000000, max_restarts=0, max_concurrency=1, is_asyncio_actor=0, is_detached=1}}, task_execution_spec={num_forwards=0}
I1016 08:55:01.558889 70924 193117632 worker_pool.cc:325] Worker process 71007 is bound to job 01000000
I1016 08:55:01.559113 70924 193117632 service_based_accessor.cc:859] Finished adding task, status = OK, task id = ffffffffffffffffdf5a1a8201000000, job id = 01000000
I1016 08:55:01.974562 70924 193117632 worker.cc:123] Assigned worker e32a2b267032d7c9485c534d7f32e1dce8eacb4d to job 01000000
I1016 08:55:01.983006 70924 193117632 service_based_accessor.cc:1150] Adding object location, object id = eb75d8974441f778201a0155bf38150000000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = bf381500
I1016 08:55:01.983310 70924 193117632 service_based_accessor.cc:1205] Removing object location, object id = eb75d8974441f778201a0155bf38150000000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = bf381500
I1016 08:55:01.983757 70924 193117632 service_based_accessor.cc:1166] Finished adding object location, status = OK, object id = eb75d8974441f778201a0155bf38150000000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = bf381500
I1016 08:55:01.984107 70924 193117632 service_based_accessor.cc:1220] Finished removing object location, status = OK, object id = eb75d8974441f778201a0155bf38150000000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = bf381500
I1016 08:55:02.493301 70924 193117632 service_based_accessor.cc:48] Marking job state, job id = 01000000
I1016 08:55:02.493429 70924 193117632 node_manager.cc:1474] Driver (pid=71004) is disconnected. job_id: 01000000
I1016 08:55:02.493961 70924 193117632 service_based_accessor.cc:57] Finished marking job state, status = OK, job id = 01000000
I1016 08:55:02.494050 70924 193117632 node_manager.cc:353] HandleJobFinished 01000000
I1016 08:55:07.481276 70924 193117632 worker.cc:123] Assigned worker 02000000ffffffffffffffffffffffffffffffff to job 02000000
I1016 08:55:07.481319 70924 193117632 service_based_accessor.cc:29] Adding job, job id = 02000000, driver pid = 71081
I1016 08:55:07.481721 70924 193117632 service_based_accessor.cc:39] Finished adding job, status = OK, job id = 02000000, driver pid = 71081
I1016 08:55:07.481789 70924 193117632 node_manager.cc:336] HandleJobStarted 02000000
I1016 08:55:07.489056 70924 193117632 service_based_accessor.cc:1150] Adding object location, object id = 6b427e06817f752e052c96de5df7830e00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = 5df7830e
I1016 08:55:07.489295 70924 193117632 service_based_accessor.cc:1205] Removing object location, object id = 6b427e06817f752e052c96de5df7830e00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = 5df7830e
I1016 08:55:07.489627 70924 193117632 service_based_accessor.cc:1166] Finished adding object location, status = OK, object id = 6b427e06817f752e052c96de5df7830e00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = 5df7830e
I1016 08:55:07.490015 70924 193117632 service_based_accessor.cc:1220] Finished removing object location, status = OK, object id = 6b427e06817f752e052c96de5df7830e00000000, node id = 16895b921f968d2a16359ac3e9de9711c51a20a1, job id = 5df7830e
I1016 08:55:07.500048 70924 193117632 service_based_accessor.cc:850] Adding task, task id = ffffffffffffffff7fffa98101000000, job id = 01000000
I1016 08:55:07.500170 70924 193117632 node_manager.cc:2256] Submitting task: task_spec={Type=ACTOR_CREATION_TASK, Language=PYTHON, Resources: {}, function_descriptor={type=PythonFunctionDescriptor, module_name=__main__, class_name=Worker, function_name=__init__, function_hash=c66af86d-070e-4c79-a178-29e8ac074eb6}, task_id=ffffffffffffffff7fffa98101000000, task_name=Worker.__init__(), job_id=01000000, num_args=0, num_returns=1, actor_creation_task_spec={actor_id=7fffa98101000000, max_restarts=0, max_concurrency=1, is_asyncio_actor=0, is_detached=1}}, task_execution_spec={num_forwards=0}
I1016 08:55:07.500252 70924 193117632 worker_pool.cc:180] Job config of job 01000000 are not local yet.
I1016 08:55:07.500522 70924 193117632 service_based_accessor.cc:859] Finished adding task, status = OK, task id = ffffffffffffffff7fffa98101000000, job id = 01000000
I1016 08:55:15.822947 70924 193117632 service_based_accessor.cc:1378] Publishing job error, job id = 01000000
I1016 08:55:15.822988 70924 193117632 service_based_accessor.cc:1381] Finished publishing job error, job id = 01000000
I1016 08:59:19.770591 70924 193117632 service_based_accessor.cc:48] Marking job state, job id = 02000000
I1016 08:59:19.770685 70924 193117632 node_manager.cc:1474] Driver (pid=71081) is disconnected. job_id: 02000000
I1016 08:59:19.771344 70924 193117632 service_based_accessor.cc:57] Finished marking job state, status = OK, job id = 02000000
I1016 08:59:19.771391 70924 193117632 node_manager.cc:353] HandleJobFinished 02000000
Notice the
I1016 08:55:07.500252 70924 193117632 worker_pool.cc:180] Job config of job 01000000 are not local yet.
If we cannot run your script, we cannot fix your issue.
This ray core issue is a P0 blocking issue for Ray Serve because it prevent user from using serve.start(detached=True) first driver and start new backends in subsequent drivers.
Note that this works if RAY_ENABLE_MULTI_TENANCY=0 is set for ray start.
@kfstorm, seems caused by a multitenancy bug.
I see. Under multi-tenancy mode, once a driver exits, the job ID will be removed from unfinished_jobs_, and further creation of tasks and actors will be blocked because the worker pool doesn't know the job config of this job.
We can fix this issue by introducing an all_jobs_ field, which will never erase the job from the map when a driver exits.
It seems that we allow creating new detached actors within a detached actor, right? But I'd like to throw out some questions:
In our internal codebase, we have the concept of long-running jobs. If a driver starts with the long-running flag and then exits, all actors and tasks of this job will be kept alive and the job state will remain ALIVE and won't be changed to FINISHED, so the job won't be erased from unfinished_jobs_. That's why we didn't notice this issue before when testing internally.
cc @raulchen
PS: as we use long-running jobs, we don't actually use the detached actor feature.
In Serve's use case, one ideal workflow can be that, when user run serve.start(detached=True) in their first driver script, the job turned into a long running job. In this case we can make sure the job don't need to get cleaned up.
btw, should we throw a warning if the job id is not local yet?
On those questions, @stephanie-wang can probably add more but my mental model is that:
Why don't we assign a new job id for each detached actor? Do you think it is feasible @kfstorm
Assigning a new job ID for each detached actor seems like the right approach to me, too. Conceptually, it fits my mental model that a detached actor is the root of a new job. I remember @raulchen mentioned a need to be able to clean up all detached actors associated with a previous job, but that seems easy enough to support by keeping track of the job ID that originally created the actor.
@kfstorm, could you take on this issue? We should not turn on multitenancy in the next release unless this is fixed since it's currently a silent failure.
Assigning a new job ID for every detached actor makes sense to me. But it may require a lot of changes since creating a new job without a driver is not a one-line code change.
I took a look at the code in worker_pool.cc and found that there are two places that block the creation of actors: StartWorkerProcess and RegisterWorker. If I just use all_jobs_ instead, a new issue will be created: new workers are allowed to be registered even if the job has finished (when the driver just exited and new non-detached actors or tasks are being scheduled). So the quick fix I mentioned above doesn't work unless we allow this type of registration. Since there's worker capping to take care of recycling idle workers, this shouldn't be a big problem except for a waste of memory.
How do you guys think about this quick fix?
As for the new job ID idea, I'm sorry that I don't have much time to work on this recently since it requires a lot of coding and we are preparing for the 11.11 event.
It looks weird to me to create a new job for every detached actor. Because conceptually, the detached actors belong to the original job. E..g, on the dashboard, users would want to see all actors (including detached ones) under the same job page.
IIRC, per our previous discussion, we were on the same page that we should have a new API that "really kill a job" (in order to clean up all the detached actors, leftover states in GCS/raylet, external dependencies, etc). So the ideal case in my mind is that when the driver exits, the job state becomes "driver_exited" and nothing is cleaned up. And when users "really kill the job", we clean up the states and set the job state to "finished".
BTW, another thing about cleaning up job states is that we should only clean up states in raylets when job finishes. For GCS states, we can clean up them after a expire time. Because users want to see dead jobs on dashboard, in order to debug failed jobs.
I don't 100% agree that detached actors belong to the job that created it (detached actor means the lifetime of the actor is not associated with the job that created this anymore). But I agree
But anyway, Serve currently heavily depends on the detached actor API, and we don't have time to remove this or turn off the multi-tenancy feature (it might be too late to do now?) for the next release (which will likely to happen in 2~3 weeks). So, we SHOULD fix this rn.
@kfstorm What about we just add a new field like detached_actor_ and use it as one kind of job id? It is not really a good "fundamental fix", but I think it should work?
@rkooo567 I'm not sure where you want to add detached_actor_ to. Could you give me more details?
Let's not discuss the definition of a "job" on this thread - that discussion is much larger and likely requires some user testing. The priority right now is that we fix this bug before the release.
@kfstorm, how about as a quick fix, we just never erase from unfinished_jobs_? We can take care of the memory leak in the future.
@stephanie-wang Isn't it the same idea as all_jobs_ implementation? Maybe I am missing something though
Btw, if we cannot fix on time, we should turn off this feature asap.
@kfstorm What I said is to have a separate field that is dedicated for detached actors, and treat tasks/actors coming from detached actors like the job exists (and we can filter tasks/actors from jobs because unfinished_job_ works in the same way as now).
@stephanie-wang Isn't it the same idea as
all_jobs_implementation? Maybe I am missing something though
Yes, I think it's the same. Just an implementation detail.
@rkooo567 I see your point. In your solution, I think we also need some additional fields to track the relationship between a task/actor created by a detached actor and the worker process being registered, and we also need to handle the situation that a task arrives before the parent actor's state notification is received by Raylet. To me, it's not a quick fix.
I'll go ahead with Stephanie's suggestion. Feel free to reply to this thread for other suggestions.
I see. But that means this issue
a new issue will be created: new workers are allowed to be registered even if the job has finished (when the driver just exited and new non-detached actors or tasks are being scheduled)
Is not covered in this case right? I might be missing something, but is Stephanie's solution covering this case?
I rather prefer to turn off the feature flag than making it slightly buggy to fix the issue quickly.
@rkooo567 You're not missing anything. Let's discuss the drawback of this solution in #11493.