Pandas: Add filters parameter to pandas.read_parquet() to enable PyArrow/Parquet partition filtering

Created on 29 May 2019  路  22Comments  路  Source: pandas-dev/pandas

Code Sample, a copy-pastable example if possible

ticker = 'AA'

stocks_close_df = pd.read_parquet(
    'data/v4.parquet',
    columns=['DateTime', 'Close', 'Ticker'],
    engine='pyarrow',
    filters=[('Ticker','=',ticker)]
)

# This is what the above should effect
stocks_close_df = stocks_close_df[stocks_close_df.Ticker == ticker]

This results in the following exception:

TypeError                                 Traceback (most recent call last)
<ipython-input-7-450ddb513430> in <module>
      6     columns=['DateTime', 'Close', 'Ticker'],
      7     engine='pyarrow',
      8     filters=[('Ticker','=',ticker)]
      9 )
     10 stocks_close_df.index = stocks_close_df['DateTime']

~/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    280 
    281     impl = get_engine(engine)
    282     return impl.read(path, columns=columns, **kwargs)

~/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    127         kwargs['use_pandas_metadata'] = True
    128         result = self.api.parquet.read_table(path, columns=columns,
    129                                              **kwargs).to_pandas()
    130         if should_close:
    131             try:

TypeError: read_table() got an unexpected keyword argument 'filters'

Problem description

I would like to pass a filters argument from pandas.read_parquet through to the pyarrow engine to do filtering on partitions in Parquet files. The pyarrow engine has this capability, it is just a matter of passing through the filters argument.

From a discussion on [email protected]:

But, filtering could also be done when reading the parquet file(s), to
actually prevent reading everything into memory. However, this is only
partly implemented in pyarrow at this moment. If you have a dataset
consisting of partitioned files in nested directories (Hive like), pyarrow
can filter on which files to read. See the "filters" keyword of
ParquetDataset (
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html).
I am only not fully sure you can already use this through the pandas
interface, it might be you need to use the pyarrow interface directly (in
which case, feel free to open an issue on the pandas issue tracker).

Note that https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html takes a filters (List[Tuple] or List[List[Tuple]] or None (default)) argument.

Expected Output

A filtered pandas.DataFrame.

Output of pd.show_versions()

[paste the output of pd.show_versions() here below this line]

INSTALLED VERSIONS

commit: None
python: 3.6.8.final.0
python-bits: 64
OS: Linux
OS-release: 4.15.0-50-generic
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8

pandas: 0.24.2
pytest: 4.5.0
pip: 19.1.1
setuptools: 41.0.1
Cython: 0.29.7
numpy: 1.16.3
scipy: 1.2.1
pyarrow: 0.13.0
xarray: None
IPython: 7.5.0
sphinx: 2.0.1
patsy: 0.5.1
dateutil: 2.8.0
pytz: 2019.1
blosc: None
bottleneck: 1.2.1
tables: 3.5.1
numexpr: 2.6.9
feather: None
matplotlib: 3.0.3
openpyxl: 2.6.2
xlrd: 1.2.0
xlwt: 1.3.0
xlsxwriter: 1.1.8
lxml.etree: 4.3.3
bs4: 4.7.1
html5lib: 1.0.1
sqlalchemy: 1.3.3
pymysql: None
psycopg2: None
jinja2: 2.10.1
s3fs: None
fastparquet: 0.3.1
pandas_gbq: None
pandas_datareader: None
gcsfs: None

IO Parquet

Most helpful comment

@rjurney The PR in arrow is merged (https://github.com/apache/arrow/pull/4409), so once the next release of pyarrow is out, you will be able to pass the filters argument directly in pd.read_parquet.

Since no further action is required for pandas, closing this.

All 22 comments

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html#pyarrow.parquet.read_table

is what is being called; pyarrow would need to expose this; pandas already passes thru kwargs

There is another interface that does expose this

But in principle we can also call ParquetDataset if a directory is passed, so this would be supported then.

and we already have https://github.com/pandas-dev/pandas/issues/17102 for this

pls close one or the other

The item mentioned there is something different (row groups filtering (instead of file filtering), which is not yet implemented on the arrow side). But will go through the items in the other issue to see if that actually can be closed.

I am also looking at the pyarrow side if it can be exposed in the same function, so pandas does not have to take a different path depending on a directory is passed or a single file.

Will update / close this issue depending on that.

I opened https://issues.apache.org/jira/browse/ARROW-5436. Let's see if that change would be accepted by pyarrow. If so, we can close this issue, otherwise we can implement it in pandas.

I have the code that motivated this working with the following workaround. This seems like it would work for pandas as well internally, but I understand wanting the read_table method to work properly.

# Test ticker
ticker = 'AAPL'

stocks_close_ds = ParquetDataset(
    'data/v4.parquet',
    filters=[('Ticker','=',ticker)]
)
table = stocks_close_ds.read()
stocks_close_df = table.to_pandas()

stocks_close_df

@rjurney that's indeed the good approach. I did a PR on the arrow project to add filters to read_table as well. But if that does not go through, we can do the above in pandas.

@jorisvandenbossche Cool, thanks for the PR

@rjurney The PR in arrow is merged (https://github.com/apache/arrow/pull/4409), so once the next release of pyarrow is out, you will be able to pass the filters argument directly in pd.read_parquet.

Since no further action is required for pandas, closing this.

Thanks!

@jorisvandenbossche the docs haven't been updated to add filter. Should they be, or will this have to wait for a release?

https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html

Hello,

I'm trying to reproduce the workaround mentioned by @rjurney with a parquet file that use multiple row_groups:

ParquetDataset('data.parquet',
               filters=[('ticker', '=', 'AAPL')]).read().to_pandas()

And I got an exception:

AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'

If I use a dataset directory instead of a parquet file, there is no exception but the filter is not pushed down to row_groups of a parquet file (as mentioned in the doc I think) and nothing is filtered.

So I wonder if @rjurney was dealing with a single parquet file and not a dataset directory or if there is some behaviour changes between releases?

Row group filtering within a file was not yet supported by pyarrow (and thus not in pandas.read_parquet. The upcoming version of pyarrow (0.17.0) will add this, and then this will also become available in pandas.

Row group filtering within a file was not yet supported by pyarrow (and thus not in pandas.read_parquet. The upcoming version of pyarrow (0.17.0) will add this, and then this will also become available in pandas.

Do you happen to know what version of Pandas will support this? I just tried this on pyarrow 0.17.1 and pandas 1.0.3 and receive the exact same error as cclienti above:

Traceback (most recent call last):
  File "filter_parquet_from_aws.py", line 6, in <module>
    filters=[[('iso_country_code', '=', 'US')]]
  File "/home/jeremyakers/.local/lib/python3.6/site-packages/awswrangler/s3.py", line 1726, in read_parquet
    s3_additional_kwargs=s3_additional_kwargs,
  File "/home/jeremyakers/.local/lib/python3.6/site-packages/awswrangler/s3.py", line 1603, in _read_parquet_init
    split_row_groups=False,
  File "/home/jeremyakers/.local/lib/python3.6/site-packages/pyarrow/parquet.py", line 1200, in __init__
    self._filter(filters)
  File "/home/jeremyakers/.local/lib/python3.6/site-packages/pyarrow/parquet.py", line 1314, in _filter
    accepts_filter = self.partitions.filter_accepts_partition
AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'

@jeremyakers because the additional keywords are passed through, it should actually already work with released pandas 1.0.3.
But, for activating the row group filtering, you will need to manually activate the "new" pyarrow reader (it will become the default in the future, but for now still opt in). As a small example, with a dummy dataframe:

In [1]: pd.read_parquet("test.parquet")
Out[1]: 
          a         b         c         d         e
0  0.833006  1.638854 -0.250418 -0.141500 -0.075873
1 -0.117031  0.370930  0.776447 -0.569128 -0.954441
2 -1.737407  0.073124  2.550558 -1.632866 -0.518660
3 -1.519883  0.299574  1.401251 -0.731901  0.419746
4 -1.535166  0.228494 -0.617373  0.143101  0.900306

In [2]: pd.read_parquet("test.parquet", filters=[('a', '>', 0)]) 
...
AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'

In [3]: pd.read_parquet("test.parquet", use_legacy_dataset=False, filters=[('a', '>', 0)]) 
Out[3]: 
          a         b         c         d         e
0  0.833006  1.638854 -0.250418 -0.141500 -0.075873

So the trick here is to specify use_legacy_dataset=False to enable the new functionality.

See the note in the pyarrow docs here: https://arrow.apache.org/docs/python/parquet.html#reading-from-partitioned-datasets (it is in the partitioned datasets section, but also is valid for reading single parquet files)

Thank you for the detailed response.

I had actually tried adding use_legacy_dataset=False but I received the below exception. I'm also using awswrangler (Which is built on top of Pandas) so perhaps it's not passing through the option?

Traceback (most recent call last):
  File "filter_parquet_from_aws.py", line 10, in <module>
    filters=[('iso_country_code', '==', 'US')]
TypeError: read_parquet() got an unexpected keyword argument 'use_legacy_dataset'

Probably, I am not familiar with awswrangler. But from looking into their code quickly (https://github.com/awslabs/aws-data-wrangler/blob/57b613346f2aaecd1e05ae39647fa1dc69b0711b/awswrangler/s3.py#L1684), it doesn't seem compatible with using use_legacy_dataset unfortunately.

Weirdly, I'm having the same problems while using pandas 1.0.4 and pyarrow 0.17.1. At first, it seems like it's the same case of @jorisvandenbossche 's last post.

In [4]: pyarrow.__version__                                                                                           
Out[4]: '0.17.1'

In [5]: pd.__version__                                                                                                
Out[5]: '1.0.4'

In [6]: pd.read_parquet('test.parquet')                                                                               
Out[6]: 
          a         b         c         d         e
0  0.833006  1.638854 -0.250418 -0.141500 -0.075873
1 -0.117031  0.370930  0.776447 -0.569128 -0.954441
2 -1.737407  0.073124  2.550558 -1.632866 -0.518660
3 -1.519883  0.299574  1.401251 -0.731901  0.419746
4 -1.535166  0.228494 -0.617373  0.143101  0.900306

In [7]: pd.read_parquet('test.parquet',filters=[('a','>',0)])                                                         
...
AttributeError: 'NoneType' object has no attribute 'filter_accepts_partition'

But then:

In [8]: pd.read_parquet('test.parquet',use_legacy_dataset=False,filters=[('a','>',0)])                                
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-80474d4e52cf> in <module>
----> 1 pd.read_parquet('test.parquet',use_legacy_dataset=False,filters=[('a','>',0)])

~/pd3/lib/python3.8/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    313 
    314     impl = get_engine(engine)
--> 315     return impl.read(path, columns=columns, **kwargs)

~/pd3/lib/python3.8/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    132         )
    133         kwargs["columns"] = columns
--> 134         result = parquet_ds.read_pandas(**kwargs).to_pandas()
    135         return result
    136 

~/pd3/lib/python3.8/site-packages/pyarrow/parquet.py in read_pandas(self, **kwargs)
   1471         through to ParquetDataset.read, see docstring for further details.
   1472         """
-> 1473         return self.read(use_pandas_metadata=True, **kwargs)
   1474 
   1475     @property

TypeError: read() got an unexpected keyword argument 'use_legacy_dataset'

It seems it's not calling read_table but another method. When I directly use pyarrow.parquet.read_table(), it works, but then I lose the metadata about IntDType columns.

@Fbrufino would you be able to check with pandas 1.0.3? We had some parquet-related regressions in 1.0.4, which will be fixed shortly in 1.0.5.

I opened a PR to add a test for this: https://github.com/pandas-dev/pandas/pull/34804
So we can ensure we don't break it anymore in the future (and we can also backport that test to the 1.0.x branch, so to ensure it is fixed for the upcoming 1.0.5 release)

@Fbrufino would you be able to check with pandas 1.0.3? We had some parquet-related regressions in 1.0.4, which will be fixed shortly in 1.0.5.

Yes, I assume you might already know by now, but it works on 1.0.3, thanks!

There's another issue in reading parquets where large datasets saved along with their indexes are failing to load with both use_legacy_dataset=False and a non-empty columns argument, but afaik it's the case for a new issue report, which I'll do later in more detail.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

mfmain picture mfmain  路  3Comments

songololo picture songololo  路  3Comments

scls19fr picture scls19fr  路  3Comments

amelio-vazquez-reina picture amelio-vazquez-reina  路  3Comments

Ashutosh-Srivastav picture Ashutosh-Srivastav  路  3Comments