'Distribute group tasks evenly using pandas_udf in PySpark

I have a Spark Dataframe which contains groups of training data. Each group is identified by the "group" column.

group | feature_1 | feature_2 | label
--------------------------------------
1     | 123       | 456       | 0
1     | 553       | 346       | 1
...   | ...       | ...       | ...
2     | 623       | 498       | 0
2     | 533       | 124       | 1
...   | ...       | ...       | ...

I want to train a python ML model (lightgbm in my case) for each group in parallel.

Therefore I have the following working code:

schema = T.StructType([T.StructField("group_id", T.IntegerType(), True),
                       T.StructField("model", T.BinaryType(), True)])

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def _fit(pdf):
    group_id = pdf.loc[0, "group"]
    X = df.loc[: X_col]
    y = df.loc[:, y_col].values

    # train model
    model = ...

    out_df = pd.DataFrame(
        [[group_id, pickle.dumps(model)],
         columns=["group_id", "model"]]
    )

    return out_df

df.groupby("group").apply(_fit)

I have 10 groups in the dataset and 10 worker nodes.

Most of the times, each group is assigned to an executor and the processing is very quick.

However sometimes, more than 1 group are assigned to an executor while some other executors are left free.

This causes the processing to become very slow as the executor has to train multiple models at the same time.

Question: how do I schedule each group to train on a separate executor to avoid this problem?



Solution 1:[1]

I think you're going to want to look into playing around with setting the following 2 spark configurations:

spark.task.cpus (the number of cpus per task)

spark.executor.cores (the number of cpus per executor)

I believe setting spark.executor.cores = spark.task.cpus = (cores per worker -1) might solve your problem.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 monty