'How to efficiently remove duplicate rows in Spark Dataframe, keeping row with highest timestamp

I have a large data set which I am reading from Postgres. It has an ID column, a timestamp column and several other columns which may have been updated. For each ID, I wish to keep only the row most recently updated (highest timestamp value). I have come up with a solution that works but I worry that it is (1.) not efficient and (2.) might not deduplicate on the entire set but only on each partition on which it runs (since this will be running on a multi-node cluster).

Here is some sample data that essentially demonstrates my technique:

Original data in a dataFrame called dfTest:

+---+--------+-----------+                                                      
| id|    city|update_time|
+---+--------+-----------+
|456|   Miami|   01:15:30|
|456| Seattle|   11:15:43|
|457| Toronto|   01:15:00|
|457| Chicago|   01:17:30|
|457|New York|   02:15:37|
|458|  Dallas|   01:18:35|
|459| Houston|   01:12:41|
|460| Chicago|   03:25:31|
|460|Montreal|   02:12:07|
|461|  Boston|   01:15:30|
+---+--------+-----------+

I put this into a temp view:

dfTest.createOrReplaceTempView("test")

I then run this spark SQL query:

    val query =
      s"""
         |select 
         |  id, 
         |  city, 
         |  update_time 
         |from (
         |  select 
         |    id, 
         |    city, 
         |    update_time, 
         |    row_number() over(partition by (id) order by update_time desc) as row_num 
         |    from test
         |) 
         |where row_num = 1
         |""".stripMargin
spark.sql(query).show()

This gives the correct result with only one row per ID:

+---+--------+-----------+
| id|    city|update_time|
+---+--------+-----------+
|456| Seattle|   11:15:43|
|457|New York|   02:15:37|
|458|  Dallas|   01:18:35|
|459| Houston|   01:12:41|
|460| Chicago|   03:25:31|
|461|  Boston|   01:15:30|
+---+--------+-----------+

My question is:

1.) Can I expect this to still work correctly when run on a large data set on a cluster with multiple nodes?

2.) Is this an efficient way to to this? Is there a way to do this more efficiently using spark functions rather than a query?



Solution 1:[1]

  1. You can expect your query to work; Spark will take care shuffling your data around between the nodes to run your query correctly.

  2. Not sure if it's more efficient, but using Spark functions is more maintainable. Here's an example in spark-shell. I got the input by running

var dfTest = Seq(("456","Miami","01:15:30"),
("456","Seattle","11:15:43"),
("457","Toronto","01:15:00"),
("457","Chicago","01:17:30"),
("457","New York","02:15:37"),
("458","Dallas","01:18:35"),
("459","Houston","01:12:41"),
("460","Chicago","03:25:31"),
("460","Montreal","02:12:07"),
("461","  Boston","01:15:30")).toDF("id", "city", "update_time")

The equivalent to your SQL would be

dfTest = dfTest.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy(desc("update_time"))))
.select("id", "city", "update_time")
.filter(col("rn") === 1)

Running dfTest.show gives this output

+---+--------+-----------+
| id|    city|update_time|
+---+--------+-----------+
|459| Houston|   01:12:41|
|458|  Dallas|   01:18:35|
|456| Seattle|   11:15:43|
|461|  Boston|   01:15:30|
|457|New York|   02:15:37|
|460| Chicago|   03:25:31|
+---+--------+-----------+

Solution 2:[2]

Spark tries to process data within partitions, but it can shuffle the data around when doing joins, aggregates, window functions, etc. So it will run your SQL correctly.

And it should work efficiently with large dataset, except for the case when you have a data skew. As mentioned above to calculate a window function, Spark will shuffle records so that same ids land on same partitions. If you provide a dataset where 50% records have id = 0, then all those records would be dumped into a single executor and that could cause trouble.

Finally, you could write the same using Spark functions, ie. Dataset API, but the result should be equivalent.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Kombajn zbo?owy