'Pyspark - iterate on a big dataframe
I'm using the following code
events_df = []
for i in df.collect():
v = generate_event(i)
events_df.append(v)
events_df = spark.createDataFrame(events_df, schema)
to go over each dataframe item and add an event header calculated in the generate_event
function
def generate_event(delta_row):
header = {
"id": 1,
...
}
row = Row(Data=delta_row)
return EntityEvent(header, row)
class EntityEvent:
def __init__(self, _header, _payload):
self.header = _header
self.payload = _payload
It works fine locally for df
with few items (even with 1 000 000 items) but when we have more than 6 millions the aws glue job fail
Note: with rdd
seems to be better but I can't use it because I've a problem with dates < 1900-01-01 (issue)
is there a way to chunk the dataframe and consolidate at the end ?
Solution 1:[1]
The best solution that we can preview is to use spark promise features, like adding new columns using struct
and create_map
functions...
events_df = (
df
.withColumn(
"header",
f.create_map(
f.lit("id"),
f.lit(1)
)
)
...
So we can create columns as much as we need and make transformations to get the required header structure
PS: this solution (add new columns to the dataframe rather than iterate on it) avoid using rdd
and brings a big advantage in terms of performance !
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |