'Auto increment id in delta table while inserting

I have a problem regarding merging csv files using pysparkSQL with delta table. I managed to create upsert function that update if matched and insert if not matched.

I want to add column ID to the final delta table and increment it each time we insert data. This column identify each row in our delta table. Is there any way to put that in place ?

def Merge(dict1, dict2):
    res = {**dict1, **dict2}
    return res

def create_default_values_dict(correspondance_df,marketplace):
    dict_output = {}
    for field in get_nan_keys_values(get_mapping_dict(correspondance_df, marketplace)):
        dict_output[field] = 'null'
        # We want to increment the id row each time we perform an insertion (TODO TODO TODO)
#         if field == 'id':
#             dict_output['id'] = col('id')+1
#         else:    
    return dict_output


def create_matched_update_dict(mapping, products_table, updates_table):
    output = {}
    for k,v in mapping.items():
        if k == 'source_name':
            output['products.source_name'] = lit(v)
        else:
            output[products_table + '.' + k] = F.when(col(updates_table + '.' + v).isNull(), col(products_table + '.' + k)).when(col(updates_table + '.' + v).isNotNull(), col(updates_table + '.' + v))     
    return output    

insert_dict = create_not_matched_insert_dict(mapping, 'products', 'updates')
default_dict = create_default_values_dict(correspondance_df_products, 'Cdiscount')

insert_values = Merge(insert_dict, default_dict)
update_values = create_matched_update_dict(mapping, 'products', 'updates')

delta_table_products.alias('products').merge(
    updates_df_table.limit(20).alias('updates'),
    "products.barcode_ean == updates.ean") \
    .whenMatchedUpdate(set = update_values) \
    .whenNotMatchedInsert(values = insert_values)\
    .execute()

I tried to increment the column id in the function create_default_values_dict but it's seems to not working well, it doesn't auto increment by 1. Is there another way to solve this problem ? Thanks in advance :)



Solution 1:[1]

Databricks has IDENTITY columns for hosted Spark

https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html#parameters

GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY
 [ ( [ START WITH start ] [ INCREMENT BY step ] ) ] 

This works on Delta tables.

Example:

create table gen1 (
     id long GENERATED ALWAYS AS IDENTITY
   , t string
)

Requires Runtime version 10.4 or above.

Solution 2:[2]

Delta does not support auto-increment column types.

In general, Spark doesn't use auto-increment IDs, instead favoring monotonically increasing IDs. See functions.monotonically_increasing_id().

If you want to achieve auto-increment behavior you will have to use multiple Delta operations, e.g., query the max value + add it to a row_number() column computed via a window function + then write. This is problematic for two reasons:

  1. Unless you introduce an external locking mechanism or some other way to ensure that no updates to the table happen in-between finding the max value and writing, you can end up with invalid data.

  2. Using row_number() will reduce parallelism to 1, forcing all the data through a single core, which will be very slow with large data.

Bottom line, you really do not want to use auto-increment columns with Spark.

Hope this helps.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Sim