'How do you create merge_asof functionality in PySpark?
Table A
has many columns with a date column, Table B
has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A
is small, table B
is massive.
I need to join B
to A
under the condition that a given element a
of A.datetime
corresponds to
B[B['datetime'] <= a]]['datetime'].max()
There are a couple ways to do this, but I would like the most efficient way.
Option 1
Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof
.
Option 2
Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition
B['datetime'] <= A['datetime']
Then eliminate all the superfluous rows.
Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.
EDIT: Here is the sample input and expected output:
A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
| A |2019-02-03|
| B |2019-03-14|
+---------+----------+
B =
+---------+----------+
| Key | Datetime |
+---------+----------+
| 0 |2019-01-01|
| 1 |2019-01-15|
| 2 |2019-02-01|
| 3 |2019-02-15|
| 4 |2019-03-01|
| 5 |2019-03-15|
+---------+----------+
custom_join(A,B) =
+---------+----------+
| Column1 | Key |
+---------+----------+
| A | 2 |
| B | 4 |
+---------+----------+
Solution 1:[1]
Anyone trying to do this in pyspark 3.x can use pyspark.sql.PandasCogroupedOps.applyInPandas
For Example:
from pyspark.sql import SparkSession, Row, DataFrame
import pandas as pd
spark = SparkSession.builder.master("local").getOrCreate()
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string"
).show()
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+
Solution 2:[2]
You could solve it with Spark by using union
and last
together with a window
function. Ideally you have something to partition your window by.
from pyspark.sql import functions as f
from pyspark.sql.window import Window
df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))
df3 = df1.unionByName(df2)
w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()
Which gives
+-------+----------+---+
|Column1| Datetime|Key|
+-------+----------+---+
| A|2019-02-03| 2|
| B|2019-03-14| 4|
+-------+----------+---+
Solution 3:[3]
Figured out a fast (but perhaps not the most efficient) method to complete this. I built a helper function:
def get_close_record(df, key_column, datetime_column, record_time):
"""
Takes in ordered dataframe and returns the closest
record that is higher than the datetime given.
"""
filtered_df = df[df[datetime_column] >= record_time][0:1]
[key] = filtered_df[key_column].values.tolist()
return key
Instead of joining B
to A
, I set up a pandas_udf
of the above code and ran it on the columns of table B
then ran groupBy
on B
with primary key A_key
and aggregated B_key
by max
.
The issue with this method is that it requires monotonically increasing keys in B
.
Better solution:
I developed the following helper function that should work
other_df['_0'] = other_df['Datetime']
bdf = sc.broadcast(other_df)
#merge asof udf
@F.pandas_udf('long')
def join_asof(v, other=bdf.value):
f = pd.DataFrame(v)
j = pd.merge_asof(f, other, on='_0', direction = 'forward')
return j['Key']
joined = df.withColumn('Key', join_asof(F.col('Datetime')))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Arran Duff |
Solution 2 | |
Solution 3 |