'How to use a Scala class inside Pyspark

I've been searching for a while if there is any way to use a Scala class in Pyspark, and I haven't found any documentation nor guide about this subject.

Let's say I create a simple class in Scala that uses some libraries of apache-spark, something like:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • Is there any possible way to use this class in Pyspark?
  • Is it too tough?
  • Do I have to create a .py file?
  • Is there any guide that shows how to do that?

By the way I also looked at the spark code and I felt a bit lost, and I was incapable of replicating their functionality for my own purpose.



Solution 1:[1]

Yes it is possible although can be far from trivial. Typically you want a Java (friendly) wrapper so you don't have to deal with Scala features which cannot be easily expressed using plain Java and as a result don't play well with Py4J gateway.

Assuming your class is int the package com.example and have Python DataFrame called df

df = ... # Python DataFrame

you'll have to:

  1. Build a jar using your favorite build tool.

  2. Include it in the driver classpath for example using --driver-class-path argument for PySpark shell / spark-submit. Depending on the exact code you may have to pass it using --jars as well

  3. Extract JVM instance from a Python SparkContext instance:

    jvm = sc._jvm
    
  4. Extract Scala SQLContext from a SQLContext instance:

    ssqlContext = sqlContext._ssql_ctx
    
  5. Extract Java DataFrame from the df:

    jdf = df._jdf
    
  6. Create new instance of SimpleClass:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. Callexe method and wrap the result using Python DataFrame:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    

The result should be a valid PySpark DataFrame. You can of course combine all the steps into a single call.

Important: This approach is possible only if Python code is executed solely on the driver. It cannot be used inside Python action or transformation. See How to use Java/Scala function from an action or a transformation? for details.

Solution 2:[2]

As an update to @zero323's answer, given that Spark's APIs have evolved over the last six years, a recipe that works in Spark-3.2 is as follows:

  1. Compile your Scala code into a JAR file (e.g. using sbt assembly)
  2. Include the JAR file in the --jars argument to spark-submit together with any --py-files arguments needed for local package definitions
  3. Extract the JVM instance within Python:
jvm = spark._jvm
  1. Extract a Java representation of the SparkSession:
jSess = spark._jsparkSession
  1. Extract the Java representation of the PySpark DataFrame:
jdf = df._jdf
  1. Create a new instance of SimpleClass:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
  1. Call the exe method and convert its output into a PySpark DataFrame:
from pyspark.sql import DataFrame

result = DataFrame(simpleObject.exe(), spark)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Community
Solution 2 rwp