'Structured Streaming to Save JSON to HDFS
My Structured Spark Streaming program is to read JSON data from Kafka and write to HDFS in JSON format. I am able to save JSON to HDFS but it saves the JSON string with:
"jsontostructs(CAST(value AS STRING))"
key as below: {"jsontostructs(CAST(value AS STRING))":{"age":42,"name":"John"}}.
How to save only
{"age":42,"name":"John"}?
StructType schema = kafkaPrimerRow.schema();
//Read json from kafka. JSON is: {"age":42,"name":"John"}
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", input_bootstrap_server)
.option("subscribe", topics[0])
.load();
//Save Stream to HDFS
StreamingQuery ds = df
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema))
.writeStream()
.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();
Solution 1:[1]
The following .select("data.*") did the trick.
StreamingQuery ds = df
.select(functions.from_json(col("value").cast(DataTypes.StringType),schema).as("data"))
.select("data.*")
.writeStream()
.format("json")
.outputMode(OutputMode.Append())
.option("path", destPath)
.option("checkpointLocation", checkpoint)
.start();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Sam |