'Connection between kafka and spark : Failed to find data source : kafka
I am trying to do link between kafka and spark by reading data from one topic and tryy to print the content of this topic into a DataFrame, but by doing connection between kafka and spark I got this error :
Error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at readFromKafka$.main(readFromKafka.scala:17)
at readFromKafka.main(readFromKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Content of my code :
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
object readFromKafka
{
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("spark://192.168.1.51:7077").appName("kafka_spark").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.1.51:9092,192.168.1.52:9092,192.168.1.53:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest") // From starting
.load()
df.printSchema() }}
build.sbt:
name := "readFromSpark"
version := "1.0"
scalaVersion := "2.12.8"
val sparkVersion = "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.4" % Test
)
I did a sbt clean package, it get the following message without any error:
[info] welcome to sbt 1.4.7 (Private Build Java 1.8.0_282)
[info] loading project definition from /home/hamza/spark-project/spark-learning/project
[info] loading settings for project spark-learning from build.sbt ...
[info] set current project to spark-learning (in build file:/home/hamza/spark-project/spark-learning/)
[success] Total time: 0 s, completed Mar 12, 2021 1:13:50 PM
[warn] There may be incompatibilities among your library dependencies; run 'evicted' to see detailed eviction warnings.
[info] compiling 7 Scala sources to /home/hamza/spark-project/spark-learning/target/scala-2.12/classes ...
[warn] multiple main classes detected: run 'show discoveredMainClasses' to see the list
[success] Total time: 18 s, completed Mar 12, 2021 1:14:08 PM
Run spark job :
spark-submit --master spark://192.168.1.51:7077 --class readFromKafka target/scala-2.12/spark-learning_2.12-1.0.jar
Any help please ?
Solution 1:[1]
You need to remove % Test
from the Kafka dependency if you want to run the code via spark-submit
e.g.
val sparkVersion = "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)
You'll also need to set up sbt-assembly to ensure that the Kafka dependencies are actually included in the jar
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |