'Read timeout in pd.read_parquet from S3, and understanding configs

I'm trying to simplify access to datasets in various file formats (csv, pickle, feather, partitioned parquet, ...) stored as S3 objects. Since some users I support have different environments with limited options for upgrading (big company, don't ask), I need to develop multiple solutions that achieve similar use cases.

One method which has worked is this:

assert pd.__version__ > '1.3.0'
import s3fs
import pandas as pd
from typing import Callable

read_methods = [pd.read_csv, pd.read_parquet, pd.read_feather, pd.read_pickle, ]

def s3_pd_read(loc: str, read_fn: Callable, **kwargs) -> pd.DataFrame:
    df = read_fn(
        's3a://bucket/data/month=1',
        storage_options = {
            "key"          : os.getenv("AWS_ACCESS_KEY_ID"),
            "secret"       : os.getenv("AWS_SECRET_ACCESS_KEY"),
            "client_kwargs": {
                'verify'      : os.getenv('AWS_CA_BUNDLE'),
                'endpoint_url': 'https://endpoint.com/'
            }
        },
        # Pass keyword args to pandas read method
        **kwargs
    )
    return df

But configuring this to improve performance and availability is challenging. For example, I'd like to set configs similar to s3fs.S3FileSystem config_kwargs "connect_timeout", "read_timeout", "max_pool_connections".

pd.read_parquet docs mention this about storage_options config:

Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to urllib as header options. For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value pairs are forwarded to fsspec. Please see fsspec and urllib for more details.

I checked the fsspec docs but it's not obvious what key-value pairs are available to set, nor what they do. One link in it directed to http client sessions (not S3).

Similarly for the S3FileSystem + pyarrow.parquet (which doesn't work in some user environments), I have this code which works:

from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = S3FileSystem(
    s3_additional_kwargs={'access_key': os.environ['AWS_ACCESS_KEY_ID'], 'secret_key': os.environ['AWS_SECRET_ACCESS_KEY']},
    client_kwargs={
        'verify': os.getenv('AWS_CA_BUNDLE'), 'endpoint_url': 'https://endpoint.com/'
    },
    config_kwargs = {'connect_timeout': 600, 'read_timeout': 600, 'max_pool_connections': 50}
)

df = pq.read_table(
    "bucket/data/month=1", 
    filesystem=s3
).to_pandas()

But the docs indirectly lead to client_kwarg docs, but regarding s3_additional_kwargs, it's not obvious which parameters are referred to (e.g. copy, or get-object), which get used, and with what preference when this S3FileSystem is passed to some pyarrow function:

s3_additional_kwargs:dict of parameters that are used when calling s3 api methods. Typically used for things like “ServerSideEncryption”. client_kwargs:dict of parameters for the botocore client

Can someone help elucidate this spaghetti of documentation? For example, in the case of using only pd.read_parquet, I'd like to pass some sort of connection and/or read timeout config to reduce timeouts. How can I do things like that?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source