Ray: Slowdown when using Ray with tqdm

Created on 24 Apr 2020  路  13Comments  路  Source: ray-project/ray

What is the problem?

We come from #5554

Ray version and other system information (Python version, TensorFlow version, OS):
ray==0.8.4
line_profiler==3.0.2
tqdm==4.38.0

Issue
When running the suggestion from #5554 with a function with arguments, the computation time with progress bar is much longer than without it. Also, as a curiosity the tqdm values seem to reset some time after the computation is finished.

Reproduction (REQUIRED)

Please provide a script that can be run to reproduce the issue. The script should have no external library dependencies (i.e., use fake or mock data / environments):

The code can be found as a notebook in https://colab.research.google.com/drive/1mQsYVHochZPITwL5fSIv2IxVY1c0cdQ6

I run a mock function test in 3 ways: parallelized with and without tqdm, and unparallelized with tqdm

def test(L):
        np.random.randint(0, 100, L)

test_remote = ray.remote(test)

def parallel_with_bar(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=os.cpu_count())

    def to_iterator(obj_ids):
        while obj_ids:
            done, obj_ids = ray.wait(obj_ids)
            yield ray.get(done[0])

    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = []
    for x in tqdm(to_iterator(obj_ids), total=len(obj_ids)):
        ret.append(x)

    ray.shutdown()


def parallel_without_bar(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=os.cpu_count())
    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = ray.get(obj_ids)

    ray.shutdown()


def single_with_bar(N, L):
    ret = [test(L) for i in tqdm(range(N))]

To profile each option I used line_profiler.

I can't get consistent results using CoLab's notebook, so I recommend running it locally.

In my computer I got the following profiling results (with 56 cpus):

Timer unit: 1e-06 s

Total time: 23.2731 s
File: <ipython-input-3-a3749fc5b867>
Function: parallel_with_bar at line 7

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     7                                           def parallel_with_bar(N, L):
     8         1     941704.0 941704.0      4.0      ray.init(ignore_reinit_error=True, num_cpus=n_cpus)
     9                                           
    10         1         10.0     10.0      0.0      def to_iterator(obj_ids):
    11                                                   while obj_ids:
    12                                                       done, obj_ids = ray.wait(obj_ids)
    13                                                       yield ray.get(done[0])
    14                                           
    15         1    2229748.0 2229748.0      9.6      obj_ids = [test_remote.remote(L) for i in range(N)]
    16         1          3.0      3.0      0.0      ret = []
    17      5001   18927016.0   3784.6     81.3      for x in tqdm(to_iterator(obj_ids), total=len(obj_ids)):
    18      5000      12861.0      2.6      0.1          ret.append(x)
    19                                           
    20         1    1161728.0 1161728.0      5.0      ray.shutdown()
Timer unit: 1e-06 s

Total time: 6.6471 s
File: <ipython-input-3-a3749fc5b867>
Function: parallel_without_bar at line 23

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    23                                           def parallel_without_bar(N, L):
    24         1     707324.0 707324.0     10.6      ray.init(ignore_reinit_error=True, num_cpus=n_cpus)
    25         1    2472897.0 2472897.0     37.2      obj_ids = [test_remote.remote(L) for i in range(N)]
    26         1    2646773.0 2646773.0     39.8      ret = ray.get(obj_ids)
    27                                           
    28         1     820106.0 820106.0     12.3      ray.shutdown()
Timer unit: 1e-06 s

Total time: 70.925 s
File: <ipython-input-3-a3749fc5b867>
Function: single_with_bar at line 31

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    31                                           def single_with_bar(N, L):
    32         1   70924965.0 70924965.0    100.0      ret = [test(L) for i in tqdm(range(N))]

Thanks for yout help! And amazing work!

If we cannot run your script, we cannot fix your issue.

  • [x] I have verified my script runs in a clean environment and reproduces the issue.
  • [x] I have verified the issue also occurs with the latest wheels.
question

All 13 comments

Hey @alexmascension, thanks for opening this issue and sorry for the slow reply.

Interesting -- are you running this in the exact order as you specified?

If the TDQM runs first, you might hit some warmup latency. Can you also try this?

I would suggest running it maybe 3 times to get some variance estimate:

def parallel_wait_without_tqdm(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=os.cpu_count())

    def to_iterator(obj_ids):
        while obj_ids:
            done, obj_ids = ray.wait(obj_ids)
            yield ray.get(done[0])

    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = []
    for x in to_iterator(obj_ids):
        ret.append(x)

    ray.shutdown()

Hi,

I've inserted the function into the Colab if you want to test it. The times I get now are (3 times):
N = 5000, L = 1000000, n_cpus = 56

parallel_wait_with_bar: 19.95 | 19.87 | 19.85

parallel_wait_without_bar: 18.94 | 19.69 | 18.51

parallel_without_bar: 5.37 | 4.35 | 4.27

single_with_bar: 50.24 | 50.36 | 48.47

It seems to do not that much. I guess there has to be somthing with the loop that make is much slower.

Hi! Any news on this?

sorry I got caught up on a deadline - @rkooo567 can you take a crack?

@alexmascension I will take a look at it this weekends. Would you mind pinging me one more time if I don't get back by next Monday?

@alexmascension When you run the parallel_without_bar, could you also try using ray.wait instead of ray.get? Is it going to produce the same result?

EDIT

JK... I saw Richard already asked you

I couldn't really reproduce the issue. What N and L values did you set? I tested with a modified test function.

def test(L):
    i = 0
    while i < L:
        np.random.randint(0, 100, L)
        i += 1

My N = 1000 L = 5000

Hi!

The final results are run in a computer with 56 cores, so maybe a higher number of CPUs lets me see the differences?

I've created a kaggle kernel with the notebook (https://www.kaggle.com/alexmartnezascensin/ray-issue-8164). I've run the functions in the inverse order (single_with_bar, parallel_without_bar and then parallel_wait_with_bar) and three things happen:

  • I guess there is a communication error (it also happened in colab) and can't access the dashboard. I don't know if this can be a source of problems for ray in execution.
  • single_with_bar and parallel_without_bar have really similar times, ~70 s each (looks like there is no parallelization advantage?)
  • single_with_bar works at ~1300 it/s, and parallel_wait_with_bar at ~ 4 it/s [there's the error, at last!]. If I reset the notebook and run parallel_wait_with_bar first, I still reproduce the error.
  • parallel_wait_without_bar, similarly, takes a really long time to run (after 10 minutes I stopped the cell)

I've run the notebook in my personal computer and the same happens. (Only the last two points, the first one not but because I run it in a local jupyter notebook, I guess).

(sorry, the kaggle kernel doesn't seem to be accessible to me..)

Hi!

Sorry, I forgot to commit the kernel. You should be able to access it now. When you load the page, you can modify the kernel by clicking in "Copy and Edit". Maybe it requires you to log in, I'm not sure about that.

Also, if you manage to get there, change the N and L parameters to what appears in the comments. I've had to make them smaller for the kernel to be commited.

Thanks!

Hi!

Any new things over here?

@alexmascension thanks for following up. I suspect the issue that you're seeing is due to

  1. Interprocess communication overheads (where the work is very minimal - np.random.randn() is an operation that takes no more than a millisecond)
  2. Line-profiling probably also incurs overhead

Now, I've changed the function to a simple sleep call:

import ray
import numpy as np
from tqdm.notebook import tqdm
import os

n_cpus = os.cpu_count()
print(f'Number of CPUs: {n_cpus}')

# Note that I replaced this instead of the np.random call.
import time
def test(L):
    time.sleep(0.01)

test_remote = ray.remote(test)

And then using the same exact functions as you have:

def parallel_wait_with_bar(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=n_cpus)

    def to_iterator(obj_ids):
        while obj_ids:
            done, obj_ids = ray.wait(obj_ids)
            yield ray.get(done[0])

    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = []
    for x in tqdm(to_iterator(obj_ids), total=len(obj_ids)):
        ret.append(x)

    ray.shutdown()


def parallel_wait_without_bar(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=n_cpus)

    def to_iterator(obj_ids):
        while obj_ids:
            done, obj_ids = ray.wait(obj_ids)
            yield ray.get(done[0])

    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = []
    for x in to_iterator(obj_ids): # Remove TQDM
        ret.append(x)

    ray.shutdown()


def parallel_without_bar(N, L):
    ray.init(ignore_reinit_error=True, num_cpus=n_cpus)
    obj_ids = [test_remote.remote(L) for i in range(N)]
    ret = ray.get(obj_ids)

    ray.shutdown()


def single_with_bar(N, L):
    ret = [test(L) for i in tqdm(range(N))]

# Change N to 100000 and L to 50000 to see the results!!!!!!

N, L = 5000, 5000

The behavior is as I expect - the bar shouldn't impact the results, while the speedup should be as fast as expected (16 cores):

%time parallel_wait_with_bar(N, L)
# CPU times: user 3.89 s, sys: 401 ms, total: 4.29 s
# Wall time: 7.03 s

%time parallel_wait_without_bar(N, L)
# CPU times: user 3.86 s, sys: 369 ms, total: 4.22 s
# Wall time: 7.15 s

%time parallel_without_bar(N, L)
# CPU times: user 1.18 s, sys: 419 ms, total: 1.6 s
# Wall time: 7.13 s


%time single_with_bar(N, L)
# CPU times: user 1.39 s, sys: 219 ms, total: 1.61 s
# Wall time: 1min

Okey! Seems fair to me. Time to close the issue I guess!

Thanks @richardliaw!

Was this page helpful?
0 / 5 - 0 ratings