'Reliable alternative to Java Watch Service

I am using Java nio's WatchService but I find it quite unreliable for the following use cases:

  1. When a very big file (>500 megs) is written to the directory being watched, if I rely on ENTRY_CREATE event, the file is often times not yet ready to read - it's still being written by the other thread. So I normally resort to listening for ENTRY_MODIFY instead.

  2. But when thousands of small sized (~2Kb) files are copied to the watched directory, the ENTRY_CREATE or ENTRY_MODIFY doesn't get invoked for 80% of them!

Has anyone else faced this? And is there a better more reliable lib or should I simply switch to a blocking queue imlementation where the file copier adds the file name into the queue and the consumer thread handles processing of the files?

Code around WatchService implementation:


            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.MEDIUM
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    log.info( "Event kind: {} . File affected: {}.", event.kind(), event.context() );
            // Processing the file..
                }
                key.reset();
            }

 


Solution 1:[1]

Based on the helpful comments by DuncG and Jim Garrison, I realised that Watch Service is sensitive to the processing times of each notification. I was copying 6,416 files to the folder it was watching and if I was doing anything more than logging the ENTRY_XX event, it was missing on many of the updates.

Here is what worked for me:

  1. While handling ENTRY_XX events, I am writing it to a LMAX Disruptor which has ring buffer size higher than max files expected in a batch (I set it to 2^19 i.e. 524,288 slots, and it's good enough to handle 50k or more file updates without blocking, assuming file will have 10 watch service notications).

[PS: writing to a simple ExecutorService queue didn't help due to latency around thread synchronisation. I only got 1273 file names out of the 6,416!].

  1. I also had to warm up the ring buffer or else I was still missing some Watch Service updates. I am filling all its slots with test messages and then sending an async event to the function which copies 6,416 files to the watched folder.

Sample code:

// publishing file names from watch service event to Disrupto ring buffer
 
    private void watchStagingFolder() {
        try {
            WatchService watchService = FileSystems.getDefault().newWatchService();
            Path path = Paths.get( coreProperties.getStagingLocation() );
            path.register( watchService,
                    new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                    SensitivityWatchEventModifier.HIGH
            );

            WatchKey key;
            while ( ( key = watchService.take() ) != null ) {
                log.info( "key found: {}", key );
                for ( WatchEvent<?> event : key.pollEvents() ) {
                    String eventKindStr = event.kind().name();
                    log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
                    if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
                        String fileName = event.context().toString();
                        log.info( "File to be processed: {}", fileName );
                        fileProcessorDisruptorEventProducer.send( fileName );
                    } else {
                        log.info( "Ignoring event kind {}", event.kind() );
                    }
                }
                key.reset();
            }
        } catch ( Exception e ) {
            log.error( "Found error while watching the staging directory.", e );
        }
    }

// ensuring Disruptor ring buffer is warmed up
@Component
@RequiredArgsConstructor
@Slf4j
public class DisruptorWarmer {
    public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
    private final CoreProperties coreProperties;
    private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;

    @PostConstruct
    public void init() {
        int bufferSize = coreProperties.getDisruptor().getBufferSize();
        for ( int i = 0; i < bufferSize; i++ ) {
            fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
        }
        log.info( "Warmed up disruptor with {} test messages.", bufferSize );
    }
}

// processing files in the Disruptor consumer/handler
    @Override
    public void onEvent( Msg msg, long l, boolean b ) {
        try {
            if ( count < bufferSize ) {
                log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
                count++;
            } else if ( count == bufferSize ) {
                log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
                newSingleThreadExecutor.submit( () ->
                        applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
                );
                count++;
            } else {
                log.debug( "File: {}", msg.getPayload() );
############ 
// no longer worried about slow processing impacting watch service
                processFile( ( String ) msg.getPayload() );
############
            }
        } catch ( RuntimeException rte ) {
            log.error( "Found error while processing msg: [{}]. Skipping to next message.", msg, rte );
        }
    }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1