'Scala spark UDF function that takes input and puts it in an Array

I am trying to create a Scala UDF for Spark, that can be used in Spark SQL. The objective of the function is to accept any column type as input, and put it in an ArrayType, unless the input is already an ArrayType.

Here's the code I have so far:

package com.latitudefinancial.spark.udf

import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._

import org.apache.spark.sql.SparkSession


class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
    override def call(inputObject: Object): scala.collection.Seq[_] = {
        if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
            return inputObject.asInstanceOf[scala.collection.Seq[_]]
        } else {
            return Array(inputObject)
        }
    }
}

val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)

The data may look like this:

+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item       |datatype_of_item                                              |item2   |datatype_of_item2              |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String               |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+

or this:

+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item                      |datatype_of_item                           |item2               |datatype_of_item_2                         |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]]             |scala.collection.mutable.WrappedArray$ofRef|[string 3]          |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+

The UDF function may be passed contents from item or item2 columns.

However when executing this line:

val myFuncUDF = udf(myFunc)

I get the following error:

scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)


Solution 1:[1]

Spark cannot use UDFs with this return type (Any, oder Object). You could do it without UDF I think:

val df = Seq(
 (Seq((1,"a"),(2,"b")),(1,"a"))
).toDF("item","item 2")


def wrapInArray(df:DataFrame,c:String) = if(df.schema(c).dataType.isInstanceOf[ArrayType]) col(c) else array(col(c))

df
 .withColumn("test",wrapInArray(df,"item"))
 .withColumn("test 2",wrapInArray(df,"item 2"))

gives the schema

root
 |-- item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- item 2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
 |-- test: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- test 2: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Raphael Roth