'Camel SFTP with Quartz scheduler: prevent concurrent execution, reject task if busy
- Camel 3.11
- Spring Boot 2.5
- Java 11
I have a dynamic Camel routes that fetches files from an SFTP server. There is one route per SFTP user. The routes are scheduled by a cron job.
sftp://localhost:55443/reports?
binary=true
&bridgeErrorHandler=true
&connectTimeout=10000
&delete=true
&disconnect=true
&fastExistsCheck=true
&hash=200f9d2c
&jschLoggingLevel=WARN
&maximumReconnectAttempts=0
&passiveMode=true
&privateKey=[PRIVATEKEY]
&readLockLoggingLevel=WARN
&runLoggingLevel=WARN
&scheduler=quartz
&scheduler.cron=*+*+*+%3F+*+*+*
&scheduler.stateful=true
&soTimeout=10000
&sortBy=file%3Amodified
&throwExceptionOnConnectFailed=true
&timeout=10000
&username=sftpuser
(Here I have increased the cron-rate for test purposes to cause the error.)
Retrieved files are processed, filtered, enriched, and sent to messaging queues.
Sometimes these files take longer to process than the cron interval. I presume Camel is supposed to have some sort of queue to prevent concurrent SFTP runs, but this isn't working. It also seems like the queue is allowing for multiple jobs to queue up for the same task, and isn't clearing the queue if the state of the SFTP server changes.
When the jobs overlap then exceptions occur. Sometimes the files get loaded multiple times. Sometimes there's an error saying the file can't be deleted. The end result is duplicated data in the messaging queues.
Errors
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory
...
Caused by: java.io.IOException: Pipe closed
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot delete file
...
Caused by: com.jcraft.jsch.SftpException: No such file
What I want
- Create 1 SFTP route per user (done)
- For each SFTP route,
- run it at least once per (done)
- only allow for one instance at a time (?)
- don't queue executions at all (?)
Attempted fixes
SFTP route properties
There's no change when these properties are set/enabled:
useFixedDelay
idempotent
readLock
fastExistsCheck
ScheduledExecutorService
If I create a single threaded executor service for all SFTP routes, it doesn't help.
private final ScheduledExecutorService sftpExecutor = Executors.newSingleThreadScheduledExecutor();
...
<create route>
.scheduledExecutorService(sftpExecutor)
Spring Boot
It doesn't look like there's any pertinent config here - though it's hard to tell with such terse documentation.
schedulerProperties
I thought maybe schedulerProperties
would help, but nothing I set works. There's no documentation or examples for this. for? Why doesn't setting anything with it work?
.schedulerProperties("stateful", true)
.schedulerProperties("scheduler.stateful", true
.schedulerProperties("threadCount", "1")
Result: Failed to create Consumer for endpoint: Reason: There are 1 scheduler parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{stateful=true}]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|