'Databricks- ConcurrentAppendException:

I'm running like 20 notebooks concurrently and they all update the same Delta table (however, different rows). I'm getting the below exception if any two notebooks try to update the table at the same time).

Does setting 'delta.isolationLevel' = 'Serializable' for the Delta table fix the issue? Is there a better option?

ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again. Conflicting commit:



Solution 1:[1]

Does setting ('delta.isolationLevel' = 'Serializable') for the Delta table fix the issue?

With the WriteSerializable which is default isolation level, files are added by blind INSERT operations without any conflict of operations, even if they touch the same partition. If the isolation level is set to Serializable, then blind appends may create some conflict operations.

Is there a better option?

The concurrent operations possibly updating different partition of directories physically, one of them may read the same partition and the other one is concurrently updates, thus causing a conflict. You can avoid this by making the separation explicit in the operation condition. Consider the below code example:

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

Because each job is working on an independent partition on the target Delta table, you cannot expect any conflicts. However, the condition is not explicit enough and it can scan the entire table and can conflict with concurrent operations updating any other partitions. To avoid this, you can rewrite your statement to add specific date and country to the merge condition, as shown in the following example.

// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()

Reference: https://docs.microsoft.com/en-us/azure/databricks/delta/concurrency-control#concurrentappendexception

https://docs.microsoft.com/en-us/azure/databricks/delta/optimizations/isolation-level

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 PratikLad-MT