'Parellel Processing Spring Batch StaxEventItemReader

I have a spring batch job defined as follows.

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="contentItemReader" writer="contentItemWriter"
                             processor="processor" commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

The contentItemReader is as follows.

 @Bean
 public StaxEventItemReader contentItemReader() {
        StaxEventItemReader reader = new StaxEventItemReader();
        reader.setFragmentRootElementName("ContentItem");
        reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
        reader.setUnmarshaller(contentItemUnmarshaller());
        return reader;
 }

Everything works great except that its a bit slower then I would like. I know that this reader is not thread safe. So I don't think I can add a taskExecutor to the tasklet. ContentItems are not dependent on each other so I want to feed data into the processors in parallel. The ItemProcessing may be fairly time consuming. So although I know I can't have a multithreaded reader, I should be able to have a multithreaded item processing.

The ItemWriters will also need to be single threaded since I am using a flatFile ItemWriter.

What is the best way to accomplish this?



Solution 1:[1]

just wrap your reader in something like this:

public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {

  private ItemReader<T> itemReader;
  private boolean isStream = false;

  public void setItemReader(ItemReader<T> itemReader) {
    this.itemReader = itemReader;
    if (itemReader instanceof ItemStream) {
      isStream = true;
    }
  }

  @Override
  public void close() {
    if (isStream) {
      ((ItemStream) itemReader).close();
    }
  }

  @Override
  public void open(ExecutionContext executionContext) {
    if (isStream) {
      ((ItemStream) itemReader).open(new ExecutionContext());
    }
  }

  @Override
  public void update(ExecutionContext executionContext) {
  }

  @Override
  public synchronized T read() throws Exception {
    return itemReader.read();
  }
}

And the same for your writer.

Please note, that the order is no longer guaranteed.

Edited:

There was a comment on how to use it in a config.xml. So, here is a simple example how to use the Wrapper with a FlatFileItemReader:

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="wrappedReader" writer="..."
                             processor="..." commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
   <property name="itemReader">
      <bean class="org.springframework.batch.item.file.FlatFileItemReader">
          <property .../>
          <property .../>
      </bean>
   </property>
</bean>

Solution 2:[2]

Since version 3.0.4 Spring Batch offers a wrapper class (like Hansjoerg Wingeier) out of the box: SynchronizedItemStreamReader<T>

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 thunderhook