Connecting to https://bucket.s3.amazonaws.com rather then private endpoint
df = pd.read_parquet('s3://bucket/file.parquet', storage_options={'client_kwargs':{'endpoint_url': S3_ENDPOINT}})
as workaround I do:
# ecs is a S3FileSystem object with client_kwargs
with ecs.open('bucket/file.parquet', 'rb') as f:
df = pd.read_parquet(f)
Connecting to private S3 bucket
pd.show_versions()commit : None
python : 3.8.3.final.0
python-bits : 64
OS : Linux
OS-release : 3.10.0-1127.19.1.el7.x86_64
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : None
LANG : en_US.UTF-8
LOCALE : en_US.UTF-8
pandas : 1.0.5
numpy : 1.18.5
pytz : 2020.1
dateutil : 2.8.1
pip : 20.1.1
setuptools : 49.1.0.post20200704
Cython : None
pytest : None
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : None
html5lib : None
pymysql : None
psycopg2 : None
jinja2 : 2.11.2
IPython : 7.16.1
pandas_datareader: None
bs4 : None
bottleneck : None
fastparquet : 0.4.0
gcsfs : None
lxml.etree : None
matplotlib : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 0.17.1
pytables : None
pytest : None
pyxlsb : None
s3fs : 0.4.2
scipy : None
sqlalchemy : 1.3.18
tables : None
tabulate : None
xarray : None
xlrd : None
xlwt : None
xlsxwriter : None
numba : 0.50.1
thank you for your report @MarkiesFredje!
Pandas is internally using fsspec to open s3 files. Do you mind trying to use fsspec to explicitly open a file, it should be something like:
import fsspec
file_obj = fsspec.open("s3:///..." mode="rb", **storage_options)).open()
That will help us to see whether it is an issue with fsspec or not.
read_parquet, is actually using a slightly different code path (to also work with directories):
fs, path = fsspec.core.url_to_fs(path, **storage_options)
result = pyarrow.api.parquet.read_table(path, columns=columns, filesystem=fs, use_pandas_metadata=True).to_pandas()
Thanks for your reply @twoertwein , I learned a few things from your post
Good news is, your test went just fine. I share my code below.
import fsspec
import pyarrow.parquet as pq
dict_ecs = {anon': False, 'key': 'my_key', 'secret': 'my_secret', 'use_ssl': False, 'client_kwargs': {'endpoint_url': 'http://....'}}
file_obj = fsspec.open("s3://bucket/file.parquet", mode="rb", **dict_ecs).open()
fs, path = fsspec.core.url_to_fs("s3://bucket/file.parquet", **dict_ecs)
result = pq.read_table(path, filesystem=fs, use_pandas_metadata=True).to_pandas()
result.head()
file_objis fine, same for resultdataframe.
thank you for testing! Can you please provide the error message you are getting with (including which file throws the error)
df = pd.read_parquet('s3://bucket/file.parquet', storage_options={'client_kwargs':{'endpoint_url': S3_ENDPOINT}})
I added a trace at the bottom of the post, but I suspect its not very helpful.
I suspect dict_ecs parameters are ignored or lost in translation somewhere
The code attempts to connect to AWS, which it shouldn't since I pass a different endpoint.
I believe the trace reflects the firewall blocking off traffic to AWS.
import pandas as pd
dict_ecs = {anon': False, 'key': 'my_key', 'secret': 'my_secret', 'use_ssl': False, 'client_kwargs': {'endpoint_url': 'http://....'}}
df = pd.read_parquet('s3://bucket/file.parquet', storage_options=dict_ecs)
---------------------------------------------------------------------------
NoCredentialsError Traceback (most recent call last)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_file_and_filesystem(filepath_or_buffer, mode)
28 try:
---> 29 file = fs.open(_strip_schema(filepath_or_buffer), mode)
30 except (FileNotFoundError, NoCredentialsError):
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
768 ac = kwargs.pop("autocommit", not self._intrans)
--> 769 f = self._open(
770 path,
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, **kwargs)
374
--> 375 return S3File(self, path, mode, block_size=block_size, acl=acl,
376 version_id=version_id, fill_cache=fill_cache,
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays)
1095 self.req_kw = {'RequestPayer': 'requester'} if requester_pays else {}
-> 1096 super().__init__(s3, path, mode, block_size, autocommit=autocommit,
1097 cache_type=cache_type)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
1064 if not hasattr(self, "details"):
-> 1065 self.details = fs.info(path)
1066 self.size = self.details["size"]
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
528 try:
--> 529 out = self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
530 Key=key, **version_id_kw(version_id), **self.req_kw)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
199 **kwargs)
--> 200 return method(**additional_kwargs)
201
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
315 # The "self" in this scope is referring to the BaseClient.
--> 316 return self._make_api_call(operation_name, kwargs)
317
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
620 else:
--> 621 http, parsed_response = self._make_request(
622 operation_model, request_dict, request_context)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_request(self, operation_model, request_dict, request_context)
640 try:
--> 641 return self._endpoint.make_request(operation_model, request_dict)
642 except Exception as e:
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in make_request(self, operation_model, request_dict)
101 operation_model, request_dict)
--> 102 return self._send_request(request_dict, operation_model)
103
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in _send_request(self, request_dict, operation_model)
131 attempts = 1
--> 132 request = self.create_request(request_dict, operation_model)
133 context = request_dict['context']
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/endpoint.py in create_request(self, params, operation_model)
114 op_name=operation_model.name)
--> 115 self._event_emitter.emit(event_name, request=request,
116 operation_name=operation_model.name)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
355 aliased_event_name = self._alias_event_name(event_name)
--> 356 return self._emitter.emit(aliased_event_name, **kwargs)
357
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in emit(self, event_name, **kwargs)
227 """
--> 228 return self._emit(event_name, kwargs)
229
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/hooks.py in _emit(self, event_name, kwargs, stop_on_response)
210 logger.debug('Event %s: calling handler %s', event_name, handler)
--> 211 response = handler(**kwargs)
212 responses.append((handler, response))
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/signers.py in handler(self, operation_name, request, **kwargs)
89 # Don't call this method directly.
---> 90 return self.sign(operation_name, request)
91
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/signers.py in sign(self, operation_name, request, region_name, signing_type, expires_in, signing_name)
159
--> 160 auth.add_auth(request)
161
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/auth.py in add_auth(self, request)
356 if self.credentials is None:
--> 357 raise NoCredentialsError
358 datetime_now = datetime.datetime.utcnow()
NoCredentialsError: Unable to locate credentials
During handling of the above exception, another exception occurred:
ClientError Traceback (most recent call last)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
528 try:
--> 529 out = self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
530 Key=key, **version_id_kw(version_id), **self.req_kw)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
199 **kwargs)
--> 200 return method(**additional_kwargs)
201
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
315 # The "self" in this scope is referring to the BaseClient.
--> 316 return self._make_api_call(operation_name, kwargs)
317
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
634 error_class = self.exceptions.from_code(error_code)
--> 635 raise error_class(parsed_response, operation_name)
636 else:
ClientError: An error occurred (503) when calling the HeadObject operation (reached max retries: 4): Service Unavailable
During handling of the above exception, another exception occurred:
OSError Traceback (most recent call last)
<ipython-input-11-e75bb74df3d5> in <module>
----> 1 df = pd.read_parquet('s3://bai/transformed/omzet-gissen/2020/drivetimes_own.parquet', storage_options=dict_ecs)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
310
311 impl = get_engine(engine)
--> 312 return impl.read(path, columns=columns, **kwargs)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
121
122 def read(self, path, columns=None, **kwargs):
--> 123 path, _, _, should_close = get_filepath_or_buffer(path)
124
125 kwargs["use_pandas_metadata"] = True
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/common.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
182 from pandas.io import s3
183
--> 184 return s3.get_filepath_or_buffer(
185 filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
186 )
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_filepath_or_buffer(filepath_or_buffer, encoding, compression, mode)
46 mode: Optional[str] = None,
47 ) -> Tuple[IO, Optional[str], Optional[str], bool]:
---> 48 file, _fs = get_file_and_filesystem(filepath_or_buffer, mode=mode)
49 return file, None, compression, True
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/s3.py in get_file_and_filesystem(filepath_or_buffer, mode)
36 # for that bucket.
37 fs = s3fs.S3FileSystem(anon=True)
---> 38 file = fs.open(_strip_schema(filepath_or_buffer), mode)
39 return file, fs
40
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
767 else:
768 ac = kwargs.pop("autocommit", not self._intrans)
--> 769 f = self._open(
770 path,
771 mode=mode,
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in _open(self, path, mode, block_size, acl, version_id, fill_cache, cache_type, autocommit, requester_pays, **kwargs)
373 cache_type = self.default_cache_type
374
--> 375 return S3File(self, path, mode, block_size=block_size, acl=acl,
376 version_id=version_id, fill_cache=fill_cache,
377 s3_additional_kwargs=kw, cache_type=cache_type,
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in __init__(self, s3, path, mode, block_size, acl, version_id, fill_cache, s3_additional_kwargs, autocommit, cache_type, requester_pays)
1094 self.s3_additional_kwargs = s3_additional_kwargs or {}
1095 self.req_kw = {'RequestPayer': 'requester'} if requester_pays else {}
-> 1096 super().__init__(s3, path, mode, block_size, autocommit=autocommit,
1097 cache_type=cache_type)
1098 self.s3 = self.fs # compatibility
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/fsspec/spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
1063 if mode == "rb":
1064 if not hasattr(self, "details"):
-> 1065 self.details = fs.info(path)
1066 self.size = self.details["size"]
1067 self.cache = caches[cache_type](
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/s3fs/core.py in info(self, path, version_id, refresh)
546 return super(S3FileSystem, self).info(path)
547 else:
--> 548 raise ee
549 except ParamValidationError as e:
550 raise ValueError('Failed to head path %r: %s' % (path, e))
OSError: [Errno 16] Service Unavailable
Thank you! The relevant code seemed to have changed quite a bit since 1.0.5. Do you mind updating to 1.1.4 to see whether you still have the issue (or if possible see whether it works on master).
I have updated my environment to pandas 1.1.4 and tested again.
The situation changed:
import pandas as pd
dict_ecs = {anon': False, 'key': 'my_key', 'secret': 'my_secret', 'use_ssl': False, 'client_kwargs': {'endpoint_url': 'http://....'}}
df = pd.read_parquet('s3://bucket/file.parquet', storage_options=dict_ecs)
Results in new error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-11-e75bb74df3d5> in <module>
----> 1 df = pd.read_parquet('s3://bai/transformed/omzet-gissen/2020/drivetimes_own.parquet', storage_options=dict_ecs)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
315 """
316 impl = get_engine(engine)
--> 317 return impl.read(path, columns=columns, **kwargs)
~/.conda/envs/drill-parquet/lib/python3.8/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
139
140 kwargs["use_pandas_metadata"] = True
--> 141 result = self.api.parquet.read_table(
142 path, columns=columns, filesystem=fs, **kwargs
143 ).to_pandas()
TypeError: read_table() got an unexpected keyword argument 'storage_options'
thank you! Looking at the code and the documentation, it seems that 1.1.4 does not support storage_options for read_parquet. 1.2 will support storage_options for read_parquet :)
I hope you can open the s3 file with fsspec until 1.2 will release.
Not a problem, we have a work-around, looking forward to v1.2 and testing again.
Appreciate your time and support.