'What is the best way to check if a file exists on an Azure Datalake using Apache Airflow?

I have a DAG that shall check if a file has been uploaded to Azure DataLake in a specific directory. If so, it allow other DAGs to run.

I thought about using a FileSensor, but I assume a fsconnid parameter is not enough to authenticate against a DataLake



Solution 1:[1]

I had severe issues using the proposed API. So I embedded the Microsoft API into Airflow. This was working fine. All you need to do then is to use this operator and pass account_url and access_token.

from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator

class AzureDataLakeSensor(BaseSensorOperator):

   def __init__(self, path, filename, account_url, access_token, **kwargs):
      super().__init__(**kwargs)
      self._client = DataLakeServiceClient(
            account_url=account_url,
            credential=access_token
      )

      self.path = path
      self.filename = filename

  def poke(self, context):
      container = self._client.get_file_system_client(file_system="raw")
      dir_client = container.get_directory_client(self.path)
      file = dir_client.get_file_client(self.filename)
      return file.exists()

Solution 2:[2]

There is no AzureDataLakeSensor in the Azure provider but you can easily implement one since the AzureDataLakeHook has check_for_file function so all needed is to wrap this function with Sensor class implementing poke() function of BaseSensorOperator. By doing so you can use Microsoft Azure Data Lake Connection directly.

I didn't test it but this should work:

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator

class MyAzureDataLakeSensor(BaseSensorOperator):
    """
    Sense for files in Azure Data Lake

    :param path: The Azure Data Lake path to find the objects. Supports glob
        strings (templated)
    :param azure_data_lake_conn_id: The Azure Data Lake conn
    """

    template_fields: Sequence[str] = ('path',)
    ui_color = '#901dd2'

    def __init__(
        self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.path = path
        self.azure_data_lake_conn_id = azure_data_lake_conn_id

    def poke(self, context: "Context") -> bool:
        hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
        self.log.info('Poking for file in path: %s', self.path)
        try:
            hook.check_for_file(file_path=self.path)
            return True
        except FileNotFoundError:
            pass
        return False

Usage example:

MyAzureDataLakeSensor(
    task_id='adls_sense',
    path='folder/file.csv',
    azure_data_lake_conn_id='azure_data_lake_default',
    mode='reschedule'
)

Solution 3:[3]

First of all, have a look at official Microsoft Operators for Airflow.

We can see that there are dedicated Operators to Azure DataLake Storage, unfortunately, only the ADLSDeleteOperator seems available at the moment.

This ADLSDeleteOperator uses a AzureDataLakeHook which you should reuse in your own custom operator to check for file presence.

My advice for you is to create a Child class of CheckOperator using the ADLS hook check if the file provided in input exists with check_for_file function of the hook.

UPDATE: as pointed in comments, CheckOperator seems to by tied to SQL queries and is deprecated. Using your own custom Sensor or custom Operator is the way to go.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Stefan Papp
Solution 2
Solution 3