'DAG of Spark Sort application spanning two jobs

I've written a very simple Sort scala program with Spark.

object Sort {

    def main(args: Array[String]): Unit = {
        if (args.length < 2) {
            System.err.println("Usage: Sort <data_file> <save_file>" +
            " [<slices>]")
            System.exit(1)
        }


        val conf = new SparkConf().setAppName("BigDataBench Sort")
        val spark = new SparkContext(conf)
        val logger = new JobPropertiesLogger(spark,"/home/abrandon/log.csv")
        val filename = args(0)
        val save_file = args(1)
        var splits = spark.defaultMinPartitions
        if (args.length > 2){
            splits = args(2).toInt
        }
        val lines = spark.textFile(filename, splits)
        logger.start_timer()
        val data_map = lines.map(line => {
            (line, 1)
        })

        val result = data_map.sortByKey().map { line => line._1}
        logger.stop_timer()
        logger.write_log("Sort By Key: Sort App")
        result.saveAsTextFile(save_file)

        println("Result has been saved to: " + save_file)
    }



}

Now, I was thinking that since there is only one wide transformation ("sortByKey") two stages will be spanned. However I see two jobs with one stage in Job 0 and two stages for Job 1. Am I missing something?. What I don't get is the first stage of the second job. it seems to do the same job as the stage of Job 0.

Two jobsJob 0Job 1



Solution 1:[1]

The sortByKey occurs in the first job only. Here are the subtasks in the first Job:

  • spark.textFile(filename, splits)
  • data_map.sortByKey()

The second job has two tasks:

  • map: .map { line => line._1}

  • saveAsTextFile

Notice that the first task in the second job picks up where the first job (sortByKey) leaves off . You can verify this as well by seeing the execution time: just 4.4 minutes for that stage.

Now why the visualization of the second job shows all of the work done in the first job as well? That I do not know But the sortByKey work does not appear to be executed twice.

Solution 2:[2]

When a RDD deal with a sort transformation, it uses RangePartitioner. To decide which key go into which partition, one good idea is to sample the source data first. So in the first job, it is must building the partitioner. I guess like this but did not confirm strictly in the source code. Actually, earlier in the MapReduce framework, the idea to build a range partitioner via sampling is very common, and it is clear due to its transparent programming model.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 WestCoastProjects
Solution 2