'spark3.2.1 cache throw NullPointerException
a job running some time about 1 day will throw the exception when i upgrade spark version to 3.2.1
i set it a driver and 2 executors executor allocate 2g memory and old generation usage rate about 50%, i think it is health
Heap
par new generation total 307840K, used 239453K [0x0000000080000000, 0x0000000094e00000, 0x00000000aaaa0000)
eden space 273664K, 81% used [0x0000000080000000, 0x000000008da4bdd0, 0x0000000090b40000)
from space 34176K, 46% used [0x0000000092ca0000, 0x0000000093c2b6b8, 0x0000000094e00000)
to space 34176K, 0% used [0x0000000090b40000, 0x0000000090b40000, 0x0000000092ca0000)
concurrent mark-sweep generation total 811300K, used 451940K [0x00000000aaaa0000, 0x00000000dc2e9000, 0x0000000100000000)
Metaspace used 102593K, capacity 110232K, committed 121000K, reserved 1155072K
class space used 12473K, capacity 13482K, committed 15584K, reserved 1048576K
i dont know why i have the exception,
and my code looks something like this, this is one of query, and there are four more like it
sparkSession
.readStream
.format('kafka')
.load
.repartition(4)
...project
...
.watermark
.groupby(k1, k2)
.agg(size(collect_set('xxx')))
.writeStream
.foreachBatch(function test)
.start
test:(Dataset[Row], Long) => Unit = (ds: Dataset[Row], _: Long) => {
ds.persist(StorageLevel.MEMORY_AND_DISK_SER)
ds.write
.option("collection", s"col_1")
.option("maxBatchSize", "2048")
.mode("append")
.mongo()
ds..write
.option("collection", s"col_2")
.option("maxBatchSize", "2048")
.mode("append")
.mongo()
ds.unpersist()
}
22/05/09 21:11:28 ERROR streaming.MicroBatchExecution: Query rydts_regist_gp [id = 669c2031-71b2-422b-859d-336722d289e9, runId = 049de32c-e6ff-48f1-8742-bb95122a36ea] terminated with error
java.lang.NullPointerException
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1(InMemoryRelation.scala:248)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1$adapted(InMemoryRelation.scala:247)
at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
at scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46)
at scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46)
at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
at common.job.xxx$.$anonfun$main$3(xxx.scala:117)
at common.job.xxx$.$anonfun$main$3$adapted(xxx.scala:103)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
Solution 1:[1]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | cxb |