'Why does calling cache take a long time on a Spark Dataset?

I'm loading large datasets and then caching them for reference throughout my code. The code looks something like this:

val conversations = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("forward_spark_s3_credentials","true")
  .option("query", "SELECT * FROM my_table "+
                   "WHERE date <= '2017-06-03' "+
                   "AND date >= '2017-03-06' ")
  .load()
  .cache()

If I leave off the cache, the code executes quickly because Datasets are evaluated lazily. But if I put on the cache(), the block takes a long time to run.

From the online Spark UI's Event Timeline, it appears that the SQL table is being transmitted to the worker nodes and then cached on the worker nodes.

Why is cache executing immediately? The source code appears to only mark it for caching when the data is computed:

The source code for Dataset calls through to this code in CacheManager.scala when cache or persist is called:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }

Which only appears to mark for caching rather than actually caching the data. And I would expect caching to return immediately based on other answers on Stack Overflow as well.

Has anyone else seen caching happening immediately before an action is performed on the dataset? Why does this happen?



Solution 1:[1]

I now believe that, as Erik van Oosten answers, the cache() command causes the query to execute.

A close look at the code in my OP does indeed appear to show that the command is being cached. There are two key lines where I think the caching is occurring:

cachedData.add(CachedData(...))

This line creates a new CachedData object, which is added to a cachedData collection of some sort. While the cached data object may be a placeholder to hold cached data later on, it seems more likely that the CachedData object truly holds cached data.

And more importantly, this line:

sparkSession.sessionState.executePlan(planToCache).executedPlan

appears to actually execute the plan. So based on my experience, Erik van Oosten gut feeling about what's going on here, and the source code, I believe that calling cache() causes a Spark Dataset's plan to be executed.

Solution 2:[2]

cache is one of those operators that causes execution of a dataset. Spark will materialize that entire dataset to memory. If you invoke cache on an intermediate dataset that is quite big, this may take a long time.

What might be problematic is that the cached dataset is only stored in memory. When it no longer fits, partitions of the dataset get evicted and are re-calculated as needed (see https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence). With too little memory present, your program could spend a lot of time on re-calculations.

To speed things up with caching, you could give the application more memory, or you can try to use persist(MEMORY_AND_DISK) instead of cache.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Josiah Yoder
Solution 2 Erik van Oosten