'Pyspark - iterate on a big dataframe

I'm using the following code

    events_df = []
    for i in df.collect():
        v = generate_event(i)
        events_df.append(v)
  
    events_df = spark.createDataFrame(events_df, schema)
  

to go over each dataframe item and add an event header calculated in the generate_event function

def generate_event(delta_row):
    
    header = {
        "id": 1,
        ...
    }

    row = Row(Data=delta_row)
    return EntityEvent(header, row)


class EntityEvent:
    def __init__(self, _header, _payload):
        self.header = _header
        self.payload = _payload


It works fine locally for df with few items (even with 1 000 000 items) but when we have more than 6 millions the aws glue job fail

Note: with rdd seems to be better but I can't use it because I've a problem with dates < 1900-01-01 (issue)

is there a way to chunk the dataframe and consolidate at the end ?



Solution 1:[1]

The best solution that we can preview is to use spark promise features, like adding new columns using struct and create_map functions...

events_df = (
        df
        .withColumn(
            "header",
            f.create_map(
                f.lit("id"),
                f.lit(1)
            )
        )
...

So we can create columns as much as we need and make transformations to get the required header structure

PS: this solution (add new columns to the dataframe rather than iterate on it) avoid using rdd and brings a big advantage in terms of performance !

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1