'Table expiration in GCS to BQ Airflow task
I am copying a CSV into a new BQ table using the GCSToBigQueryOperator
task in Airflow. Is there a way to add a table expiration to this table within this task?
new_table_task = GCSToBigQueryOperator(
task_id='insert_gcs_to_bq_tmp_table',
bucket=BUCKET,
source_objects=SOURCE_PATH,
destination_project_dataset_table=f"{BQ_PROJECT}.{DATASET}.{tmp_table_name}",
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
schema_object=SCHEMA_OBJECT
)
If this is not possible, is my best option to create the table first and define the expiration using DDL, then using GCSToBigQueryOperator
? Thanks!
Solution 1:[1]
You can use BigQueryHook.patch_bq_table(), to update the expiration after you created your table. It accepts expiration_time
as parameter.
NOTE: patch_bq_table() only updates the passed parameters.
expiration_time (Optional[int]) -- [Optional] The time when this table expires, in milliseconds since the epoch.
See code below:
import datetime
from airflow import models
from airflow.operators import python
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
DATASET_NAME = 'your-dataset'
TABLE_NAME = 'your-table'
PROJECT_ID = 'your-project'
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
with models.DAG(
'create_table_add_expiration',
catchup=False,
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
load_csv = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='bucket-name',
source_objects=['folder/file.csv'],
source_format='CSV',
skip_leading_rows=1,
destination_project_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.{TABLE_NAME}",
autodetect=True,
write_disposition='WRITE_TRUNCATE',
)
def patch_bq_table(**kwargs):
hook = BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=False)
hook.patch_table(
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
description="test",
expiration_time=1646884330000 # March 10, 2022 in epoch time (milliseconds)
)
update_table = python.PythonOperator(
task_id='add_expiration',
provide_context=True,
python_callable=patch_bq_table,
)
load_csv >> update_table
Airflow test:
Updated table in BQ:
Solution 2:[2]
execute BigQueryCreateEmptyTableOperator with table expirationTime before GCSToBigQueryOperator might solve the problem
Airflow BigQueryCreateEmptyTableOperator Document
BigQueryCreateEmptyTableOperator(
...
table_resource={
"tableReference": {"tableId": ""},
"expirationTime": ,
}
)
Solution 3:[3]
You could also use the BigQueryUpdateTableOperator. If you specify fields
it will patch rather than update the table
set_bq_table_expiration = BigQueryUpdateTableOperator(
dag=dag,
task_id="set_bq_table_expiration",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
project_id=PROJECT_ID,
fields=["expirationTime"],
table_resource={
"expirationTime": "1646884330000",
},
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Jasmine |
Solution 3 | Dominik |