'Auto increment id in delta table while inserting
I have a problem regarding merging csv files using pysparkSQL with delta table. I managed to create upsert function that update if matched and insert if not matched.
I want to add column ID
to the final delta table and increment it each time we insert data. This column identify each row in our delta table. Is there any way to put that in place ?
def Merge(dict1, dict2):
res = {**dict1, **dict2}
return res
def create_default_values_dict(correspondance_df,marketplace):
dict_output = {}
for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
dict_output[field] = 'null'
# We want to increment the id row each time we perform an insertion (TODO TODO TODO)
# if field == 'id':
# dict_output['id'] = col('id')+1
# else:
return dict_output
def create_matched_update_dict(mapping, products_table, updates_table):
output = {}
for k,v in mapping.items():
if k == 'source_name':
output['products.source_name'] = lit(v)
else:
output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))
return output
insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')
insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')
delta_table_products.alias('products').merge(
updates_df_table.limit(20).alias('updates'),
"products.barcode_ean == updates.ean") \
.whenMatchedUpdate(set = update_values) \
.whenNotMatchedInsert(values = insert_values)\
.execute()
I tried to increment the column id
in the function create_default_values_dict
but it's seems to not working well, it doesn't auto increment by 1. Is there another way to solve this problem ? Thanks in advance :)
Solution 1:[1]
Databricks has IDENTITY columns for hosted Spark
GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
[ ( [ START WITH start ] [ INCREMENT BY step ] ) ]
This works on Delta tables.
Example:
create table gen1 (
id long GENERATED ALWAYS AS IDENTITY
, t string
)
Requires Runtime version 10.4 or above.
Solution 2:[2]
Delta does not support auto-increment column types.
In general, Spark doesn't use auto-increment IDs, instead favoring monotonically increasing IDs. See functions.monotonically_increasing_id()
.
If you want to achieve auto-increment behavior you will have to use multiple Delta operations, e.g., query the max value + add it to a row_number()
column computed via a window function + then write. This is problematic for two reasons:
Unless you introduce an external locking mechanism or some other way to ensure that no updates to the table happen in-between finding the max value and writing, you can end up with invalid data.
Using
row_number()
will reduce parallelism to 1, forcing all the data through a single core, which will be very slow with large data.
Bottom line, you really do not want to use auto-increment columns with Spark.
Hope this helps.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Sim |