'Apache Arrow in Scala: AbstractMethodError on loadBatch

I'm trying to load Arrow file into scala. But every time I call ethier arrowStreamReader.loadNextBatch() nor arrowFileReader.loadRecordBatch(arrowBlock), the JVM print the error below:

An exception or error caused a run to abort: org.apache.arrow.memory.NettyAllocationManager$1.create(Lorg/apache/arrow/memory/BufferAllocator;J)Lorg/apache/arrow/memory/AllocationManager; 
java.lang.AbstractMethodError: org.apache.arrow.memory.NettyAllocationManager$1.create(Lorg/apache/arrow/memory/BufferAllocator;J)Lorg/apache/arrow/memory/AllocationManager;

I cannot find out what's going on, so need your help, thanks!

Here is my repo: https://github.com/oliverdding/hpient

It's the test from src/test/scala/ArrowStreamTest.scala

Below is the test code:

  test("arrow from file") {
    Using(getClass.getResourceAsStream("/table_engines.arrow")) {
      arrowFileStream =>
        Using(
          new SeekableInMemoryByteChannel(IOUtils.toByteArray(arrowFileStream))
        ) { channel =>
          val seekableReadChannel = new SeekableReadChannel(channel)
          Using(
            new ArrowFileReader(
              seekableReadChannel,
              new RootAllocator(Integer.MAX_VALUE)
            )
          ) { arrowFileReader =>
            val root = arrowFileReader.getVectorSchemaRoot
            println(s"schema is ${root.getSchema}")

            val arrowBlocks = arrowFileReader.getRecordBlocks
            println(s"num of arrow blocks is ${arrowBlocks.size()}")

            arrowBlocks.asScala.foreach { arrowBlock =>
              if(!arrowFileReader.loadRecordBatch(arrowBlock)) {
                throw new IOException("Expected to read record batch")
              }
              val fieldVectorItr = root.getFieldVectors.iterator()
              val sparkVectors = fieldVectorItr.asScala
                .map[ColumnVector] { fieldVector =>
                  println(s"parsing the vector $fieldVector")
                  new ArrowColumnVector(fieldVector)
                }
                .toArray
              Using(new ColumnarBatch(sparkVectors, root.getRowCount)) {
                columnarBatch =>
                  println("Got it --->")
                  println(
                    s"rows: ${columnarBatch.numRows()}; cols: ${columnarBatch.numCols()}"
                  )
              }
            }
          }
        }
    }
  }

And my sbt file:

ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.7"
ThisBuild / organization := "com.github"

val arrowVersion = "6.0.1"

lazy val root = (project in file("."))
  .settings(
    name := "hpient",
    idePackagePrefix := Some("com.github.oliverdding.hpient"),
    libraryDependencies ++= Seq(
      // Apache Spark
      "org.apache.spark" %% "spark-core" % "3.2.0",
      "org.apache.spark" %% "spark-sql" % "3.2.0" % "provided",
      // Apache Arrow
      "org.apache.arrow" % "arrow-compression" % arrowVersion,
      "org.apache.arrow" % "arrow-format" % arrowVersion,
      "org.apache.arrow" % "arrow-vector" % arrowVersion,
      "org.apache.arrow" % "arrow-memory" % arrowVersion pomOnly(),
      // STTP
      "com.softwaremill.sttp.client3" %% "core" % "3.3.18",
      // Scala Test
      "org.scalatest" %% "scalatest" % "3.2.10" % Test
    )
  )


Solution 1:[1]

Not sure I can help, but your example code helped me a lot for my own purpose; didn't know about the ColumnarBatch before. Interesting to go back to rows and a DataFrame from there (I get an arrow buffer from somewhere).

We use something like:

    while (reader.loadNextBatch()) {
      reader.getRecordBlocks.asScala.map { row =>
        val fieldVectorItr = root.getFieldVectors.iterator()
        val sparkVectors = fieldVectorItr.asScala
          .map[ColumnVector] { fieldVector =>
            new ArrowColumnVector(fieldVector)
          }
          .toArray
        val cBatch = new ColumnarBatch(sparkVectors, root.getRowCount)
        ...

Still on spark 3.1.2 and arrow 5 here. As arrow libraries in our project we have arrow-vector and arrow-memory-netty.

Solution 2:[2]

I was having the same error, then I realized that there is some issue with the spark implementation of arrow. If you remove from your sbt the calls to spark it should solve your problem. Also acording to https://arrow.apache.org/docs/java/install.html you should use arrow-memory-netty:

https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-netty

libraryDependencies += "org.apache.arrow" % "arrow-memory-netty" % arrow_version

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Matthias
Solution 2 Nicolás Suescún