Ray: [core] importing ray slows down pandas, xgboost, possibly other libraries

Created on 22 Oct 2020  Â·  11Comments  Â·  Source: ray-project/ray

What is the problem?

pandas.read_parquet() is very fast even for larger files. For this file from the NYC taxi dataset, loading takes around 0.5 seconds. However, if ray is imported before, even when it is not used, the time to read bumps up the 1.5 seconds.

Something seems to interfere with pandas - I couldn't get the same behavior with any other library. I tried dask, tensorflow (which takes ages for importing, but does not interfere with pandas), torch.

Follow-up (see comments below): This also impacts training speed for xgboost (14 secs without ray vs. 38 secs with ray), and possibly other libraries.

Reproduction (REQUIRED)

import time

import pandas as pd
import numpy as np


columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']

# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
mean 1.7001096263642004
time 0.489408016204834
import time

import pandas as pd
import numpy as np

import ray

columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']

# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
mean 1.7001096263642004
time 1.529021978378296
  • [x] I have verified my script runs in a clean environment and reproduces the issue.
  • [x] I have verified the issue also occurs with the latest wheels.
bug triage

All 11 comments

This seems to be a one-off overhead, however. Loading the data 10 times:

ststart = time.time(); df = pd.concat([pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns) for i in range(10)]); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)

Without ray:

mean 1.7001096263642004
time 24.716262102127075

With ray:

mean 1.7001096263642004
time 26.9325749874115

Follow-up: It also impacts training using XGBoost:

import time

import pandas as pd
import numpy as np

import xgboost as xgb


columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']

# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)

x = df[df.columns.difference(["passenger_count"])]
y = df["passenger_count"]

dtrain = xgb.DMatrix(x, y)

train_start = time.time()
bst = xgb.train(
    params={
        "tree_method": "hist",
        "eval_metric": ["logloss", "error"],
    },
    dtrain=dtrain,
    num_boost_round=20,
    evals=[(dtrain, "train")])
train_taken = time.time() - train_start
print(f"TRAIN TIME TAKEN: {train_taken:.2f}")
TRAIN TIME TAKEN: 14.33

With import ray:

import time

import pandas as pd
import numpy as np

import xgboost as xgb

import ray

columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']

# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)

x = df[df.columns.difference(["passenger_count"])]
y = df["passenger_count"]

dtrain = xgb.DMatrix(x, y)

train_start = time.time()
bst = xgb.train(
    params={
        "tree_method": "hist",
        "eval_metric": ["logloss", "error"],
    },
    dtrain=dtrain,
    num_boost_round=20,
    evals=[(dtrain, "train")])
train_taken = time.time() - train_start
print(f"TRAIN TIME TAKEN: {train_taken:.2f}")
TRAIN TIME TAKEN: 36.24

Note that ray is not initialized. Only the package is imported. No ray code is used at all.

Here are some profiling results for the parquet loading.

Without ray:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.487    0.487 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:268(read_parquet)
        1    0.038    0.038    0.440    0.440 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:120(read)
        1    0.000    0.000    0.332    0.332 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1516(read_table)
        1    0.001    0.001    0.325    0.325 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1254(read)
        1    0.000    0.000    0.324    0.324 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:688(read)
        1    0.000    0.000    0.321    0.321 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:313(read)
        1    0.321    0.321    0.321    0.321 {method 'read_all' of 'pyarrow._parquet.ParquetReader' objects}
        1    0.000    0.000    0.070    0.070 {method 'to_pandas' of 'pyarrow.lib._PandasConvertible' objects}
        1    0.000    0.000    0.070    0.070 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:743(table_to_blockmanager)
        1    0.000    0.000    0.066    0.066 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:1096(_table_to_blocks)
        1    0.065    0.065    0.065    0.065 {pyarrow.lib.table_to_blocks}
     37/4    0.000    0.000    0.047    0.012 <frozen importlib._bootstrap>:978(_find_and_load)
     37/4    0.000    0.000    0.047    0.012 <frozen importlib._bootstrap>:948(_find_and_load_unlocked)
     36/4    0.000    0.000    0.047    0.012 <frozen importlib._bootstrap>:663(_load_unlocked)
        1    0.000    0.000    0.046    0.046 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:14(get_engine)
        1    0.000    0.000    0.046    0.046 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:73(__init__)

With ray:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    1.588    1.588 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:268(read_parquet)
        1    0.031    0.031    1.574    1.574 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:120(read)
        1    0.000    0.000    1.179    1.179 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1516(read_table)
        1    0.001    0.001    1.167    1.167 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1254(read)
        1    0.000    0.000    1.167    1.167 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:688(read)
        1    0.000    0.000    1.162    1.162 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:313(read)
        1    1.162    1.162    1.162    1.162 {method 'read_all' of 'pyarrow._parquet.ParquetReader' objects}
        1    0.000    0.000    0.365    0.365 {method 'to_pandas' of 'pyarrow.lib._PandasConvertible' objects}
        1    0.000    0.000    0.364    0.364 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:743(table_to_blockmanager)
        1    0.000    0.000    0.361    0.361 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:1096(_table_to_blocks)
        1    0.360    0.360    0.360    0.360 {pyarrow.lib.table_to_blocks}
        3    0.000    0.000    0.015    0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:679(open)
        3    0.000    0.000    0.015    0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1045(_open_dataset_file)
        3    0.000    0.000    0.015    0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:205(__init__)
        3    0.015    0.005    0.015    0.005 {method 'open' of 'pyarrow._parquet.ParquetReader' objects}
      5/4    0.000    0.000    0.014    0.004 <frozen importlib._bootstrap>:978(_find_and_load)
      4/3    0.000    0.000    0.014    0.005 <frozen importlib._bootstrap>:948(_find_and_load_unlocked)
        1    0.000    0.000    0.014    0.014 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:14(get_engine)
        1    0.000    0.000    0.014    0.014 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:73(__init__)

Cc @ericl we should probably handle this asap? Looks like pyarrow related

Maybe because we set omp num threads environment variable to 1?

Yeah, I just checked that - that seems to be the case.

del os.environ["OMP_NUM_THREADS"]

solves the problem

And it seems to be documented, too. It would be nice to include a note for this specifically with regard to numerical libraries in the docs, but I'm closing the issue for now.

We used to log a warning, but removed it due to verbosity concerns :/ Not
sure of a great solution here since not setting it inevitably leads to
crashes and poor performance in the distributed setting.

On Thu, Oct 22, 2020, 11:04 AM Kai Fricke notifications@github.com wrote:

Closed #11554 https://github.com/ray-project/ray/issues/11554.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/11554#event-3910450018, or
unsubscribe
https://github.com/notifications/unsubscribe-auth/AAADUST67FU44J6RN4EHVR3SMBXZRANCNFSM4S3BHEWQ
.

What if we set omp_num_threads only on the actor? i.e., ray.remote(num_cpus=10) => OMP_NUM_THREADS on actor is 10.

One issue is you have to set OMP_NUM_THREADS quite early (before import of other libraries) for it to take effect. It might be possible to delay it until a concrete task is run with resources. That still leaves the question of what to do for the driver though (which doesn't have resource specs). Users have complained before about the driver env being different from actor/task envs, which results in odd performance differences for remoteed code.

Btw there is a way to override it: set OMP_NUM_THREADS manually outside of Ray. We won't override it if it isn't set.

So the problem that I have is that I need to (un)set it within a library and then also in the remote workers (I suppose, at least I had to do that to make it work). This is because actual training via xgboost takes place in a remote ray actor. It's fine with me for the time being but feels a bit hacky.

Was this page helpful?
0 / 5 - 0 ratings