'How to filter files in Databricks Autoloader stream
I want to set up an S3 stream using Databricks Auto Loader. I have managed to set up the stream, but my S3 bucket contains different type of JSON files. I want to filter them out, preferably in the stream itself rather than using a filter
operation.
According to the docs I should be able to filter using a glob pattern. However, I can't seem to get this to work as it loads everything anyhow.
This is what I have
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
.option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
.option("includeExistingFiles", "true")
.option("multiLine", "true")
.option("inferSchema", "true")
# .option("cloudFiles.schemaHints", schemaHints)
# .load("s3://<BUCKET>/qualifier/**/*_INPUT")
.load("s3://<BUCKET>/qualifier")
.withColumn("filePath", F.input_file_name())
.withColumn("date_ingested", F.current_timestamp())
)
My files have a key that is structured as qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json
, so I want to filter files that contain the name input.
This seems to load everything: .load("s3://<BUCKET>/qualifier")
and .load("s3://<BUCKET>/qualifier/**/*_INPUT")
is what I want to do, but that doesn't work. (I have also tried .load("s3://<BUCKET>/qualifier/**/*_INPUT.json"
)
Is my glob pattern incorrect, or is there something else I am missing?
Solution 1:[1]
From the documentation, it seems that you can use both load and the pathGlobfilter option to achieve what you need. Have you tried something like this approach here?
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
.option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
.option("includeExistingFiles", "true")
.option("multiLine", "true")
.option("inferSchema", "true")
.option("pathGlobfilter", "*_INPUT.json")
.load("s3://<BUCKET>/qualifier/")
.withColumn("filePath", F.input_file_name())
.withColumn("date_ingested", F.current_timestamp())
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Herivelton Andreassa |