'Spark 3.2.1 fetch HBase data not working with NewAPIHadoopRDD

Below is the sample code snippet that is used for data fetch from HBase. This worked fine with Spark 3.1.2. However after upgrading to Spark 3.2.1, it is not working i.e. returned RDD doesn't contain any value. Also, it is not throwing any exception.

def getInfo(sc: SparkContext, startDate:String, cachingValue: Int, sparkLoggerParams: SparkLoggerParams, zkIP: String, zkPort: String): RDD[(String)] = {{
val scan = new Scan
    scan.addFamily("family")
    scan.addColumn("family","time")
    val rdd = getHbaseConfiguredRDDFromScan(sc, zkIP, zkPort, "myTable", scan, cachingValue, sparkLoggerParams)
    val output: RDD[(String)] = rdd.map { row =>
      (Bytes.toString(row._2.getRow))
    }
    output
  }
 
def getHbaseConfiguredRDDFromScan(sc: SparkContext, zkIP: String, zkPort: String, tableName: String,
                                    scan: Scan, cachingValue: Int, sparkLoggerParams: SparkLoggerParams): NewHadoopRDD[ImmutableBytesWritable, Result] = {
    scan.setCaching(cachingValue)
    val scanString = Base64.getEncoder.encodeToString(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(scan).toByteArray)
    val hbaseContext = new SparkHBaseContext(zkIP, zkPort)
    val hbaseConfig = hbaseContext.getConfiguration()
    hbaseConfig.set(TableInputFormat.INPUT_TABLE, tableName)
    hbaseConfig.set(TableInputFormat.SCAN, scanString)
    sc.newAPIHadoopRDD(
      hbaseConfig,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable], classOf[Result]
    ).asInstanceOf[NewHadoopRDD[ImmutableBytesWritable, Result]]
  }

Also, If we fetch using Scan directly without using NewAPIHadoopRDD, it works.

Software versions:

  • Spark: 3.2.1 prebuilt with user provided Apache Hadoop
  • Scala: 2.12.10
  • HBase: 2.4.9
  • Hadoop: 2.10.1


Solution 1:[1]

I found out the solution to this one. See this upgrade guide from Spark 3.1.x to Spark 3.2.x: https://spark.apache.org/docs/latest/core-migration-guide.html

Since Spark 3.2, spark.hadoopRDD.ignoreEmptySplits is set to true by default which means Spark will not create empty partitions for empty input splits. To restore the behavior before Spark 3.2, you can set spark.hadoopRDD.ignoreEmptySplits to false.

It can be set like this on spark-submit:

  ./spark-submit \
  --class org.apache.hadoop.hbase.spark.example.hbasecontext.HBaseDistributedScanExample \
  --master  spark://localhost:7077 \
  --conf "spark.hadoopRDD.ignoreEmptySplits=false" \
  --jars ... \
  /tmp/hbase-spark-1.0.1-SNAPSHOT.jar YourHBaseTable

Alternatively, you can also set these globally at $SPARK_HOME/conf/spark-defaults.conf to apply for every Spark application.

spark.hadoopRDD.ignoreEmptySplits false

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 nardqueue