'Flatten Nested Json String Column Table into tabular format
I am currently trying to get a flatten a data in databricks table. Since some of the columns are deeply nested and is of 'String' type, i couldn't use explode function.
My current dataframe looks like this:
display(df)
account | applied | applylist | aracct | Internal Id |
---|---|---|---|---|
{"id":"1","name":"ABC","type":null} | 2500.00 | {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0},{"total":25.00,"applyDate":"2021-07-15T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":25.0}],"replaceAll":false} | {"internalId":"121","name":"CMS","type":null} | 101 |
{"id":"2","name":"DEF","type":null} | 1500.00 | {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0},{"total":35.00,"applyDate":"2021-09-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":350.0}],"replaceAll":false} | {"internalId":"121","name":"BMS","type":null} | 102 |
My dataframe schema looks like this:
df.printSchema()
- |--account: string (nullable = true)
- |--applied: decimal(38,6) (nullable = true)
- |-- applylist: string (nullable = true)
- |-- aracct: string (nullable = true)
How can I flatten above table and store individual record on tabular format, not in the nested.
Expected Output:
account.id | account.name | account.type | applied | applylist.apply.total | applylist.apply.applydate | applylist.apply.currency | applylist.apply.apply | applylist.apply.discamount | applylist.apply.line | applylist.apply.type | applylist.apply.amount | applylist.replaceAll |
---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | ABC | null | 2500.00 | 20.00 | 2021-07-13T07:00:00Z | USA | true | null | 0 | Invoice | 200.0 | false |
2 | DEF | null | 1500.00 | 30.00 | 2021-08-13T07:00:00Z | USA | true | null | 0 | Invoice | 250.0 | false |
This is my Scala code:
import org.apache.spark.sql.functions._
import spark.implicits._
val df = spark.sql("select * from ns_db_integration.transaction")
display(df.select($"applied" as "Applied", $"applylist", explode($"account"))
.withColumn("Account.Id" ,$"col.id")
.withColumn("Account.Name",$"col.name")
.withColumn("Account.Type",$"col.type").drop($"col")
.select($"*",$"applylist.*")
.drop($"applylist")
.select($"*",explode($"apply"))
.drop($"apply")
.withColumn("Total",$"col.total")
.withColumn("ApplyDate",$"col.applyDate")
.drop($"col")
)
Also tried json_tuple function in Pyspark. Which didn't work as i expected. All applylist column value becomes null.
from pyspark.sql.functions import json_tuple,from_json,get_json_object, explode,col
df.select(col("applied"),json_tuple(col("applylist"),"apply.total","apply.applyDate","apply.currency","apply.apply")) \
.toDF("applied","applylist.apply.total","applylist.apply.applyDate","applylist.apply.currency","applylist.apply.apply") \
.show(truncate=False)
Solution 1:[1]
Using Pyspark, see below logic -
Input Data
str1 = """account applied applylist aracct
{"id":"1","name":"ABC","type":null} 2500.00 {"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false} {"internalId":"121","name":"CMS","type":null}
{"id":"2","name":"DEF","type":null} 1500.00 {"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false} {"internalId":"121","name":"BMS","type":null}"""
import pandas as pd
from io import StringIO
pdf = pd.read_csv(StringIO(str1), sep = '\t')
df = spark.createDataFrame(pdf)
df.show(truncate=False)
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|account |applied|applylist |aracct |
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
|{"id":"1","name":"ABC","type":null}|2500.0 |{"apply":[{"total":20.00,"applyDate":"2021-07-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":200.0}],"replaceAll":false}|{"internalId":"121","name":"CMS","type":null}|
|{"id":"2","name":"DEF","type":null}|1500.0 |{"apply":[{"total":30.00,"applyDate":"2021-08-13T07:00:00Z","currency":"USA","apply":true,"discAmt":null,"line":0,"type":"Invoice","amount":250.0}],"replaceAll":false}|{"internalId":"121","name":"BMS","type":null}|
+-----------------------------------+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------+
Required Output
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema_account = StructType([StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("type", StringType(), True)
])
df1 = (
df.select(from_json(col("account"), schema_account).alias("account"),"applied",from_json(col("applylist"), MapType(StringType(), StringType())))
.select("account.*","applied","entries.apply", "entries.replaceAll")
.select("id", "name", "type", "applied" , from_json(col("apply"), ArrayType(MapType(StringType(), StringType()))).alias("apply"), "replaceAll")
.select("id", "name", "type", "applied" , explode("apply").alias("apply"), "replaceAll")
.select("id", "name", col("type").alias("type1"), "applied" , explode("apply"), "replaceAll")
.groupBy("id", "name", "type1", "applied", "replaceAll").pivot("key").agg(first("value"))
.withColumnRenamed("id", "account.id")
.withColumnRenamed("name", "account.name")
.withColumnRenamed("type1", "account.type")
.withColumnRenamed("total", "applylist.apply.total")
.withColumnRenamed("applyDate", "applylist.apply.applyDate")
.withColumnRenamed("currency", "applylist.apply.currency")
.withColumnRenamed("apply", "applylist.apply.apply")
.withColumnRenamed("discAmt", "applylist.apply.discAmt")
.withColumnRenamed("line", "applylist.apply.line")
.withColumnRenamed("type", "applylist.apply.type")
.withColumnRenamed("amount", "applylist.apply.amount")
)
df1.select("`account.id`" ,"`account.name`" ,"`account.type`" ,"applied" ,"`applylist.apply.total`" ,"`applylist.apply.applyDate`" ,"`applylist.apply.currency`" ,"`applylist.apply.apply`" ,"`applylist.apply.discAmt`" ,"`applylist.apply.line`" ,"`applylist.apply.type`" ,"`applylist.apply.amount`" ,"`replaceAll`").show(truncate=False)
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|account.id|account.name|account.type|applied|applylist.apply.total|applylist.apply.applyDate|applylist.apply.currency|applylist.apply.apply|applylist.apply.discAmt|applylist.apply.line|applylist.apply.type|applylist.apply.amount|replaceAll|
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
|1 |ABC |null |2500.0 |20.0 |2021-07-13T07:00:00Z |USA |true |null |0 |Invoice |200.0 |false |
|2 |DEF |null |1500.0 |30.0 |2021-08-13T07:00:00Z |USA |true |null |0 |Invoice |250.0 |false |
+----------+------------+------------+-------+---------------------+-------------------------+------------------------+---------------------+-----------------------+--------------------+--------------------+----------------------+----------+
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | DKNY |