Hi @robertnishihara @pcmoritz , we are planning to add a batch Garbage Collection to Ray.
We have a concept called batchId (int64_t) used to do the Garbage Collection. For example, one job will use this batchId to generate all the objectIds and taskIds, and all these objectIds and taskIds will be stored under the Garbage Collection Table under the batchId in GCS. When the job is finished, we can simply pass a batchId to the garbage collector and the garbage collector will look up the Garbage Collection table in GCS and do the garbage collection to all the related tasks and objects.
In current id.h implementation, the lowest 32 bits in ObjectId is used for Object Index. We can use the higher 64 bits next to the Object Index as the batchId and add a new GC Table in GCS.
This GC mechanism will help release the memory resources in GCS and plasma.
How do you think of this code change?
@eric-jj @raulchen @imzhenyu . FYI.
We used the task_id(part of the objectID) to GC objects for a task in ant finance internal version, which gets very promising results for the plasma object management. I am thinking to migrate the feature to the open source version, so we raise the issue. @pcmoritz @robertnishihara , do you have any comments on it?
Can you post a code example showing what this would look like from the application level? Or would this be completely under the hood?
I will post a code review as an example shortly. Thank you Robert.
Thanks! I have some reservations about this approach and how general it is. It clearly makes sense for at least one of your use cases, but we need to make sure we have a strategy that works in general.
cc @concretevitamin
Here is the user code change that our scenario needs. https://github.com/ant-tech-alliance/ray/pull/29
In this code change, there is no void gc(Long batchId); in RayApi.java, but our target is to add it.
In the user side, here is a sample of the user code. The user wants to do the GC to release the Object memory for the finished jobs.
public RayJobScheduler {
public void init(JobSchedulerContext jobSchedulerContext) {
// Set the job id. Do some init work...
}
public void scheduleJob(IRayaJobDriver driver) {
// Set the job id. Do some init work.
Ray.startBatch(jobid,...);
}
public void finishJob(Long jobId, JobRes jobRes) {
// Do some cleaning work...
Ray.endBatch(...);
if (job.Res.doGc) {
Ray.gc(jobId);
}
}
}
Here, we are referring JobId and BatchId as the same concept which means a whole job/procedure/experiment that generating many ray tasks and objects to achieve a goal. And the job finishes and we have the result, we know that the resources in the job are no longer needed, so we can release the resources for later and continuous job requirements.
For the python code, user application will be the following code. The jobID is an option one, it will not affect the existing feature/code. Only user want to manage the task and object life cycle by himself, he can use the start_batch and end_batch to do GC.
@ray.remote
def f():
time.sleep(1)
return 1
ray.init()
jobID = ray.start_batch();
results = ray.get([f.remote() for i in range(4)])
ray.end_batch(jobID, need_gc = true);
Hey @guoyuhong @eric-jj - I think we have been conservative in two things
So I think it'd be great to have a deeper discussion on this proposed feature, perhaps in the form of a design doc? @robertnishihara @pcmoritz what do you guys think?
To clarify, we've been working on addressing GCS memory issue as well. The difference is our solution's more transparent to the user, while still being possible to call directly by the user should she choose so. Each GC batch is not tied to a job. When it's ready, I'd love to get you guys to play with this feature and see if it meets your needs!
@concretevitamin
The feature is not for the GCS memory, it is for the plasma memory management. It is an important feature for our production system, without the feature the unused objects will not be recycled in plasma, and useful object may be evicted because of the lifetime. We implement the feature because we met the problem when we use ray in our system. Our scenario is a graph computation, there are many temp variables during the calculation. So we implemented the feature on our internal version, and got the expected results. The system is online now.
I am trying to merge the internal version ray and github version. The first step to me is to deploy the github ray to our scenario. The feature is a blocking one for it, we can't apply the github version ray to our online system.
@guoyuhong will write a design doc and we can follow up the detail design. To the design, it will introduce a top level API, it is expected. Some scenarios need the RAII ability, user can choose to use it or not. And I don't think it will exposing internal control to user, how to implement the GC logical is transparent to user, we can do nothing although user mentioned that.
What's your plan to resolve the GCS memory issue, can you provide more details like design doc. We can evaluate it resolve our problem or not.
We have graph computing scenario based on Ray. Ray can be used as the backend for the continuous graph computing requests. The requests have the following characteristics:
Currently, the Plasma Store will evict the objects according to creation time when the memory is not enough. However, this mechanism is not enough. For one thing, our scenario needs large amount of memory, so it won't take long to use up the memory with incoming requests. For another, if an object is evicted according to creation time while it is still in use for later tasks, there will be a reconstruction of the object, which will affect the performance. Since we have the knowledge which requests are finished and won't be used in the future, we can release the resource safely.
In the further evolution, Ray will support stream computing. The life cycle for streaming objects are different. Even the best automatic garbage collecion may not be enough. We need to provide the elaborate memory management like RAII to enable users to manage object life cycle according to their needs.
We introduce a concept of Batch and BatchId (int64_t). It is an identifier for a set of Ray tasks and Ray objects related to one high-level request/job concept. After the high-level request/job finished, there is a new API in Ray for users to release all the resources from GCS and Plasma according to one BatchId.
# APIs:
# This function can be called after Ray.init().
# If this function is called, there is a batch concept in this Ray Driver.
# If not, Ray will work as normal.
ray.start_batch(batch_id)
# This function can be called by the users
# if they think all their resources are not needed in the future.
ray.end_batch(batch_id, need_gc = true)
###########################################################
# Scenario 1: Normal Mode
@ray.remote
def f():
time.sleep(1)
return 1
ray.init()
results = ray.get([f.remote() for i in range(4)])
# Scenario 2: Batch Mode
@ray.remote
def f():
time.sleep(1)
return 1
ray.init()
batchId = ray.start_batch()
results = ray.get([f.remote() for i in range(4)])
# Some other work...
ray.end_batch(batchId, need_gc = true)
The Java API can be design in a similar way.
Currently, the lowest 32 bits are used for Object Index. The full higher 128 bits are used for TaskId. We can further split the 128 bits into two parts. The highest 64 bits can be used for BatchId.

There will be a GCS table to record the ObjectIds and TaskIds under one BatchId if the Batch mode is enable. The table name is "GC:${batchId}". Every objectId and taskId will be pushed into this table. After garbage collection is called with batchId, the GCS will release all the objectIds and taskIds under this table.
Node manager will subscribe the delete message of the "GC:${batchId}" from GCS. If GCS has deleted the garbage collection table, the node manager will trigger a garbage collection request to Plasma. Plasma will go through all the objectIds and delete the objects under the requested batchId.
@robertnishihara @concretevitamin any comments?
@guoyuhong thanks for the code example! @eric-jj, thanks, I am still thinking through the implications.
Is there a clean way to implement this as a helper utility outside of the backend? E.g., in Python/Java you can track all of the object IDs that are generated between start_batch and end_batch (and use the task table to find tasks that were spawned by those tasks), and then generate new tasks that run on each node and use the plasma client API to evict the relevant IDs.
The advantage of doing it this way is that we can avoid adding functionality to the backend and also avoid adding extra information to the ObjectIDs.
Is it accurate to say that the core issue is that the object store eviction policy is not good enough for some scenarios like you're streaming use case and that you want to provide application-level knowledge to improve the eviction strategy?
@robertnishihara your solution can solve some problems, but it can't meet the requirments.
Is it accurate to say that the core issue is that the object store eviction policy is not good enough for some scenarios like you're streaming use case and that you want to provide application-level knowledge to improve the eviction strategy?
@robertnishihara extactly
I think currently Ray is still lack of a good resource management ability.
For GCS, its memory is never recycled.
For object store, it simply adopts LRU-based eviction now, which definitely isn't the most effective approach.
Thus, I think we can consider introducing a Resource Manager component into the core. It takes responsibility for managing the life cycle of all resources in the system, including GCS, object store, etc.
I haven't thought about the detailed design of the Resource Manager. But I think having a component that unifies resource management will be great.
Also, I'd like to hear @concretevitamin's plan to address GCS memory issue.
@robertnishihara For the proposal, it is not only for the GC management also. It is a starting point for the job level management. When I talked with ant finance's AI team, they have strong feature requirement for the job level management, such as observe the job status, cancel or re-submit a job.
The job level concept/definition is a missing piece in the ray stack. GC algorithm is important, but job level resource management is also required. Without it, it is hard to provide ray service to many AI engineers to work together.
When a java application exit, all the object created by it will be recycled by the OS also. We need the similar ability, when a job(just like application in java) complete its life cycle, return the resource back.
@guoyuhong thanks for raising these issues. These are great points to discuss.
First, regarding a job API. What are the requirements? Something like Spark submit (https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-submit.html)? What else?
Second, regarding object eviction from the object store, it would be great to understand when LRU is not good enough. What workload (it looks to me that for the graph workload LRU should be fine, but maybe I'm missing something) ?
Of course, by LRU I mean that the objects will be evicted according with the times since they've been last accessed (red/write). And if we have workloads for which LRU is bad, one question here is whether configuring the eviction policy per driver/job would be good enough.
Also, related to the above we need to add the ability to spill over the objects to a persistent store. If you guys have some thoughts here please let us know. Also, this would be a great task to collaborate or maybe for you to lead.
@istoica For the job API, the Spark submit is a similar one. And Ray should provide a http server to user can submit, view, cancel the jobs. Another required important feature about job is separate resource manager for different user to avoid one user occupy all the resources in a cluster. If the feature(resource isolation) is not a general one, we need the interface in the job management module to let us apply our internal policy.
For the LRU, different job's life cycle is different, many small jobs create many temp variables and exit, and few large job run a long time. So the objects created by the long run job will be created will be deleted because its' life time is longer than the objects' created by the small jobs.
We did consider to create a new GC algorithm to resolve the issue. Ray has provide the interface, it will modify the code less. But we can't find a good solution to our scenarios. LRU is a GC algorithm, the GC algorithm is not better than java GC algorithm. It can't provide the scope resource management just like RAII in c++ and using in C#.
If you have concern on add top level API for RAY, with the job level support, we can add a flag to submit to let user decide recycle the objects created in the job or not. It is more nature to user, and will not expose the details in the job.
Spill over the objects to a persistent store is an option, but it can't resolve the issue because many temp variables are useless when a job exit in graph computation. The objects inflation can't be avoided in graph computation. In our case, 1 node will access the 300 neighbor nodes, it will be done for 3 times, which means 300^3 temp variables will be created. After a job exit, we don't want to maintain these objects.
And in our current thinking, spill over the objects to persistent store need job level support. Retired objects in a job will be put to the persistent store as a group, and we will not keep entries for these retired objects in the plasma store. We also are trying to apply the ray graph to online calculation scenarios now, the perf of it is very sensitive, we only have less than 300ms budget, large page out and page in to persistent store is not acceptable.
@istoica LRU is not enough because the storage unnecessarily caches a lot of objects which brings in significant memory/storage/computation overhead in our long running service case with small jobs. We therefore introduce the above batch based approach in our internal version, and it is now running in production.
However, we do find limitations though for this approach as it does not allow interactions among different jobs (e.g., requests), and it does not support big job nicely (e.g., when the workload does not fit into total memory capacity). We therefore consider new distributed garbage collection solutions as mentioned in Tahoe to @robertnishihara etc. We will come back to you later when it is validated in production.
Said so, we still think a batch based approach valuable and it is useful for ray to come up the new capability for being a long running job service quickly.
@eric-jj and @imzhenyu thanks for the clarifications. A few things:
1) Regarding Ray submit, if anyone has cycles to submit a github issue with a short product requirement document (PRD) that would be great. We definitely need help here, especially from people like you that understand well the use case.
2) Regarding resource management would per-driver resource management be good enough? As you know, in Ray you can have multiple drivers in the same cluster, so maybe this is enough; just associate a job or a user with a driver. The second question here is what kind of policies we would need to implement: fair sharing, priority? Finally, again it would be great for someone to create a github issue and a PRD.
3) Regarding * object eviction policy*, there is no question that there are workloads for which LRU won't work well. However, it would be great to get more detailed examples. For instance, I'm still confused about this statement "... objects created by the long run job ... will be deleted because its' life time is longer than the objects' created by the small jobs." LRU will only evict the objects of the long jobs if they were not accessed recently; eviction is not really related to the object's lifetime. If you keep accessing the long job's objects, small jobs won't cause the eviction of these objects. Maybe in your application an object is not accessed for a long time since it was created.
Also, one question here. How useful would be an API like ray.unsafe_evict(object_list), which will force the eviction of all objects in object_list form the object store? Additionally, we could have ray.unsafe_evict(object_list, recursive=True), which will evict not only the objects in object_list, but also all objects on which these objects depend on. For instance, if obj is in object_list, then we evict not only obj, but also all "orphan" objects, i.e., objects whose descendants in the computation graph have been all removed. For example, in the following code:
xid = f.remote(a, b)
# use xid...
ray.unsafe_evict(xid, recursive=True)
# above instruction will cause not only the xid object to be evicted from object store,
# but also "a" and "b"
(I know that the above API might not be as easy to use, but certainly is very flexible.)
An alternative would be to provide an API like ray.pin(object_list) which makes sure that none of the objects in obj_list will be removed by LRU.
@istoica I will follow up the Ray Submit issue next week.
For the per-driver resource management, I have not think about carefully, it should be ok to resolve our problem. For the policy issue, priority is a required one, but there are some other feature like security check, it seems can't be a common one for users out of ant financial.
For the object eviction policy, some function like ray.unsafe_evict with recursive support should be fine to resolve our issue. I am fine to use the API to do that.
Thanks, @eric-jj.
BTW, it would still be great to understand the workloads for which doesn't LRU work well. Maybe there is an issue with the current implementation, rather than a LRU-policy problem?
Also, in addition to evicting all objects of a job when finishes execution, would pinning an object in the object store be good enough? Arguably pinning would be easier to implement than ray.unsafe_evict(), and also more consistent with memory management patterns in OSes: https://lwn.net/Articles/600502/
@istoica
For LRU doesn't work, there are two types jobs are running together, job type A running for a long time, job type B running for short time and continue be submitted. When the type A is still running the first job of it, the job type B may be running for 100 times. The plasma are full occupied by the objects created by the job type B, but 99% of them are useless now. When the new objects are created, the object created by the type A will evicted from memory.
And for another reason we want to control the GC because we are trying to apply ray to online calculation scenario as I mentioned. At that scenario, the total latency budget is less than 200ms. We can't afford do GC during the job running time.
I think manually evicting all objects seems more reasonable for our case, and it is similar to our current solution running on the production.
For pin objects mean user need un-pin objects, and it is hard to predict which objects should be pin when the objects are created. It is hard to predict that.
@eric-jj, thanks for clarification. So I assume that in this case, during B's executions, A's won't access its data too much.
Let's then design a job API, and free resources under the hood when the job ends. This solution would require no batch API.
Here is a possible API for Python:
# @ray.job assigns (1) a job id which is propagated to all tasks/actors/objects which
# are in the execution graph rooted at this task, (2) takes some scheduling
# arguments (e.g., priority), (3) cleans up the objects and task table when the
# job finishes.
@ray.job(name = "name", sched_args = sched_arg_list, ...)
def f(...)
# code of the job here
# if cleanup=True, clean up all state associate to the job when it finishes, including
# its object in the object store, and the associated GCS state; otherwise don't clean
# up any state.
jobId = f.job(..., cleanup=True)
# if you want to wait for finishing the job, or the job returns a result,
# then just use ray.get()
jobRes = ray.get(jobId)
Alternatively, we can define a class/actor that also has methods to get the job progress. Maybe this is the way to go. Note that under the hood, we could use the batch implementations.
So it seems to me that the most important next steps are to define the job concept (e.g. what are its properties, e.g., name, unique id, scheduling parameters, ability to get a progress report, ...), and the API, including a submit API. Anyone would like to start a PRD on this?
@istoica Yes. Job level management is more make sense. We introduce the batch because we it will change less code. As I mentioned before, I planned to do it step by step. Implement the batch first, then build job on it.
I can ask someone in our side to write the design doc for the job first, then we can finalize the design on the design doc and go to the detail implementation.
Sounds good @eric-jj. I'help with the PRD document. It would be great to do it over the next couple of days. Thanks.
A different API I'd propose here is to use context managers for this:
Usage with the with statement:
@ray.remote
def f(something):
# Do something
with ray.job(name="jobname"):
x = f.remote(1)
# Do other things
# The object "x" can now be cleaned up
with ray.job(name="jobname2"):
y = f.remote(1)
# Do more
Context managers can also be used without the with statement if needed, see the python documentation: https://docs.python.org/2/reference/datamodel.html#context-managers
The semantics here would be that there is one default job per driver and you can create custom ones that inherit all objects and function definitions from that default job. It is not allowed to pass objects between these inner jobs, so after the context manager is closed on the driver and all tasks from that job have finished, things can be cleaned up.
@pcmoritz yes, with operation seems more reasonable for the scenario. @guoyuhong please add it to the design doc.
@eric-jj , @guoyuhong , @pcmoritz , @robertnishihara, @ericl here is a draft of the product requirements document for jobs: https://docs.google.com/document/d/1XarOe4QKKcFbHRzy-OH5gGQz25dNBl3tCWDK-9oOyEY/edit#
We should make sure that the design of the "garbage collection" is consistent with jobs. They clearly have overlapping requirements, and don't want to have multiple APIs or implementations.
Regarding the with API we need to make sure that it provides the developer with the ability to (1) get an unique ID of the job's execution, (2) specify whether to perform garbage collection, or not, on job's completion, and (3) get status of the job's execution (while this is not a part of the "must" requirements for now, the question is not whether to do it but when).
Regarding the with API we need to make sure that it provides the developer with the ability to (1) get an unique ID of the job's execution, (2) specify whether to perform garbage collection, or not, on job's completion, and (3) get status of the job's execution (while this is not a part of the "must" requirements for now, the question is not whether to do it but when).
@istoica These requirements can be easily implemented on top of context manager. The code will be something like this:
with ray.job(name="name", cleanup_on_completion=True, resources={...}) as job: # we may also want to limit resources assigned to this job
# do something here
print(job.id(), job.status())
Another thing to note is that, this API requires a global variable to represent the current job id. And having mutable global variables is error prone when the worker program becomes concurrent.
If using multi-threading, the fix can be as easy as storing the global state in thread locals.
But if using py3 asyncio, maintaining the state correctly will be very hard when multiple jobs running in multiple coroutines in the one thread.
Thus, I'd recommend avoiding global states if possible.
@raulchen I'm afraid I do not follow your comment.
The ID I was referring to is associated with each job execution and is immutable.
A job is a bunch of code (e.g., a jar) that you can execute multiple times with different inputs and different arguments. The job has a name. Each job execution has a unique ID.
Job vs job execution is similar to remote function vs task, and to remote class vs actor.
@istoica
In my above code example, by with ... as job, I actually meant a running job instance (or a job execution).
Actually, the term job sounds more like the dynamic running instance to me. Maybe we can use JobSpec to represent job's specification (code, priority, resources, etc) and Job to represent the running instance?
@eric-jj, I see. These naming could make sense. Maybe leave some comments in the PRD: https://docs.google.com/document/d/1XarOe4QKKcFbHRzy-OH5gGQz25dNBl3tCWDK-9oOyEY/edit# ?
@istoica We have added the required features to the doc. There is 1 Must have feature - "Ability to specify job timeout.", and two Nice to Have feature -"Ability to apply custom scheduling policy by cluster manager." and "Ability to query the state of a job and provide a http server on the query API." from our side.
@eric-jj, thanks. Can you please add some clarifications about the state you would like to query? Is this state defined by the job (e.g., how much data was processed, accuracy for a training job) or some generic state (e.g., how many tasks have executed)? Or both?
@istoica We need some general information not the job internal status, such as is it job in the queue or has been scheduled, processed time, estimation remain time.
@eric-jj sounds good. One thing though: how can you estimate the job remaining time? I think we should have to types of status: (1) generic (i.e., waiting, running, completed), and (2) specific status which can be provided by the developer (e.g., estimated remaining time, progress, accuracy, etc).
@istoica I didn't think thoroughly about what data should be in the query api. But I think monitoring is certainly a must for a mutual system. We can keep data structure simple and expandable in the initial implementation.
all, today we had a review of the PRD at Berkeley. The latest document reflects the feedback. Also please note a few more comments from the group. I think we are converging. Please take a look and try to address the comments that refer to your part of the proposal. Also, for the Ant Financial people please make sure that the current proposal satisfies all your needs. Thanks.
@raulchen we agree about the status. The question is how to let the user add more job-specific status information. A solution that is quite ugly is to have a generic status() API, and another API, internal_status() where the developer can provide job specific API.
@istoica
I think this API should only focus on generic status (in system level, e.g., job state, number of tasks/objects, etc). This is used for monitoring job's healthiness and managing jobs.
It will be much more complex to also include application-level status (e.g., training accuracy), because 1) as you mentioned, we'll need to provide an API to let users define their metrics; 2) users need to tell us how to aggregate and display the metrics.
For application-level metrics, it makes sense to let users collect and process the metrics by themselves.
@istoica
We have an internal discussion in ant financial. We decide the separate the concept of batch api and APP management api.
User can submit a ray APP(a py file or a jar file) to ray cluster. User can submit job from either command line or the http portal. A http portal will be set up(run on gcs?) for user submitting APP, view APP status, cancel or re-submit an APP. User can assign priority for an APP(The max priority of a user and max resource usage for a user is managed by the cluster master). Scheduling module will schedule the task created by the APP by the priority and resource constrict. User can also use custom policy for the scheduling. Almost all the APP API are transparent to user, except the submit command line.
Batch API is used for manage the objects created in the batch scope. Batch scope can also set a timeout to do better task management(to avoid deadlock, the feature is also important for realtime scenario). The batch scope can be implemented by the with(python) and try(java). The resources allocated in the batch scope will be evicted out of scope except user make it visible manually.
@eric-jj
Sounds good. Can you please create the PRDs for job submit and batch? To make it easy, I made two copies of the previous Jobs PRD document (please feel free to modify them so that each of them contains only the relevant information):
I'd be happy to take a pass once you are done. Please let me know.
Also, when we get to the implementation, it would be great for the resource management and scheduling for batches and jobs to share the same mechanisms/implementation or to build on top of each other.
Thanks.
@istoica
The Batch PRD is almost done.
For the Job submit PRD, we are still working with the AI team, we expect to finalize it on early next week.
@eric-jj the batch PRD looks really good. Just left a few comments. (Sorry, what is you google e-mail address?)
@istoica have replied your comments and my gmail address is eric.[email protected]
Thanks, @eric-jj. I added a few more comments.
I think the Batch PRD is converging nicely. Any more requirements from the AI team or any other team from Ant (or Alibaba)?
Also, note that in the future we can use the batch abstraction to implement (1) data parallel operations, (1) a parallel SGD iteration, or maybe the full SGD, and (3) maybe support affinity or anti-affinity location-aware placement.
For the Batch PRD most requirement from us. For the job management PRD, we are still working with AI team on it. Expect we can make it ready today.
@istoica We complete the first version PRD for the Job submit. Please review and provide comments, thanks.
@eric-jj, it looks great! I left a few quick comments.
@eric-jj et al, there are a few relative minor comments in the Batch PRD. Otherwise we went again over the two PRDs today in our weekly meeting, and we are fine with the current PRDs.
@istoica @eric-jj we need also take into consideration the impact to FO for this batch approach. Considering the case where an actor is invoked by several interleaved batches, how we will handle the FO when one of the batch is garbaged collected (including the lineage information). This is challenging, as we've come up one solution before but never got it into production due to its complication and high overhead. I guess it is best that we can think it through before we really implement it.
@imzhenyu good point!
We discussed about this a bit here, but didn't add to the document. For actors declared outside the scope of a batch, we probably want to checkpoint their internal state as part of the cleanup. Also, if we whitelist some objects we might want to push these objects to persistent store, so we don't need to reconstruct them after the batch completes. Alternatively, we can checkpoint the out-of-scope actors when we enter the batch, and then replay the batch if we need some outputs that were created by that batch.
Anyway, I'll add some text about this in the document.
@istoica Checkpointing is indeed one way to mitigate this problem. The challenge is when there are too many interleaves that causes much checkpointing overhead. Another approach is to delay the garbage collection. @yuyiming can share more information about what we did before trying to address this problem, though that approach also has its own drawbacks. Hopefully, we may be able to combine the strength of both approaches.
@imzhenyu @eric-jj I updated the document added more precise description of the batch behavior, including some fault tolerance considerations.
Regarding @ericl' s request to be able to destroy all the state associated to the batch explicitly, here is a possible solution that keeps things simple for the typical users, and makes things possible for the sophisticated users, e.g., library developers:
The default API is unchanged:
batch = ray.batch(....)
with batch:
result = f.remote()
ray.do_not_free(result)
When the code in the batch's scope terminates, everything will be cleaned up except for result.
In addition, we can provide the library writers with an additional API:
batch.delete()
which will delete result as well.
If the user specifies no_cleanup=True in the batch declaration, then no state is cleaned up after with completes. In this case, batch.delete() will destroy everything.
Comments?
@yuyiming @imzhenyu can you please give an example "when there are too many interleaves that causes much checkpointing overhead"? Is this streaming? A pseudocode would be great, if possible. Thanks.
@istoica For example, if there is such a task executing pattern in an Actor: T1 (batch 1) -> S1 -> T2 (batch 2) -> S2 -> T3 (batch 1) -> S3 -> T4 (batch 2) -> S4 -> T5 (batch 1) -> S5 -> T6 (batch 2) -> S6 -> ..., where T for Actor tasks and S for Actor states after executing the corresponding tasks.
When batch 1 ends and we want to delete T1, T3 and T5, we must checkpoint S1, S3 and S5, because it is possible to re-execute T2, T4 and T6 in the future.
Later, I would explain my scheme to address this issue. Give me some time to make the explanation neater. :)
@istoica @imzhenyu I would explain my scheme in this comment.
For task execution chain in an Actor:
T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ...,
we use previous and next to describe the order of these tasks and states. For instance, T1 is previous to T_m, and S2 is the next state of T2.
My scheme is based on the fact that given a task T_m and the first checkpoint S_n next to it, if all the tasks between T_m (included) and S_n are marked as GCed (i.e. all of there batches have ended), T_m would never be re-executed due to lineage reconstruction along this task execution chain, but it may be re-executed due to lineage reconstruction along task execution chain of another Actor.
My scheme is described below:
GCed instead of being deleted immediately.GCed, we stop traversing and mark this task as ACTING. ACTING task being marked as GCed in the future, the traversal continues from it.All, regarding the API, I had a longer discussion with @ericl and here is his proposal, I agree with.
User API (this is pretty much the one in the Batch API document):
with ray.batch() as b:
do stuff
b.return(obj_result)
# All the state created by b in GCS and object store (except obj_result)
# is cleaned when b completes.
# b.return(obj_result) transfers the ownership of obj_result to parent batch.
Developer API; this is to provide more flexibility to hard core developers such as library developers:
static def new_batch_id()
static def set_current_batch_id(batch_id)
static def get_current_batch_id()
static def destroy_batch(batch_id)
static def transfer_ownership(obj, batch_id)
The user API is syntactic sugar for:
try:
prev_id = get_current_batch_id()
new_id = new_batch_id()
set_current_batch_id(new_id)
do stuff
transfer_ownership(obj_result, prev_id)
finally:
set_current_batch_id(prev_id)
destroy_batch(new_id)
Please let us know if you have any comments. If not I propose we update the Batch PRD accordingly. Thanks.
Regarding to the case where interleaving occurs a lot, considering you are building your own scheduler atop of Ray, e.g., for large amount of concurrent small graph jobs and online mr s.
The current API are mostly synchronous, which means we meed quite some threads for given concurrent small jobs (each as a batch). This is in particularly difficult without user level threading (which is challenging due to hybrid language runtime in the driver right now). One alternative is to use asynchronous API, which is however less easy for adoption. Probably need more discussion in the future for this.
@imzhenyu actually, I think the current API is asynchronous. The two batches below should execute in parallel. Or maybe you meant something else by synchronous?
with ray.batch() as b1:
result1 = f.remote()
b1.return(result1)
with ray.batch() as b2:
result2 = f.remote()
b2.return(result2)
results = ray.get(result1, result2)
@istoica @imzhenyu regarding the interleaved batches, maybe we could do this using the low level API, and by adding a new flag (e.g., checkpoint=False) to destroy_batch() indicating that no checkpoint should be done when batch's state is cleaned up.
Consider your example T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ..., Then create a batch for each Ti and do the following:
@ray.remote
class foo():
class declaration
A = foo.remote()
batch_stack = []
t1 = new ray.batch()
batch_stack.append(t1)
do stuff on actor A
t2 = new ray.batch()
batch_stack.append(t2)
do stuff on actor A
...
tn = new ray.batch()
batch_stack.append(tn)
do stuff on actor A
# checkpoint and clean up
t = batch_stack.pop()
ray.destroy_batch(t) # clean & checkpoint state of actor A
while len(batch_stack) > 0:
t = batch_stack.pop()
ray.destroy_batch(t, checkpoint=False) # clean & don't checkpoint state of A
Would this satisfy your requirements?
@istoica my bad for not explaining the concern about the sync/async APIs clearly. Indeed, the API itself is asynchronous, and we can get the results later as futures (illustrated in your example already). The concern is about the synchronous thinking style this API suggests, because we need to get the symbolic results of the batch immediately in the with block. This is reasonable for machine learning jobs in most cases I believe, as we use a controller to get things ready. However, considering the streaming scenarios, we may simply want to start the batch here and end the batch somewhere else (so called asynchronous:-)). Our current implementation in production adopts the later approach, where an endBatch API is given and developers can call endBatch in a worker that is not the driver. Then I just have a second look of your low level API, and probably the problem is already gone as we can have another syntactic sugar to cover the above case using these low API.
@istoica regarding to your solution to the interleaved batch, I'll discuss with @yuyiming etc. in Ant tomorrow and get back to you asap. Thx.
@istoica Sorry for my obscure expression. I would give a pseudocode for the scenario:
@ray.remote
class Counter(object):
def __init__(self):
self.counter = 0
def incAndGet(self):
self.counter += 1
return self.counter
@ray.remote
def other_func(r):
# do stuff. For example:
print(r)
@ray.remote
def batch_func(counter):
b = ray.batch()
r1 = counter.incAndGet.remote()
other_func.remote(r1)
time.sleep(10)
r2 = counter.incAndGet.remote()
other_func.remote(r2)
time.sleep(10)
r3 = counter.incAndGet.remote()
other_func.remote(r3)
time.sleep(10)
b.return(None) # destroy_batch
g_counter = Counter.remote()
batch_func.remote(g_counter) # batch 1
time.sleep(5)
batch_func.remote(g_counter) # batch 2
This code might generate a task execution chain in Actor g_counter:
# T1.1 -> S1 -> T2.1 -> S2 -> T1.2 -> S3 -> T2.2 -> S4 -> T1.3 -> S5 -> T2.3 -> S6 -> ...
# | | | | | |
# v v v v v v
# R1.1 R2.1 R1.2 R2.2 R1.3 R2.3
# | | | | | |
# v v v v v v
# ... ... ... ... ... ...
where Ti.j refers to the jth task in batch i, R refers to the return value of corresponding task, and S refers to some Actor states.
When batch 1 returns, if we want to delete task info of this batch in GCS, we must checkpoint S1, S3 and S5, because T2.1, T2.2 or T2.3 might be re-executed for the reconstruction of R2.1, R2.2 or R2.3 in the future. This is what @imzhenyu said: interleaves cause much checkpointing overhead.
In my scheme, Ray framework need not to checkpoint automatically when a batch ends. Checkpointing could be done:
ray.checkpoint() which tells Ray to checkpoint the Actor state after the caller returned. Checkpointing could be done in any Actor task.So, in my example:
T1 -> S1 -> T2 -> S2 -> ... -> T_m -> S_m -> ... -> T_n -> S_n (checkpoint) -> ...
I mean the checkpointing on S_n is done by the checkpointing algorithm such as in Ray 0.4.0. Besides, Each T_i might belong to any batches.
Please think about my scheme under these explanation. Thanks a lot.
@yuyiming thanks for the example.
Quick question: when does a batch end or is destroyed in your case, i.e., when are a batch resources cleaned up?
@istoica b.return(None) in my code implies that the batch ends
@yuyiming thanks, got it. So the key problem here is having two batches that invoke methods on the same actor, and having these invocations interleaving. I think I understand your solution now. Basically, this kind of interleave introduces dependencies between batches. You cannot just clean the state of one batch since the other batch might depend on the actor state updated by the first batch.
@istoica Yes, interleaving is a complicated problem.
My solution could be summarized as follows:
When a batch ends, we do not delete the task and object info in GCS immediately, instead we wait for some specific conditions indicating that the batch info could be deleted safely.
@yuyiming Hi, Yiming. In your case, you mentioned that we may need to checkpoint S1, S3 and S5, because T2.1, T2.2 or T2.3 might be re-executed for the reconstruction of R2.1, R2.2 or R2.3 in the future. By our API design, we could add R2.1, R2.2 and R2.3 to the GC whitelist which will not be cleaned. Will this whitelist help to avoid reconstruction of R2.1, R2.2 or R2.3 and hence avoid the checkpoint of S1, S3?
@guoyuhong Hi, yuhong. Regarding the highly interleaved batches, it is also too consuming to add so many return values to GC whitelist and to persistently store them.
Furthermore, it might be better to combine these two schemes. For the scenario in which interleaving rarely occurs, checkpointing in boundary has a virtue of simplicity, otherwise delayed GC (my solution) would save much checkpointing and data-persistence expenditure.
@yuyiming (@guoyuhong, @imzhenyu) just to make sure we are on the same page, here is one way to implement your scheme. Please let me know if this looks good to it.
First we say that batch b1 depends on batch b2 if a task of b1 uses a state or result produced by b2 before b2 completes.
We add to each batch has two flags:
status: this flag takes two values, active, which is set when the batch is created, and done, which is set when the batch completes.dependency: this flag also takes two values, dirty if another batch depends on this one (i.e., it uses a state/result produced by this task before this batch has finished), clean if no other batch depends on this batch.In addition, each batch contains two lists:
depends_on_list: the list of batches this batch depends on;dependent_list: the list of batches that depend on this batch; note that if this list has at least one entry then dependency = dirty.Then the cleanup algorithm is something like this:
# called when batch is done or when a dependent batch is cleaned up
cleanup(b):
if (status = done) && (dependency = clean):
checkpoint every return result and the state of every actor that batch b had modified;
clean up all GCS state and any other objects created by batch b;
for every batch b1 in depends_on_list:
b1.dependent_list.remove(b) # we just cleaned b so b1 has no longer b as dependent
clean(b1)
I'll add this pseudocode to the document later today so we can refine it.
Seems good to me. While this is more general, one potential problem is whether there can be cycles in this batch level dependency graph, because batches are not atomic and the dependencies can be interleaved in the middle.
Looks good to me. The virtue is that it handles dependency in batch level. I have some comments:
dependency, clean should implies that no other active batch depends on this batch. Otherwise if two batches mutually depends on each other, they would reach a deadlock.@yuyiming and @imzhenyu, thanks for the comments. I added another cleanup proposal which should work for arbitrarily interleaved batches. See "Garbage collection for interleaving batches" here: https://docs.google.com/document/d/1sMgVMajomF5j-GDkzhuejrAwLcxXNKY9lTmlDo7GkqM/edit?ts=5b360317#.
In a nutshell here is the scheme:
I suggest that once we are comfortable with this or another scheme, we are starting the implementation. Thanks.
@istoica My concern is how to checkpoint the state of every Actor a finished batch has modified. For the below timeline:
# Actor 1 -----S1.1----------------S1.2------------ -> timeline
# ^ ^
# | T1.1 | T1.3 (finish)
# Batch 1 ----------------------------------------- -> timeline
# | T1.2
# v
# Actor 2 ---------S2.1----S2.2-------------------- -> timeline
# ^ (Actor 2 is not on the boundary
# | T2.1 of Batch 1 when Batch 1 finishes)
# Batch 2 ----------------------------------------- -> timeline
When Batch 1 is done, it can NOT checkpoint the state of Actor 2 on its boundary (S2.1), but Actor 2 is on the state S2.2.
@yuyiming Not sure I see the problem here. First, in this case, we will do cleanup only after Batch 2 finishes as well. Before then we have all the GCS lineage so we can reconstruct all the objects and actor states, if needed.
Second, even if Batch 1 finishes and we clean up its state right away things should still work in this case. Say, Batch 1 finishes, its state is cleaned up (i.e., actor states S1.2 and S2.1 are checkpointed), and right after this we have a failure that kills Actor 2. In this case Batch 2 can reconstruct the state of Actor 2, S2.2., by starting from checkpoint S2.1 and then replaying T2.1.
Also, yesterday we did discuss a simple solution during a Ray meeting that only cleans up the object store. In a nutshell:
ray.add_return().All, we have had some internal discussions at Berkeley of whether the batch API should be asynchronous or synchronous:
In the async case
with ray.batch() as mybatch:
a = f1.remote()
b = f2.remote()
c = f3.remote(a, b)
fun4.remote()
from the user's viewpoint the batch "finishes" as soon as f3.remote(a, b') is submitted, and hence fun4.remote() can finish before f3.remote(a, b'). (From the system perspective however the batch always completes only after the last function in the batch finishes, i.e., after f3.remote(a, b') finishes.)
In the sync case, fun4.remote() is submitted only after f3.remote(a, b') finishes.
Also note that in the asynchronous case, one can still force synchronous execution by using ray.wait(), e.g.,
with ray.batch() as mybatch:
a = f1.remote()
b = f2.remote()
c = ray.wait(f3.remote(a, b))
fun4.remote()
Any comments give your use cases?
@istoica The problem with checkpointing actor state on batch completion is that you don't know a given task is the last actor task in a batch before hand, because other tasks may further emit more actor tasks, unless with certain restricted API design (e.g., static data flow as we can do the mark before hand). That's why @yuyiming said that it is difficult to do the checkpoint when a batch completes for the actors as their state are polluted by others already.
Regarding to the async/sync API design, I think both approaches you suggest are fine, and users should decide which one to use. However, my concern is about which task is considered the batch completion task in the API invocations. In a _sync_ design, the last remote call in the with block is considered the one. While in an _async_ design, the last remote call is merely considered a common task in the batch, and one further task (directly or indirectly) triggered in these tasks within the with block is the completion task. IMHO, the API design in the latter case fits better in the streaming case while the API design in the former case may work in many other scenarios where the driver totally commands almost everything.
@imzhenyu When you are saying "other tasks may further emit more actor tasks" are you referring to something like the following code, where parent() calls child() which invokes a method on actor a?
@ray.remote
fun child(a, b):
a.method.remote(b)
...
@ray.remote
fun parent(a):
child.remote(a, b)
a = Actor.remote()
width ray.batch():
...
parent.remote(a)
2) Regarding async vs sync, just to make sure: you are saying asyn API works better for streaming, right? How hard would be to provide the same functionality with a sync API?
@imzhenyu and @yuyiming another quick question. Would it be ok to have only one add_return() statement, and for that statement to be the last one in the batch? If not, can you give an example of functionality you cannot implement assuming this limitation?
@istoica For this pattern:
# Actor 1 -----S1.1----------------S1.2---------|--- -> timeline
# ^ ^ |
# | T1.1 | T1.3 (finish) |
# Batch 1 --------------------------------------|--- -> timeline
# | T1.2 |
# v |
# Actor 2 ---------S2.1----S2.2-----------------|--- -> timeline
# ^ |
# | T2.1 |
# Batch 2 --------------------------------------|--- -> timeline
# |
# (cleanup moment of batch 1)
S2.1 can NOT be checkpointed when batch 1 finishes, because at this moment Actor 2 is on the state S2.2. Although we can reconstruct the Actor state S2.1 by re-executing tasks, I think this is too consuming: It needs to re-run almost all the tasks to checkpoint all the boundary Actor states under highly interleaved batches.
On the other words, when T1.2 finishes, we do not know it is the last task of batch 1 in Actor 2, thus we do not have enough information to decide whether to checkpoint this Actor at this moment.
If I misunderstand the scenario, please point it out. Thanks a lot.
@istoica regarding to the previous 2 questions about sync/async. (1). "other tasks may further emit more actor tasks" Yes, it is correct. (2). Yes, async API is better for streaming, IMHO. Implementing that with sync API is doable I guess, but it will introduce unnecessary delays and complications, because streaming is more similar to one way RPC, where the upstream nodes do not care much about what the downstream nodes will do. With a add_return statement within the batch, we can implement the sync API but not the async ones. One possible way is to have a ray.end_batch API that can be invoked _anywhere_ within the batch.
@istoica My previous statement about unnecessary delay may be wrong, as it is possible to emit the result to the environment before we end a batch (though extra notifications are needed to notify the driver after emitting results). Then I guess we may limit the API design to this sync version, which is simpler to use (though less flexible but good enough already for many cases) and easier to implement.
@imzhenyu can you provide an example of why we need multiple return statements in the same batch? Can't we emulate the behavior by having one batch per return? If we allow multiple return statements and the batch is async, it could be difficult to figure out when we can clean up the object store. Also the semantics of a batch with multiple return statements seems more complex/confusing.
Basically, I'm thinking of something like this where batch.end takes as argument a list of objects to return:
x = fun1.remote()
batch = ray.batch()
y = fun2.remote(a)
z = fun3.remote(c)
batch.end([z]) # change ownership of `z` to parent batch
Upon finishing the batch, the above code will clean up y but not z (as it was returned by the group) and x (as it was declared outside group's scope).
And, here is the with version which is virtually identical:
x = fun1.remote()
with ray.batch() as batch:
y = fun2.remote(a)
z = fun3.remote(c)
batch.return([z])
@imzhenyu BTW, in the above example garbage collection happens only after the batch ends. Nothing will be garbage collected before then. So as long as you add an object to the return list, you can safely use that object elsewhere as soon as it is created as you know it won't be garbage collected during or after the batch ends (no need to wait for return -- probably need to change the name again to indicate that you do not need to wait for return to use an object).
In the above example, say you do a lot of stuff after creating z. You can still use z safely before the batch ends, as none of the objects will be garbage collected before then.
Wouldn't this be enough for your cases?
@imzhenyu Maybe rename return to keep, i.e., batch.keep(object_list, actor_list) ?
@yuyiming and @zhijunfu Thanks for your example illustrating that it is hard to checkpoint actors on the batch frontier. However, the proposal I added to the document should still work correctly. Is the a problem you have in mind an efficiency or correctness problem?
Anyway, I was thinking about the following alternative that will make the above discussion moot: how about checkpointing the actors at the beginning of the batch instead of at the end? As long as the actors are checkpointed frequently enough it shouldn't matter whether we do it at the beginning or the end. In a streaming example, just think of the next batch checkpointing the state of the current batch. This would be much easier to implement and it will have the added benefit to enable rollback, e.g., in an SGD iteration you can rollback to the initial state of an iteration if an actor fails, which is what you want.
@istoica Agree that each batch should only have one return statement (when I say anywhere for ray.end_bath I should say anywhere but only once:-)). That's also one subtle issue regarding the async design, as developers should guarantee this invariant.
As for the return(z) design, I think it is great, though we still need to figure out a way to garbage collect z in the end, esp. for online computing services. Currently in our production scenario, we don't have z returned but sent to remote services directly in the end batch task.
@imzhenyu can you please give an example showing this pattern, i.e., (1) needing to have a return statement that is not the last statement in the batch, and (2) having to garbage collect z at a latter point ? For (2) maybe we can use nested batches, so z is garbage collected in the outer batch. For instance consider the following code snippet:
with ray.batch() as outer:
x = fun1.remote()
with ray.batch() as inner:
y = fun2.remote(a)
z = fun3.remote(c)
inner.return([z]) # move z to outer's scope
# inner done; z is not garbage collected since it is in outer's scope
do something more in inner's scope
# outer done; z is garbage collected
@istoica I don't have an immediate example for both. But for (1), considering a streaming example where the initial points only calls a function without a return value (as it is the ingest nodes), and the batch ends in a later stage deployed in another actor. For (2), considering caching the results for the computation requests, then you need to collect z a later point.
@imzhenyu Thanks. Correct me if I'm wrong but for (1) I think that you are saying there is no return statement, right?
Also, for (2) you are basically saying that you want to garbage collect an object outside the initial with statement where it was created, e.g., imagine a queue where z is created by an actor method (e.g., enqueue()), and you want to garbage collect after another method, dequeue(), removes it. Correct?
@istoica I think one solution is to checkpoint Actor states on all batch transition points (batch boundaries). For instance, When T2.1 is received by Actor 2, S2.1 should be checkpointed before the execution of T2.1 because it is generated by another batch (T1.2). We say that S2.1 is a batch transition point from batch 1 (T1.2) to batch 2 (T2.1).
@istoica yes, there is no returned value or the batch returns empty results. (2) is correct.
@yuyiming and @imzhenyu, given that there are still questions about the use cases and hence about the high level API, how about starting with a low-level API, use it, and build a higher level API as we get more experience with the use cases. So how about the following low-level API:
ray.free(object_list): delete objects in the object_list from the object store. We already have this functionality so it's a matter of exposing it. This will enable avoid polution of the object store as in your streaming app.ray.checkpoint(actor): checkpoint actor state. That is, instead of transparently checkpointing the actor state, give the developer the ability to do it explicitly. This should be enough for implementing fault-tolerant streaming apps.@ray.remote(fault-tolerance = false): disable actor fault-tolerance and raise exception when the actor fails. This will give control to the developer, and allow her/him to ignore such failures, e.g., an ML algorithm can ignore some results.ray.kill(actor) and ray.kill(task): ability to kill actors and tasks. This is needed to implement expiration timers or straggler mitigation.Please let me know if the above API could cover all your use cases. Again, the key question here is feasibility not convenience. Thanks.
@istoica Agree, we can definitely go further first. I think @guoyuhong is already working on it :-)
@imzhenyu sounds good. So let's then go with the above plan then.
According to @stephanie-wang we already have functionality for (2) but we need to expose it. @robertnishihara is looking at (3).
Regarding (4) we need to think a bit of semantics, e.g.,
Also, a better name for this API is ray.cancel() instead of ray.kill().
@imzhenyu, @eric-jj, @yuyiming, @robertnishihara, @pcmoritz I added the above proposal to the end of our design document: https://docs.google.com/document/d/1sMgVMajomF5j-GDkzhuejrAwLcxXNKY9lTmlDo7GkqM/edit?ts=5b360317#
Also wrote another small document about (4) here: https://docs.google.com/document/d/1kcl5SHXtQJQ5_F69rSRgbLrJ9LYDPEmY-totALi7BTc/edit#
Please take a look and add any comments you might have.
I really think we should start with implementing this proposal and use if for various example apps to get experience with it. Since (4) is the most complex, in the first phase I propose we do the first tree.
@imzhenyu, @eric-jj, @yuyiming, @robertnishihara, @pcmoritz Just added some examples and updated the two documents: https://docs.google.com/document/d/1sMgVMajomF5j-GDkzhuejrAwLcxXNKY9lTmlDo7GkqM/edit?ts=5b360317# and https://docs.google.com/document/d/1kcl5SHXtQJQ5_F69rSRgbLrJ9LYDPEmY-totALi7BTc/edit#
@istoica Sorry for the late response. Regarding to the semantic of kill & delete, I'm wondering it is better that a FO system does not have user-facing decreasing APIs as it is against the lineage based FO (but we can have them internally though). Instead, we may use the similar approach for how we handle the common objects using GC.
@imzhenyu Thanks for the note. Can you please be more precise about GC? When would you GC? Also, what about cancelling tasks/methods on timeouts? A few notes:
ray.free() doesn't remove the lineage, so you can still reconstruct the the object. It just removes it from the object store. So for all practical proposes it is a hint. Maybe we want different name?ray.kill() to ray.cancel() (see the documents). This is basically needed to implement expiration and it is equivalent to it. The expiration concept itself is a decreasing API so if we want this functionality not sure how to get around it.@istoica Hi, Ion, I'm implementing ray.free(). From current implementation, the object list passed to ray.free() will not be deleted immediately. Each worker will held a PlasmaClient which will hold objects and increase the object ref counts in Object Store. And there is a L3Cache mechanism to delay the PlasmaClient release. The deletion will happen when the ref count is 0. Therefore, we can name the function ray.garbage_collet() or ray.object_store_gc(), which reminds the user there is a delay in deletion.
Moreover, since these functions are low-level functions, can we put them into another namespace to mark that these functions should be used carefully? For example, we can use ray.internal.garbage_collet().
@guoyuhong Can you please write down the exact algorithm you are planning to implement? When is the reference counter increased, and when it is decreased? What happen in case of failure? (As a disclaimer, we did consider reference counting early on, but we gave up since maintaining the counter in a distributed system in the presence of failures and message losses is complex; it's equivalent with implementing transactions in a distributed system, as you need to provide exactly once semantics.)
Again, I think that just deleting the objects or marking them for deletion so you evict them first when the object store is full, would be a fine solution--simple and effective. This is because we do not delete the lineage from GCS (we only flush it) so you can still reconstruct the objects after removing them from the object store, if needed.
Yes, using ray.internal sounds fine.
Current object store has implemented the ref count. Here is the code. The ref count is used to count the usage of an object by all the Plasma Clients. I have send a PR to cache the delete object ids, and these objects will be deleted once its ref count is 0. However, the L3 cache in PlasmaClient will held the Release request until the cache condition satisfies. Current condition is that the release cache has more than 64 items or cache memory reaches 100MB. Otherwise, this client will hold the reference of the target objects even it is released.
My current object store free algorithm:
From my current experiments, this algorithm works. But we need to wait until there are more than 64 items pushed to the L3 cache of Plasma Clients.
@guoyuhong Thanks for clarifications. I think there was a confusion. In my original post, I was referring to having global reference counters. The ones in Ray are per node.
At the first sight, your solution makes sense.
Also, I think that waiting for cache condition is not a big deal as 100MB is relatively tiny for today's machines. But we might want to change this, especially if we want to run Ray on edge devices as some users do.
I don't think this is relevant anymore now that we're working on doing exact GC for ObjectIDs. Please feel free to re-open if you disagree.
Most helpful comment
A different API I'd propose here is to use context managers for this:
Usage with the
withstatement:Context managers can also be used without the
withstatement if needed, see the python documentation: https://docs.python.org/2/reference/datamodel.html#context-managersThe semantics here would be that there is one default job per driver and you can create custom ones that inherit all objects and function definitions from that default job. It is not allowed to pass objects between these inner jobs, so after the context manager is closed on the driver and all tasks from that job have finished, things can be cleaned up.