'why does spark streaming query fail java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
I have a streaming query streaming data from Azure Eventhubs to ADLS every 5 seconds and the same streaming query is watermark for 1 hour window with 5 minute water mark delay.
Code:
val rawStreamQuery = messages.writeStream.format("delta")
.option("checkpointLocation", BASE_LOC + "checkpoint/" + RAW_SCHEMA_NAME + "/" + RAW_TASK_TABLE)
.trigger(Trigger.ProcessingTime(RAW_STREAM_TRIGGER_INTERVAL))
.table(RAW_SCHEMA_NAME + "." + RAW_TASK_TABLE)
rawStreamQuery.withWatermark(watermarkTimeStamp, STREAM_WATERMARK) //5 minutes
.groupBy(window(col(watermarkTimeStamp), STREAM_WINDOW).as("window")) //1 hour
.count()
.select(
lit(commonDataObj.getFeedName).as("feed_name")
, lit(commonDataObj.getStage).as("stage_name")
, col("count").as("record_count")
, col("window").getField("start").as("start_ts")
, col("window").getField("end").as("end_ts")
)
Getting the below error.
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Solution 1:[1]
I'm not completely sure since you don't show your full code, you talk about reading a stream from eventhubs but i only see your write stream to delta, anyways i used to have this problem when the input stream from eventhubs didn't send events for some time, it seems the underlying library has a timeout exception configured when the stream doesn't receive data, you could try sending continuos data and check if the error persists.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | frammnm |