'Receive a Kafka message through Spark Streaming and delete Phoenix/HBase data

In my project, I have the current workflow: Kafka message => Spark Streaming/processing => Insert/Update to HBase and/or Phoenix

Both the Insert and Update operation works with HBase directly or through Phoenix (I tested both case).

Now I would like to delete data in HBase/Phoenix if I receive a specific message on Kafka. I did not find any clues/documentation on how I could do that while streaming.

I have found a way to delete data in "static"/"batch" mode with both HBase and Phoenix, but the same code does not work when on streaming (there is no error though, the data is simply not deleted).

Here is how we tried the delete part (we first create a parquet file on which we make a "readStream" to start a "fake" stream):

Main Object:

import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.functions.{col, concat_ws} import org.apache.spark.sql.streaming.DataStreamWriter

object AppMain{   def main(args: Array[String]): Unit = {
    val spark : SparkSession = SparkSession.builder().getOrCreate()


    import spark.implicits._
    //algo_name, partner_code, site_code, indicator
    val df = Seq(("FOO","FII"),
      ("FOO","FUU")
    ).toDF("col_1","col_2")

    df.write.mode("overwrite").parquet("testParquetStream")
    df.printSchema()
    df.show(false)

    val dfStreaming = spark.readStream.schema(df.schema).parquet("testParquetStream")
    dfStreaming.printSchema()
    val dfWithConcat = dfStreaming.withColumn("row", concat_ws("\u0000" , col("col_1"),col("col_2"))).select("row")

    // using delete class
    val withDeleteClass : WithDeleteClass = new WithDeleteClass(spark, dfWithConcat)
    withDeleteClass.delete_data()
    // using JDBC/Phoenix 
    //val withJDBCSink : WithJDBCSink = new WithJDBCSink(spark, dfStreaming)
    //withJDBCSink.delete_data() }

WithJDBCSink class

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.streaming.DataStreamWriter

class WithJDBCSink (spark : SparkSession,dfToDelete : DataFrame){

  val df_writer = dfToDelete.writeStream

  def delete_data():Unit = {
    val writer = new JDBCSink()
    df_writer.foreach(writer).outputMode("append").start()
    spark.streams.awaitAnyTermination()
  }

}

JDBCSink class

import java.sql.{DriverManager, PreparedStatement, Statement}

class JDBCSink() extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {

  val quorum = "phoenix_quorum"
  var connection: java.sql.Connection = null
  var statement: Statement = null
  var ps : PreparedStatement= null
  def open(partitionId: Long, version: Long): Boolean = {
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    statement = connection.createStatement()
    true
  }

  def process(row: org.apache.spark.sql.Row): Unit = {
    //-----------------------Method 1
    connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    val query = s"DELETE from TABLE_NAME WHERE key_1 = 'val1' and key_2 = 'val2'"
    statement = connection.createStatement()
    statement.executeUpdate(query)
    connection.commit()

    //-----------------------Method 2
    //val query2 = s"DELETE from TABLE_NAME WHERE key_1 = ? and key_2 = ?"
    //connection = DriverManager.getConnection(s"jdbc:phoenix:$quorum")
    //ps = connection.prepareStatement(query2)
    //ps.setString(1, "val1")
    //ps.setString(2, "val2")
    //ps.executeUpdate()
    //connection.commit()
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

WithDeleteClass

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HTable, RetriesExhaustedWithDetailsException, Row, Table}
import org.apache.hadoop.hbase.ipc.CallTimeoutException
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.storage.StorageLevel

import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement

class WithDeleteClass (spark : SparkSession, dfToDelete : DataFrame){
  val finalData = dfToDelete
  var df_writer = dfToDelete.writeStream
  var dfWriter = finalData.writeStream.outputMode("append").format("console").start()

  def delete_data(): Unit = {
    deleteDataObj.open()
    df_writer.foreachBatch((output : DataFrame, batchId : Long) =>
      deleteDataObj.process(output)
    )
    df_writer.start()

  }

  object deleteDataObj{
    var quorum = "hbase_quorum"
    var zkPort = "portNumber"
    var hbConf = HBaseConfiguration.create()
    hbConf.set("hbase.zookeeper.quorum", quorum)
    hbConf.set("hbase.zookeeper.property.clientPort", zkPort)
    var tableName: TableName = TableName.valueOf("TABLE_NAME")
    var conn = ConnectionFactory.createConnection(hbConf)
    var table: Table = conn.getTable(tableName)
    var hTable: HTable = table.asInstanceOf[HTable]
    def open() : Unit = {
    }
    def process(df : DataFrame) : Unit = {
      val rdd : RDD[Array[Byte]] = df.rdd.map(row => Bytes.toBytes(row(0).toString))
      val deletions : util.ArrayList[Delete] = new util.ArrayList()
      //List all rows to delete
      rdd.foreach(row => {
        val delete: Delete = new Delete(row)
        delete.addColumns(Bytes.toBytes("0"), Bytes.toBytes("DATA"))
        deletions.add(delete)
      })
      hTable.delete(deletions)
    }
    def close(): Unit = {}
  }
}

Any help/pointers would be greatly appreciated



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source