'No module named 'pyspark.streaming.kafka' even with older spark version
In another similar question, they hint 'install older spark 2.4.5.'
EDIT: the solution from above link says 'install spark 2.4.5 and it does have kafkautils. But the problem is I can't download spark2.4.5 - not available even in the archive.
i followed the advice, installed older version of spark - spark2.4.6(the only old available) and also have python37, kafka-python,pyspark libs.
i have my spark_job.py file that needs to use kafka
from pyspark.streaming.kafka import KafkaUtils
when hitting 'python spark_job.py
ModuleNotFoundError: No module named 'pyspark.streaming.kafka'
the error still persists!
spark_job.py:
from __future__ import print_function
import sys
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json
outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
#-------------------------------------------------
# What I want to do per each RDD...
#-------------------------------------------------
def process(time, rdd):
print("===========-----> %s <-----===========" % str(time))
try:
spark = getSparkSessionInstance(rdd.context.getConf())
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
except Exception as e:
print("--> Opps! It seems an Errrorrr with DB working!", e)
except Exception as e:
print("--> Opps! Is seems an Error!!!", e)
#-------------------------------------------------
# General function
#-------------------------------------------------
def createContext():
sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
broker_list, topic = sys.argv[1:]
try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list={} topic={}".format(broker_list, topic))
parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# RDD handling
parsed_lines.foreachRDD(process)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()
Solution 1:[1]
i just downgraded it using pip:
pip install --force-reinstall pyspark==2.4.6
I did not use any poetry. AFter reinstalling, the kafkaUtils pkg was recognized.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | ERJAN |