'Efficient way to read specific columns from parquet file in spark

What is the most efficient way to read only a subset of columns in spark from a parquet file that has many columns? Is using spark.read.format("parquet").load(<parquet>).select(...col1, col2) the best way to do that? I would also prefer to use typesafe dataset with case classes to pre-define my schema but not sure.



Solution 1:[1]

val df = spark.read.parquet("fs://path/file.parquet").select(...)

This will only read the corresponding columns. Indeed, parquet is a columnar storage and it is exactly meant for this type of use case. Try running df.explain and spark will tell you that only the corresponding columns are read (it prints the execution plan). explain would also tell you what filters are pushed down to the physical plan of execution in case you also use a where condition. Finally use the following code to convert the dataframe (dataset of rows) to a dataset of your case class.

case class MyData...
val ds = df.as[MyData]

Solution 2:[2]

Spark supports pushdowns with Parquet so

load(<parquet>).select(...col1, col2)

is fine.

I would also prefer to use typesafe dataset with case classes to pre-define my schema but not sure.

This could be an issue, as it looks like some optimizations don't work in this context Spark 2.0 Dataset vs DataFrame

Solution 3:[3]

Parquet is a columnar file format. It is exactly designed for these kind of use cases.

val df = spark.read.parquet("<PATH_TO_FILE>").select(...)

should do the job for you.

Solution 4:[4]

At least in some cases getting dataframe with all columns + selecting a subset won't work. E.g. the following will fail if parquet contains at least one field with type that is not supported by Spark:

spark.read.format("parquet").load("<path_to_file>").select("col1", "col2")

One solution is to provide schema that contains only requested columns to load:

spark.read.format("parquet").load("<path_to_file>",
                                   schema="col1 bigint, col2 float")

Using this you will be able to load a subset of Spark-supported parquet columns even if loading the full file is not possible. I'm using pyspark here, but would expect Scala version to have something similar.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Alper t. Turker
Solution 3
Solution 4 Oli