'kafka-consumer-groups.sh does not show lag information for spark streaming + kafka
I used Spark Streaming to consume Kafka. It consumed messages successfully, however, I could not get information about lag by using kafka-consumer-groups.sh.
The spark code is:
val SPARK_STREAMING_CONTEXT = new StreamingContext(lvSparkSession.sparkContext,
Seconds(BdPropTool.getLong(BdPropKey.TRAIL_STREAMING_KAFKA_INTERVAL_SECOND)))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> BdKafkaUtil.KAFKA_BOOTSTRAP,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark_streaming_trail_monitor",
"auto.offset.reset" -> "latest",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.kerberos.service.name" -> "kafka",
"sasl.mechanism" -> "GSSAPI",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val dfStream = KafkaUtils.createDirectStream[String, String](SPARK_STREAMING_CONTEXT, PreferConsistent, Subscribe[String, String](kafkaTopicArr, kafkaParams))
dfStream.map(event => event.value()).foreachRDD { dfRddCache => ......
The CLI tool used and output is:
$ /usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --bootstrap-server sbd03:9092 --command-config /home/kafka/kfk/kfk.prop --describe --group spark_streaming_trail_monitor.
$ Consumer group 'spark_streaming_trail_monitor' has no active members.
The maven dependency used is:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.2</version>
</dependency>
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|