'Reliable alternative to Java Watch Service
I am using Java nio's WatchService but I find it quite unreliable for the following use cases:
When a very big file (>500 megs) is written to the directory being watched, if I rely on ENTRY_CREATE event, the file is often times not yet ready to read - it's still being written by the other thread. So I normally resort to listening for ENTRY_MODIFY instead.
But when thousands of small sized (~2Kb) files are copied to the watched directory, the ENTRY_CREATE or ENTRY_MODIFY doesn't get invoked for 80% of them!
Has anyone else faced this? And is there a better more reliable lib or should I simply switch to a blocking queue imlementation where the file copier adds the file name into the queue and the consumer thread handles processing of the files?
Code around WatchService implementation:
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get( coreProperties.getStagingLocation() );
path.register( watchService,
new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
SensitivityWatchEventModifier.MEDIUM
);
WatchKey key;
while ( ( key = watchService.take() ) != null ) {
for ( WatchEvent<?> event : key.pollEvents() ) {
log.info( "Event kind: {} . File affected: {}.", event.kind(), event.context() );
// Processing the file..
}
key.reset();
}
Solution 1:[1]
Based on the helpful comments by DuncG and Jim Garrison, I realised that Watch Service is sensitive to the processing times of each notification. I was copying 6,416 files to the folder it was watching and if I was doing anything more than logging the ENTRY_XX event, it was missing on many of the updates.
Here is what worked for me:
- While handling ENTRY_XX events, I am writing it to a LMAX Disruptor which has ring buffer size higher than max files expected in a batch (I set it to 2^19 i.e. 524,288 slots, and it's good enough to handle 50k or more file updates without blocking, assuming file will have 10 watch service notications).
[PS: writing to a simple ExecutorService queue didn't help due to latency around thread synchronisation. I only got 1273 file names out of the 6,416!].
- I also had to warm up the ring buffer or else I was still missing some Watch Service updates. I am filling all its slots with test messages and then sending an async event to the function which copies 6,416 files to the watched folder.
Sample code:
// publishing file names from watch service event to Disrupto ring buffer
private void watchStagingFolder() {
try {
WatchService watchService = FileSystems.getDefault().newWatchService();
Path path = Paths.get( coreProperties.getStagingLocation() );
path.register( watchService,
new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
SensitivityWatchEventModifier.HIGH
);
WatchKey key;
while ( ( key = watchService.take() ) != null ) {
log.info( "key found: {}", key );
for ( WatchEvent<?> event : key.pollEvents() ) {
String eventKindStr = event.kind().name();
log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
String fileName = event.context().toString();
log.info( "File to be processed: {}", fileName );
fileProcessorDisruptorEventProducer.send( fileName );
} else {
log.info( "Ignoring event kind {}", event.kind() );
}
}
key.reset();
}
} catch ( Exception e ) {
log.error( "Found error while watching the staging directory.", e );
}
}
// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
private final CoreProperties coreProperties;
private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;
@PostConstruct
public void init() {
int bufferSize = coreProperties.getDisruptor().getBufferSize();
for ( int i = 0; i < bufferSize; i++ ) {
fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
}
log.info( "Warmed up disruptor with {} test messages.", bufferSize );
}
}
// processing files in the Disruptor consumer/handler
@Override
public void onEvent( Msg msg, long l, boolean b ) {
try {
if ( count < bufferSize ) {
log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
count++;
} else if ( count == bufferSize ) {
log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
newSingleThreadExecutor.submit( () ->
applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
);
count++;
} else {
log.debug( "File: {}", msg.getPayload() );
############
// no longer worried about slow processing impacting watch service
processFile( ( String ) msg.getPayload() );
############
}
} catch ( RuntimeException rte ) {
log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |