'Spark Scala Split dataframe into equal number of rows

I have a Dataframe and wish to divide it into an equal number of rows.

In other words, I want a list of dataframes where each one is a disjointed subset of the original dataframe.

Let's say the input dataframer is the following:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+

I wish to split it in K equal sized dataframes. If k = 4, then a possible results would be:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+


Solution 1:[1]

According to my understanding from your input and required output, you can create row numbers by grouping the dataframe with one groupId.

Then you can just filter dataframe comparing the row number and storing them somewhere else according to your needs.

Following is the temporary solution to your needs. You can change according to your needs

val k = 4

val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")

val newDF = dataFrame.withColumn("grouped", lit("grouping"))

var latestDF = newDF.withColumn("row", row_number() over windowSpec)

val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k

while(lowLimit < totalCount){
  latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
  lowLimit = lowLimit + k
  highLimit = highLimit + k
}

I hope this will give you a good start.

Solution 2:[2]

Another solution is to use limit and except. The following program will return an array with Dataframes that have an equal number of rows. Except the first one that may contain less rows.

var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt

while (size > 0) {
    newFrames(numberOfNew) = input.limit(limit)
    input = input.except(newFrames(numberOfNew))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

+-----+
|value|
+-----+
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

+-----+
|value|
+-----+
|    5|
|    9|
+-----+

...

Solution 3:[3]

This is an improved answer on that of Steffen Schmitz that is in fact incorrect. I have improved it for posterity and generalized it. I do wonder about the performance at scale, however.

var numberOfNew = 4  
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit

while (numberOfNew > 0) {

    if (numberOfNew == 1 && residual != 0) limit_to_use = residual  

    newFrames(numberOfNew-1) = input.limit(limit_to_use)
    input = input.except(newFrames(numberOfNew-1))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)

returns individual dataframes:

+---+---+
| _1| _2|
+---+---+
|  7|  8|
|  3|  4|
| 11| 12|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  5|  6|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  9| 10|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+

Solution 4:[4]

If you want to divide a dataset into n equal datasets

double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.

List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);

Solution 5:[5]

Dunno if this is performant compared to the other options, but I think it looks prettier at least:

import spark.implicits._
import org.apache.spark.sql.functions._

val df = Seq(1,2,3,4,5,6,7,8,9,0).toDF
val split_count = 4
val to_be_split = df.withColumn("split", monotonically_increasing_id % split_count)
val dfs = (0 until split_count).map(n => to_be_split.where('split === n).drop('split))

dfs.foreach(_.show)
+-----+
|value|
+-----+
|    1|
|    5|
|    9|
+-----+

+-----+
|value|
+-----+
|    2|
|    6|
|    0|
+-----+

+-----+
|value|
+-----+
|    3|
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

Solution 6:[6]

you could use

val result = df.randomSplit(Array(0.25,0.25,0.25,0.25), 1)

to split dataframe into smaller chunks. The array could be expanded based on required split. (second argument=1 is seed and could be changed if required)

To read use

result(0).count

or 

result(1).count 

based on how many splits are done.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Ramesh Maharjan
Solution 2
Solution 3 thebluephantom
Solution 4 mustaccio
Solution 5 Jeremy
Solution 6 TheArchitect