'Spark Scala Split dataframe into equal number of rows
I have a Dataframe and wish to divide it into an equal number of rows.
In other words, I want a list of dataframes where each one is a disjointed subset of the original dataframe.
Let's say the input dataframer is the following:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 100|[15.5097750043269...|
|15.509775004326936| 0| 101|[15.5097750043269...|
|15.509775004326936| 0| 102|[15.5097750043269...|
|15.509775004326936| 0| 103|[15.5097750043269...|
|15.509775004326936| 0| 104|[15.5097750043269...|
|15.509775004326936| 0| 105|[15.5097750043269...|
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 108|[15.5097750043269...|
|15.509775004326936| 0| 109|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| 111|[15.5097750043269...|
|15.509775004326936| 0| 112|[15.5097750043269...|
|15.509775004326936| 0| 113|[15.5097750043269...|
|15.509775004326936| 0| 114|[15.5097750043269...|
|15.509775004326936| 0| 115|[15.5097750043269...|
| 43.01955000865387| 0| 116|[43.0195500086538...|
+------------------+-----------+-----+--------------------+
I wish to split it in K equal sized dataframes. If k = 4, then a possible results would be:
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 106|[15.5097750043269...|
|15.509775004326936| 0| 107|[15.5097750043269...|
|15.509775004326936| 0| 110|[15.5097750043269...|
|15.509775004326936| 0| 111|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 104|[15.5097750043269...|
|15.509775004326936| 0| 108|[15.5097750043269...|
|15.509775004326936| 0| 112|[15.5097750043269...|
|15.509775004326936| 0| 114|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 100|[15.5097750043269...|
|15.509775004326936| 0| 105|[15.5097750043269...|
|15.509775004326936| 0| 109|[15.5097750043269...|
|15.509775004326936| 0| 115|[15.5097750043269...|
+------------------+-----------+-----+--------------------+
+------------------+-----------+-----+--------------------+
| eventName|original_dt|count| features|
+------------------+-----------+-----+--------------------+
|15.509775004326936| 0| 101|[15.5097750043269...|
|15.509775004326936| 0| 102|[15.5097750043269...|
|15.509775004326936| 0| 103|[15.5097750043269...|
|15.509775004326936| 0| 113|[15.5097750043269...|
| 43.01955000865387| 0| 116|[43.0195500086538...|
+------------------+-----------+-----+--------------------+
Solution 1:[1]
According to my understanding from your input and required output, you can create row numbers
by grouping
the dataframe
with one groupId
.
Then you can just filter
dataframe
comparing the row number
and storing
them somewhere else according to your needs.
Following is the temporary solution to your needs. You can change according to your needs
val k = 4
val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")
val newDF = dataFrame.withColumn("grouped", lit("grouping"))
var latestDF = newDF.withColumn("row", row_number() over windowSpec)
val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k
while(lowLimit < totalCount){
latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
lowLimit = lowLimit + k
highLimit = highLimit + k
}
I hope this will give you a good start.
Solution 2:[2]
Another solution is to use limit and except. The following program will return an array with Dataframes that have an equal number of rows. Except the first one that may contain less rows.
var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
while (size > 0) {
newFrames(numberOfNew) = input.limit(limit)
input = input.except(newFrames(numberOfNew))
size = size - limit
numberOfNew = numberOfNew - 1
}
newFrames.foreach(_.show)
+-----+
|value|
+-----+
| 7|
+-----+
+-----+
|value|
+-----+
| 4|
| 8|
+-----+
+-----+
|value|
+-----+
| 5|
| 9|
+-----+
...
Solution 3:[3]
This is an improved answer on that of Steffen Schmitz that is in fact incorrect. I have improved it for posterity and generalized it. I do wonder about the performance at scale, however.
var numberOfNew = 4
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit
while (numberOfNew > 0) {
if (numberOfNew == 1 && residual != 0) limit_to_use = residual
newFrames(numberOfNew-1) = input.limit(limit_to_use)
input = input.except(newFrames(numberOfNew-1))
size = size - limit
numberOfNew = numberOfNew - 1
}
newFrames.foreach(_.show)
val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)
returns individual dataframes:
+---+---+
| _1| _2|
+---+---+
| 7| 8|
| 3| 4|
| 11| 12|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 5| 6|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 9| 10|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 1| 2|
+---+---+
Solution 4:[4]
If you want to divide a dataset into n equal datasets
double[] arraySplit = {1,1,1...,n}; //you can also divide into ratio if you change the numbers.
List<Dataset<String>> datasetList = dataset.randomSplitAsList(arraySplit,1);
Solution 5:[5]
Dunno if this is performant compared to the other options, but I think it looks prettier at least:
import spark.implicits._
import org.apache.spark.sql.functions._
val df = Seq(1,2,3,4,5,6,7,8,9,0).toDF
val split_count = 4
val to_be_split = df.withColumn("split", monotonically_increasing_id % split_count)
val dfs = (0 until split_count).map(n => to_be_split.where('split === n).drop('split))
dfs.foreach(_.show)
+-----+
|value|
+-----+
| 1|
| 5|
| 9|
+-----+
+-----+
|value|
+-----+
| 2|
| 6|
| 0|
+-----+
+-----+
|value|
+-----+
| 3|
| 7|
+-----+
+-----+
|value|
+-----+
| 4|
| 8|
+-----+
Solution 6:[6]
you could use
val result = df.randomSplit(Array(0.25,0.25,0.25,0.25), 1)
to split dataframe into smaller chunks. The array could be expanded based on required split. (second argument=1 is seed and could be changed if required)
To read use
result(0).count
or
result(1).count
based on how many splits are done.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Ramesh Maharjan |
Solution 2 | |
Solution 3 | thebluephantom |
Solution 4 | mustaccio |
Solution 5 | Jeremy |
Solution 6 | TheArchitect |