'Connection between kafka and spark : Failed to find data source : kafka

I am trying to do link between kafka and spark by reading data from one topic and tryy to print the content of this topic into a DataFrame, but by doing connection between kafka and spark I got this error :

Error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at readFromKafka$.main(readFromKafka.scala:17)
    at readFromKafka.main(readFromKafka.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Content of my code :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

object readFromKafka
{
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder().master("spark://192.168.1.51:7077").appName("kafka_spark").getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "earliest") // From starting
      .load()

    df.printSchema() }}

build.sbt:

name := "readFromSpark"

version := "1.0"

scalaVersion := "2.12.8"

val sparkVersion = "2.4.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4" % Test
)

I did a sbt clean package, it get the following message without any error:

[info] welcome to sbt 1.4.7 (Private Build Java 1.8.0_282)
[info] loading project definition from /home/hamza/spark-project/spark-learning/project
[info] loading settings for project spark-learning from build.sbt ...
[info] set current project to spark-learning (in build file:/home/hamza/spark-project/spark-learning/)
[success] Total time: 0 s, completed Mar 12, 2021 1:13:50 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 7 Scala sources to /home/hamza/spark-project/spark-learning/target/scala-2.12/classes ...
[warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list
[success] Total time: 18 s, completed Mar 12, 2021 1:14:08 PM

Run spark job :

spark-submit --master spark://192.168.1.51:7077 --class readFromKafka target/scala-2.12/spark-learning_2.12-1.0.jar

Any help please ?



Solution 1:[1]

You need to remove % Test from the Kafka dependency if you want to run the code via spark-submit

e.g.

val sparkVersion = "2.4.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)

You'll also need to set up sbt-assembly to ensure that the Kafka dependencies are actually included in the jar

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1