'Different sort results after coalesce(1) vs repartition(1)
I have the following script which returns properly sorted result:
from transforms.api import transform, Output
from pyspark.sql import functions as F
@transform(
out=Output("ri.foundry.main.dataset.29fdbff7-168a-457d-bb79-8f7508cede9d"),
)
def compute(out, ctx):
data = [("1", "2022-02-01", "older"),
("1", "2022-02-12", "older"),
("1", "2022-02-09", "new")]
df_inp = (
ctx.spark_session.createDataFrame(data, ["c1", "date", "record_status"])
.withColumn("date", F.to_date("date"))
.withColumn("record_status", F.lit("older"))
)
df_upd = (
ctx.spark_session.createDataFrame([('1',)], ['c1'])
.withColumn('date', F.to_date(F.lit('2022-02-17')))
.withColumn('record_status', F.lit('new'))
)
df = df_inp.unionByName(df_upd)
df = df.coalesce(1)
df = df.sort(F.desc('date'))
out.write_dataframe(df)
Notice df = df.coalesce(1)
before the sort
.
Question. As both df.coalesce(1)
and df.repartition(1)
should result in one partition, I tried to replace df = df.coalesce(1)
with df = df.repartition(1)
. But then the result appeared not sorted. Why?
Additional details
If I don't interfere with partitioning, the result as well appears not sorted:
Physical plan using coalesce(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- Coalesce 1
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
Physical plan using repartition(1)
:
+- *(3) Sort [date#6 DESC NULLS LAST], true, 0
+- CustomShuffleReader coalesced
+- ShuffleQueryStage 1
+- Exchange rangepartitioning(date#6 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#85]
+- ShuffleQueryStage 0
+- Exchange RoundRobinPartitioning(1), REPARTITION_WITH_NUM, [id=#83]
+- Union
:- *(1) Project [c1#0, cast(date#1 as date) AS date#6, older AS record_status#10]
: +- *(1) Scan ExistingRDD[c1#0,date#1,record_status#2]
+- *(2) Project [c1#14, 19040 AS date#16, new AS record_status#19]
+- *(2) Scan ExistingRDD[c1#14]
I am aware of the question Difference between repartition(1) and coalesce(1) where the guy says he cannot use coalesce(1)
for some reason. In my case it's the opposite.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|