'spark how to convert a json string to a struct column without schema

Spark: 3.0.0 Scala: 2.12.8

My data frame has a column with JSON string and I want to create a new column from it with the StructType.


|temp_json_string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+
|{"name":"test","id":"12","category":[{"products":["A","B"],"displayName":"test_1","displayLabel":"test1"},{"products":["C"],"displayName":"test_2","displayLabel":"test2"}],"createdAt":"","createdBy":""}|
+
root
 |-- temp_json_string: string (nullable = true)

json string looks like

{
  "name":"test",
  "id":"12",
  "category":[
    {
      "products":[
        "A",
        "B"
      ],
      "displayName":"test_1",
      "displayLabel":"test1"
    },
    {
      "products":[
        "C"
      ],
      "displayName":"test_2",
      "displayLabel":"test2"
    }
  ],
  "createdAt":"",
  "createdBy":""
}

I want to create a new column of type Struct so I did try:

 dataFrame
      .withColumn("temp_json_struct", struct(col("temp_json_string")))
      .select("temp_json_struct")

Now, I get the schema as:

root
 |-- temp_json_struct: struct (nullable = false)
 |    |-- temp_json_string: string (nullable = true)

What I am looking for is something:

root
 |-- temp_json_struct: struct (nullable = false)
 |    |-- name: string (nullable = true)
      |-- category: array (nullable = true)
         |-- products: array (nullable = true)
         |-- displayName: string (nullable = true)
         |-- displayLabel: string (nullable = true)
      |-- createdAt: timestamp (nullable = true)
      |-- updatedAt: timestamp (nullable = true)

Also, I am not aware of the schema that can be in JSON string.

I have looked for other options but was not able to figure out the solution.



Solution 1:[1]

I had the same issue with the data from mongo. _doc is the column that has json string. Mine had multiple files so that's why the fist line is iterating through each row to extract the schema. Also, if you know your schema up front then just replace json_schema with that.

json_schema = spark.read.json(df.rdd.map(lambda row: row._doc)).schema
df= df.withColumn('new_json_column', from_json(col('_doc'), json_schema))

Solution 2:[2]

There at least two different ways to retrieve/discover the schema for a given JSON.

For the illustration, let's create some data first:

import org.apache.spark.sql.types.StructType

val jsData = Seq(
  ("""{
    "name":"test","id":"12","category":[
    {
      "products":[
        "A",
        "B"
      ],
      "displayName":"test_1",
      "displayLabel":"test1"
    },
    {
      "products":[
        "C"
      ],
      "displayName":"test_2",
      "displayLabel":"test2"
    }
  ],
  "createdAt":"",
  "createdBy":""}""")
)

Option 1: schema_of_json

The first option is to use the built-in function schema_of_json. The function will return the schema for the given JSON in DDL format:

val json = jsData.toDF("js").collect()(0).getString(0)

val ddlSchema: String = spark.sql(s"select schema_of_json('${json}')")
                            .collect()(0) //get 1st row
                            .getString(0) //get 1st col of the row as string
                            .replace("null", "string") //replace type with string, this occurs since you have "createdAt":"" 

// struct<category:array<struct<displayLabel:string,displayName:string,products:array<string>>>,createdAt:null,createdBy:null,id:string,name:string>

val schema: StructType = StructType.fromDDL(s"js_schema $ddlSchema")

Note that you would expect that schema_of_json would also work on the column level i.e: schema_of_json(js_col), unfortunately, this doesn't work as expected therefore we are forced to pass a string instead.

Option 2: use Spark JSON reader (recommended)

import org.apache.spark.sql.functions.from_json

val schema: StructType = spark.read.json(jsData.toDS).schema

// schema.printTreeString

// root
//  |-- category: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- displayLabel: string (nullable = true)
//  |    |    |-- displayName: string (nullable = true)
//  |    |    |-- products: array (nullable = true)
//  |    |    |    |-- element: string (containsNull = true)
//  |-- createdAt: string (nullable = true)
//  |-- createdBy: string (nullable = true)
//  |-- id: string (nullable = true)
//  |-- name: string (nullable = true)

As you can see, here we are producing a schema based on StructType and not a DDL string as in the previous case.

After discovering the schema we can move on to the next step which is converting the JSON data into a struct. To achieve that we will use from_json built-in function:

jsData.toDF("js")
      .withColumn("temp_json_struct", from_json($"js", schema))
      .printSchema()

// root
//  |-- js: string (nullable = true)
//  |-- temp_json_struct: struct (nullable = true)
//  |    |-- category: array (nullable = true)
//  |    |    |-- element: struct (containsNull = true)
//  |    |    |    |-- displayLabel: string (nullable = true)
//  |    |    |    |-- displayName: string (nullable = true)
//  |    |    |    |-- products: array (nullable = true)
//  |    |    |    |    |-- element: string (containsNull = true)
//  |    |-- createdAt: string (nullable = true)
//  |    |-- createdBy: string (nullable = true)
//  |    |-- id: string (nullable = true)
//  |    |-- name: string (nullable = true)

Solution 3:[3]

// import spark implicits for conversion to dataset (.as[String])
import spark.implicits._

val df = ??? //create your dataframe having the 'temp_json_string' column

//convert Dataset[Row] aka Dataframe to Dataset[String]
val ds = df.select("temp_json_string").as[String]

//read as json
spark.read.json(ds)

Documentation

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 jayrythium
Solution 2
Solution 3