'Postgres 9.5 upsert command in pandas or psycopg2?
Most of the examples I see are people inserting a single row into a database with the ON CONFLICT DO UPDATE syntax.
Does anyone have any examples using SQLAlchemy or pandas.to_sql?
99% of my inserts are using psycopg2 COPY command (so I save a csv or stringio and then bulk insert), and the other 1% are pd.to_sql. All of my logic to check for new rows or dimensions is done in Python.
def find_new_rows(existing, current, id_col):
current[id_col] = current[id_col].astype(int)
x = existing[['datetime', id_col, 'key1']]
y = current[['datetime', id_col, 'key2']]
final = pd.merge(y, x, how='left', on=['datetime', id_col])
final = final[~(final['key2'] == final['key1'])]
final = final.drop(['key1'], axis=1)
current = pd.merge(current, final, how='left', on=['datetime', id_col])
current = current.loc[current['key2_y'] == 1]
current.drop(['key2_x', 'key2_y'], axis=1, inplace=True)
return current
Can someone show me an example of using the new PostgreSQL syntax for upsert with pyscopg2? A common use case is to check for dimension changes (between 50k - 100k rows daily which I compare to existing values) which is CONFLICT DO NOTHING to only add new rows.
Another use case is that I have fact data which changes over time. I only take the most recent value (I currently use a view to select distinct), but it would be better to UPSERT, if possible.
Solution 1:[1]
FYI, this is the solution I am using currently.
It seems to work fine for my purposes. I had to add a line to replace null (NaT) timestamps with None though, because I was getting an error when I was loading each row into the database.
def create_update_query(table):
"""This function creates an upsert query which replaces existing data based on primary key conflicts"""
columns = ', '.join([f'{col}' for col in DATABASE_COLUMNS])
constraint = ', '.join([f'{col}' for col in PRIMARY_KEY])
placeholder = ', '.join([f'%({col})s' for col in DATABASE_COLUMNS])
updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in DATABASE_COLUMNS])
query = f"""INSERT INTO {table} ({columns})
VALUES ({placeholder})
ON CONFLICT ({constraint})
DO UPDATE SET {updates};"""
query.split()
query = ' '.join(query.split())
return query
def load_updates(df, table, connection):
conn = connection.get_conn()
cursor = conn.cursor()
df1 = df.where((pd.notnull(df)), None)
insert_values = df1.to_dict(orient='records')
for row in insert_values:
cursor.execute(create_update_query(table=table), row)
conn.commit()
row_count = len(insert_values)
logging.info(f'Inserted {row_count} rows.')
cursor.close()
del cursor
conn.close()
Solution 2:[2]
Here is my code for bulk insert & insert on conflict update query for postgresql from pandas dataframe:
Lets say id is unique key for both postgresql table and pandas df and you want to insert and update based on this id.
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine(postgresql://username:pass@host:port/dbname)
query = text(f"""
INSERT INTO schema.table(name, title, id)
VALUES {','.join([str(i) for i in list(df.to_records(index=False))])}
ON CONFLICT (id)
DO UPDATE SET name= excluded.name,
title= excluded.title
""")
engine.execute(query)
Make sure that your df columns must be same order with your table.
Solution 3:[3]
For my case, I wrote to a temporary table first, then merged the temp table into the actual table I wanted to upsert to. Performing the upsert this way avoids any conflicts where the strings may have single quotes in them.
def upsert_dataframe_to_table(self, table_name: str, df: pd.DataFrame, schema: str, id_col:str):
"""
Takes the given dataframe and inserts it into the table given. The data is inserted unless the key for that
data already exists in the dataframe. If the key already exists, the data for that key is overwritten.
:param table_name: The name of the table to send the data
:param df: The dataframe with the data to send to the table
:param schema: the name of the schema where the table exists
:param id_col: The name of the primary key column
:return: None
"""
engine = create_engine(
f'postgresql://{postgres_configs["username"]}:{postgres_configs["password"]}@{postgres_configs["host"]}'
f':{postgres_configs["port"]}/{postgres_configs["db"]}'
)
df.to_sql('temp_table', engine, if_exists='replace')
updates = ', '.join([f'{col} = EXCLUDED.{col}' for col in df.columns if col != id_col])
columns = ', '.join([f'{col}' for col in df.columns])
query = f'INSERT INTO "{schema}".{table_name} ({columns}) ' \
f'SELECT {columns} FROM temp_table ' \
f'ON CONFLICT ({id_col}) DO ' \
f'UPDATE SET {updates} '
self.cursor.execute(query)
self.cursor.execute('DROP TABLE temp_table')
self.conn.commit()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | trench |
Solution 2 | |
Solution 3 |