'DataFrame partitionBy to a single Parquet file (per partition)

I would like to repartition / coalesce my data so that it is saved into one Parquet file per partition. I would also like to use the Spark SQL partitionBy API. So I could do that like this:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

I've tested this and it doesn't seem to perform well. This is because there is only one partition to work on in the dataset and all the partitioning, compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?



Solution 1:[1]

I had the exact same problem and I found a way to do this using DataFrame.repartition(). The problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

Once I do that I get one parquet file per output partition, instead of multiple files.

I tested this in Python, but I assume in Scala it should be the same.

Solution 2:[2]

By definition :

coalesce(numPartitions: Int): DataFrame Returns a new DataFrame that has exactly numPartitions partitions.

You can use it to decrease the number of partitions in the RDD/DataFrame with the numPartitions parameter. It's useful for running operations more efficiently after filtering down a large dataset.

Concerning your code, it doesn't perform well because what you are actually doing is :

  1. putting everything into 1 partition which overloads the driver since it's pull all the data into 1 partition on the driver (and also it not a good practice)

  2. coalesce actually shuffles all the data on the network which may also result in performance loss.

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

The shuffle concept is very important to manage and understand. It's always preferable to shuffle the minimum possible because it is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

Concerning partitioning parquet, I suggest that you read the answer here about Spark DataFrames with Parquet Partitioning and also this section in the Spark Programming Guide for Performance Tuning.

I hope this helps !

Solution 3:[3]

It isn't much on top of @mortada's solution, but here's a little abstraction that ensures you are using the same partitioning to repartition and write, and demonstrates sorting as wel:

  def one_file_per_partition(df, path, partitions, sort_within_partitions, VERBOSE = False):
    start = datetime.now()
    (df.repartition(*partitions)
      .sortWithinPartitions(*sort_within_partitions)
      .write.partitionBy(*partitions)
      # TODO: Format of your choosing here
      .mode(SaveMode.Append).parquet(path)
      # or, e.g.:
      #.option("compression", "gzip").option("header", "true").mode("overwrite").csv(path)
    )
    print(f"Wrote data partitioned by {partitions} and sorted by {sort_within_partitions} to:" +
        f"\n  {path}\n  Time taken: {(datetime.now() - start).total_seconds():,.2f} seconds")

Usage:

one_file_per_partition(df, location, ["entity", "year", "month", "day", "status"])

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Patrick McGloin
Solution 2 Community
Solution 3 Alain