I am computing the pairwise dot products from a set of numpy vectors using Ray. For small number of vectors (eg for N=4) this works fine with the below implementation. However, when N=3000 the program runs up to the line references = [...]. On this line the program spends a lot of time and memory use slowly increases until the program crashes with an out of memory error.
If I understand correctly, the memory use in the above program should be minimal after the vectors have been put into the common object store. Also the total memory used by all numpy vectors is ~24M when N=3000, and the machine has 64GB of memory.
Could it be that the vectors are deserialized from the common storage for each function call, and that memory is somehow not released?
Any ideas how to approach this/or are there any workaround?
Thank you!
import numpy as np
import itertools, ray, sys
ray.init()
N = 4 # works fine
#N=3000 # crashes
vectors = [np.random.rand(1000) for _ in range(N)]
print("Total memory used by numpy vectors", sum([v.nbytes for v in vectors]))
print("Putting vectors in common memory")
remote_lookup = [ray.put(v) for v in vectors]
idx_pairs = list(itertools.combinations(range(len(remote_lookup)), 2))
print("Number of index pairs", len(idx_pairs)) #= 4498500 when N=3000
@ray.remote
def inner_product(u, v):
return np.dot(u, v)
print("Compute inner products in parallel..")
references = [inner_product.remote(remote_lookup[i], remote_lookup[j]) for i, j in idx_pairs]
print("Getting results..")
results = [ray.get(r) for r in references]
for entry in zip(idx_pairs, results):
print(entry)
For N=4 the output is of the form:
((0, 1), 252.13786148167642)
((0, 2), 248.4986942689117)
((0, 3), 251.32252423180103)
((1, 2), 248.3331002589962)
((1, 3), 248.72755759077816)
((2, 3), 253.47336096240747)
Versions:
The documentation does mention that there is a memory leak in Python versions, see here. For this reason I tested the problem with 3.7.6.
Batching the computation does help with memory. For example, as follows:
import numpy as np
import itertools, ray, sys
ray.init()
N = 3000
vectors = [np.random.rand(1000) for _ in range(N)]
remote_lookup = [ray.put(v) for v in vectors]
idx_pairs = list(itertools.combinations(range(len(remote_lookup)), 2))
@ray.remote
def inner_product(u, v):
return np.dot(u, v)
def group_iterator(iterator, size):
# split iterator into batches of size `size`
assert size > 0
buffer = []
for x in iterator:
if len(buffer) >= size:
yield tuple(buffer)
buffer = []
buffer += [x]
if len(buffer) > 0:
yield tuple(buffer)
results = []
for batch in group_iterator(idx_pairs, 10000):
batch_results = [inner_product.remote(remote_lookup[i], remote_lookup[j]) for i, j in batch]
results += [ray.get(r) for r in batch_results]
However, this seems slow. Even if the number of input vectors is reduced to N=1000 this takes around a minute. For comparison, the below multiprocessor code takes just a few seconds when N=3000 (on 24 cores).
import numpy as np
import itertools
import multiprocessing as mp
N = 3000
vectors = [np.random.rand(1000) for _ in range(N)]
idx_pairs = list(itertools.combinations(range(len(vectors)), 2))
def inner_product(ij):
i, j = ij
return np.dot(vectors[i], vectors[j])
def parallel_map(f, array):
pool = mp.Pool(mp.cpu_count())
try:
result = list(pool.map(f, array))
finally:
pool.close()
return result
results = parallel_map(inner_product, idx_pairs)
This doesn't directly answer your question, but
vectors = np.random.rand(N, 1000).Note that if you want to exactly use the multiprocessing syntax (but run on a cluster with Ray), you can use our drop in replacement. https://docs.ray.io/en/master/multiprocessing.html
I am interested in your memory problem. Refactoring as @robertnishihara mentions sounds like a good way forward for your case but I'd like to know why you are having memory problems with the specific case. Is it problems with actual memory or object store memory? I think understanding the specific issue in your case can help others as well.
Thank you both for your input.
@robertnishihara Yes, what you suggest is good. Computing pairwise dot products is much faster with numpy's internal method. Thus, for my part the issue is resolved.
Out of interest I did some benchmarking between Ray's multiprocessing-interface, multiprocessing as-is and numpy matrix multiplication:
| | N=500 | N=1000 | N=1500 | N=2000 |
|---------------------------|---------------|---------------|-----------------|---------------|
| numpy | 0.05 sec | 0.06 sec | 0.13 sec | 0.21 sec |
| multiprocessing | 0.9 sec | 2.9 sec | 6.4 sec | 10.7 sec |
| Ray (mp interface) | 12.8 sec | 30.3 sec| 58.9 sec | 103.0 sec |
The codes for this attached below.
@virtualluke
Is it problems with actual memory or object store memory? I think understanding the specific issue in your case can help others as well.
I am not sure. Can I check that in some way? The code does seem to work for small input, but just very slowly. For larger N the code in the ticket crashes with out of memory.
import numpy as np
import timeit
N = 2000 # 500, 1000, 1500, 2000
vectors = np.random.rand(N, 1000)
def do_multiplication():
result = vectors @ vectors.transpose()
assert result.shape == (N, N)
print(
timeit.timeit(
"do_multiplication()", number=10, globals=globals()
)
)
Change import multiprocessing as mp into from ray.util import multiprocessing as mp in the multiprocessing code
import numpy as np
import itertools, timeit
import multiprocessing as mp
N = 2000 # 500, 1000, 1500, 2000
vectors = np.random.rand(N, 1000)
idx_pairs = list(itertools.combinations(range(N), 2))
def inner_product(ij):
i, j = ij
return np.dot(vectors[i, :], vectors[j, :])
def parallel_map(f, array):
pool = mp.Pool(24)
try:
result = list(pool.map(f, array))
finally:
pool.close()
return result
print(
timeit.timeit(
"parallel_map(inner_product, idx_pairs)", number=10, globals=globals()
)
)
# Note seems the `number=10` parameter did not repeat the experiments multiple times.
Hi, I'm a bot from the Ray team :)
To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.
If there is no further activity in the 14 days, the issue will be closed!
You can always ask for help on our discussion forum or Ray's public slack channel.