import ray
import torch
ray.init(object_store_memory=int(100e6))
@ray.remote
def identity(vectors):
return [ray.put(ray.get(vec)) for vec in vectors]
obj_id = ray.put(torch.randn(int(1e5)))
vectors = [obj_id for _ in range(200)]
while True:
vectors = ray.get(identity.remote(vectors))
This code throws the following error.
2019-09-03 13:46:22,598 WARNING worker.py:1797 -- The task with ID ffffffffffffffffffff01000000 is a driver task and so the object created by ray.put could not be reconstructed.
(pid=38222) 2019-09-03 13:46:23,308 INFO worker.py:432 -- The object with ID ObjectID(7d58f415c89effffffff0100000000c001000000) already exists in the object store.
2019-09-03 13:46:28,320 ERROR worker.py:1737 -- Possible unhandled error from worker: ray_worker (pid=38222, host=atlas)
ray.exceptions.UnreconstructableError: Object ffffffffffffffffffff01000000008002000000 is lost (either LRU evicted or deleted by user) and cannot be reconstructed. Try increasing the object store memory available with ray.init(object_store_memory=<bytes>) or setting object store limits with ray.remote(object_store_memory=<bytes>). See also: https://ray.readthe
docs.io/en/latest/memory-management.html
However, if you replace the definition of obj_id with
obj_id = ray.put(list(range(int(1e5)))) then we get the correct error which @ericl 's recent PR added, or if you replace the definition with obj_id = torch.randn(int(1e5)):
(pid=46751) 2019-09-03 13:56:21,919 INFO worker.py:2381 -- Put failed since the value was either too large or the store was full of pinned objects. If you are putting and holding references to a lot of object ids, consider ray.put(value, weakref=True) to allow object data to be evicted early.
However, neither error should be raised -we have only 80 MB of objects and the object store has 100 MB capacity.
This is a more minimal version of #5586 which still errors even after the weakref PR, so CCing @richardliaw @stephanie-wang @ericl
It works fine for me up to 400 iterations if I just increase the object store size to 500e6. The thing's not very tolerant if you get close since it does bulk evictions of 30% at a time / memory fragmentation, I would leave at least 2x headroom. 5-10x if you can.
@ericl what you're proposing is not an acceptable solution for us, since we can't afford to waste 5-10x more RAM than is actually needed. This is some combination of bug and a design flaw in Ray, not a question, as you tagged it; please re-open the issue.
The point of this MWE is to expose the bug, not to serve as an excuse to close the issue because you got it to work for only 400 iterations by introducing 5x overhead in the OS. For small variants of @kiddyboots216's MWE that are closer to our actual code, I can't get it to work without making the OS 2GB = 25x overhead. But even 5x is (obviously) not acceptable. @ericl's weakref PR was supposed to provide a workaround for this issue, and the workaround does work sometimes, but not all the time. In particular, it doesn't work for this MWE or for our actual code.
There's an additional performance bug related to this in which our code gets slower and slower as we run more iterations. We think this is because Ray is evicting objects that still have references in the code even though there are a bunch of objects in the OS that don't. We end up using the former (but obviously never the latter), and Ray has to reconstruct them over an increasingly long lineage. We haven't distilled this into a MWE, and we probably won't given that it takes a while and doesn't seem to be productive.
That's surprising then. @richardliaw do you think this is some with the plasma references not getting immediately gced or something? 2x overhead is pretty fundamental due to the batch eviction strategy, but 25x is clearly a bug.
@ericl I can fix this issue by using ray_get_and_free in the MWE but it's slightly harder to apply to our actual code since we start to run into race conditions
This actually works fine at 1.5x overhead:
import time
import ray
import numpy as np
ray.init(object_store_memory=int(250e6))
@ray.remote
def identity(vectors):
return [ray.put(ray.get(vec)) for vec in vectors]
vectors = [ray.put(np.zeros(int(1e5), dtype=np.int32)) for _ in range(200)]
i = 0
while True:
vectors = ray.get(identity.remote(vectors))
i += 1
print(i)
print(ray.worker.global_worker.plasma_client.debug_string())
Can you identify parameters that make this into a 25x overhead? By the way it's actually using 160MB of memory not 80MB since the arguments take 80MB and the returns copy it once.
Thanks @ericl. I don't follow your argument for why it uses 160MB OS memory -- once the arguments are ray.get()ed in identity(), they can be evicted without causing an error later. Where does the factor of 2 come from?
Here's code that doesn't work:
import ray
import torch
import numpy as np
osSize = int(1000e6)
D = int(1e5)
nVecs = 200
ray.init(object_store_memory=osSize)
@ray.remote
def identity(vectors):
return [ray.put(ray.get(vec)) for vec in vectors]
obj_id = ray.put(torch.randn(D), weakref=True)
vecs = np.array([obj_id for _ in range(nVecs)])
while True:
idx = np.random.choice(nVecs, 4, replace=False)
news = ray.get(identity.remote(vecs[idx]))
vecs[idx] = news
We're storing 200 400kB vectors = 80MB, so 1GB OS is 12.5x what we actually need (and in this case your 2x argument wouldn't apply either way, since we only choose 4 at a time). It does work when I use 2GB OS, hence my 25x above (but actually I'm sure it'll work starting at something between 12.5x and 25x).
Thanks @ericl. I don't follow your argument for why it uses 160MB OS memory -- once the arguments are ray.get()ed in identity(), they can be evicted without causing an error later. Where does the factor of 2 come from?
Yes, if we evicted in optimal order. In practice, we evict is approximate LRU order, so you need 160MB to guarantee an argument is not evicted before you get it.
my 25x above
The reason your example doesn't work is since your workload does random reads of past object IDs. This means that after a while you just unlucky and haven't read an object iD for long enough and it get LRU evicted, hence the need for a huge buffer size to avoid this randomness.
There's a simple workaround though. Stop storing object ids (which aren't properly ref counted except for the initial driver put), and store the actual arrays instead. This runs forever for me:
import ray
import torch
import numpy as np
osSize = int(500e6)
D = int(1e5)
nVecs = 200
ray.init(object_store_memory=osSize)
@ray.remote
def identity(vectors):
return [vec for vec in vectors]
vecs = [torch.randn(D) for _ in range(nVecs)]
i = 0
while True:
idx = np.random.choice(nVecs, 4, replace=False)
news = ray.get(identity.remote([vecs[i] for i in idx]))
for ix, v in zip(idx, news):
vecs[ix] = v
print(i)
print(ray.worker.global_worker.plasma_client.debug_string())
i += 1
Here's another workaround that does explicit pinning (which is hackier but avoids the potential extra memcpy you might be thinking of complaining about next):
import ray
import torch
import numpy as np
osSize = int(500e6)
D = int(1e5)
nVecs = 200
ray.init(object_store_memory=osSize)
@ray.remote
def identity(vectors):
return [ray.put(ray.get(vec)) for vec in vectors]
obj_id = ray.put(torch.randn(D))
vecs = np.array([obj_id for _ in range(nVecs)])
i = 0
while True:
idx = np.random.choice(nVecs, 4, replace=False)
news = ray.get(identity.remote(vecs[idx]))
vecs[idx] = news
for obj_id in news:
obj_id.set_buffer_ref(ray.get(obj_id))
print(i)
print(ray.worker.global_worker.plasma_client.debug_string())
i += 1
We could attempt to do this auto ref counting on Object IDs returned by ray.get, but this would actually incur significant overhead for multi-node workloads since it would force a transfer to the local node, so it is better to wait for the proper distributed solution (cc @stephanie-wang ).
@ericl ok so this would be the correct usage of these internal methods, then?
@ray.remote
def identity(vectors):
vectors = ray_get_and_free(vectors)
vectors += 1
obj_ids = [ray.put(vec) for vec in vectors]
[pin_object_data(obj_id) for obj_id in obj_ids]
return obj_ids
Note that in our usecase it's not feasible to keep all the tensors in main memory; for a larger-scale usecase we'd like to have 800 * 1.5 GB = 1.2 TB of these tensors and it's just not feasible to call ray.get() on that in the driver or even have them lying around in memory, so I don't think either of the workarounds you gave are feasible for this large-scale usecase.
Well, if you can't keep them all in memory then it's natural that they are
evicted. I would suggest writing them to disk then.
On Wed, Sep 4, 2019, 6:26 PM Ashwinee Panda notifications@github.com
wrote:
Note that in our usecase it's not feasible to keep all the tensors in main
memory; for a larger-scale usecase we'd like to have 800 * 1.5 GB = 1.2 TB
of these tensors and it's just not feasible to call ray.get() on that in
the driver or even have them lying around in memory, so I don't think
either of the workarounds you gave are feasible.—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/5624?email_source=notifications&email_token=AAADUSUMUE6ZOZCJG7KEECDQIBN5VA5CNFSM4ITKXW62YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD55QMAY#issuecomment-528156163,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAADUSUHR3KHCZGLLEUB54TQIBN5VANCNFSM4ITKXW6Q
.
I think he just means that we want to make use of the fact that the OS is sharded across devices, which we can't do if the driver has to keep all the tensors locally. He doesn't mean that we want/need to write anything to disk.
I'm not sure what you mean by sharded by devices, do you mean multiple machines? It sounds like you'll need about 1.5TB of RAM. I'm not aware of any GPU machines that have this much memory in total, though you could create a cluster with a large-memory head node and GPU workers.
@ray.remote
def identity(vectors):
vectors = ray_get_and_free(vectors)
vectors += 1
obj_ids = [ray.put(vec) for vec in vectors]
[pin_object_data(obj_id) for obj_id in obj_ids]
return obj_ids
You misunderstand how Python ref counting works. This won't work. Once the method returns, obj_ids drops out of scope, and those objects can be evicted again. You need to pin those objects in a long-lived process, i.e., the driver.
The example I provided works.
Yes I mean multiple machines. Another way to phrase what @kiddyboots216 is saying is that we'd like to have 800 workers, with maybe 5% participation, so we'll be using 40 GPUs and need to store 1.2TB of parameter vectors. On p3 instances each GPU comes with 61GB host memory, so 2.4TB total. That's plenty to fit all the vectors we need, but no single machine can fit all of them.
The whole reason for using ray is so we can actually do distributed training. @kiddyboots216 is just saying that we don't want to hamstring ourselves here by requiring the driver to hold every vector locally.
Ok great, in this case I would recommend you switch to using actors. Actors are long-lived, so if you pin an object in the actor it will stay there for-ever as long as the actor is holding a reference to it.
@ericl it's not just drivers that I can pin the objects in, right -I could pin an object in an Actor?
Another recommendation: avoid passing around raw object ids. I would stick with numpy arrays, the overhead is not that much. If you need to refer to an array in some actor's memory, give it a random id or something. This completely avoids the sort of issues you've been running into.