'How to prevent Flink job from getting cancelled by itself

Environment

My Flink Job runs on a standalone cluster, session mode. Version is 1.13 (https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode)

The issue

it reads message from kafka and sink to MySQL and HBase. I notify that the job gets cancelled by Flink after working for about two days, while the Flink Cluster is still fine.

My Observations

In flink log, I see nearly #100,000# SQL exceptions have been thrown from a MySQL sink before the job finally gets cancelled.

2021-07-15 09:26:17,455 WARN  com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink        [] - insert slice already exists with packet id: 370913 and slice_num: 8
java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '370913-8' for key 'packet_id_slice_num'
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370) ~[blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink.invoke(StagingDataSliceSink.java:153) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.exceeddata.vcloud.mysql.sink.StagingDataSliceSink.invoke(StagingDataSliceSink.java:16) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:82) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at com.exceeddata.vcloud.streaming.vdata.VDataStreamingJob$6.processElement(VDataStreamingJob.java:112) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at com.exceeddata.vcloud.streaming.vdata.VDataStreamingJob$6.processElement(VDataStreamingJob.java:86) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) [blob_p-b8b2fbfc0c701f705087cf10f8a531c9fb37a047-ca1fe017724a4f2404cbfaa614a5d62f:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [flink-dist_2.11-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) [flink-dist_2.11-1.13.1.jar:1.13.1]

However, this exception is supposed to be catched in our code, and the warning message has proven it has been catched indeed:

               if(ex.getMessage().toLowerCase().contains("unique key")
                || ex.getMessage().toLowerCase().contains("duplicate entry")) {
                // flink job does print this warning message, 
                    LOG.warn("insert slice already exists with packet id: " + packetId + " and slice_num: "  + dataSlice.getSliceNum(), ex);
                } else {
                    LOG.error("insert staging_data_slice error, packet id: " + packetId + " and slice_num: "  + dataSlice.getSliceNum(), ex);
                    throw ex;
                }

My Other observations

I fetch the message from kafka and re-produce the issue on my local environment, The flink cluster and the job are both fine

My preferred approach

I want to keep my job running

[Updated on July 20] Log of Job Manager

// before this section there are many lines of completed checkpoint xxxx for job xxxx

2021-07-16 17:01:43,878 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 19094 for job ad2b1cc19dbc5fc782e67bcea8a483ed (1061 bytes in 3 ms).
2021-07-16 17:01:44,498 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job vData streaming (ad2b1cc19dbc5fc782e67bcea8a483ed) switched from state RUNNING to CANCELLING.
2021-07-16 17:01:44,498 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: ingress_kafka -> Process -> (Sink: HBase: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice_error, <Mapping>: StagingDataSliceModel to StagingDataSliceError -> Sink: HBase: sink to staging_data_slice_error, Sink: MySQL: sink to staging_message_error, Sink: MySQL: sink to staging_data_slice_out_of_scope_error, Sink: MySQL: sink to staging_temp_vcompute) (1/1) (776d7910e418d672ffaedda4d237dbb5) switched from RUNNING to CANCELING.
2021-07-16 17:01:44,513 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: ingress_kafka -> Process -> (Sink: HBase: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice, Sink: MySQL: sink to staging_data_slice_error, <Mapping>: StagingDataSliceModel to StagingDataSliceError -> Sink: HBase: sink to staging_data_slice_error, Sink: MySQL: sink to staging_message_error, Sink: MySQL: sink to staging_data_slice_out_of_scope_error, Sink: MySQL: sink to staging_temp_vcompute) (1/1) (776d7910e418d672ffaedda4d237dbb5) switched from CANCELING to CANCELED.
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job vData streaming (ad2b1cc19dbc5fc782e67bcea8a483ed) switched from state CANCELLING to CANCELED.
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job ad2b1cc19dbc5fc782e67bcea8a483ed
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job ad2b1cc19dbc5fc782e67bcea8a483ed.
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.checkpoint.CompletedCheckpoint      [] - Checkpoint with ID 19094 at 'file:/home/edgeuser/flinkData/checkpoints/ad2b1cc19dbc5fc782e67bcea8a483ed/chk-19094' not discarded.
2021-07-16 17:01:44,514 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job ad2b1cc19dbc5fc782e67bcea8a483ed reached terminal state CANCELED.
2021-07-16 17:01:44,523 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Stopping the JobMaster for job vData streaming(ad2b1cc19dbc5fc782e67bcea8a483ed).
2021-07-16 17:01:44,523 INFO  org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [ef791c2d1fd5461b6a15306f4dcd682f].
2021-07-16 17:01:44,525 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job ad2b1cc19dbc5fc782e67bcea8a483ed
2021-07-16 17:01:44,525 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Close ResourceManager connection efe52ea6c6b1862bf96eab2b63616f43: Stopping JobMaster for job vData streaming(ad2b1cc19dbc5fc782e67bcea8a483ed)..
2021-07-16 17:01:44,525 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager [email protected]://flink@localhost:6123/user/rpc/jobmanager_8 for job ad2b1cc19dbc5fc782e67bcea8a483ed from the resource manager.
2021-07-16 17:04:56,790 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://[email protected]:41366] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2021-07-16 17:04:56,790 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [akka.tcp://[email protected]:37250] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
2021-07-16 17:04:57,456 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2021-07-16 17:04:57,457 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:34942
2021-07-16 17:04:57,457 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-07-16 17:04:57,458 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint   [] - Shutting down rest endpoint.
2021-07-16 17:04:57,488 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2021-07-16 17:04:57,488 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2021-07-16 17:04:57,502 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-07-16 17:04:57,502 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2021-07-16 17:04:57,520 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
2021-07-16 17:04:57,520 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source