'Write data from broadcast variable (databricks) to azure blob

I have a url from where I download the data (which is in JSON format) using Databricks:

url="https://tortuga-prod-eu.s3-eu-west-1.amazonaws.com/%2FNinetyDays/amzf277698d77514b44"
testfile = urllib.request.URLopener()
testfile.retrieve(url, "file.gz")
with gzip.GzipFile("file.gz", 'r') as fin:
json_bytes = fin.read()
json_str = json_bytes.decode('utf-8')
data = json.loads(json_str)

Now I want to save this data in Azure container as a blob .json file. I have tried saving data in a dataframe and write df to mounted location but data is huge in GBs and I get spark.rpc.message.maxSize (268435456 bytes) error. I have tried saving data in a broadcast variable (it saves successfully) but I am not sure how to write data from broadcast variable to mounted location.

Here is how I save data in broadcast variable

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
broadcastStates = spark.sparkContext.broadcast(data)
print(broadcastStates.value)

M question is

  • Is there any way I can write data from broadcast variable to azure mounted location
  • if not then please guide me what is right/best way to get this job done.


Solution 1:[1]

It is not possible to write the broadcast variable into mounted azure blob storage. However, there is a way you can write the value of broadcast variable into a file.

pyspark.broadcast.Broadcast provides 2 methods, dump() and load_from_path(), using which you can write and read the value of a broadcast variable. Since you have created a broadcast variable using:

broadcastStates = spark.sparkContext.broadcast(data)

Use the following syntax to write the value of broadcast variable to a file.

<broadcast_variable>.dump(<broadcast_variable>.value, filename)

Note: the file must have write attribute and the write() argument must be string.

To read this data from the file, you can use load_from_path() as shown below:

<broadcast_variable>.load_from_path(filename)

Note: the file must have read and readline attributes.

There might also be a way to avoid “spark.rpc.message.maxSize (268435456 bytes) error”. The default value of spark.rpc.message.maxSize is 128. Refer to the following document to know more about this error.

https://spark.apache.org/docs/latest/configuration.html#networking

While creating a cluster in Databricks, we can configure and increase the value to avoid this error. The steps to configure a cluster are:

? While creating cluster, choose advanced options (present at the bottom).

? Under spark tab, choose the configuration and its value as shown below.

enter image description here

  • Click create cluster.

This might help in writing the dataframe directly to mounted blob storage without using broadcast variables.

You can also try increasing the number of partitions to save the dataframe into multiple smaller files to avoid maxSize error. Refer to the following document about configuring spark and partitioning.

https://kb.databricks.com/execution/spark-serialized-task-is-too-large.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 SaideepArikontham-MT