'OnHeapColumnVector putLong NullPointerException

I am using spark-sql-v4.5.0

I have to read s3 file something as below

   val df = sc.parallelize(Array(
        ("2019-01-01",546209022, "2018-10-12", "2020-05-19 02:17:59","2.0","31"),
         ("2019-01-01",24807628, "2018-10-12", "2020-05-21 20:59:30","2.0","31"),
        ("2019-01-01",261726311, "2018-10-12", "2020-05-13 22:23:43","2.0","31"),
        ("2019-01-01",682098, "2018-10-12", "2020-05-19 12:11:23","2.0","31"),
        ("2019-01-01",6464256, "2018-10-12", "2020-05-15 07:40:51","2.0","31"),
        ("2019-01-01",12704564, "2018-10-12", "2020-09-16 06:38:15","2.0","31"),
         ("2019-01-01",20332976, "2018-10-12", "2020-11-22 12:45:23","2.0","31"),
      ("2019-01-01",20332976, "2018-10-12", "2020-11-22 12:45:23","2.0","31")
      
        ),3)
    .toDF("cc_date", "id", "gen_date", "create_date","v_id","mf_id")
    .withColumn("cc_date", to_date(col("cc_date"), "yyyy-MM-dd").cast(DataTypes.DateType))
    .withColumn("gen_date", to_date(col("gen_date"), "yyyy-MM-dd").cast(DataTypes.DateType))
    .withColumn("id", col("id").cast(DataTypes.StringType))
    //.withColumn("create_date", to_timestamp(col("create_date"),"yyyy-MM-dd HH:mm:ss").cast(DataTypes.TimestampType))
    .withColumn("v_id", col("v_id").cast(DataTypes.StringType))
    .withColumn("mf_id", col("mf_id").cast(DataTypes.StringType))


+----------+---------+----------+-------------------+----+-----+
|cc_date   |id       |gen_date  |create_date        |v_id|mf_id|
+----------+---------+----------+-------------------+----+-----+
|2019-01-01|546209022|2018-10-12|2020-05-19 02:17:59|2.0 |31   |
|2019-01-01|24807628 |2018-10-12|2020-05-21 20:59:30|2.0 |31   |
|2019-01-01|261726311|2018-10-12|2020-05-13 22:23:43|2.0 |31   |
|2019-01-01|682098   |2018-10-12|2020-05-19 12:11:23|2.0 |31   |
|2019-01-01|6464256  |2018-10-12|2020-05-15 07:40:51|2.0 |31   |
|2019-01-01|12704564 |2018-10-12|2020-09-16 06:38:15|2.0 |31   |
|2019-01-01|20332976 |2018-10-12|2020-11-22 12:45:23|2.0 |31   |
|2019-01-01|20332976 |2018-10-12|2020-11-22 12:45:23|2.0 |31   |
+----------+---------+----------+-------------------+----+-----+

Trying to get the top records based on "create_date" using window function.

val ws = Window.partitionBy("cc_date", "mf_id","v_id","id","gen_date").orderBy(desc("create_date"));
       
val outDs = df
                .dropDuplicates
                .withColumn("rank", row_number.over(ws)) 
                .where(col("rank").equalTo(lit(1)))
                .select("*")
                ;

While i execute below show function

outDs
.show(10,false);

I am getting below error, how to resolve this error ?

Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:327) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongs(VectorizedRleValuesReader.java:332) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:429) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:208) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:261)



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source