'How do you create merge_asof functionality in PySpark?

Table A has many columns with a date column, Table B has a datetime and a value. The data in both tables are generated sporadically with no regular interval. Table A is small, table B is massive.

I need to join B to A under the condition that a given element a of A.datetime corresponds to

B[B['datetime'] <= a]]['datetime'].max()

There are a couple ways to do this, but I would like the most efficient way.

Option 1

Broadcast the small dataset as a Pandas DataFrame. Set up a Spark UDF that creates a pandas DataFrame for each row merges with the large dataset using merge_asof.

Option 2

Use the broadcast join functionality of Spark SQL: set up a theta join on the following condition

B['datetime'] <= A['datetime']

Then eliminate all the superfluous rows.

Option B seems pretty terrible... but please let me know if the first way is efficient or if there is another way.

EDIT: Here is the sample input and expected output:

A =
+---------+----------+
| Column1 | Datetime |
+---------+----------+
|    A    |2019-02-03|
|    B    |2019-03-14|
+---------+----------+

B =
+---------+----------+
|   Key   | Datetime |
+---------+----------+
|    0    |2019-01-01|
|    1    |2019-01-15|
|    2    |2019-02-01|
|    3    |2019-02-15|
|    4    |2019-03-01|
|    5    |2019-03-15|
+---------+----------+

custom_join(A,B) =
+---------+----------+
| Column1 |   Key    |
+---------+----------+
|    A    |     2    |
|    B    |     4    |
+---------+----------+


Solution 1:[1]

Anyone trying to do this in pyspark 3.x can use pyspark.sql.PandasCogroupedOps.applyInPandas

For Example:

  from pyspark.sql import SparkSession, Row, DataFrame
  import pandas as pd
  spark = SparkSession.builder.master("local").getOrCreate()

  df1 = spark.createDataFrame(
      [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
      ("time", "id", "v1"))
  df2 = spark.createDataFrame(
      [(20000101, 1, "x"), (20000101, 2, "y")],
      ("time", "id", "v2"))
  def asof_join(l, r):
      return pd.merge_asof(l, r, on="time", by="id")
  df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
      asof_join, schema="time int, id int, v1 double, v2 string"
  ).show()


  >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  +--------+---+---+---+
  |    time| id| v1| v2|
  +--------+---+---+---+
  |20000101|  1|1.0|  x|
  |20000102|  1|3.0|  x|
  |20000101|  2|2.0|  y|
  |20000102|  2|4.0|  y|
  +--------+---+---+---+

Solution 2:[2]

You could solve it with Spark by using union and last together with a window function. Ideally you have something to partition your window by.

from pyspark.sql import functions as f
from pyspark.sql.window import Window

df1 = df1.withColumn('Key', f.lit(None))
df2 = df2.withColumn('Column1', f.lit(None))

df3 = df1.unionByName(df2)

w = Window.orderBy('Datetime', 'Column1').rowsBetween(Window.unboundedPreceding, -1)
df3.withColumn('Key', f.last('Key', True).over(w)).filter(~f.isnull('Column1')).show()

Which gives

+-------+----------+---+
|Column1|  Datetime|Key|
+-------+----------+---+
|      A|2019-02-03|  2|
|      B|2019-03-14|  4|
+-------+----------+---+

Solution 3:[3]

Figured out a fast (but perhaps not the most efficient) method to complete this. I built a helper function:

def get_close_record(df, key_column, datetime_column, record_time):
    """
    Takes in ordered dataframe and returns the closest 
    record that is higher than the datetime given.
    """
    filtered_df = df[df[datetime_column] >= record_time][0:1]
    [key] = filtered_df[key_column].values.tolist()
    return key

Instead of joining B to A, I set up a pandas_udf of the above code and ran it on the columns of table B then ran groupBy on B with primary key A_key and aggregated B_key by max.

The issue with this method is that it requires monotonically increasing keys in B.

Better solution:

I developed the following helper function that should work

other_df['_0'] = other_df['Datetime']
bdf = sc.broadcast(other_df)

#merge asof udf
@F.pandas_udf('long')
def join_asof(v, other=bdf.value):
    f = pd.DataFrame(v)
    j = pd.merge_asof(f, other, on='_0', direction = 'forward')
    return j['Key']

joined = df.withColumn('Key', join_asof(F.col('Datetime')))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Arran Duff
Solution 2
Solution 3