'convert python dictionary into pyspark dataframe

I have a json file which contains a dictionary in the following format:

{"a1":{"b1":["c1","c2"], "b2":["c4","c3"]}, "a2":{"b3":["c1","c4"]}}

Is it possible to convert this dictionary into a PySpark dataframe as the following?

 col1 |  col2 |  col3
----------------------
| a1  |   b1  |  c1  |
----------------------
| a1  |   b1  |  c2  |
----------------------
| a1  |   b2  |  c4  |
----------------------
| a1  |   b2  |  c3  |
----------------------
| a2  |   b3  |  c1  |
----------------------
| a2  |   b3  |  c4  |

I have seen the standard format of converting json to PySpark dataframe (example in this link) but was wondering about nested dictionaries that contain lists as well.



Solution 1:[1]

Interesting problem! The main struggle I realized with this problem is your when reading from JSON, your schema is likely has struct type, making it harder to solve, because basically a1 has different type than a2.

My idea is using somehow converting your struct type to map type, then stack them together, then apply a few explodes:

This is your df
+----------------------------------+
|data                              |
+----------------------------------+
|{{[c1, c2], [c4, c3]}, {[c1, c4]}}|
+----------------------------------+

root
 |-- data: struct (nullable = true)
 |    |-- a1: struct (nullable = true)
 |    |    |-- b1: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- b2: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- a2: struct (nullable = true)
 |    |    |-- b3: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
Create a temporary df to handle JSON's first level
first_level_df = df.select('data.*')
first_level_df.show()
first_level_cols = first_level_df.columns # ['a1', 'a2']

+--------------------+----------+
|                  a1|        a2|
+--------------------+----------+
|{[c1, c2], [c4, c3]}|{[c1, c4]}|
+--------------------+----------+
Some helper variables
map_cols = [F.from_json(F.to_json(c), T.MapType(T.StringType(), T.StringType())).alias(c) for c in first_level_cols]
# [Column<'entries AS a1'>, Column<'entries AS a2'>]

stack_cols = ', '.join([f"'{c}', {c}" for c in first_level_cols])
# 'a1', a1, 'a2', a2
Main transformation
(first_level_df
    .select(map_cols)
    .select(F.expr(f'stack(2, {stack_cols})').alias('AA', 'temp'))
    .select('AA', F.explode('temp').alias('BB', 'temp'))
    .select('AA', 'BB', F.explode(F.from_json('temp', T.ArrayType(T.StringType()))).alias('CC'))
    .show(10, False)
)

+---+---+---+
|AA |BB |CC |
+---+---+---+
|a1 |b1 |c1 |
|a1 |b1 |c2 |
|a1 |b2 |c4 |
|a1 |b2 |c3 |
|a2 |b3 |c1 |
|a2 |b3 |c4 |
+---+---+---+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 pltc