'Parellel Processing Spring Batch StaxEventItemReader
I have a spring batch job defined as follows.
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="contentItemReader" writer="contentItemWriter"
processor="processor" commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
The contentItemReader is as follows.
@Bean
public StaxEventItemReader contentItemReader() {
StaxEventItemReader reader = new StaxEventItemReader();
reader.setFragmentRootElementName("ContentItem");
reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
reader.setUnmarshaller(contentItemUnmarshaller());
return reader;
}
Everything works great except that its a bit slower then I would like. I know that this reader is not thread safe. So I don't think I can add a taskExecutor to the tasklet. ContentItems are not dependent on each other so I want to feed data into the processors in parallel. The ItemProcessing may be fairly time consuming. So although I know I can't have a multithreaded reader, I should be able to have a multithreaded item processing.
The ItemWriters will also need to be single threaded since I am using a flatFile ItemWriter.
What is the best way to accomplish this?
Solution 1:[1]
just wrap your reader in something like this:
public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {
private ItemReader<T> itemReader;
private boolean isStream = false;
public void setItemReader(ItemReader<T> itemReader) {
this.itemReader = itemReader;
if (itemReader instanceof ItemStream) {
isStream = true;
}
}
@Override
public void close() {
if (isStream) {
((ItemStream) itemReader).close();
}
}
@Override
public void open(ExecutionContext executionContext) {
if (isStream) {
((ItemStream) itemReader).open(new ExecutionContext());
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public synchronized T read() throws Exception {
return itemReader.read();
}
}
And the same for your writer.
Please note, that the order is no longer guaranteed.
Edited:
There was a comment on how to use it in a config.xml. So, here is a simple example how to use the Wrapper with a FlatFileItemReader:
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="wrappedReader" writer="..."
processor="..." commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
<property name="itemReader">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property .../>
<property .../>
</bean>
</property>
</bean>
Solution 2:[2]
Since version 3.0.4
Spring Batch offers a wrapper class (like Hansjoerg Wingeier) out of the box: SynchronizedItemStreamReader<T>
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | thunderhook |