'Table expiration in GCS to BQ Airflow task

I am copying a CSV into a new BQ table using the GCSToBigQueryOperator task in Airflow. Is there a way to add a table expiration to this table within this task?

new_table_task = GCSToBigQueryOperator(
    task_id='insert_gcs_to_bq_tmp_table',
    bucket=BUCKET,
    source_objects=SOURCE_PATH,
    destination_project_dataset_table=f"{BQ_PROJECT}.{DATASET}.{tmp_table_name}", 
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows=1,
    schema_object=SCHEMA_OBJECT
)

If this is not possible, is my best option to create the table first and define the expiration using DDL, then using GCSToBigQueryOperator? Thanks!



Solution 1:[1]

You can use BigQueryHook.patch_bq_table(), to update the expiration after you created your table. It accepts expiration_time as parameter.

NOTE: patch_bq_table() only updates the passed parameters.

expiration_time (Optional[int]) -- [Optional] The time when this table expires, in milliseconds since the epoch.

See code below:

import datetime

from airflow import models
from airflow.operators import python

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'create_table_add_expiration',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery',
        bucket='bucket-name',
        source_objects=['folder/file.csv'],
        source_format='CSV',
        skip_leading_rows=1,
        destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
        autodetect=True,
        write_disposition='WRITE_TRUNCATE',
    )

    def patch_bq_table(**kwargs):
        hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
        hook.patch_table(
                dataset_id=DATASET_NAME,
                table_id=TABLE_NAME,
                project_id=PROJECT_ID,
                description="test",
                expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
                )

    update_table = python.PythonOperator(
        task_id='add_expiration',
        provide_context=True,
        python_callable=patch_bq_table,
    )

    load_csv >> update_table

Airflow test:

enter image description here

Updated table in BQ:

enter image description here

Solution 2:[2]

execute BigQueryCreateEmptyTableOperator with table expirationTime before GCSToBigQueryOperator might solve the problem

Airflow BigQueryCreateEmptyTableOperator Document

BigQueryCreateEmptyTableOperator(
  ...
  table_resource={
            "tableReference": {"tableId": ""},
            "expirationTime": ,
  }
)

Solution 3:[3]

You could also use the BigQueryUpdateTableOperator. If you specify fields it will patch rather than update the table

set_bq_table_expiration = BigQueryUpdateTableOperator(
    dag=dag,
    task_id="set_bq_table_expiration",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    project_id=PROJECT_ID,
    fields=["expirationTime"],
    table_resource={
        "expirationTime": "1646884330000",
    },
)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Jasmine
Solution 3 Dominik