'Spark Streaming "ERROR JobScheduler: error in job generator"

I build a spark Streaming application to keep receiving messages from Kafka and then write them into a table HBase.

This app runs pretty good for first 25 mins. When I input KV pairs like 1;name1, 2;name2 in Kafka-console-producer, they are able to be saved in Hbase Table:

ROW       COLUMN+CELL

 1        column=cf1:column-Name, timestamp=1471905340560, value=name1

 2        column=cf1:column-Name, timestamp=1471905348165, value=name2

But after about 25 mins, my app stops with error ERROR JobSchedular: ERROR in job generator. Details of this error are shown below:

16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
        at scala.Predef$.require(Predef.scala:221)
        at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
        at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
        at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
        at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately

It runs well in first 25 mins, but after that for some reason I dont know, it seems that job generator suddenly cannot be instantiated correctly.

My codes are shown below:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job

object ReceiveKafkaAsDstream {
  case class SampleKafkaRecord(id: String, name: String)
  object SampleKafkaRecord extends Serializable {
    def parseToSampleRecord(line: String): SampleKafkaRecord = {
      val values = line.split(";")
      SampleKafkaRecord(values(0), values(1))
    }

    def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
      val rowKey = CSVData.id
      val putOnce = new Put(rowKey.getBytes)

      putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
      return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
    }
  }


  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val topics = "test"
    val brokers = "10.0.2.15:6667"

    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
        "zookeeper.connection.timeout.ms" -> "1000")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    val tableName = "KafkaTable"
    val conf = HBaseConfiguration.create()
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.property.clientPort", "2181")

    val job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[Text])
    job.setOutputFormatClass(classOf[TableOutputFormat[Text]])

    val records = messages
      .map(_._2)
      .map(SampleKafkaRecord.parseToSampleRecord)

    records
      .foreachRDD{ rdd => {
        rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
      }
    records.print()  

    ssc.start()
    ssc.awaitTermination()
  }
}

I feel like it is a configuration problem. Any help is appreciated.



Solution 1:[1]

I added a property called zookeeper.session.timeout.ms by adding code:

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "zookeeper.connect" -> xxxxxx:2181",
    "zookeeper.connection.timeout.ms" -> "10000",
    "zookeeper.session.timeout.ms" -> "10000")

and set interval of spark streaming as 10 seconds. By doing this, my spark streaming application can keep running for a long time.

But when I check memory, it is still keep decreasing and I dont know how to solve this problem.

Solution 2:[2]

It is highly possible that the clock synchronization problem. Try to enable NTP to ensure all cluster nodes are synced to the same time.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Frank Kong
Solution 2