'Read timeout in pd.read_parquet from S3, and understanding configs
I'm trying to simplify access to datasets in various file formats (csv, pickle, feather, partitioned parquet, ...) stored as S3 objects. Since some users I support have different environments with limited options for upgrading (big company, don't ask), I need to develop multiple solutions that achieve similar use cases.
One method which has worked is this:
assert pd.__version__ > '1.3.0'
import s3fs
import pandas as pd
from typing import Callable
read_methods = [pd.read_csv, pd.read_parquet, pd.read_feather, pd.read_pickle, ]
def s3_pd_read(loc: str, read_fn: Callable, **kwargs) -> pd.DataFrame:
df = read_fn(
's3a://bucket/data/month=1',
storage_options = {
"key" : os.getenv("AWS_ACCESS_KEY_ID"),
"secret" : os.getenv("AWS_SECRET_ACCESS_KEY"),
"client_kwargs": {
'verify' : os.getenv('AWS_CA_BUNDLE'),
'endpoint_url': 'https://endpoint.com/'
}
},
# Pass keyword args to pandas read method
**kwargs
)
return df
But configuring this to improve performance and availability is challenging. For example, I'd like to set configs similar to s3fs.S3FileSystem
config_kwargs "connect_timeout", "read_timeout", "max_pool_connections".
pd.read_parquet docs mention this about storage_options
config:
Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to urllib as header options. For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value pairs are forwarded to fsspec. Please see fsspec and urllib for more details.
I checked the fsspec docs but it's not obvious what key-value pairs are available to set, nor what they do. One link in it directed to http client sessions (not S3).
Similarly for the S3FileSystem + pyarrow.parquet (which doesn't work in some user environments), I have this code which works:
from s3fs import S3FileSystem
import pyarrow.parquet as pq
s3 = S3FileSystem(
s3_additional_kwargs={'access_key': os.environ['AWS_ACCESS_KEY_ID'], 'secret_key': os.environ['AWS_SECRET_ACCESS_KEY']},
client_kwargs={
'verify': os.getenv('AWS_CA_BUNDLE'), 'endpoint_url': 'https://endpoint.com/'
},
config_kwargs = {'connect_timeout': 600, 'read_timeout': 600, 'max_pool_connections': 50}
)
df = pq.read_table(
"bucket/data/month=1",
filesystem=s3
).to_pandas()
But the docs indirectly lead to client_kwarg docs, but regarding s3_additional_kwargs, it's not obvious which parameters are referred to (e.g. copy, or get-object), which get used, and with what preference when this S3FileSystem
is passed to some pyarrow
function:
s3_additional_kwargs
:dict of parameters that are used when calling s3 api methods. Typically used for things like “ServerSideEncryption”.client_kwargs
:dict of parameters for the botocore client
Can someone help elucidate this spaghetti of documentation? For example, in the case of using only pd.read_parquet
, I'd like to pass some sort of connection and/or read timeout config to reduce timeouts. How can I do things like that?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|