Category "pyspark"

pyspark - getting error 'list' object has no attribute 'write' when attempting to write to a delta table

I am attempting to read the first X number of rows of a delta table into a dataframe, and then write (overwrite) that back to the delta table. Here is code: # r

At what point should you force a cache in Spark when performing heavy transformations?

Say you have something like this: big_table1 = spark.table('db.big_table1').cache() big_table2 = spark.table('db.big_table2').cache() big_table2 = spark.table('

Spark binary file and Delta Table

I have batches of binary files (~3mb each) that I receive in batches of ~20000 files at a time. These files are used downstream for further processing, but I wa

How to make a new pyspark df column that's the average of the last n values by day of week?

What I'm trying to do is make a pyspark dataframe with item and date and another column "3_avg" that's the average of the last three same day-of-week from the g

(py)spark weighted average taking account of missing values

Is there a canonical way to compute the weighted average in pyspark ignoring missing values in the denominator sum? Take the following example: # create data da

How do I Insert Overwrite with parquet format?

I am have two parquet file in azure data lake gen2 I want to Insert Overwrite onw with other. I was trying the same in azure data bricks by doing below. Reading

How to select all columns except 2 of them from a large table on pyspark sql?

In joining two tables, I would like to select all columns except 2 of them from a large table with many columns on pyspark sql on databricks. My pyspark sql: %

Errors when running spark-submit on a local machine with Apache Spark (stand alone, single node)

I've installed apache spark on my mac with 16 GB of RAM to test my pyspark code locally with small data sets before I test it on a real cluster. I've installed

Spark writing extra rows when saving to CSV

I wrote a file to parquet containing 1,000,000 rows. When I read the parquet file back, the result is 1,000,000 rows. df = spark.read.parquet(parquet_path) df.

Py4JJavaError: An error occurred while calling o143.parquet

So I have parquet files in S3 bucket and I want to load it using pyspark in python, but I'm getting some error, here's what I have tried so far. I'm using Juput

Pyspark: join and union in for loop

I have a really simple logic that I would like to understand how I can make it work in pyspark. for data in df1: spark_data_row = spark.createDataFrame(data

How to close the spark instance

I want to stop my spark instance here once I complete my job running on Jupyter notebook. I did execute spark.stop() at the end, but when I open my terminal, I'

Pyspark create schema for maptype with different value types

I need to give the correct schema to an rdd I have, but struggling with a maptype that has different valuetypes. I guess the problem is that one specific Key ha

Processing data from a kafka stream using Pyspark

What the console of the kafka consumer looks like: ["2017-12-31 16:06:01", 12472391, 1] ["2017-12-31 16:06:01", 12472097, 1] ["2017-12-31 16:05:59", 12471979,

How to apply a pandas geocode function to Pyspark column

Table is like this id ADDRESS 0 6101 SUMMITVIEW AVE STE 200 YAKIMA 1 527 CEDAR WAY SUITE 105 OAKMONT 2 1700 N ROSE AVE SUITE 460 OXNARD 3 1275 YORK AVE NEW YOR

Pyspark: Return next weeks saturday

I'm trying to return next weeks Saturday date from datatype column rel_d. Normally, in python, I'd subtract number of days till next Saturday and add it to the

How to handle memory issue while writing data in which a particular column contains very large data in each record in databricks in pyspark

I have a set of records with 10 columns. There is a column 'x' which contains an array of float values and the length of array can be very large(for eg, the len

How to manually checkpoint a delta table using PySpark?

I have a delta table, and am trying to append data to it and then checkpoint that table. By default I believe it checkpoints every 10 commits, but I would like

Spark partition size greater than the executor memory

I have four questions. Suppose in spark I have 3 worker nodes. Each worker node has 3 executors and each executor has 3 cores. Each executor has 5 gb memory. (T

How to divide two aggreate sum dataframe

I want to divide the sum of two columns in pyspark. For example, I have a datasets like below: A B C 1 1 2 3 2 1 2 3 3 1 2 3 What I want is t