'How to filter files in Databricks Autoloader stream

I want to set up an S3 stream using Databricks Auto Loader. I have managed to set up the stream, but my S3 bucket contains different type of JSON files. I want to filter them out, preferably in the stream itself rather than using a filter operation.

According to the docs I should be able to filter using a glob pattern. However, I can't seem to get this to work as it loads everything anyhow.

This is what I have

df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
  .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
  .option("includeExistingFiles", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
#   .option("cloudFiles.schemaHints", schemaHints)
#  .load("s3://<BUCKET>/qualifier/**/*_INPUT")
  .load("s3://<BUCKET>/qualifier")
  .withColumn("filePath", F.input_file_name())
  .withColumn("date_ingested", F.current_timestamp())
)

My files have a key that is structured as qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json, so I want to filter files that contain the name input.

This seems to load everything: .load("s3://<BUCKET>/qualifier") and .load("s3://<BUCKET>/qualifier/**/*_INPUT") is what I want to do, but that doesn't work. (I have also tried .load("s3://<BUCKET>/qualifier/**/*_INPUT.json")

Is my glob pattern incorrect, or is there something else I am missing?



Solution 1:[1]

From the documentation, it seems that you can use both load and the pathGlobfilter option to achieve what you need. Have you tried something like this approach here?

df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
  .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
  .option("includeExistingFiles", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
  .option("pathGlobfilter", "*_INPUT.json") 
  .load("s3://<BUCKET>/qualifier/")
  .withColumn("filePath", F.input_file_name())
  .withColumn("date_ingested", F.current_timestamp())
)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Herivelton Andreassa