'kerberos error while authenticating on Confluent Kafka
I´ve been trying to understand apache beam, confluent kafka and dataflow integration with python 3.8 and beam sdk 2.7 the desire result is to build a pipeline (which is going to be ran on dataflow) which consumes from confluent kafka and and just logs the messages on gcp.(I'm using JDK 17 btw)
This is the code I´m using:
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
import json
import logging
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
with open('cluster.configuration.json') as cluster:
data=json.load(cluster)
cluster.close()
def logger(element):
logging.INFO('Something was found')
config={
"bootstrap.servers":data["bootstrap.servers"],
"security.protocol":"SASL_SSL",
"sasl.mechanisms":"PLAIN",
"session.timeout.ms":data["session.timeout.ms"],
"group.id":"tto",
"sasl.jaas.config":f'org.apache.kafka.common.security.plain.PlainLoginModule required serviceName="Kafka" username=\"{data["sasl.username"]}\" password=\"{data["sasl.password"]}\";',
"auto.offset.reset":"earliest"
}
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',project='project',experiments=['use_runner_v2'],streaming=True,save_main_session=True,job_name='kafka-stream-test')
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config=config,topics=['users'])
msgs | beam.FlatMap(logger)
if __name__ == '__main__':
main()
I have tested this pipeline with dataflow but also with direct runner and on both runners I get a this error: "Timeout while fetching topic metadata".
This error seems to be caused because of the consumer being unable to authenticate to confluent kafka since I get these warnings:
WARNING:root:severity: WARN
timestamp {
seconds: 1650473787
nanos: 331000000
}
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Bootstrap broker "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null) disconnected"
instruction_id: "bundle_1"
transform_id: "ReadFromKafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/ParDo(GenerateKafkaSourceDescriptor)/ParMultiDo(GenerateKafkaSourceDescriptor)"
log_location: "org.apache.kafka.clients.NetworkClient"
thread: "21"
And after this warning I get this other warning:
message: "[Consumer clientId=consumer-tto-1, groupId=tto] Error connecting to node "Here is suposed to be mi broker ip but I wont show it" (id: -1 rack: null)"
trace: "java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]\n\tat
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
Two important things are that I already build a consumer on python but without the ReadFromKafka apache beam io and it connects and consumes perfectly to the topic so the credentials I'm using are the same and I have the same protocol "SASL_SSL""PLAIN" (related to this also I don't have any idea why is a kerberos error popping since I'm not using kerberos authentication)... the other thing is that the transform 'ReadFromKafka' is used through a expansion service since this transform is only supported by java but with apache beam I can use it on Python.
Solution 1:[1]
Ok the mistake was really simple to fix, I had a typo in 'sasl.mechanisms' so the property wasn't getting recognized.
Instead of sasl.mechanisms use sasl.mechanism.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Christian |