'Apache Flink: AWS S3 timeout exception when starting a job from a savepoint

I have a Flink job which has large state in a Map operator. We are taking savepoint which has around 80GB storing to AWS S3. We have around 100 parallelism for this operator. However, when we recover from the savepoint, there is always exception like

Caused by: java.io.InterruptedIOException: Failed to open s3a://xxxx/UbfK/flink/savepoints/logjoin/savepoint-a9600e-39fd2cc07076/f9a31490-461a-42f3-88be-ec169145c35f at 0 on s3a://adshonor-data-cube-test-apse1/UbfK/flink/savepoints/logjoin/savepoint-a9600e-39fd2cc07076/f9a31490-461a-42f3-88be-ec169145c35f: org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool.

Is there a configuration parameter to increase the timeout settings for AWS S3 in Flink or another way to avoid this error?



Solution 1:[1]

Try setting fs.s3a.connection.maximum to something like 50 or 100

Solution 2:[2]

to elaborate a bit on what Steve said...it's likely that the problem is the HTTP client being used doesn't have a large enough "connection pool".

Each S3A client interacting with a single bucket, as a single user, has its own dedicated pool of open HTTP 1.1 connections alongside a pool of threads used for upload and copy operations. The default pool sizes are intended to strike a balance between performance and memory/thread use.

For a good overview of things that you can tune (which includes fs.s3a.connection.max see the "Options to Tune" section of this Hadoop page.

Solution 3:[3]

Because Flink uses AWS connection code, the setting to bump is fs.s3.maxConnections, which isn't the same as a pure Hadoop configuration.

When running on AWS EMR, you can refer to this document: https://aws.amazon.com/cn/premiumsupport/knowledge-center/emr-timeout-connection-wait/

Solution 4:[4]

For anyone that finds this - according to the Flink documentation

For example, Hadoop has a fs.s3a.connection.maximum configuration key. If you want to change it, you need to put s3.connection.maximum: xyz to the flink-conf.yaml. Flink will internally translate this back to fs.s3a.connection.maximum.

So to set the value for maximum connections you actually want to put s3.connection.maximum: xyz (or s3a.connection.maximum: xyz which worked for me) in your Flink config to change this value.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 stevel
Solution 2 kkrugler
Solution 3 Declan
Solution 4 Adriank