'Cloud Data Fusion ETL from PostGres to BigQuery - idempotent load
I'm trying to use Google's Cloud Data Fusion (CDF) to perform an ETL of some OLTP data in PostGres into BigQuery (BQ). We will copy the contents of a few select tables into an equivalent table in BQ every night - we will add one column with the datestamp.
So imagine we have a table with two columns A & B, and one row of data like this in PostGres
|--------------------|
| A | B |
|--------------------|
| egg | milk |
|--------------------|
Then over two days, the BigQuery table would look like this
|-------------------------------|
| ds | A | B |
|-------------------------------|
| 22-01-01 | egg | milk |
|-------------------------------|
| 22-01-02 | egg | milk |
|-------------------------------|
However, I'm worried that the way I am doing this in CDF is not idempotent and if the pipeline runs twice I'll have duplicate data for a given day in BQ (not desired)
One idea is to delete rows for that day in BQ before doing the ETL (as part of the same pipeline). However, not sure how to do this, or if it is best practice. Any ideas?
Solution 1:[1]
You can use one of these 2 options, depending on what you want to do with the information:
Option 1
You can create a blank new_table with the same schema (ds,A,B). You will insert the data into the old_table
from Data Fusion. With the MERGE statement, you will compare the data from the old_table
with the new_table
; the data that does not exist into the new_table
will be inserted, and the data that exist and have different data will update this other data.
MERGE merge_example.new_table T
USING dataset.old_table S
ON T.ds = S.ds
WHEN MATCHED THEN
UPDATE SET T.A = s.a, T.B=s.b
WHEN NOT MATCHED THEN
INSERT (ds,A, B) VALUES(ds, A, B)
Option 2
It is the same process as Option 1, but this query only inserts the data that does not exist into the new_table.
insert into `dataset.new_table`
select ds, A, B from `dataset.old_table`
where ds not in (select ds from `dataset.new_table`)
The difference between Option 1 and Option 2 is that option 1 will update the data that exists which has a different value in the new_table
and insert the new data. Option 2 will just insert the new data.
You can execute these queries with a Scheduled Query once a day. You can see this documentation.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Raul Saucedo |