'loop over airflow variables issue question

I am having hard time looping over an airflow variable in my script

so I have a requirement to list all files prefixed by string in a bucket.

next loop through it and do some operations.

I tried making use of xcomm and subdags but i couldn't figure it out so i came up with a new approach.

it involves 2 scripts though

1 st scripts sets the airflow variable with a value i generate

below is the code.

#!/usr/bin/env python


with DAG('Test_variable', default_args=default_args,
    schedule_interval=None
) as dag:
    GCS_File_list = GoogleCloudStorageListOperator(
                    task_id= 'list_Files',
                    bucket= 'bucketname',
                    prefix='aaa',
                    delimiter='.csv',
                    google_cloud_storage_conn_id='google_cloud_default'
                    #provide_context = True,
                    #dag = dag
                )
    def update_variable(**context):
        files_list = Variable.get('files_list')
        print(type(Variable.get('files_list')))
        updated_file_list = context['ti'].xcom_pull(task_ids='list_Files')
        #updated_file_list = updated_file_list.strip('][').split(',')
        Variable.set("files_list", updated_file_list)
        #print(updated_file_list)
        #print(type(updated_file_list))

    python_task = PythonOperator(
                             task_id= 'pass_list',
                             provide_context=True,
                             python_callable=update_variable,
                             #op_kwargs={'input_file':file},
                             trigger_rule=TriggerRule.ALL_SUCCESS,
                             #provide_context = True,
                             #xcom_push=True,
                             dag=dag
                            )
    GCS_File_list >> python_task

as you see the above script lists files from a bucket and set the result to an airflow variable.

script 2: import the variable value set in script 1 and loop over it

this is where i can get the variable value but cant iterate over it

script 2:

#!/usr/bin/env python
from datetime import datetime, timedelta
from airflow import DAG
from airflowag.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
from datetime import datetime

YESTERDAY = datetime.combine(
    datetime.today() - timedelta(days=1), datetime.min.time())
BQ_DATASET_NAME = 'abc'
CURRENT_TIME = datetime

files_list_str = Variable.get("files_list")
files_list = files_list_str.strip('][').split(',')
bucket = 'def'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': YESTERDAY,
    'provide_context': True,
}

def gen_date(input_file,**kwargs):
# stepl1: check for file extension and remove it
idx_extension           = input_file.find(".")
input_file_name         = input_file[:idx_extension]
find_date_time_part     = re.findall("_(\d*?_\d*?_\d*)",input_file_name)
find_date_time_part     = str(find_date_time_part).split('_', 1)[-1].strip(']')
find_date_time_part     = str(find_date_time_part)
find_date_time_part     = re.sub("'",'', find_date_time_part)
find_date_time_part_len = len(find_date_time_part)

if find_date_time_part_len == 15:
    x = [a for a in find_date_time_part.split('_') if a]
    #get the date time part from the list 
    x = (' '.join(x[-2:]))
    #Using strptime to parse the string value as datetime object here our date format is YYYYMMDD hhmiss
    dt_obj = datetime.strptime(x, "%Y%m%d %H%M%S")
    # use strftime to format the date object into desired format in our case YYYY-MM-DD hh:mi:ss
    final_date_formatted = dt_obj.strftime("%Y-%m-%d %H:%M:%S")
    #print(type(find_date_time_part))
    return final_date_formatted
else:
    print("{}_{}".format(files_list,input_file))

with DAG('dag1', default_args=default_args,
schedule_interval=None
        ) as dag:
    for file in enumerate (files_list):
        Python_Task = PythonOperator(
                             task_id='pass_date',
                             provide_context=True,
                             python_callable=gen_date,
                             op_kwargs={'input_file':file},
                             trigger_rule=TriggerRule.ALL_SUCCESS,
                             provide_context = True,
                             #xcom_push=True,
                             dag=dag
                            )
        Python_Task

as you see in this function gen_date() i am printing out the variable name as well as the input file name in the else block

the output of the statement print("{}_{}".format(files_list,input_file) is ['abc.csv','def.csv']_[

I am not sure why a "[" is getting passed instead of input_file name any advise appreciated.


i see now that in for loop file_list is being treated as string and not a list

how can i make the files_list as a list and not string.



Solution 1:[1]

so when we get airflow variable (files_list) it is stored and returned as a string not a list so we would first need to convert the variable into a list and then loop on that variable.

Solution 2:[2]

I had the same problem. A simple solution is to use the regular expressions to remove the '[', ']', and the ' itself from the string representing the list. Then the split would work. And the list would be created.

import re

files_list_str = Variable.get("files_list")
files_list = re.sub('\'|\[|\]','', files_list_str).split(', ') 

it is very simple, but saves a lot of time.

I can use the list in other tasks in the DAG.

Solution 3:[3]

Since Airflow 2.1.0 use can pass to DAG

render_template_as_native_obj=True

Airflow documentation and example:
https://airflow.apache.org/docs/apache-airflow/2.1.0/concepts/operators.html?highlight=render_template_as_native_obj#rendering-fields-as-native-python-objects

Solution 4:[4]

This works for me but I'm using 2.2.3

file_list = Variable.get('files_list')
for i in  file_list:
   logging.info(i)

Solution 5:[5]

Set the variable with double-quotes and then read it back using the deserialize_json flag, to read it as a JSON, instead of as a string.

Variable.set("files_list", '["file1.txt", "file2.txt"]' )
files_list_str = Variable.get("files_list", deserialize_json=True)

for f in files_list_str:
    print(f)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 kumarm
Solution 2 Tycho
Solution 3 0x40c1
Solution 4 codeBarer
Solution 5 Remis Haroon - ????