'Apache Arrow in Scala: AbstractMethodError on loadBatch
I'm trying to load Arrow file into scala. But every time I call ethier arrowStreamReader.loadNextBatch()
nor arrowFileReader.loadRecordBatch(arrowBlock)
, the JVM print the error below:
An exception or error caused a run to abort: org.apache.arrow.memory.NettyAllocationManager$1.create(Lorg/apache/arrow/memory/BufferAllocator;J)Lorg/apache/arrow/memory/AllocationManager;
java.lang.AbstractMethodError: org.apache.arrow.memory.NettyAllocationManager$1.create(Lorg/apache/arrow/memory/BufferAllocator;J)Lorg/apache/arrow/memory/AllocationManager;
I cannot find out what's going on, so need your help, thanks!
Here is my repo: https://github.com/oliverdding/hpient
It's the test from src/test/scala/ArrowStreamTest.scala
Below is the test code:
test("arrow from file") {
Using(getClass.getResourceAsStream("/table_engines.arrow")) {
arrowFileStream =>
Using(
new SeekableInMemoryByteChannel(IOUtils.toByteArray(arrowFileStream))
) { channel =>
val seekableReadChannel = new SeekableReadChannel(channel)
Using(
new ArrowFileReader(
seekableReadChannel,
new RootAllocator(Integer.MAX_VALUE)
)
) { arrowFileReader =>
val root = arrowFileReader.getVectorSchemaRoot
println(s"schema is ${root.getSchema}")
val arrowBlocks = arrowFileReader.getRecordBlocks
println(s"num of arrow blocks is ${arrowBlocks.size()}")
arrowBlocks.asScala.foreach { arrowBlock =>
if(!arrowFileReader.loadRecordBatch(arrowBlock)) {
throw new IOException("Expected to read record batch")
}
val fieldVectorItr = root.getFieldVectors.iterator()
val sparkVectors = fieldVectorItr.asScala
.map[ColumnVector] { fieldVector =>
println(s"parsing the vector $fieldVector")
new ArrowColumnVector(fieldVector)
}
.toArray
Using(new ColumnarBatch(sparkVectors, root.getRowCount)) {
columnarBatch =>
println("Got it --->")
println(
s"rows: ${columnarBatch.numRows()}; cols: ${columnarBatch.numCols()}"
)
}
}
}
}
}
}
And my sbt file:
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.7"
ThisBuild / organization := "com.github"
val arrowVersion = "6.0.1"
lazy val root = (project in file("."))
.settings(
name := "hpient",
idePackagePrefix := Some("com.github.oliverdding.hpient"),
libraryDependencies ++= Seq(
// Apache Spark
"org.apache.spark" %% "spark-core" % "3.2.0",
"org.apache.spark" %% "spark-sql" % "3.2.0" % "provided",
// Apache Arrow
"org.apache.arrow" % "arrow-compression" % arrowVersion,
"org.apache.arrow" % "arrow-format" % arrowVersion,
"org.apache.arrow" % "arrow-vector" % arrowVersion,
"org.apache.arrow" % "arrow-memory" % arrowVersion pomOnly(),
// STTP
"com.softwaremill.sttp.client3" %% "core" % "3.3.18",
// Scala Test
"org.scalatest" %% "scalatest" % "3.2.10" % Test
)
)
Solution 1:[1]
Not sure I can help, but your example code helped me a lot for my own purpose; didn't know about the ColumnarBatch before. Interesting to go back to rows and a DataFrame from there (I get an arrow buffer from somewhere).
We use something like:
while (reader.loadNextBatch()) {
reader.getRecordBlocks.asScala.map { row =>
val fieldVectorItr = root.getFieldVectors.iterator()
val sparkVectors = fieldVectorItr.asScala
.map[ColumnVector] { fieldVector =>
new ArrowColumnVector(fieldVector)
}
.toArray
val cBatch = new ColumnarBatch(sparkVectors, root.getRowCount)
...
Still on spark 3.1.2 and arrow 5 here. As arrow libraries in our project we have arrow-vector and arrow-memory-netty.
Solution 2:[2]
I was having the same error, then I realized that there is some issue with the spark implementation of arrow. If you remove from your sbt the calls to spark it should solve your problem. Also acording to https://arrow.apache.org/docs/java/install.html you should use arrow-memory-netty:
https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory-netty
libraryDependencies += "org.apache.arrow" % "arrow-memory-netty" % arrow_version
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Matthias |
Solution 2 | Nicolás Suescún |