'Pyspark's df.writeStream generates no output
I'm trying to store the tweets from my kafka cluster into Elastic Search. Initially, I set the output format to be
'org.elasticsearch.spark.sql'
. But , it created no index.
I tried to change the format to 'console' to check the working of the streaming . But , it doesn't print out anything to the console either.
I am guessing this is a problem with my streaming dataframes . But , I can't seem to find out what exactly is the issue .
This is my full code for the Consumer(Spark Streaming):
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.elasticsearch:elasticsearch-hadoop:7.6.2 pyspark-shell'
from pyspark import SparkContext,SparkConf
# Spark Streaming
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
import nltk
import logging
from datetime import datetime
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
def evaluate_sentiment(avg):
try:
if avg < 0:
return 'Negative'
elif avg > 0:
return 'Positive'
else:
return 'Neutral'
except TypeError:
return 'Neutral'
eval_udf = udf(evaluate_sentiment,StringType())
def start_stream(df):
df.writeStream.format('console').start()
conf = SparkConf().setAppName('twitter_analysis')
spark = SparkSession.builder.appName('twitter_analysis').getOrCreate()
conf.set("es.index.auto.create", "true")
schema = StructType([StructField("date", TimestampType(), True),
StructField("user", StringType(), True),
StructField("text", StringType(), True),
StructField("reply_count", IntegerType(), True),
StructField("retweet_count", IntegerType(), True),
StructField("favorite_count", IntegerType(), True),
StructField("sentiment_score", DecimalType(), True)])
kafkaStream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.0.10:9092").option("subscribe", "twitter_analysis")\
.option('failOnDataLoss',False).load()
parsed_df = kafkaStream.select(from_json(col('value').cast('string'),schema).alias('parsed_value')) \
.withColumn('timestamp', lit(current_timestamp()))
mdf = parsed_df.select('parsed_value.*', 'timestamp')
evaluated_df = mdf.withColumn('status',eval_udf('sentiment_score'))\
.withColumn('date',to_date(col('timestamp')))
start_stream(evaluated_df)
What could be causing this problem ? Has it got to do anything with the schema I have defined ?
An example of the JSON data that is sent from the Kafka cluster to spark streaming :
{"date": "2020-11-07 21:02:39", "user": "TalhianeM", "text": "RT @amin_goat: Non, des probl\u00e8mes de vote dans une d\u00e9mocratie occidentale ?\n\nOn m\u2019avait assur\u00e9 que cela n\u2019arrivait qu\u2019en Afrique pourtant.", "reply_count": 0, "retweet_count": 0, "favorite_count": 0, "sentiment_score": 0.0}
Could someone please help me resolve this problem ? I tried multiple methods but nothing seems to work in getting the data streams sent to Elastic Search.
UPDATE : I resolved it . There seemed to be a problem with the host .
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|