'What is the best way to check if a file exists on an Azure Datalake using Apache Airflow?
I have a DAG that shall check if a file has been uploaded to Azure DataLake in a specific directory. If so, it allow other DAGs to run.
I thought about using a FileSensor, but I assume a fsconnid parameter is not enough to authenticate against a DataLake
Solution 1:[1]
I had severe issues using the proposed API. So I embedded the Microsoft API into Airflow. This was working fine. All you need to do then is to use this operator and pass account_url and access_token.
from azure.storage.filedatalake import DataLakeServiceClient
from airflow.sensors.base import BaseSensorOperator
class AzureDataLakeSensor(BaseSensorOperator):
def __init__(self, path, filename, account_url, access_token, **kwargs):
super().__init__(**kwargs)
self._client = DataLakeServiceClient(
account_url=account_url,
credential=access_token
)
self.path = path
self.filename = filename
def poke(self, context):
container = self._client.get_file_system_client(file_system="raw")
dir_client = container.get_directory_client(self.path)
file = dir_client.get_file_client(self.filename)
return file.exists()
Solution 2:[2]
There is no AzureDataLakeSensor
in the Azure provider but you can easily implement one since the AzureDataLakeHook
has check_for_file
function so all needed is to wrap this function with Sensor class implementing poke()
function of BaseSensorOperator
. By doing so you can use Microsoft Azure Data Lake Connection directly.
I didn't test it but this should work:
from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeHook
from airflow.sensors.base import BaseSensorOperator
class MyAzureDataLakeSensor(BaseSensorOperator):
"""
Sense for files in Azure Data Lake
:param path: The Azure Data Lake path to find the objects. Supports glob
strings (templated)
:param azure_data_lake_conn_id: The Azure Data Lake conn
"""
template_fields: Sequence[str] = ('path',)
ui_color = '#901dd2'
def __init__(
self, *, path: str, azure_data_lake_conn_id: str = 'azure_data_lake_default', **kwargs
) -> None:
super().__init__(**kwargs)
self.path = path
self.azure_data_lake_conn_id = azure_data_lake_conn_id
def poke(self, context: "Context") -> bool:
hook = AzureDataLakeHook(azure_data_lake_conn_id=self.azure_data_lake_conn_id)
self.log.info('Poking for file in path: %s', self.path)
try:
hook.check_for_file(file_path=self.path)
return True
except FileNotFoundError:
pass
return False
Usage example:
MyAzureDataLakeSensor(
task_id='adls_sense',
path='folder/file.csv',
azure_data_lake_conn_id='azure_data_lake_default',
mode='reschedule'
)
Solution 3:[3]
First of all, have a look at official Microsoft Operators for Airflow.
We can see that there are dedicated Operators to Azure DataLake Storage, unfortunately, only the ADLSDeleteOperator
seems available at the moment.
This ADLSDeleteOperator
uses a AzureDataLakeHook which you should reuse in your own custom operator to check for file presence.
My advice for you is to create a Child class of CheckOperator using the ADLS hook check if the file provided in input exists with check_for_file
function of the hook.
UPDATE: as pointed in comments, CheckOperator seems to by tied to SQL queries and is deprecated. Using your own custom Sensor or custom Operator is the way to go.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Stefan Papp |
Solution 2 | |
Solution 3 |