'Flatten Nested Json String Column Table into tabular format

I am currently trying to get a flatten a data in databricks table. Since some of the columns are deeply nested and is of 'String' type, i couldn't use explode function.

My current dataframe looks like this:

display(df)

account applied applylist aracct Internal Id
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0},{"total":25.00,"applyDate":"2021-07-15T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":25.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null} 101
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0},{"total":35.00,"applyDate":"2021-09-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":350.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null} 102

My dataframe schema looks like this:

df.printSchema()

  • |--account: string (nullable = true)
  • |--applied: decimal(38,6) (nullable = true)
  • |-- applylist: string (nullable = true)
  • |-- aracct: string (nullable = true)

How can I flatten above table and store individual record on tabular format, not in the nested.

Expected Output:

account.id account.name account.type applied applylist.apply.total applylist.apply.applydate applylist.apply.currency applylist.apply.apply applylist.apply.discamount applylist.apply.line applylist.apply.type applylist.apply.amount applylist.replaceAll
1 ABC null 2500.00 20.00 2021-07-13T07:00:00Z USA true null 0 Invoice 200.0 false
2 DEF null 1500.00 30.00 2021-08-13T07:00:00Z USA true null 0 Invoice 250.0 false

This is my Scala code:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = spark.sql("select * from ns_db_integration.transaction")

display(df.select($"applied" as "Applied", $"applylist", explode($"account"))
        .withColumn("Account.Id" ,$"col.id")
        .withColumn("Account.Name",$"col.name")
        .withColumn("Account.Type",$"col.type").drop($"col")
        .select($"*",$"applylist.*")
        .drop($"applylist")
        .select($"*",explode($"apply"))
        .drop($"apply")
        .withColumn("Total",$"col.total")
        .withColumn("ApplyDate",$"col.applyDate")
        .drop($"col")
       )

Error in Scala Code

Also tried json_tuple function in Pyspark. Which didn't work as i expected. All applylist column value becomes null.

from pyspark.sql.functions import json_tuple,from_json,get_json_object, explode,col
    
    df.select(col("applied"),json_tuple(col("applylist"),"apply.total","apply.applyDate","apply.currency","apply.apply")) \
        .toDF("applied","applylist.apply.total","applylist.apply.applyDate","applylist.apply.currency","applylist.apply.apply") \
        .show(truncate=False)

Output of Pyspark Code



Solution 1:[1]

Using Pyspark, see below logic -

Input Data

str1 = """account   applied applylist   aracct
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null}
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null}"""

import pandas as pd
from io import StringIO

pdf = pd.read_csv(StringIO(str1), sep = '\t')
df = spark.createDataFrame(pdf)
df.show(truncate=False)

+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|account                            |applied|applylist                                                                                                                                                              |aracct                                       |
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|{"id":"1","name":"ABC","type":null}|2500.0 |{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false}|{"internalId":"121","name":"CMS","type":null}|
|{"id":"2","name":"DEF","type":null}|1500.0 |{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false}|{"internalId":"121","name":"BMS","type":null}|
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+

Required Output

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema_account = StructType([StructField("id", StringType(), True),
                             StructField("name", StringType(), True),
                             StructField("type", StringType(), True)
                            ])

df1 = (
  df.select(from_json(col("account"), schema_account).alias("account"),"applied",from_json(col("applylist"), MapType(StringType(), StringType())))
    .select("account.*","applied","entries.apply", "entries.replaceAll")
    .select("id", "name", "type", "applied" , from_json(col("apply"), ArrayType(MapType(StringType(), StringType()))).alias("apply"), "replaceAll")
    .select("id", "name", "type", "applied" , explode("apply").alias("apply"), "replaceAll")
    .select("id", "name", col("type").alias("type1"), "applied" , explode("apply"), "replaceAll")
    .groupBy("id", "name", "type1", "applied", "replaceAll").pivot("key").agg(first("value"))
    .withColumnRenamed("id", "account.id")
    .withColumnRenamed("name", "account.name")
    .withColumnRenamed("type1", "account.type")
    .withColumnRenamed("total", "applylist.apply.total")
    .withColumnRenamed("applyDate", "applylist.apply.applyDate")
    .withColumnRenamed("currency", "applylist.apply.currency")
    .withColumnRenamed("apply", "applylist.apply.apply")
    .withColumnRenamed("discAmt", "applylist.apply.discAmt")
    .withColumnRenamed("line", "applylist.apply.line")
    .withColumnRenamed("type", "applylist.apply.type")
    .withColumnRenamed("amount", "applylist.apply.amount")
)

df1.select("`account.id`" ,"`account.name`" ,"`account.type`" ,"applied" ,"`applylist.apply.total`" ,"`applylist.apply.applyDate`" ,"`applylist.apply.currency`" ,"`applylist.apply.apply`" ,"`applylist.apply.discAmt`" ,"`applylist.apply.line`" ,"`applylist.apply.type`" ,"`applylist.apply.amount`" ,"`replaceAll`").show(truncate=False)

+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|account.id|account.name|account.type|applied|applylist.apply.total|applylist.apply.applyDate|applylist.apply.currency|applylist.apply.apply|applylist.apply.discAmt|applylist.apply.line|applylist.apply.type|applylist.apply.amount|replaceAll|
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|1         |ABC         |null        |2500.0 |20.0                 |2021-07-13T07:00:00Z     |USA                     |true                 |null                   |0                   |Invoice             |200.0                 |false     |
|2         |DEF         |null        |1500.0 |30.0                 |2021-08-13T07:00:00Z     |USA                     |true                 |null                   |0                   |Invoice             |250.0                 |false     |
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 DKNY