'how to sequentially iterate rows in Pyspark Dataframe

I have a Spark DataFrame like this:

 +-------+------+-----+---------------+
 |Account|nature|value|           time|
 +-------+------+-----+---------------+
 |      a|     1|   50|10:05:37:293084|
 |      a|     1|   50|10:06:46:806510|
 |      a|     0|   50|11:19:42:951479|
 |      a|     1|   40|19:14:50:479055|
 |      a|     0|   50|16:56:17:251624|
 |      a|     1|   40|16:33:12:133861|
 |      a|     1|   20|17:33:01:385710|
 |      b|     0|   30|12:54:49:483725|
 |      b|     0|   40|19:23:25:845489|
 |      b|     1|   30|10:58:02:276576|
 |      b|     1|   40|12:18:27:161290|
 |      b|     0|   50|12:01:50:698592|
 |      b|     0|   50|08:45:53:894441|
 |      b|     0|   40|17:36:55:827330|
 |      b|     1|   50|17:18:41:728486|
 +-------+------+-----+---------------+

I want to compare nature column of one row to other rows with the same Account and value,I should look forward, and add new column named Repeated. The new column get true for both rows, if nature changed, from 1 to 0 or vise versa. For example, the above dataframe should look like this:

   +-------+------+-----+---------------+--------+
   |Account|nature|value|           time|Repeated|
   +-------+------+-----+---------------+--------+
   |      a|     1|   50|10:05:37:293084|   true |
   |      a|     1|   50|10:06:46:806510|    true|
   |      a|     0|   50|11:19:42:951479|   true |
   |      a|     0|   50|16:56:17:251624|   true |
   |      b|     0|   50|08:45:53:894441|   true |
   |      b|     0|   50|12:01:50:698592|   false|
   |      b|     1|   50|17:18:41:728486|   true |
   |      a|     1|   40|16:33:12:133861|   false|
   |      a|     1|   40|19:14:50:479055|   false|
   |      b|     1|   40|12:18:27:161290|    true|
   |      b|     0|   40|17:36:55:827330|   true |
   |      b|     0|   40|19:23:25:845489|   false|
   |      b|     1|   30|10:58:02:276576|    true|
   |      b|     0|   30|12:54:49:483725|   true |
   |      a|     1|   20|17:33:01:385710|   false|
   +-------+------+-----+---------------+--------+              
        

My solution is that I have to do group by or window on Account and value columns; then in each group, compare nature of each row to nature of other rows and as a result of comperation, Repeated column become full. I did this calculation with Spark Window functions. Like this:

windowSpec  = Window.partitionBy("Account","value").orderBy("time")

df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()

The result was like this which is not the result that I wanted:

 +-------+------+-----+---------------+--------+
 |Account|nature|value|           time|Repeated|
 +-------+------+-----+---------------+--------+
 |      a|     1|   50|10:05:37:293084|   false|
 |      a|     1|   50|10:06:46:806510|    true|
 |      a|     0|   50|11:19:42:951479|   false|
 |      a|     0|   50|16:56:17:251624|   false|
 |      b|     0|   50|08:45:53:894441|   false|
 |      b|     0|   50|12:01:50:698592|    true|
 |      b|     1|   50|17:18:41:728486|   false|
 |      a|     1|   40|16:33:12:133861|   false|
 |      a|     1|   40|19:14:50:479055|   false|
 |      b|     1|   40|12:18:27:161290|    true|
 |      b|     0|   40|17:36:55:827330|   false|
 |      b|     0|   40|19:23:25:845489|   false|
 |      b|     1|   30|10:58:02:276576|    true|
 |      b|     0|   30|12:54:49:483725|   false|
 |      a|     1|   20|17:33:01:385710|   false|
 +-------+------+-----+---------------+--------+

UPDATE: To explain more, if we suppose the first Spark Dataframe is named "df",in the following, I write what exactly want to do in each group of "Account" and "value":

a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
    j = i+1
for j in j<=len(group):
    if a.loc[i,'nature']!=a.loc[j,'nature'] and  a.loc[j,'repeated']==False:
             a.loc[i,'repeated'] = True
             a.loc[j,'repeated'] = True

Would you please guide me how to do that using Pyspark Window?

Any help is really appreciated.



Solution 1:[1]

Problem solved. Even though this way costs a lot,but it's ok.

  def check(part):
    df = part
    size = len(df)
    for i in range(size):
      if (df.loc[i,'repeated'] == True):
          continue
      else:
          for j in range((i+1),size):
            if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
                df.loc[j,'repeated'] = True
                df.loc[i,'repeated'] = True
                break
  return df

df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()

Update1: Another solution without any iterations.

def check(df):
   df = df.sort_values('verified_time')
   df['index'] = df.index
   df['IS_REPEATED'] = 0
   df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
   df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
   df1['IS_REPEATED']=df1['nature']^df2['nature']
   df3 = df1.sort_values(['index'],ascending=[True])
   df = df3.drop(['index'],axis=1)
   return df

df = df.groupby("account_id", "amount").applyInPandas(gf.check2,schema=gf.get_schema('trx'))

Solution 2:[2]

You actually need to guarantee that the order you see in your dataframe is the actual order. Can you do that? You need a column to sequence that what happened did happen in that order. Inserting new data into a dataframe doesn't guarantee it's order.

A window & Lag will allow you to look at the previous rows value and make the required adjustment.
FYI: I use coalesce here as if it's the first row there is no value for it to compare with. consider using the second parameter to coalesce as you see fit with what should happen with the first value in the account.)

If you need it look at monotonically increasing function. It may help you to create the order by value that is required for us to deterministically look at this data.

from pyspark.sql.functions import lag 
from pyspark.sql.functions import lit 
from pyspark.sql.functions import coalesce
from pyspark.sql.window import Window

spark.sql("create table nature (Account string,nature int, value int, order int)"); 
spark.sql("insert into nature values ('a', 1, 50,1), ('a', 1, 40,2),('a',0,50,3),('b',0,30,4),('b',0,40,5),('b',1,30,6),('b',1,40,7)")
windowSpec  = Window.partitionBy("Account").orderBy("order")
nature = spark.table("nature");
nature.withColumn("Repeated", coalesce( lead(nature['nature']).over(windowSpec) != nature['nature'], lit(True)) ).show()
|Account|nature|value|order|Repeated|
+-------+------+-----+-----+--------+
|      b|     0|   30|    4|   false|
|      b|     0|   40|    5|    true|
|      b|     1|   30|    6|   false|
|      b|     1|   40|    7|    true|
|      a|     1|   50|    1|   false|
|      a|     1|   40|    2|    true|
|      a|     0|   50|    3|    true|
+-------+------+-----+-----+--------+

EDIT: It's not clear from your description if I should look forward or backward. I have changed my code to look forward a row as this is consistent with account 'B' in your output. However it doesn't seem like the logic for Account 'A' is identical to the logic for 'B' in your sample output. (Or I don't understand a subtly of starting on '1' instead of starting on '0'.) If you want to look forward a row use lead, if you want to look back a row use lag.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2