'Why my Spark mapPartition function is being slowed?
My algorithm is simple: I am using Spark to distribute the processing of a process that runs a cross-validation in Python. I have 3 workers and all I do is assign a binary array to each one to run the Python function, the Driver then takes the results and stores them in a JSON. Below is a simplification of the code:
stars_subsets = []
for i in range(15):
star_subset = (i + 1, generate_random_binary_array())
stars_subsets.append(star_subset)
stars_parallelized = sc.parallelize(stars_subsets)
result = stars_parallelized \
.partitionBy(NUMBER_OF_WORKERS, partitionFunc=lambda key: key * NUMBER_OF_WORKERS // len(stars_subsets)) \
.mapPartitions(lambda records: cross_validation(records), preservesPartitioning=True) \
.collect()
Note that I generate the partitions in such a way that all nodes receive the same amount of arrays to test, and the mapPartition()
ensures that it is taken care of one at a time.
My problem is that, if I run the cross-validations only on the Driver, as the size of the binary array to be used in the cross-validation increases, the times, as expected, increase linearly (please ignore the first peaks in the pictures, they are caused by an issue with the rules of the algorithm we are testing).
Sequential code (only driver):
for records in stars_subsets:
cross_validation(records)
Sequential times result:
But when distributing the computation in Spark, there are some exceptional peaks that I don't know what is causing them. I have already logged the Garbage Collector behavior as indicated in the Tuning section of the official documentation, but I can't find any GC action that takes more than 1 second. Also, I tried changing the GC to G1GC which is recommended when Java GC is causing a bottleneck, but the performance was worse.
Distributed cross-validation times:
I am starting to believe that there is more than one GC and we are logging the logs from the wrong one, or that there is another internal Spark problem that we are not considering.
Any help would be more than welcome!
Solution 1:[1]
I found the problem. It was not due to the Garbage Collector, nor to some network phenomenon. What is happening is that Spark assigns several partitions to the same worker node, even if there is an idle worker.
When this happened, the processing done by each worker hogged all the cores, so if two partitions were assigned, they were two processes that required all the cores of the machine, thus degrading the performance.
I have opened another more specific question about this problem.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Genarito |