pandas.read_parquet() is very fast even for larger files. For this file from the NYC taxi dataset, loading takes around 0.5 seconds. However, if ray is imported before, even when it is not used, the time to read bumps up the 1.5 seconds.
Something seems to interfere with pandas - I couldn't get the same behavior with any other library. I tried dask, tensorflow (which takes ages for importing, but does not interfere with pandas), torch.
Follow-up (see comments below): This also impacts training speed for xgboost (14 secs without ray vs. 38 secs with ray), and possibly other libraries.
import time
import pandas as pd
import numpy as np
columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']
# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
mean 1.7001096263642004
time 0.489408016204834
import time
import pandas as pd
import numpy as np
import ray
columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']
# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
mean 1.7001096263642004
time 1.529021978378296
This seems to be a one-off overhead, however. Loading the data 10 times:
ststart = time.time(); df = pd.concat([pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns) for i in range(10)]); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
Without ray:
mean 1.7001096263642004
time 24.716262102127075
With ray:
mean 1.7001096263642004
time 26.9325749874115
Follow-up: It also impacts training using XGBoost:
import time
import pandas as pd
import numpy as np
import xgboost as xgb
columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']
# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
x = df[df.columns.difference(["passenger_count"])]
y = df["passenger_count"]
dtrain = xgb.DMatrix(x, y)
train_start = time.time()
bst = xgb.train(
params={
"tree_method": "hist",
"eval_metric": ["logloss", "error"],
},
dtrain=dtrain,
num_boost_round=20,
evals=[(dtrain, "train")])
train_taken = time.time() - train_start
print(f"TRAIN TIME TAKEN: {train_taken:.2f}")
TRAIN TIME TAKEN: 14.33
With import ray:
import time
import pandas as pd
import numpy as np
import xgboost as xgb
import ray
columns = ['passenger_count', 'trip_distance', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'pickup_location_id', 'dropoff_location_id', 'congestion_surcharge', 'year', 'month']
# Use https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com/2009/01/data.parquet
ststart = time.time(); df = pd.read_parquet("/Users/kai/nyc-taxi/2009/01/data.parquet", columns=columns); print("mean", np.mean(df["passenger_count"])); print("time", time.time() - ststart)
x = df[df.columns.difference(["passenger_count"])]
y = df["passenger_count"]
dtrain = xgb.DMatrix(x, y)
train_start = time.time()
bst = xgb.train(
params={
"tree_method": "hist",
"eval_metric": ["logloss", "error"],
},
dtrain=dtrain,
num_boost_round=20,
evals=[(dtrain, "train")])
train_taken = time.time() - train_start
print(f"TRAIN TIME TAKEN: {train_taken:.2f}")
TRAIN TIME TAKEN: 36.24
Note that ray is not initialized. Only the package is imported. No ray code is used at all.
Here are some profiling results for the parquet loading.
Without ray:
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 0.487 0.487 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:268(read_parquet)
1 0.038 0.038 0.440 0.440 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:120(read)
1 0.000 0.000 0.332 0.332 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1516(read_table)
1 0.001 0.001 0.325 0.325 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1254(read)
1 0.000 0.000 0.324 0.324 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:688(read)
1 0.000 0.000 0.321 0.321 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:313(read)
1 0.321 0.321 0.321 0.321 {method 'read_all' of 'pyarrow._parquet.ParquetReader' objects}
1 0.000 0.000 0.070 0.070 {method 'to_pandas' of 'pyarrow.lib._PandasConvertible' objects}
1 0.000 0.000 0.070 0.070 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:743(table_to_blockmanager)
1 0.000 0.000 0.066 0.066 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:1096(_table_to_blocks)
1 0.065 0.065 0.065 0.065 {pyarrow.lib.table_to_blocks}
37/4 0.000 0.000 0.047 0.012 <frozen importlib._bootstrap>:978(_find_and_load)
37/4 0.000 0.000 0.047 0.012 <frozen importlib._bootstrap>:948(_find_and_load_unlocked)
36/4 0.000 0.000 0.047 0.012 <frozen importlib._bootstrap>:663(_load_unlocked)
1 0.000 0.000 0.046 0.046 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:14(get_engine)
1 0.000 0.000 0.046 0.046 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:73(__init__)
With ray:
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 1.588 1.588 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:268(read_parquet)
1 0.031 0.031 1.574 1.574 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:120(read)
1 0.000 0.000 1.179 1.179 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1516(read_table)
1 0.001 0.001 1.167 1.167 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1254(read)
1 0.000 0.000 1.167 1.167 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:688(read)
1 0.000 0.000 1.162 1.162 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:313(read)
1 1.162 1.162 1.162 1.162 {method 'read_all' of 'pyarrow._parquet.ParquetReader' objects}
1 0.000 0.000 0.365 0.365 {method 'to_pandas' of 'pyarrow.lib._PandasConvertible' objects}
1 0.000 0.000 0.364 0.364 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:743(table_to_blockmanager)
1 0.000 0.000 0.361 0.361 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/pandas_compat.py:1096(_table_to_blocks)
1 0.360 0.360 0.360 0.360 {pyarrow.lib.table_to_blocks}
3 0.000 0.000 0.015 0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:679(open)
3 0.000 0.000 0.015 0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:1045(_open_dataset_file)
3 0.000 0.000 0.015 0.005 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pyarrow/parquet.py:205(__init__)
3 0.015 0.005 0.015 0.005 {method 'open' of 'pyarrow._parquet.ParquetReader' objects}
5/4 0.000 0.000 0.014 0.004 <frozen importlib._bootstrap>:978(_find_and_load)
4/3 0.000 0.000 0.014 0.005 <frozen importlib._bootstrap>:948(_find_and_load_unlocked)
1 0.000 0.000 0.014 0.014 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:14(get_engine)
1 0.000 0.000 0.014 0.014 /Users/kai/.pyenv/versions/3.7.7/lib/python3.7/site-packages/pandas/io/parquet.py:73(__init__)
Cc @ericl we should probably handle this asap? Looks like pyarrow related
Maybe because we set omp num threads environment variable to 1?
Yeah, I just checked that - that seems to be the case.
del os.environ["OMP_NUM_THREADS"]
solves the problem
And it seems to be documented, too. It would be nice to include a note for this specifically with regard to numerical libraries in the docs, but I'm closing the issue for now.
We used to log a warning, but removed it due to verbosity concerns :/ Not
sure of a great solution here since not setting it inevitably leads to
crashes and poor performance in the distributed setting.
On Thu, Oct 22, 2020, 11:04 AM Kai Fricke notifications@github.com wrote:
Closed #11554 https://github.com/ray-project/ray/issues/11554.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/ray-project/ray/issues/11554#event-3910450018, or
unsubscribe
https://github.com/notifications/unsubscribe-auth/AAADUST67FU44J6RN4EHVR3SMBXZRANCNFSM4S3BHEWQ
.
What if we set omp_num_threads only on the actor? i.e., ray.remote(num_cpus=10) => OMP_NUM_THREADS on actor is 10.
One issue is you have to set OMP_NUM_THREADS quite early (before import of other libraries) for it to take effect. It might be possible to delay it until a concrete task is run with resources. That still leaves the question of what to do for the driver though (which doesn't have resource specs). Users have complained before about the driver env being different from actor/task envs, which results in odd performance differences for remoteed code.
Btw there is a way to override it: set OMP_NUM_THREADS manually outside of Ray. We won't override it if it isn't set.
So the problem that I have is that I need to (un)set it within a library and then also in the remote workers (I suppose, at least I had to do that to make it work). This is because actual training via xgboost takes place in a remote ray actor. It's fine with me for the time being but feels a bit hacky.