'Programmatically add/remove executors to a Spark Session

I'm looking for a reliable way in Spark (v2+) to programmatically adjust the number of executors in a session.

I know about dynamic allocation and the ability to configure spark executors on creation of a session (e.g. with --num-executors), but neither of these options are very useful to me because of the nature of my Spark job.

My spark job

The job performs the following steps on a large amount of data:

  1. Perform some aggregations / checks on the data
  2. Load the data into Elasticsearch (ES cluster is typically much smaller than Spark cluster)

The problem

  • If I use the full set of available Spark resources, I will very quickly overload Elasticsearch and potentially even knock over the Elasticsearch nodes.
  • If I use a small enough number of spark executors so as not overwhelm Elasticsearch, step 1 takes a lot longer than it needs to (because it has a small % of the available spark resources)

I appreciate that I can split this job into two jobs which are executed separately with difference Spark resource profiles, but what I really want is to programatically set the number of executors to X at a particular point in my Spark script (before the Elasticsearch load begins). This seems like a useful thing to be able to do generally.

My initial attempt

I played around a bit with changing settings and found something which sort of works, but it feels like a hacky way of doing something which should be doable in a more standardised and supported way.

My attempt (this is just me playing around):

def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect { 
  case bm if !bm.isDriver => bm
}

def reduceExecutors(totalNumber: Int): Unit = {
  //TODO throw error if totalNumber is more than current
  logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
  spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  val killedExecutors = scala.collection.mutable.ListBuffer[String]()
  while (getExecutors.size > totalNumber) {
      val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
      val executorsToKill =  Random.shuffle(executorIds).take(executorIds.size - totalNumber)
      spark.sparkContext.killExecutors(executorsToKill)
      killedExecutors ++= executorsToKill
      Thread.sleep(1000)
  }
}

def increaseExecutors(totalNumber: Int): Unit = {
  //TODO throw error if totalNumber is less than current
  logger.info(s"""Attempting to increase number of executors to $totalNumber""")
  spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
  while (getExecutors.size < totalNumber) {
      Thread.sleep(1000)
  }
}


Solution 1:[1]

One thing you can try is to call

val dfForES = df.coalesce(numberOfParallelElasticSearchUploads) 

before step #2. This would reduce the number of partitions without shuffling overhead and ensure that only max numberOfParallelElasticSearchUploads executors are sending data to ES in parallel while the rest of them are sitting idle.

If you're running your job on a shared cluster, I'd still recommend enabling dynamic allocation to release these idle executors for a better resource utilization.

Solution 2:[2]

I was looking for a way to programmatically adjust the number of executors in pyspark and this was the top result. Here is what I've gathered from Will's question and from poking around with py4j:

# Create the spark session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(... your configs ...).getOrCreate()

# Increase cluster to 5 executors:
spark._jsparkSession.sparkContext().requestTotalExecutors(5, 0, sc._jvm.PythonUtils.toScalaMap({}))

# Decrease cluster back to zero executors:
spark._jsparkSession.sparkContext().requestTotalExecutors(0, 0, sc._jvm.PythonUtils.toScalaMap({}))
javaExecutorIds = spark._jsparkSession.sparkContext().getExecutorIds()
executorIds = [javaExecutorIds.apply(i) for i in range(javaExecutorIds.length())]
print(f'Killing executors {executorIds}')
spark._jsparkSession.sparkContext().killExecutors(javaExecutorIds)

I hope that saves someone else from excessive googling.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Denis Makarenko
Solution 2 Lou Zell