'PySpark DataFrame - Join on multiple columns dynamically
let's say I have two DataFrames on Spark
firstdf = sqlContext.createDataFrame([{'firstdf-id':1,'firstdf-column1':2,'firstdf-column2':3,'firstdf-column3':4}, \
{'firstdf-id':2,'firstdf-column1':3,'firstdf-column2':4,'firstdf-column3':5}])
seconddf = sqlContext.createDataFrame([{'seconddf-id':1,'seconddf-column1':2,'seconddf-column2':4,'seconddf-column3':5}, \
{'seconddf-id':2,'seconddf-column1':6,'seconddf-column2':7,'seconddf-column3':8}])
Now I want to join them by multiple columns (any number bigger than one)
What I have is an array of columns of the first DataFrame and an array of columns of the second DataFrame, these arrays have the same size, and I want to join by the columns specified in these arrays. For example:
columnsFirstDf = ['firstdf-id', 'firstdf-column1']
columnsSecondDf = ['seconddf-id', 'seconddf-column1']
Since these arrays have variable sizes I can't use this kind of approach:
from pyspark.sql.functions import *
firstdf.join(seconddf, \
(col(columnsFirstDf[0]) == col(columnsSecondDf[0])) &
(col(columnsFirstDf[1]) == col(columnsSecondDf[1])), \
'inner'
)
Is there any way that I can join on multiple columns dynamically?
Solution 1:[1]
@Mohan sorry i dont have reputation to do "add a comment". Having column same on both dataframe,create list with those columns and use in the join
col_list=["id","column1","column2"]
firstdf.join( seconddf, col_list, "inner")
Solution 2:[2]
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("abc").getOrCreate()
lst1 = [[1, 2, 3], ["A", "B", "C"], ["aa", "bb", "cc"]]
lst2 = [[2, 3, 4], ["A", "B", "C"], ["aa", "bb", "cc"]]
lst3 = [[1, 2, 4], ["A", "B", "C"], ["aa", "bb", "cc"]]
R1 = Row("A1", "A2", "A3")
R2 = Row("B1", "B2", "B3")
R3 = Row("C1", "C2", "C3")
df1 = spark.sparkContext.parallelize([R1(*r) for r in zip(*lst1)]).toDF().alias("df1")
df2 = spark.sparkContext.parallelize([R2(*r) for r in zip(*lst2)]).toDF().alias("df2")
df3 = spark.sparkContext.parallelize([R3(*r) for r in zip(*lst3)]).toDF().alias("df3")
list_tup = [
(df1, df2, "df1.A1", "df2.B1"),
(df2, df3, "df2.B1", "df3.C1"),
(df1, df3, "df1.A1", "df3.C1"),
]
list_df = {"df1":df1, "df2":df2, "df3":df3}
list_condition = [
("df1", "df2", "df1.A1", "df2.B1"),
("df2", "df3", "df2.B1", "df3.C1"),
("df1", "df3", "df1.A1", "df3.C1")
]
updated_df_list = []
for item in list_condition:
updated_df_list.append((list_df.get(item[0]),list_df.get(item[1]),item[2],item[3]))
print(updated_df_list)
df_1 = updated_df_list[0][0]
for x in updated_df_list:
df_1 = x[0].join(x[1], on=F.col(x[2]) == F.col(x[3]), how="left_outer")
df_1.show()
+---+---+---+----+----+----+
| A1| A2| A3| C1| C2| C3|
+---+---+---+----+----+----+
| 1| A| aa| 1| A| aa|
| 2| B| bb| 2| B| bb|
| 3| C| cc|null|null|null|
+---+---+---+----+----+----+
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Balaji SS |
Solution 2 |