'Spark binary file and Delta Table
I have batches of binary files (~3mb each) that I receive in batches of ~20000 files at a time. These files are used downstream for further processing, but I want to process them and store in Delta tables.
I can do this easily:
df = spark.read.format(“binaryFile”).load(<path-to-batch>)
df = df.withColumn(“id”, expr(“uuid()”)
dt = DeltaTable.forName(“myTable”)
dt.alias(“a”).merge(
df.alias(“a”),
“a.path = b.path”
).whenNotMatchedInsert(
values={“id”: “b.id”, “content”: “b.content”}
).execute()
This makes the table quite slow already, but later I need to query certain IDs, do collect and write them individually back to binary files.
Questions:
- Would my table benefit from a batch column and partition?
- Should I partition by
id
? I know this is not ideal, but might make querying individual rows easier? - Is there a better way to write the files out again, rather than
.collect()
? I have seen when I select about 1000 specific ids write them out that about 10 minutes is just for collect and then less than a minute to write. I do something like:
for row in df.collect():
with open(row.id, “wb”) as fw:
fw.write(row.content)
Solution 1:[1]
- As uuid() returns random values, I'm afraid we cannot use it to compare existing data with new records. (Sorry if I misunderstood the idea)
- I don't think using partition by id will help as the id column has obviously high cardinality.
- Instead of using collect() which loads all records into Driver, I think it would be better if you can write the records in the Spark dataframe directly and simultaneously from all the worker nodes into a temporary location on ADLS first and then aggregate a few data files from that location.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | PhuriChal |