Ray: [ray] out of memory problem when using numpy vectors in common object store

Created on 20 Jun 2020  路  5Comments  路  Source: ray-project/ray

What is your question?

I am computing the pairwise dot products from a set of numpy vectors using Ray. For small number of vectors (eg for N=4) this works fine with the below implementation. However, when N=3000 the program runs up to the line references = [...]. On this line the program spends a lot of time and memory use slowly increases until the program crashes with an out of memory error.

If I understand correctly, the memory use in the above program should be minimal after the vectors have been put into the common object store. Also the total memory used by all numpy vectors is ~24M when N=3000, and the machine has 64GB of memory.

Could it be that the vectors are deserialized from the common storage for each function call, and that memory is somehow not released?

Any ideas how to approach this/or are there any workaround?

Thank you!

import numpy as np
import itertools, ray, sys
ray.init()

N = 4 # works fine
#N=3000 # crashes

vectors = [np.random.rand(1000) for _ in range(N)]
print("Total memory used by numpy vectors", sum([v.nbytes for v in vectors]))

print("Putting vectors in common memory")
remote_lookup = [ray.put(v) for v in vectors]

idx_pairs = list(itertools.combinations(range(len(remote_lookup)), 2))
print("Number of index pairs", len(idx_pairs))      #= 4498500 when N=3000

@ray.remote
def inner_product(u, v):
    return np.dot(u, v)

print("Compute inner products in parallel..") 
references = [inner_product.remote(remote_lookup[i], remote_lookup[j]) for i, j in idx_pairs]

print("Getting results..")
results = [ray.get(r) for r in references]

for entry in zip(idx_pairs, results):
    print(entry)

For N=4 the output is of the form:

((0, 1), 252.13786148167642)
((0, 2), 248.4986942689117)
((0, 3), 251.32252423180103)
((1, 2), 248.3331002589962)
((1, 3), 248.72755759077816)
((2, 3), 253.47336096240747)

Versions:

  • Python 3.7.6
  • Ray 0.8.5
  • Numpy 1.18.2
  • Linux/Ubuntu

The documentation does mention that there is a memory leak in Python versions, see here. For this reason I tested the problem with 3.7.6.

question stale

All 5 comments

Batching the computation does help with memory. For example, as follows:

import numpy as np
import itertools, ray, sys
ray.init()

N = 3000

vectors = [np.random.rand(1000) for _ in range(N)]
remote_lookup = [ray.put(v) for v in vectors]

idx_pairs = list(itertools.combinations(range(len(remote_lookup)), 2))

@ray.remote
def inner_product(u, v):
    return np.dot(u, v)

def group_iterator(iterator, size):
    # split iterator into batches of size `size`
    assert size > 0
    buffer = []
    for x in iterator:
        if len(buffer) >= size:
            yield tuple(buffer)
            buffer = []
        buffer += [x]
    if len(buffer) > 0:
        yield tuple(buffer)

results = []
for batch in group_iterator(idx_pairs, 10000):
    batch_results = [inner_product.remote(remote_lookup[i], remote_lookup[j]) for i, j in batch]
    results += [ray.get(r) for r in batch_results]

However, this seems slow. Even if the number of input vectors is reduced to N=1000 this takes around a minute. For comparison, the below multiprocessor code takes just a few seconds when N=3000 (on 24 cores).

import numpy as np
import itertools
import multiprocessing as mp 

N = 3000

vectors = [np.random.rand(1000) for _ in range(N)]
idx_pairs = list(itertools.combinations(range(len(vectors)), 2))

def inner_product(ij):
    i, j = ij
    return np.dot(vectors[i], vectors[j])

def parallel_map(f, array):
    pool = mp.Pool(mp.cpu_count())

    try:
        result = list(pool.map(f, array))
    finally:
        pool.close()

    return result

results = parallel_map(inner_product, idx_pairs)

This doesn't directly answer your question, but

  1. You're calling one remote function per inner product, instead, you'll want to have each remote function handle a batch (e.g., you could just submit K tasks in total, where K is the number of CPU cores).
  2. I'd suggest making the vectors one big numpy array, e.g., vectors = np.random.rand(N, 1000).
  3. The vector computations can be vectorized to speed it up.

Note that if you want to exactly use the multiprocessing syntax (but run on a cluster with Ray), you can use our drop in replacement. https://docs.ray.io/en/master/multiprocessing.html

I am interested in your memory problem. Refactoring as @robertnishihara mentions sounds like a good way forward for your case but I'd like to know why you are having memory problems with the specific case. Is it problems with actual memory or object store memory? I think understanding the specific issue in your case can help others as well.

Thank you both for your input.

@robertnishihara Yes, what you suggest is good. Computing pairwise dot products is much faster with numpy's internal method. Thus, for my part the issue is resolved.

Out of interest I did some benchmarking between Ray's multiprocessing-interface, multiprocessing as-is and numpy matrix multiplication:

| | N=500 | N=1000 | N=1500 | N=2000 |
|---------------------------|---------------|---------------|-----------------|---------------|
| numpy | 0.05 sec | 0.06 sec | 0.13 sec | 0.21 sec |
| multiprocessing | 0.9 sec | 2.9 sec | 6.4 sec | 10.7 sec |
| Ray (mp interface) | 12.8 sec | 30.3 sec| 58.9 sec | 103.0 sec |

The codes for this attached below.

@virtualluke

Is it problems with actual memory or object store memory? I think understanding the specific issue in your case can help others as well.

I am not sure. Can I check that in some way? The code does seem to work for small input, but just very slowly. For larger N the code in the ticket crashes with out of memory.


Benchmark codes (for reference)

Numpy

import numpy as np
import timeit

N = 2000 # 500, 1000, 1500, 2000

vectors = np.random.rand(N, 1000)

def do_multiplication():
    result = vectors @ vectors.transpose()
    assert result.shape == (N, N)

print(
    timeit.timeit(
        "do_multiplication()", number=10, globals=globals()
    )
)

Ray (mp interface)

Change import multiprocessing as mp into from ray.util import multiprocessing as mp in the multiprocessing code

multiprocessing

import numpy as np
import itertools, timeit
import multiprocessing as mp

N = 2000 # 500, 1000, 1500, 2000

vectors = np.random.rand(N, 1000)

idx_pairs = list(itertools.combinations(range(N), 2))

def inner_product(ij):
    i, j = ij
    return np.dot(vectors[i, :], vectors[j, :])

def parallel_map(f, array):
    pool = mp.Pool(24)

    try:
        result = list(pool.map(f, array))
    finally:
        pool.close()

    return result

print(
    timeit.timeit(
        "parallel_map(inner_product, idx_pairs)", number=10, globals=globals()
    )
)

# Note seems the `number=10` parameter did not repeat the experiments multiple times. 

Hi, I'm a bot from the Ray team :)

To help human contributors to focus on more relevant issues, I will automatically add the stale label to issues that have had no activity for more than 4 months.

If there is no further activity in the 14 days, the issue will be closed!

  • If you'd like to keep the issue open, just leave any comment, and the stale label will be removed!
  • If you'd like to get more attention to the issue, please tag one of Ray's contributors.

You can always ask for help on our discussion forum or Ray's public slack channel.

Was this page helpful?
0 / 5 - 0 ratings