'Spark scala data frame udf returning rows

Say I have an dataframe which contains a column (called colA) which is a seq of row. I want to to append a new field to each record of colA. (And the new filed is associated with the former record, so I have to write an udf.) How should I write this udf?

I have tried to write a udf, which takes colA as input, and output Seq[Row] where each record contains the new filed. But the problem is the udf cannot return Seq[Row]/ The exception is 'Schema for type org.apache.spark.sql.Row is not supported'. What should I do?

The udf that I wrote: val convert = udf[Seq[Row], Seq[Row]](blablabla...) And the exception is java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Row is not supported



Solution 1:[1]

since spark 2.0 you can create UDFs which return Row / Seq[Row], but you must provide the schema for the return type, e.g. if you work with an Array of Doubles :

val schema = ArrayType(DoubleType)

val myUDF = udf((s: Seq[Row]) => {
  s // just pass data without modification
}, schema)

But I cant really imagine where this is useful, I would rather return tuples or case classes (or Seq thereof) from the UDFs.

EDIT : It could be useful if your row contains more than 22 fields (limit of fields for tuples/case classes)

Solution 2:[2]

This is an old question, I just wanted to update it according to the new version of Spark.

Since Spark 3.0.0, the method that @Raphael Roth has mentioned is deprecated. Hence, you might get an AnalysisException. The reason is that the input closure using this method doesn't have type checking and the behavior might be different from what we expect in SQL when it comes to null values.

If you really know what you're doing, you need to set spark.sql.legacy.allowUntypedScalaUDF configuration to true.

Another solution is to use case class instead of schema. For example,

case class Foo(field1: String, field2: String)

val convertFunction: Seq[Row] => Seq[Foo] = input => {
    input.map {
        x => // do something with x and convert to Foo
    }
}

val myUdf = udf(convertFunction)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Iraj Hedayati