'Caused by: com.jcraft.jsch.JSchException: connection is closed by foreign host while using DefaultSftpSessionFactory and SftpInboundFileSynchronizer
I am using spring batch remote partitioning to read input files and process them. To make input files available on all servers, I have added step listener which checks if the exist or not and downloads them from master.
<step id="importExchangesStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
commit-interval="${import.exchanges.commit.interval}" />
<listeners>
<listener ref="ftpGetRemoteExchangesFilesListener" />
</listeners>
</tasklet>
</step>
<job id="importExchangesJob" restartable="true">
<step id="importExchangesStep.master">
<partition partitioner="importExchangesPartitioner"
handler="importExchangesPartitionHandler" />
</step>
</job>
I am using DefaultSftpSessionFactory to download files from master servers to slave servers. There are 4 servers and consumer concurrency is 7 on each so total 28 partitions (step executions) running in parallel.
Below is sftp configuration ,
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="${master.host}" />
<beans:property name="user" value="${master.user}" />
<beans:property name="password" value="${master.password}" />
<beans:property name="port" value="22"/>
</beans:bean>
It works if it run it on only one server. But if I distributed it to 4 some partition gets completed successfully and some gets failed with this exception
Caused by: com.jcraft.jsch.JSchException: connection is closed by foreign host
Listener config:
<beans:bean id="ftpGetRemoteExchangesFilesListener"
class="com.st.batch.listeners.FtpGetRemoteFilesListener"
p:sessionFactory-ref="sftpSessionFactory" p:downloadFileAttempts="3"
p:fileNamePattern="*.txt" p:deleteLocalFiles="false"
p:localDirectory="/tmp/spring/batch/#{jobParameters[batch_id]}/exchanges/"
p:remoteDirectory="/tmp/spring/batch/#{jobParameters[batch_id]}/exchanges/"
scope="step" />
Listener Class without getters and setters referring to this
http://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/
public class FtpGetRemoteFilesListener extends StepExecutionListenerSupport implements InitializingBean
{
//private Logger logger = LoggerFactory.getLogger(FtpGetRemoteFilesTasklet.class);
private static Log logger = LogFactory.getLog(FtpGetRemoteFilesListener.class);
private File localDirectory;
private AbstractInboundFileSynchronizer<?> ftpInboundFileSynchronizer;
private SessionFactory sessionFactory;
private boolean autoCreateLocalDirectory = true;
private boolean deleteLocalFiles = true;
private String fileNamePattern;
private String remoteDirectory;
private int downloadFileAttempts = 12;
private long retryIntervalMilliseconds = 300000;
private boolean retryIfNotFound = false;
/* (non-Javadoc)
* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
*/
public void afterPropertiesSet() throws Exception
{
Assert.notNull(sessionFactory, "sessionFactory attribute cannot be null");
Assert.notNull(localDirectory, "localDirectory attribute cannot be null");
Assert.notNull(remoteDirectory, "remoteDirectory attribute cannot be null");
Assert.notNull(fileNamePattern, "fileNamePattern attribute cannot be null");
setupFileSynchronizer();
if (!this.localDirectory.exists())
{
if (this.autoCreateLocalDirectory)
{
if (logger.isDebugEnabled())
{
logger.debug("The '" + this.localDirectory + "' directory doesn't exist; Will create.");
}
this.localDirectory.mkdirs();
}
else
{
throw new FileNotFoundException(this.localDirectory.getName());
}
}
}
private void setupFileSynchronizer()
{
if (isSftp())
{
ftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
((SftpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new SftpSimplePatternFileListFilter(fileNamePattern));
}
else
{
ftpInboundFileSynchronizer = new FtpInboundFileSynchronizer(sessionFactory);
((FtpInboundFileSynchronizer) ftpInboundFileSynchronizer).setFilter(new FtpSimplePatternFileListFilter(fileNamePattern));
}
ftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
}
private void deleteLocalFiles()
{
if (deleteLocalFiles)
{
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
if (CollectionUtils.isNotEmpty(matchingFiles))
{
for (File file : matchingFiles)
{
FileUtils.deleteQuietly(file);
}
}
}
}
@Override
public void beforeStep(StepExecution stepExecution) {
deleteLocalFiles();
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
if (retryIfNotFound)
{
SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
int attemptCount = 1;
while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts)
{
logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
try {
Thread.sleep(retryIntervalMilliseconds);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
attemptCount++;
}
if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0)
{
try {
throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
Log:
12:28:47,430 ERROR SimpleAsyncTaskExecutor-3 step.AbstractStep:225 - Encountered an error executing step importExchangesStep in job importExchangesJob
java.lang.IllegalStateException: failed to create SFTP Session
at org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:266)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:143)
at com.st.batch.listeners.FtpGetRemoteFilesListener.beforeStep(FtpGetRemoteFilesListener.java:121)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at $Proxy29.beforeStep(Unknown Source)
at org.springframework.batch.core.listener.CompositeStepExecutionListener.beforeStep(CompositeStepExecutionListener.java:77)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:194)
at org.springframework.batch.integration.partition.StepExecutionRequestHandler.handle(StepExecutionRequestHandler.java:64)
at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:97)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:81)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:103)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:126)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:227)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:127)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:67)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:233)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:207)
at org.springframework.integration.amqp.inbound.AmqpInboundGateway.access$200(AmqpInboundGateway.java:47)
at org.springframework.integration.amqp.inbound.AmqpInboundGateway$1.onMessage(AmqpInboundGateway.java:87)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:693)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:586)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1113)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:559)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: failed to connect
at org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:204)
at org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:262)
... 54 more
Caused by: com.jcraft.jsch.JSchException: connection is closed by foreign host
at com.jcraft.jsch.Session.connect(Session.java:244)
at com.jcraft.jsch.Session.connect(Session.java:158)
at org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:196)
... 55 more
Is there any limit to number of simultaneous connections as 28 partitions must be trying to connect some of which succeeds and some fails on all servers or some thing else?
I am able to login through command line from all servers to master using sftp user@host.
Solution 1:[1]
It sounds like your SSH server has some limit for the number of concurrent connections from a server. It also looks like you are using Spring Integration 2.2.x (indicated by the stack trace).
Spring Integration 2.2.x uses a connection for each session.
3.0 introduced the concept of shared sessions where each "session" is a channel multiplexed over a single shared connection/session.
Add <constructor-arg value="true"/>
to enable this feature.
If you can't move to 3.0, you'll have to look at configuring the server to allow more connections.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Gary Russell |