'PySpark DataFrame - Join on multiple columns dynamically

let's say I have two DataFrames on Spark

firstdf = sqlContext.createDataFrame([{'firstdf-id':1,'firstdf-column1':2,'firstdf-column2':3,'firstdf-column3':4}, \
{'firstdf-id':2,'firstdf-column1':3,'firstdf-column2':4,'firstdf-column3':5}])

seconddf = sqlContext.createDataFrame([{'seconddf-id':1,'seconddf-column1':2,'seconddf-column2':4,'seconddf-column3':5}, \
{'seconddf-id':2,'seconddf-column1':6,'seconddf-column2':7,'seconddf-column3':8}])

Now I want to join them by multiple columns (any number bigger than one)

What I have is an array of columns of the first DataFrame and an array of columns of the second DataFrame, these arrays have the same size, and I want to join by the columns specified in these arrays. For example:

columnsFirstDf = ['firstdf-id', 'firstdf-column1']
columnsSecondDf = ['seconddf-id', 'seconddf-column1']

Since these arrays have variable sizes I can't use this kind of approach:

from pyspark.sql.functions import *

firstdf.join(seconddf, \
    (col(columnsFirstDf[0]) == col(columnsSecondDf[0])) &
    (col(columnsFirstDf[1]) == col(columnsSecondDf[1])), \
    'inner'
)

Is there any way that I can join on multiple columns dynamically?



Solution 1:[1]

@Mohan sorry i dont have reputation to do "add a comment". Having column same on both dataframe,create list with those columns and use in the join

col_list=["id","column1","column2"]
firstdf.join( seconddf, col_list, "inner")

Solution 2:[2]

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("abc").getOrCreate()

lst1 = [[1, 2, 3], ["A", "B", "C"], ["aa", "bb", "cc"]]
lst2 = [[2, 3, 4], ["A", "B", "C"], ["aa", "bb", "cc"]]
lst3 = [[1, 2, 4], ["A", "B", "C"], ["aa", "bb", "cc"]]

R1 = Row("A1", "A2", "A3")
R2 = Row("B1", "B2", "B3")
R3 = Row("C1", "C2", "C3")
df1 = spark.sparkContext.parallelize([R1(*r) for r in zip(*lst1)]).toDF().alias("df1")
df2 = spark.sparkContext.parallelize([R2(*r) for r in zip(*lst2)]).toDF().alias("df2")
df3 = spark.sparkContext.parallelize([R3(*r) for r in zip(*lst3)]).toDF().alias("df3")

list_tup = [
    (df1, df2, "df1.A1", "df2.B1"),
    (df2, df3, "df2.B1", "df3.C1"),
    (df1, df3, "df1.A1", "df3.C1"),
]

list_df = {"df1":df1, "df2":df2, "df3":df3}
list_condition = [
    ("df1", "df2", "df1.A1", "df2.B1"),
    ("df2", "df3", "df2.B1", "df3.C1"),
    ("df1", "df3", "df1.A1", "df3.C1")
]

updated_df_list = []
for item in list_condition:
    updated_df_list.append((list_df.get(item[0]),list_df.get(item[1]),item[2],item[3]))
print(updated_df_list)

df_1 = updated_df_list[0][0]
for x in updated_df_list:
    df_1 = x[0].join(x[1], on=F.col(x[2]) == F.col(x[3]), how="left_outer")

df_1.show()



+---+---+---+----+----+----+
| A1| A2| A3|  C1|  C2|  C3|
+---+---+---+----+----+----+
|  1|  A| aa|   1|   A|  aa|
|  2|  B| bb|   2|   B|  bb|
|  3|  C| cc|null|null|null|
+---+---+---+----+----+----+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Balaji SS
Solution 2