'StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
This is regarding upgrading existing code base in production which uses windowing from kafka-clients,kafka-streams,spring-kafka 2.4.0 to 2.6.x and also upgrading spring-boot-starter-parentfrom 2.2.2.RELEASE to 2.3.x as 2.2 is incompatible with kafka-streams 2.6.
The existing code had these beans mentioned below with old verions(2.4.0,2.2 spring release):
@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder) {
//Your topology code
return streamsBuilder.build();
}
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}
Now after upgrading kafka streams,kafka clients to to 2.6.2 and spring kafka to 2.6.x, the following exception was observed:
2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
Solution 1:[1]
A similar Error can happen when you are running multiple of the same application(name/id)
on the same machine
.
Please visite State.dir to get the idea.
you can add that in Kafka configurations and make it unique per each instance
In case you are using spring cloud stream (cann't have same port in the same machine):
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: ${spring.application.name}${server.port}
Solution 2:[2]
If you have a sophisticated Kafka Streams topology in your Spring Cloud Streams Kafka Streams Binder 3.0 style application, you might need to specify different application ids for different functions like the following:
spring.cloud.stream.function.definition: myFirstStream;mySecondStream
...
spring.cloud.stream.kafka.streams:
binder:
functions:
myFirstStream:
applicationId: app-id-1
mySecondStream:
applicationId: app-id-2
Solution 3:[3]
The problem here is newer versions of spring-kafka is initializing one more instance of kafka streams based on topology bean automatically and another bean of generickafkaStreams is getting initialized from existing code base which is resulting in multiple threads trying to lock over state directory and thus the error.
Even disabling the KafkaAutoConfiguration at spring boot level does not disable this behavior. This was such a pain to identify and lost lot of time.
The fix is to get rid of topology bean and have our own custom kafka streams bean as below code:
protected Topology customTopology() {
//topology code
return streamsBuilder.build();
}
/**
* This starts kafka stream application and sets the state listener and state
* store listener.
*
* @return KafkaStreams
*/
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
KafkaStreams kafkaStreams = new KafkaStreams(customTopology(), kstreamsconfigs);
return kafkaStreams;
}
Solution 4:[4]
I've handled problem on versions:
- org.springframework.boot version 2.5.3
- org.springframework.kafka:spring-kafka:2.7.5
- org.apache.kafka:kafka-clients:2.8.0
- org.apache.kafka:kafka-streams:2.8.0
Check this: State directory
By default it is created in temp folder with kafka streams app id like: /var/folders/xw/xgslnvzj1zj6wp86wpd8hqjr0000gn/T/kafka-streams/${spring.kafka.streams.application-id}/.lock
If two or more Kafka Streams apps use the same spring.kafka.streams.application-id then you get this exception. So just change your Kafka Streams apps id's.
Or set directory option manually StreamsConfig.STATE_DIR_CONFIG in streams config.
Solution 5:[5]
Above answers to set state dir works perfectly for me. Thanks. Adding one observation that might be helpful for someone working with spring-boot. When working on same machine and trying to bring up multiple kafka stream application instances and If you have enabled property spring.devtools.restart.enabled (which mostly is the case in dev profile), you might want to disable it as when the same application instance restarts automatically it might not get store lock. This is what I was facing and was able to resolve by disabling restart behavior.
Solution 6:[6]
In my case perfectly works specyfing separate @TestConfiguration
class in which I specify counter for changing application name for each SpringBoot Test Context.
@TestConfiguration
public class TestKafkaStreamsConfig {
private static final AtomicInteger COUNTER = new AtomicInteger();
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration kStreamsConfig() {
final var props = new HashMap<String, Object>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-id-" + COUNTER.getAndIncrement());
// rest of configuration
return new KafkaStreamsConfiguration(props);
}
}
Of course I had to enable spring bean overriding to replace primary configuration.
Edit: I'm using SpringBoot v. 2.5.10 so in my case to make use of @TestConfiguration
i have to pass it to @SpringBootTest(classes =)
annotation.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Jay |
Solution 2 | Sergey Shcherbakov |
Solution 3 | Dharman |
Solution 4 | Yuriy Kiselev |
Solution 5 | Vikas Joshi |
Solution 6 | Jacek Sawko |