'OnHeapColumnVector putLong NullPointerException
I am using spark-sql-v4.5.0
I have to read s3 file something as below
val df = sc.parallelize(Array(
("2019-01-01",546209022, "2018-10-12", "2020-05-19 02:17:59","2.0","31"),
("2019-01-01",24807628, "2018-10-12", "2020-05-21 20:59:30","2.0","31"),
("2019-01-01",261726311, "2018-10-12", "2020-05-13 22:23:43","2.0","31"),
("2019-01-01",682098, "2018-10-12", "2020-05-19 12:11:23","2.0","31"),
("2019-01-01",6464256, "2018-10-12", "2020-05-15 07:40:51","2.0","31"),
("2019-01-01",12704564, "2018-10-12", "2020-09-16 06:38:15","2.0","31"),
("2019-01-01",20332976, "2018-10-12", "2020-11-22 12:45:23","2.0","31"),
("2019-01-01",20332976, "2018-10-12", "2020-11-22 12:45:23","2.0","31")
),3)
.toDF("cc_date", "id", "gen_date", "create_date","v_id","mf_id")
.withColumn("cc_date", to_date(col("cc_date"), "yyyy-MM-dd").cast(DataTypes.DateType))
.withColumn("gen_date", to_date(col("gen_date"), "yyyy-MM-dd").cast(DataTypes.DateType))
.withColumn("id", col("id").cast(DataTypes.StringType))
//.withColumn("create_date", to_timestamp(col("create_date"),"yyyy-MM-dd HH:mm:ss").cast(DataTypes.TimestampType))
.withColumn("v_id", col("v_id").cast(DataTypes.StringType))
.withColumn("mf_id", col("mf_id").cast(DataTypes.StringType))
+----------+---------+----------+-------------------+----+-----+
|cc_date |id |gen_date |create_date |v_id|mf_id|
+----------+---------+----------+-------------------+----+-----+
|2019-01-01|546209022|2018-10-12|2020-05-19 02:17:59|2.0 |31 |
|2019-01-01|24807628 |2018-10-12|2020-05-21 20:59:30|2.0 |31 |
|2019-01-01|261726311|2018-10-12|2020-05-13 22:23:43|2.0 |31 |
|2019-01-01|682098 |2018-10-12|2020-05-19 12:11:23|2.0 |31 |
|2019-01-01|6464256 |2018-10-12|2020-05-15 07:40:51|2.0 |31 |
|2019-01-01|12704564 |2018-10-12|2020-09-16 06:38:15|2.0 |31 |
|2019-01-01|20332976 |2018-10-12|2020-11-22 12:45:23|2.0 |31 |
|2019-01-01|20332976 |2018-10-12|2020-11-22 12:45:23|2.0 |31 |
+----------+---------+----------+-------------------+----+-----+
Trying to get the top records based on "create_date" using window function.
val ws = Window.partitionBy("cc_date", "mf_id","v_id","id","gen_date").orderBy(desc("create_date"));
val outDs = df
.dropDuplicates
.withColumn("rank", row_number.over(ws))
.where(col("rank").equalTo(lit(1)))
.select("*")
;
While i execute below show function
outDs
.show(10,false);
I am getting below error, how to resolve this error ?
Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:327) at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readLongs(VectorizedRleValuesReader.java:332) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readLongBatch(VectorizedColumnReader.java:429) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:208) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:261)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|