Pandas: BUG: pandas not connecting to private S3 endpoint

Created on 16 Nov 2020  路  8Comments  路  Source: pandas-dev/pandas

Problem description

Connecting to https://bucket.s3.amazonaws.com rather then private endpoint

df = pd.read_parquet('s3://bucket/file.parquet', storage_options={'client_kwargs':{'endpoint_url': S3_ENDPOINT}})

as workaround I do:

# ecs is a S3FileSystem object with client_kwargs

with ecs.open('bucket/file.parquet', 'rb') as f:
    df = pd.read_parquet(f)

Expected Output

Connecting to private S3 bucket

Output of pd.show_versions()

INSTALLED VERSIONS

commit : None
python : 3.8.3.final.0
python-bits : 64
OS : Linux
OS-release : 3.10.0-1127.19.1.el7.x86_64
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : None
LANG : en_US.UTF-8
LOCALE : en_US.UTF-8

pandas : 1.0.5
numpy : 1.18.5
pytz : 2020.1
dateutil : 2.8.1
pip : 20.1.1
setuptools : 49.1.0.post20200704
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 2.11.2
IPython : 7.16.1
pandas_datareader: None
bs4 : None
bottleneck : None
fastparquet : 0.4.0
gcsfs : None
lxml.etree : None
matplotlib : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 0.17.1
pytables : None
pytest : None
pyxlsb : None
s3fs : 0.4.2
scipy : None
sqlalchemy : 1.3.18
tables : None
tabulate : None
xarray : None
xlrd : None
xlwt : None
xlsxwriter : None
numba : 0.50.1

Bug IO Network IO Parquet

All 8 comments

thank you for your report @MarkiesFredje!

Pandas is internally using fsspec to open s3 files. Do you mind trying to use fsspec to explicitly open a file, it should be something like:

import fsspec
file_obj = fsspec.open("s3:///..." mode="rb", **storage_options)).open()

That will help us to see whether it is an issue with fsspec or not.

read_parquet, is actually using a slightly different code path (to also work with directories):

fs, path = fsspec.core.url_to_fs(path, **storage_options)
result = pyarrow.api.parquet.read_table(path, columns=columns, filesystem=fs, use_pandas_metadata=True).to_pandas()

Thanks for your reply @twoertwein , I learned a few things from your post

Good news is, your test went just fine. I share my code below.


import fsspec
import pyarrow.parquet as pq

dict_ecs = {anon': False, 'key':  'my_key', 'secret': 'my_secret', 'use_ssl': False,  'client_kwargs': {'endpoint_url': 'http://....'}}

file_obj = fsspec.open("s3://bucket/file.parquet", mode="rb", **dict_ecs).open()

fs, path = fsspec.core.url_to_fs("s3://bucket/file.parquet", **dict_ecs)
result = pq.read_table(path, filesystem=fs, use_pandas_metadata=True).to_pandas()

result.head()

file_objis fine, same for resultdataframe.

thank you for testing! Can you please provide the error message you are getting with (including which file throws the error)

df = pd.read_parquet('s3://bucket/file.parquet', storage_options={'client_kwargs':{'endpoint_url': S3_ENDPOINT}})

I added a trace at the bottom of the post, but I suspect its not very helpful.
I suspect dict_ecs parameters are ignored or lost in translation somewhere
The code attempts to connect to AWS, which it shouldn't since I pass a different endpoint.
I believe the trace reflects the firewall blocking off traffic to AWS.

import pandas as pd

dict_ecs = {anon': False, 'key':  'my_key', 'secret': 'my_secret', 'use_ssl': False,  'client_kwargs': {'endpoint_url': 'http://....'}}

df = pd.read_parquet('s3://bucket/file.parquet', storage_options=dict_ecs)

---------------------------------------------------------------------------
NoCredentialsError                        Traceback (most recent call last)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_file_and_filesystem(filepath_or_buffer, mode)
     28     try:
---> 29         file = fs.open(_strip_schema(filepath_or_buffer), mode)
     30     except (FileNotFoundError, NoCredentialsError):

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
    768             ac = kwargs.pop("autocommit", not self._intrans)
--> 769             f = self._open(
    770                 path,

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, **kwargs)
    374 
--> 375         return S3File(self, path, mode, block_size=block_size, acl=acl,
    376                       version_id=version_id, fill_cache=fill_cache,

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays)
   1095         self.req_kw = {'RequestPayer': 'requester'} if requester_pays else {}
-> 1096         super().__init__(s3, path, mode, block_size, autocommit=autocommit,
   1097                          cache_type=cache_type)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
   1064             if not hasattr(self, "details"):
-> 1065                 self.details = fs.info(path)
   1066             self.size = self.details["size"]

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
    528             try:
--> 529                 out = self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
    530                                     Key=key, **version_id_kw(version_id), **self.req_kw)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    199                                                        **kwargs)
--> 200         return method(**additional_kwargs)
    201 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    315             # The "self" in this scope is referring to the BaseClient.
--> 316             return self._make_api_call(operation_name, kwargs)
    317 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    620         else:
--> 621             http, parsed_response = self._make_request(
    622                 operation_model, request_dict, request_context)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
    640         try:
--> 641             return self._endpoint.make_request(operation_model, request_dict)
    642         except Exception as e:

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
    101                      operation_model, request_dict)
--> 102         return self._send_request(request_dict, operation_model)
    103 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
    131         attempts = 1
--> 132         request = self.create_request(request_dict, operation_model)
    133         context = request_dict['context']

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in create_request(self, params, operation_model)
    114                 op_name=operation_model.name)
--> 115             self._event_emitter.emit(event_name, request=request,
    116                                      operation_name=operation_model.name)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    355         aliased_event_name = self._alias_event_name(event_name)
--> 356         return self._emitter.emit(aliased_event_name, **kwargs)
    357 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
    227         """
--> 228         return self._emit(event_name, kwargs)
    229 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
    210             logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211             response = handler(**kwargs)
    212             responses.append((handler, response))

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/signers.py in handler(self, operation_name, request, **kwargs)
     89         # Don't call this method directly.
---> 90         return self.sign(operation_name, request)
     91 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/signers.py in sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
    159 
--> 160             auth.add_auth(request)
    161 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/auth.py in add_auth(self, request)
    356         if self.credentials is None:
--> 357             raise NoCredentialsError
    358         datetime_now = datetime.datetime.utcnow()

NoCredentialsError: Unable to locate credentials

During handling of the above exception, another exception occurred:

ClientError                               Traceback (most recent call last)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
    528             try:
--> 529                 out = self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
    530                                     Key=key, **version_id_kw(version_id), **self.req_kw)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    199                                                        **kwargs)
--> 200         return method(**additional_kwargs)
    201 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
    315             # The "self" in this scope is referring to the BaseClient.
--> 316             return self._make_api_call(operation_name, kwargs)
    317 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
    634             error_class = self.exceptions.from_code(error_code)
--> 635             raise error_class(parsed_response, operation_name)
    636         else:

ClientError: An error occurred (503) when calling the HeadObject operation (reached max retries: 4): Service Unavailable

During handling of the above exception, another exception occurred:

OSError                                   Traceback (most recent call last)
<ipython-input-11-e75bb74df3d5> in <module>
----> 1 df = pd.read_parquet('s3://bai/transformed/omzet-gissen/2020/drivetimes_own.parquet', storage_options=dict_ecs)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    310 
    311     impl = get_engine(engine)
--> 312     return impl.read(path, columns=columns, **kwargs)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    121 
    122     def read(self, path, columns=None, **kwargs):
--> 123         path, _, _, should_close = get_filepath_or_buffer(path)
    124 
    125         kwargs["use_pandas_metadata"] = True

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/common.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
    182         from pandas.io import s3
    183 
--> 184         return s3.get_filepath_or_buffer(
    185             filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
    186         )

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
     46     mode: Optional[str] = None,
     47 ) -> Tuple[IO, Optional[str], Optional[str], bool]:
---> 48     file, _fs = get_file_and_filesystem(filepath_or_buffer, mode=mode)
     49     return file, None, compression, True

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_file_and_filesystem(filepath_or_buffer, mode)
     36         # for that bucket.
     37         fs = s3fs.S3FileSystem(anon=True)
---> 38         file = fs.open(_strip_schema(filepath_or_buffer), mode)
     39     return file, fs
     40 

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
    767         else:
    768             ac = kwargs.pop("autocommit", not self._intrans)
--> 769             f = self._open(
    770                 path,
    771                 mode=mode,

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, **kwargs)
    373             cache_type = self.default_cache_type
    374 
--> 375         return S3File(self, path, mode, block_size=block_size, acl=acl,
    376                       version_id=version_id, fill_cache=fill_cache,
    377                       s3_additional_kwargs=kw, cache_type=cache_type,

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays)
   1094         self.s3_additional_kwargs = s3_additional_kwargs or {}
   1095         self.req_kw = {'RequestPayer': 'requester'} if requester_pays else {}
-> 1096         super().__init__(s3, path, mode, block_size, autocommit=autocommit,
   1097                          cache_type=cache_type)
   1098         self.s3 = self.fs  # compatibility

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
   1063         if mode == "rb":
   1064             if not hasattr(self, "details"):
-> 1065                 self.details = fs.info(path)
   1066             self.size = self.details["size"]
   1067             self.cache = caches[cache_type](

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
    546                     return super(S3FileSystem, self).info(path)
    547                 else:
--> 548                     raise ee
    549             except ParamValidationError as e:
    550                 raise ValueError('Failed to head path %r: %s' % (path, e))

OSError: [Errno 16] Service Unavailable

Thank you! The relevant code seemed to have changed quite a bit since 1.0.5. Do you mind updating to 1.1.4 to see whether you still have the issue (or if possible see whether it works on master).

I have updated my environment to pandas 1.1.4 and tested again.
The situation changed:

import pandas as pd

dict_ecs = {anon': False, 'key':  'my_key', 'secret': 'my_secret', 'use_ssl': False,  'client_kwargs': {'endpoint_url': 'http://....'}}

df = pd.read_parquet('s3://bucket/file.parquet', storage_options=dict_ecs)

Results in new error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-11-e75bb74df3d5> in <module>
----> 1 df = pd.read_parquet('s3://bai/transformed/omzet-gissen/2020/drivetimes_own.parquet', storage_options=dict_ecs)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
    315     """
    316     impl = get_engine(engine)
--> 317     return impl.read(path, columns=columns, **kwargs)

~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
    139 
    140         kwargs["use_pandas_metadata"] = True
--> 141         result = self.api.parquet.read_table(
    142             path, columns=columns, filesystem=fs, **kwargs
    143         ).to_pandas()

TypeError: read_table() got an unexpected keyword argument 'storage_options'

thank you! Looking at the code and the documentation, it seems that 1.1.4 does not support storage_options for read_parquet. 1.2 will support storage_options for read_parquet :)

I hope you can open the s3 file with fsspec until 1.2 will release.

Not a problem, we have a work-around, looking forward to v1.2 and testing again.
Appreciate your time and support.

Was this page helpful?
0 / 5 - 0 ratings