Category "airflow"

Docker-compose file of Airflow with DaskExecutor

Can someone provide a YAML file of the same mentioned above? I need it for a project. I am trying to execute my tasks parallelly on each core of the workers, as

Is it possible to use an XCom from an operator parameter without using Jinja?

Is it possible to pass an XCom to an operator parameter without using a Jinja template? I have a dict stored in an XCom and I need to pass it to an Operator tha

Defining complex workflow dependency in airflow 2.0 taskflow API

Let's say I have the follow dummy DAG defined as below: @dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2)) def airflow_ta

What is the best way to check if a file exists on an Azure Datalake using Apache Airflow?

I have a DAG that shall check if a file has been uploaded to Azure DataLake in a specific directory. If so, it allow other DAGs to run. I thought about using a

Airflow SimpleHttpOperator is not pushing to xcom

I have the following SimpleHttpOperator inside my dag: extracting_user = SimpleHttpOperator( task_id='extracting_user', http_conn_id='user_api',

Airflow Scheduler fails to execute Windows EXE via WSL

My Windows 10 machine has Airflow 1.10.11 installed within WSL 2 (Ubuntu-20.04). I have a BashOperator task which calls an .EXE on Windows (via /mnt/c/... or vi

Error in airflow 2 even though my task's have actually completed? ERROR - Could not serialize the XCom value into JSON

Hi All so my dag actully runs fine, all the outputs are working but airflow's UI does not change to succes and fails due to the following issue. Reading online

How does Airflow decide to render template values?

I am working with Airflow 2.2.3 in GCP (Composer) and I am seeing inconsistent behavior which I can't explain when trying to use template values. When I referen

set priority for the multiple dag runs

I have a dag that I want to run multiple times say 30. But airflow can parallelly execute 16 dag runs at a time. Suppose one dag run takes longer time to execut

Aiflow 2 Xcom in Task Groups

I have two tasks inside a TaskGroup that need to pull xcom values to supply the job_flow_id and step_id. Here's the code: with TaskGroup('execute_my_steps') a

psycopg2.OperationalError: could not translate host name "<address>" to address: Temporary failure in name resolution

I have looked through similar posts on SO and they seem to be specific to using Docker environments and haven't been much helpful. Ours is a little different, w

How do I run SQL queries on the airflow meta db from inside an airflow DAG?

I'm looking for a way to extract execution_date, start_date, end_date etc. of the last successful run instance of a task in a DAG and then decide to raise an er

Snowflake connection to airflow

I have created a a dag which call to the snowflake wharehouse and call query on it. CODE FOR THE DAG:- import logging from datetime import datetime, timedelta i

Airflow - What do I do when I have a variable amount of Work that needs to be handled by a DAG?

I have a sensor task that listens to files being created in S3. After a poke I may have 3 files, after another poke I might have another 5 files. I want to crea

Listing packages in MWAA with a broken scheduler

So, I'm currently working with an Airflow installation via MWAA. I'm having this issue with a broken dependency, specifically: ERROR: pip's dependency resolve

Airflow task still running even after the endpoint process is completed

I have an airflow task using SimpleHTTPOperator hitting a service in Google Cloud Run. I checked in Cloud Run Logs, the service itself is completed in 12 minute

How to pause / Un pause multiple dags in airflow

We have 100 of dags which has a prefix with "dag_EDW_HC_*" . we have below command to pause the dag Command: airflow pause dag_id Is there any way we can pause

Airflow with Okta integration

I'm attempting to integrate Airflow with Okta, however there is little documentation available online. I'm referring to a blog article, but I can't seem to get

BigQueryInsertJobOperator dryRun is returning success instead of failure on composer (airflow)

When using BigQueryInsertJobOperator and setting the configuration to perform a dry run on a faulty .sql file/ a hardcoded query, the task succeeds even though

Airflow without sudo access?

in my virtual env on azure VM, i ran pip3 install apache-airflow when i started airflow db init i received File "/home/shivamanand/myenv/lib/python3.7/site-pack