'spark3.2.1 cache throw NullPointerException

a job running some time about 1 day will throw the exception when i upgrade spark version to 3.2.1

i set it a driver and 2 executors executor allocate 2g memory and old generation usage rate about 50%, i think it is health

Heap
 par new generation   total 307840K, used 239453K [0x0000000080000000, 0x0000000094e00000, 0x00000000aaaa0000)
  eden space 273664K,  81% used [0x0000000080000000, 0x000000008da4bdd0, 0x0000000090b40000)
  from space 34176K,  46% used [0x0000000092ca0000, 0x0000000093c2b6b8, 0x0000000094e00000)
  to   space 34176K,   0% used [0x0000000090b40000, 0x0000000090b40000, 0x0000000092ca0000)
 concurrent mark-sweep generation total 811300K, used 451940K [0x00000000aaaa0000, 0x00000000dc2e9000, 0x0000000100000000)
 Metaspace       used 102593K, capacity 110232K, committed 121000K, reserved 1155072K
  class space    used 12473K, capacity 13482K, committed 15584K, reserved 1048576K

i dont know why i have the exception,

and my code looks something like this, this is one of query, and there are four more like it

sparkSession
.readStream
.format('kafka')
.load
.repartition(4)
...project
...
.watermark
.groupby(k1, k2)
.agg(size(collect_set('xxx')))
.writeStream
.foreachBatch(function test)
.start

test:(Dataset[Row], Long) => Unit = (ds: Dataset[Row], _: Long) => {
      ds.persist(StorageLevel.MEMORY_AND_DISK_SER)
      ds.write
        .option("collection", s"col_1")
        .option("maxBatchSize", "2048")
        .mode("append")
        .mongo()
      ds..write
        .option("collection", s"col_2")
        .option("maxBatchSize", "2048")
        .mode("append")
        .mongo()
      ds.unpersist()
}

22/05/09 21:11:28 ERROR streaming.MicroBatchExecution: Query rydts_regist_gp [id = 669c2031-71b2-422b-859d-336722d289e9, runId = 049de32c-e6ff-48f1-8742-bb95122a36ea] terminated with error
java.lang.NullPointerException
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1(InMemoryRelation.scala:248)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1$adapted(InMemoryRelation.scala:247)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46)
    at scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46)
    at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
    at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
    at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
    at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
    at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
    at common.job.xxx$.$anonfun$main$3(xxx.scala:117)
    at common.job.xxx$.$anonfun$main$3$adapted(xxx.scala:103)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)


Solution 1:[1]

the problem will fix in future version

https://github.com/apache/spark/pull/36496

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 cxb