'Kafka producer property enable.idempotence=true is causing error
On setting kafka producer property - enable.idempotence to true
kafkaProps.put("enable.idempotence" , "true");
I am getting below error -
2021-04-18 16:43:53.584[0;39m [31mERROR[0;39m [35m15524[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36mo.a.k.clients.producer.internals.Sender [0;39m [2m:[0;39m [Producer clientId=producer-1] Aborting producer batches due to fatal error
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2m2021-04-18 16:43:53.585[0;39m [31mERROR[0;39m [35m15524[0;39m [2m---[0;39m [2m[ restartedMain][0;39m [36mc.a.c.g.kafkaclient.PricerProducer [0;39m [2m:[0;39m sending above record failed. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2m
Does the cluster have to support/enable this feature. If so, what is the minimum version of Kafka the cluster should be on.
From kafka docs -
enable.idempotence
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown. Type: boolean Default: false Valid Values: Importance: low
I have set - max.in.flight.requests.per.connection=1 and acks is unset, so it's automatically set to -1(all). So, i see my configuration is fine, but even otherwise it should result in to ConfigException not ClusterAuthorizationException.
Solution 1:[1]
According to the kafka doc's its an autorization issue. Under "Authorization and ACLs" you'll find operations like "IdempotentWrite". You must be authorized to do "IdempotentWrite"'s how it looks like. So you need to add this privilege via acl for example. Operation "IdempotentWrite" for resource "cluster"
An idempotent produce action requires this privilege.
as described here: https://kafka.apache.org/documentation/#operations_resources_and_protocols
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |