'Standard Deviation coming NaN in Pyspark rolling window

I have a dataset with 4 sensor values, 'volt', 'pressure', 'rotate' and 'vibration'. For these sensor values I am calculating rolling mean and rolling standard deviation over a fixed window.

The code to calculate rolling mean and standard deviation is

def preprocessing_df(final_rotate):
    telemetry_spark = spark.createDataFrame(final_rotate,verifySchema=False)        
    lags = [12, 24, 36]
    rolling_features = ['volt_forecast','rotate_forecast', 'pressure_forecast', 'vibration_forecast']
    # align the data
    for lag_n in lags:
        wSpec = Window.partitionBy('MachinedID').orderBy('Datetime').rowsBetween(1-lag_n, 0)
        for col_name in rolling_features:
            telemetry_spark = telemetry_spark.withColumn(col_name+'_rollingmean_'+str(lag_n), 
                                               F.avg(col(col_name)).over(wSpec))
            telemetry_spark = telemetry_spark.withColumn(col_name+'_rollingstd_'+str(lag_n), 
                                               F.stddev(col(col_name)).over(wSpec))
            
    # Calculate lag values...
    telemetry_feat = (telemetry_spark.withColumn("dt_truncated", dt_truncated)
                        .drop('volt_forecast', 'rotate_forecast', 'pressure_forecast', 'vibration_forecast')
                        .fillna(0)
                        .groupBy("MachinedID","dt_truncated")
                        .agg(F.mean('volt_forecast_rollingmean_12').alias('volt_forecast_rollingmean_12'),
                               F.mean('rotate_forecast_rollingmean_12').alias('rotate_forecast_rollingmean_12'), 
                               F.mean('pressure_forecast_rollingmean_12').alias('pressure_forecast_rollingmean_12'), 
                               F.mean('vibration_forecast_rollingmean_12').alias('vibration_forecast_rollingmean_12'), 
                               F.mean('volt_forecast_rollingmean_24').alias('volt_forecast_rollingmean_24'),
                               F.mean('rotate_forecast_rollingmean_24').alias('rotate_forecast_rollingmean_24'), 
                               F.mean('pressure_forecast_rollingmean_24').alias('pressure_forecast_rollingmean_24'), 
                               F.mean('vibration_forecast_rollingmean_24').alias('vibration_forecast_rollingmean_24'),
                               F.mean('volt_forecast_rollingmean_36').alias('volt_forecast_rollingmean_36'),
                               F.mean('vibration_forecast_rollingmean_36').alias('vibration_forecast_rollingmean_36'),
                               F.mean('rotate_forecast_rollingmean_36').alias('rotate_forecast_rollingmean_36'), 
                               F.mean('pressure_forecast_rollingmean_36').alias('pressure_forecast_rollingmean_36'), 
                               F.stddev('volt_forecast_rollingstd_12').alias('volt_forecast_rollingstd_12'),
                               F.stddev('rotate_forecast_rollingstd_12').alias('rotate_forecast_rollingstd_12'), 
                               F.stddev('pressure_forecast_rollingstd_12').alias('pressure_forecast_rollingstd_12'), 
                               F.stddev('vibration_forecast_rollingstd_12').alias('vibration_forecast_rollingstd_12'), 
                               F.stddev('volt_forecast_rollingstd_24').alias('volt_forecast_rollingstd_24'),
                               F.stddev('rotate_forecast_rollingstd_24').alias('rotate_forecast_rollingstd_24'), 
                               F.stddev('pressure_forecast_rollingstd_24').alias('pressure_forecast_rollingstd_24'), 
                               F.stddev('vibration_forecast_rollingstd_24').alias('vibration_forecast_rollingstd_24'),
                               F.stddev('volt_forecast_rollingstd_36').alias('volt_forecast_rollingstd_36'),
                               F.stddev('rotate_forecast_rollingstd_36').alias('rotate_forecast_rollingstd_36'), 
                               F.stddev('pressure_forecast_rollingstd_36').alias('pressure_forecast_rollingstd_36'), 
                               F.stddev('vibration_forecast_rollingstd_36').alias('vibration_forecast_rollingstd_36'), ))
    return telemetry_feat   

where 'final_rotate' is a dataframe having all sensor values.

When I run the above function, I am able to get values for rolling mean but for rolling standard deviation, it is giving NaN for the whole dataframe and time window.

Something like this,

enter image description here

I am unable to find out the cause for the same. Please help.



Solution 1:[1]

When you have NaN values in the column, F.stddev returns NaN:

import pandas as pd
pdf = pd.DataFrame({'col1':  [1,"x",4], 'col2':  [1,2,4]})
pdf['col1'] = pd.to_numeric(pdf['col1'], errors='coerce')
print(pdf)
#    col1  col2
# 0   1.0     1
# 1   NaN     2
# 2   4.0     4

df = spark.createDataFrame(pdf)
df.groupBy().agg(F.stddev('col1'), F.stddev('col2')).show()
# +-----------------+------------------+
# |stddev_samp(col1)| stddev_samp(col2)|
# +-----------------+------------------+
# |              NaN|1.5275252316519465|
# +-----------------+------------------+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ZygD