'convert df.apply to spark to run parallely iusing all the cores
We have a panda dataframe that are using. We have a function we use in retail data which runs on a daily basis row by row to calculate the item to item difference like below
for itemiter in range(len(RetDf)):
column = RetDf.loc[itemiter , "username"]
RetDf[column] = RetDf.apply(lambda row: ItemDiff(RetDf.loc[itemiter, 'Val'], row['Val']), axis=1)
is there a way to convert it to sparkcontext rdd parallelize to use all cores
Sample Data with dummy values for retdf:
username UserId Val
abc75757 1234 [0.0 , 0.0, 1.0, 2.0]
abcraju 4567 [0.0 , 0.0, 1.0, 2.0]
xyzuser 4343 [0.0 , 0.0, 1.0, 2.0]
user4abc 2323 [0.0 , 0.0, 1.0, 2.0]
FinalOutput:
username UserId Val abc75757 abcraju xyzuser user4abc
abc75757 1234 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
abcraju 4567 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
xyzuser 4343 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
user4abc 2323 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 4.0 1.0
ItemDiff
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return(distance_t)
Solution 1:[1]
You've turned a combinations problem in to a product problem, more than doubling the necessary calculations, I'm not sure of a good pure pandas way of doing this... but this should still be much faster even without parallelization.
a = [["abc75757", 1234, [4.0, 0.0, 1.0, 4.0]],
["abcraju", 4567, [0.0, 0.0, 3.0, 2.0]],
["xyzuser", 4343, [0.0, 1.0, 1.0, 2.0]],
["user4abc", 2323, [0.0, 0.0, 1.0, 3.0]]]
RetDf = pd.DataFrame(a, columns=['username', 'UserId', 'Val'])
from itertools import combinations
combos = combinations(RetDf[['username', 'Val']].to_numpy(), r=2)
combos = [(x[0][0], x[1][0], fastdtw(x[0][1], x[1][1])[0]) for x in combos]
permuts = [(x[0], x[1], x[2]) for x in combos] + [(x[1], x[0], x[2]) for x in combos]
df = pd.DataFrame(permuts, columns=['username', 'pair', 'value']).pivot(index='username', columns='pair').droplevel(0, axis=1).reset_index()
output = RetDf.merge(df).fillna(0)
print(output)
Output:
username UserId Val abc75757 abcraju user4abc xyzuser
0 abc75757 1234 [4.0, 0.0, 1.0, 4.0] 0.0 8.0 5.0 6.0
1 abcraju 4567 [0.0, 0.0, 3.0, 2.0] 8.0 0.0 2.0 3.0
2 xyzuser 4343 [0.0, 1.0, 1.0, 2.0] 6.0 3.0 1.0 0.0
3 user4abc 2323 [0.0, 0.0, 1.0, 3.0] 5.0 2.0 0.0 1.0
Solution 2:[2]
There is Pandas on Spark which should look into but here it won't help as they don't have a direct translation for pd.loc
It seems your doing a cartesian join which is expensive but this is what I suggest you do:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
import pyspark.sql.functions as f
from pyspark.sql.types import *
from fastdtw import fastdtw
#Create PySpark SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("SparkByExamples.com") \
.getOrCreate()
#Create PySpark DataFrame from Pandas
raw_data = { 'username' :[ 'abc75757', 'abcraju', 'xyzuser', 'user4abc'], 'UserId':[ 1234, 4567,4343,2323], 'Val': [[0.0 , 0.0, 1.0, 2.0] ,[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0]]}
RetDf = pd.DataFrame(raw_data)
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return float(distance_t)
itemDiff = udf(ItemDiff, FloatType()) # create UDF to do work.
sparkDF=spark.createDataFrame(RetDf)
cartesianJoin = sparkDF.crossJoin(sparkDF)\ # this is expensive but necessary
.toDF("UserId","Val","username","myUserId","myVal","my_username")\ #renaming the columns for convience
.select( itemDiff( "Val", "myVal" ).alias("dist"), f.col("*") )\ # run UDF
.groupBy( "Val","UserId","username" )\ # this enables us to pivot
.pivot("my_username")\ #exposes the calculation be careful to uses the 'exploded' column.
.max("dist").show() #handy tick as their is only 1 value so max is just the number.
+--------------------+------+--------+--------+-------+--------+-------+
| Val|UserId|username|abc75757|abcraju|user4abc|xyzuser|
+--------------------+------+--------+--------+-------+--------+-------+
|[0.0, 0.0, 1.0, 2.0]| 1234|abc75757| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 4343| xyzuser| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 4567| abcraju| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 2323|user4abc| 0.0| 0.0| 0.0| 0.0|
+--------------------+------+--------+--------+-------+--------+-------+
UDF documentation here. If you can rework your logic to avoid using a UDF, it would run faster, but you'd need to learn what spark sql functions you could use to do the same things. This might be a good time to review if you really need all the data to calculate the columns or if you can simplify your logic.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 |