'GoogleCloudStorageToBigQueryOperator source_objects to receive list via XCom
I would like to pass a list of strings, containing the name of files in google storage to XCom. Later to be picked up by a GoogleCloudStorageToBigQueryOperator task. The source_objects field is templated, so that Jinja templating can be used. Unfortunately, Jinja can only return a string, and thus I cannot pass the list in XCom.
How can I use a XCom list in GoogleCloudStorageToBigQueryOperator?
Reference to a similar question, solved by using provide_context: Pass a list of strings as parameter of a dependant task in Airflow
The closest solution I've found, which works, is to create a wrapper class and sending the id of the task who posted the xcom like so:
@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
source_objects = context['ti']
.xcom_pull(task_ids=self.source_objects_task_id)
operator = GoogleCloudStorageToBigQueryOperator(
source_objects=source_objects,
dag=self.dag,
....
)
operator.execute(context)
Solution 1:[1]
Not sure how you get the list of Google Cloud Storage objects but if you are doing it using GoogleCloudStorageListOperator
then you can instead pass wildcards to source_objects
params in GoogleCloudStorageToBigQueryOperator
in the same way that you do in BigQuery Web UI:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)
If you want to get a list from other task using xcom
, you can create a new operator or an Airflow plugin for GoogleCloudStorageToBigQueryOperator
adding a new param source_objects_task_id
, removing source_objects
param and just replace the following code (Line 203 and 204: https://github.com/apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204 ):
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in self.source_objects]
with
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]
and use it as follows:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects_task_id='task-id-of-previos-task',
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)
Solution 2:[2]
Starting from 2.1.0 the Airflow added the ability to render XCOM output as native Python objects.
Set the render_template_as_native_obj=True
in your DAG constructor:
dag = DAG(
...
render_template_as_native_obj=True,
)
Because the render_template_as_native_obj
works for the PythonOperator
only (let me know if I am wrong, I tested on other operators and nothing works) we need to wrap our operator into PythonOperator
:
PythonOperator(dag=dag, task_id='any', python_callable=_import, provide_context=True)
where the python callback function extracts the source objects from XCOM and executes the GCS operator:
def _import(**kwargs):
ti = kwargs["ti"]
op = GCSToBigQueryOperator(
...
source_objects=ti.xcom_pull(task_ids="task-id-of-previos-task"),
...
op.execute(kwargs)
Because the GoogleCloudStorageToBigQueryOperator
is deprecated I used the GCSToBigQueryOperator
.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | kaxil |
Solution 2 | Dmytro Maslenko |