The goal of "named actors" is to make it possible for one job to get a handle to an actor that was created by a different job (both jobs would have to be part of the same Ray cluster).
We have to decide on the semantics as well as the API. @pcmoritz proposed some possible APIs.
Assuming we have created an actor with
a = ActorClass.remote()
# Create named actor.
ray.actors["actor1"] = a
# Get named actor handle (done by a different driver).
a = ray.actors["actor1"]
# Create named actor.
ray.register_actor("actor1", a)
# Get named actor handle (done by a different driver).
a = ray.get_actor("actor1")
Another approach, suggested by @stephanie-wang is to just create a single global actor (global to the cluster), which each driver has a handle to, and which the user can use to implement named actors if they want to.
@ray.remote(name="actor1")
class ActorClass(object):
pass
# Create named actor.
a = ActorClass.remote()
# The second instantiation would raise an Exception because
# only one actor can be created with a given name.
a_new = ActorClass.remote() # ERROR!
# Get named actor handle (done by a different driver).
a = ray.get_actor("actor1")
One use-case that would be useful to support is when you have separate actors for different workers in parameter server architecture. IE, in the diagram below, 3 actors have different recovery procedures. More specifically, actor1 may be restoring it's dataset queue checkpoint from /efs/checkpoints/actor1.

So when, say machine of worker1 machine fails and actor needs to be recreated on a new machine, it would be the "same" actor in a sense, ie, it would load dataset checkpoint from /efs/checkpoints/actor1 and continue the work from that location
For completeness, a few other API suggestions that were made.
Some of these could be more general and include non-actor objects.
# Publishing (one of the following).
ray.set_name(a, "actor1") # Could work for object IDs also.
# Retrieving (one of the following).
a = ray.get_by_name("actor1")
The main difference here is that it anticipates potentially adding other properties other than "names" (do we even want that?).
Also, properties imply that an actor could have multiple properties and that multiple actors could have the same property.
# Publishing (one of the following).
ray.set_property(a, name="actor1") # Could work for object IDs also.
# Retrieving (one of the following).
list_of_handles = ray.get_by_property(name="actor1")
# Publishing.
ray.put(a, name="actor1") # Could work for object IDs also.
# Retrieving
a = ray.get(name="actor1")
Another (most minimal) approach is to allow actor handles to be serializable. The user could then save the handle ID and communicate it out of band to other jobs.
This would have the similar advantages and pitfalls of allowing object IDs to be serializable.
The main downside of this I can see is that you still can't allocate actors with a specific id, just like you can't put objects to a specific id. This restriction could be relaxed however.
Edit: this is similar to Option 7 but not necessarily requiring ray.put
I like @ericl 's approach if it can be implemented without too many downsides (i.e. if creating an actor from the serialized handle is cheap and it doesn't introduce overhead in the common case of unnamed actors; furthermore if we don't need a list of "exported actors" for garbage collection reasons). In that case the user can just have their own way of managing the actors and we don't impose an API on them.
I think that should work, the main thing we need to make sure is that when we deserialize an actor handle, we need to get a new actor_handle_id. cc @stephanie-wang
One thought. Instead of creating an actor first and then turning it into a "named actor", it probably makes more sense to "create it as a named actor", e.g.,
a = ActorClass.create_named_actor(args=[], kwargs={}, name='actor1')
Looking forward to this feature!
Once https://github.com/ray-project/ray/pull/2007 is merged (probably later tonight), the following will work (not the "cleanest" API, but it contains the bulk of the functionality).
In interpreter 1:
import cloudpickle
import numpy as np
import ray
ray.init()
@ray.remote
class ParameterServer:
def __init__(self):
self.params = np.zeros(10)
def get(self):
return self.params
def update(self, update):
self.params += update
ps = ParameterServer.remote()
@ray.remote
def f(ps):
ps.update.remote(np.ones(10))
ray.get(ps.get.remote()) # array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
f.remote(ps)
ray.get(ps.get.remote()) # array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
cloudpickle.dumps(ps) # b'\x80\x04\x95>\x03\x00\x00\x00\x00\x00\x00\x8c\tray.actor\x94\x8c\x0bActorHandle\x94\x93\x94)\x81\x94}\x94(\x8c\x08actor_id\x94C\x14\xed\x83\xdb\xf6\xb6\x8d\xca\xcc\xf1W1k\x86\xd72\xfe\x19\x10\x12S\x94\x8c\nclass_name\x94C\x0fParameterServer\x94\x8c\x0bactor_forks\x94K\x01\x8c\x0cactor_cursor\x94C\x14ba_\x8c\x8d\xbd\x90\x14b\x810\x80\xb9\xb2\x82\x1c\xe0o\x9c\xaa\x94\x8c\ractor_counter\x94K\x00\x8c\x12actor_method_names\x94]\x94(\x8c\x08__init__\x94\x8c\x12__ray_checkpoint__\x94\x8c\x1a__ray_checkpoint_restore__\x94\x8c\x1f__ray_restore_from_checkpoint__\x94\x8c\x17__ray_save_checkpoint__\x94\x8c\x11__ray_terminate__\x94\x8c\x03get\x94\x8c\x06update\x94e\x8c\x1cactor_method_num_return_vals\x94]\x94(K\x01K\x01K\x01K\x01K\x01K\x01K\x01K\x01e\x8c\x11method_signatures\x94}\x94(h\x0f\x8c\rray.signature\x94\x8c\x11FunctionSignature\x94\x93\x94(]\x94]\x94]\x94\x8f\x94h\x0ft\x94\x81\x94h\x10h\x1d(]\x94]\x94]\x94\x8f\x94h\x10t\x94\x81\x94h\x11h\x1d(]\x94]\x94]\x94\x8f\x94h\x11t\x94\x81\x94h\x12h\x1d(]\x94]\x94]\x94\x8f\x94h\x12t\x94\x81\x94h\x13h\x1d(]\x94]\x94]\x94\x8f\x94h\x13t\x94\x81\x94h\x14h\x1d(]\x94]\x94]\x94\x8f\x94h\x14t\x94\x81\x94h\x15h\x1d(]\x94]\x94]\x94\x8f\x94h\x15t\x94\x81\x94h\x16h\x1d(]\x94h\x16a]\x94\x8c\x08funcsigs\x94\x8c\x06_empty\x94\x93\x94a]\x94\x89a\x8f\x94h\x16t\x94\x81\x94u\x8c\x1eactor_creation_dummy_object_id\x94C\x14hc\x8e\xadpM\xe8\xdc\x1f7Miy\xbf\xaf\xeb\xc2\xc0Y\xa3\x94\x8c\x11actor_method_cpus\x94K\x01\x8c\x0factor_driver_id\x94C\x14DY\xc3\x1e\nm\xcfJ\xc7:\xf0m\xc0\x9d_\x8b\x90\xe1\xca~\x94\x8c\x18previous_actor_handle_id\x94C\x14\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x94\x8c\x0bray_forking\x94\x89ub.'
In interpreter 2:
import cloudpickle
import ray
ray.init(redis_address=...) # Fill this in with the correct address.
ps = cloudpickle.loads(...) # Fill in the output of cloudpickle.dumps from above
ray.get(ps.get.remote()) # array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
It seems that the above usage works when two interpreters are on the same machine, but doesn't work when they sit on different machines within the same network (the second interpreter simply hangs after calling ray.get). I already launched ray on the second machine via ray start --redis-address=.... Any idea why?
It seems to work for me when the two interpreters are on different machines.
Can you double check the following:
--redis-port on the head machine)--redis-shard-ports on the head machine)--object-manager-port on each machine)ray.global_state.client_table(), is it aware of both machines.--num-gpus=1 and then verify that you can create a GPU task and get the result?EDIT:
Also, can you check that
pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-0.4.0-cp36-cp36m-manylinux1_x86_64.whl
@dementrock any updates about this?
One thought. We may also want to unregister an actor (for example, 1) when a driver exits, unregister all named actor it registered; 2) switch a named actor to another instance).
Considering this point, I think API 2 looks most natural.
ray.register("actor1", actor) # or maybe name it 'register_actor' to be more specific
actor = ray.get_actor("actor1")
ray.unregister("actor1")
Implemented in an experimental form.
Most helpful comment
Once https://github.com/ray-project/ray/pull/2007 is merged (probably later tonight), the following will work (not the "cleanest" API, but it contains the bulk of the functionality).
In interpreter 1:
In interpreter 2: