'Spark binary file and Delta Table

I have batches of binary files (~3mb each) that I receive in batches of ~20000 files at a time. These files are used downstream for further processing, but I want to process them and store in Delta tables.

I can do this easily:

df = spark.read.format(“binaryFile”).load(<path-to-batch>)
df = df.withColumn(“id”, expr(“uuid()”)

dt = DeltaTable.forName(“myTable”)
dt.alias(“a”).merge(
    df.alias(“a”),
    “a.path = b.path”
).whenNotMatchedInsert(
    values={“id”: “b.id”, “content”: “b.content”}
).execute()

This makes the table quite slow already, but later I need to query certain IDs, do collect and write them individually back to binary files.

Questions:

  1. Would my table benefit from a batch column and partition?
  2. Should I partition by id? I know this is not ideal, but might make querying individual rows easier?
  3. Is there a better way to write the files out again, rather than .collect()? I have seen when I select about 1000 specific ids write them out that about 10 minutes is just for collect and then less than a minute to write. I do something like:
for row in df.collect():
    with open(row.id, “wb”) as fw:
        fw.write(row.content)


Solution 1:[1]

  1. As uuid() returns random values, I'm afraid we cannot use it to compare existing data with new records. (Sorry if I misunderstood the idea)
  2. I don't think using partition by id will help as the id column has obviously high cardinality.
  3. Instead of using collect() which loads all records into Driver, I think it would be better if you can write the records in the Spark dataframe directly and simultaneously from all the worker nodes into a temporary location on ADLS first and then aggregate a few data files from that location.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 PhuriChal