(Comes from https://github.com/dask/distributed/issues/1978#issuecomment-645869748)
What happened:
$ ipython
Python 3.8.3 (default, May 20 2020, 12:50:54)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.15.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: # coding: utf-8
...: from dask import dataframe as dd
...: import pandas as pd
...: from distributed import Client
...: client = Client()
...: df = dd.read_csv("../data/yellow_tripdata_2019-*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
...: payment_types = {
...: 1: "Credit Card",
...: 2: "Cash",
...: 3: "No Charge",
...: 4: "Dispute",
...: 5: "Unknown",
...: 6: "Voided trip"
...: }
...: payment_names = pd.Series(
...: payment_types, name="payment_name"
...: ).to_frame()
...: df2 = df.merge(
...: payment_names, left_on="payment_type", right_index=True
...: )
...: op = df2.groupby("payment_name")["tip_amount"].mean()
...: client.compute(op)
...:
Out[1]: <Future: pending, key: finalize-85edcc1f23785545f628c932abd19768>
In [2]: distributed.worker - WARNING - Compute Failed
Function: _apply_chunk
args: ( VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag ... mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge payment_name
0 1 2019-01-04 14:08:46 2019-01-04 14:18:10 1 1.70 1 N ... 0.5 0.0 0.00 0.3 9.30 NaN Cash
1 1 2019-01-04 14:20:33 2019-01-04 14:25:10 1 0.90 1 N ... 0.5 0.0 0.00 0.3 6.30 NaN Cash
13 2 2019-01-04 14:14:45 2019-01-04 14:26:00 5 1.63 1 N ... 0.5 0.0 0.00 0.3 9.80 NaN Cash
15 2 2019-01-04 14:49:45 2019-01-04 15:0
kwargs: {'chunk': <methodcaller: sum>, 'columns': 'tip_amount'}
Exception: ValueError('buffer source array is read-only')
In [2]:
In [2]: client
Out[2]: <Client: 'tcp://127.0.0.1:33689' processes=4 threads=4, memory=16.70 GB>
In [3]: _1
Out[3]: <Future: error, key: finalize-85edcc1f23785545f628c932abd19768>
What you expected to happen: The operation finishes without error.
Minimal Complete Verifiable Example:
# coding: utf-8
from dask import dataframe as dd
import pandas as pd
from distributed import Client
client = Client()
df = dd.read_csv("../data/yellow_tripdata_2019-*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
payment_types = {
1: "Credit Card",
2: "Cash",
3: "No Charge",
4: "Dispute",
5: "Unknown",
6: "Voided trip"
}
payment_names = pd.Series(
payment_types, name="payment_name"
).to_frame()
df2 = df.merge(
payment_names, left_on="payment_type", right_index=True
)
op = df2.groupby("payment_name")["tip_amount"].mean()
client.compute(op)
Data:
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-02.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-03.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-04.csv
Anything else we need to know?: I managed to avoid this error by reducing the number of files, but then it hit me again at a later point. I expect this behavior to be dependent on the available RAM.
Environment:
Is it possible to reproduce this without the Taxi data? Would some randomly generated data work as well?
Perhaps, although it will take me more time to narrow down the example. I managed to reproduce the problem with these URLs (stripped out some files with mixed dtypes) and without the intermediate merge:
yellow_tripdata_2019-01.csv
yellow_tripdata_2019-02.csv
yellow_tripdata_2019-03.csv
yellow_tripdata_2019-04.csv
yellow_tripdata_2019-05.csv
yellow_tripdata_2019-06.csv
yellow_tripdata_2019-12.csv
# coding: utf-8
from dask import dataframe as dd
from distributed import Client
client = Client()
df = dd.read_csv("../data/yellow_tripdata_2019-*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
op = df.groupby("payment_type")["tip_amount"].mean()
client.compute(op)
However, removing one file resulted in the operation executing successfully. I'll try to see if this can be simplified further.
Thanks Juan! 馃榾
When I ran this I got the mismatched dtype error:
In [7]: /Users/bzaitlen/Documents/GitHub/distributed/distributed/worker.py:3312: DtypeWarning: Columns (6) have mixed types.Specify dtype option on import or set low_memory=False.
return func(*map(execute_task, args))
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function check_meta at 0x1132a88c0>, (<function apply at 0x100dd8b00>, <function pandas_read_text at 0x113380dd0>, [<function _make_parser_function.<locals>.parser_f at 0x114baf710>, (<function read_block_from_file at 0x114b2c830>, <OpenFile '/Users/bzaitlen/Downloads/yellow_tripdata_2019-12.csv'>, 576000000, 64000000, b'\n'), b'VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n', (<class 'dict'>, [['parse_dates', ['tpep_pickup_datetime', 'tpep_dropoff_datetime']]]), (<class 'dict'>, [['VendorID', dtype('int64')], ['tpep_pickup_datetime', dtype('<M8[ns]')], ['tpep_dropoff_datetime', dtype('<M8[ns]')], ['passenger_count', dtype('int64')], ['trip_distance', dtype('float64')], ['RatecodeID', dtype('int64')], ['store_and_fwd_flag', dtype('O')], ['PULocationID', dtype('int64')], ['
kwargs: {}
Exception: ValueError("Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.\n\n+-----------------+---------+----------+\n| Column | Found | Expected |\n+-----------------+---------+----------+\n| RatecodeID | float64 | int64 |\n| VendorID | float64 | int64 |\n| passenger_count | float64 | int64 |\n| payment_type | float64 | int64 |\n+-----------------+---------+----------+\n\nUsually this is due to dask's dtype inference failing, and\n*may* be fixed by specifying dtypes manually by adding:\n\ndtype={'RatecodeID': 'float64',\n 'VendorID': 'float64',\n 'passenger_count': 'float64',\n 'payment_type': 'float64'}\n\nto the call to `read_csv`/`read_table`.\n\nAlternatively, provide `assume_missing=True` to interpret\nall unspecified integer columns as floats.")
FWIW I tried to ensure that we always create bytearrays in merge_frames (part of the serialization/deserialization pipeline) with PR ( https://github.com/dask/distributed/pull/3918 ). That would ensure all the buffers backing data is writeable. It seems to work on a trivial NumPy test include in that PR. Maybe that helps? Would you be able to try it @astrojuanlu?
I confirm that #3918 fixed the issue with the same data:
In [1]: from dask import dataframe as dd
...: from distributed import Client
...:
...: client = Client()
...:
...: df = dd.read_csv("../data/yellow_tripdata_2019-*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
...:
...: op = df.groupby("payment_type")["tip_amount"].mean()
...: client.compute(op)
Out[1]: <Future: pending, key: finalize-d2d79ddf9a418b1c0ed76bfa1c20daf6>
In [2]: _1
Out[2]: <Future: finished, type: pandas.Series, key: finalize-d2d79ddf9a418b1c0ed76bfa1c20daf6>
In [3]: _1.result()
Out[3]:
payment_type
1 2.976392
2 0.000326
3 0.625695
4 -0.010395
5 0.000000
Name: tip_amount, dtype: float64
Thanks Juan! 馃榾
Will look at cleaning it up and adding some more tests.
Adding a test for a Pandas Series as well in PR ( https://github.com/dask/distributed/pull/3995 ).