'How to set config for Sparksession using Python
I have a huge Oracle table and want to process it using Pyspark Dataframe. I use ora_hash to force each node reads its data based on ora_hash output. Also, I config spark as I want. My code is in the following:
num_partitions = num_partitions - 1
hash_function = lambda x: 'ora_hash({}, {})'.format(x, num_partitions)
conf = SparkConf().setAll([("spark.app.master", "local[4]"), ("spark.appName", "ETL"),
('spark.executor.memory', '8g'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'),
('spark.driver.memory', '8g'), ("spark.driver.bindAddress", "localhost"),
("spark.ui.port", "4050")])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
dates_df = connection.read_sql('SELECT distinct time_stamp d FROM (select substr(time_stamp, 1, 8) time_stamp '
'from {})'.format(source_table_name))
dates = list(dates_df.loc[:, 'D'])
cst_hash_df = connection.read_sql('SELECT distinct {} hash FROM {}'.format(
hash_function('cst_id'), source_table_name)) # todo: remove rownum<10000
cst_hash_values = list(cst_hash_df.loc[:, 'HASH'])
predicates = [
"to_date(substr(transtamp, 1, 8),'YYYYMMDD','nls_calendar=persian')= to_date({} ,'YYYYMMDD','nls_calendar=persian') and ora_hash(cst_id,{}) = {}"
.format(d, num_partitions, cst) for d in dates for cst in
cst_hash_values]
dataframe = spark.read \
.option('driver', 'oracle.jdbc.driver.OracleDriver') \
.jdbc(url='jdbc:oracle:thin:user/[email protected]:1521:orcl',
table=source_table_name,
predicates=predicates)
dataframe = dataframe.distinct()
... do some process on data
dataframe.write.format('jdbc').options(
url='jdbc:oracle:thin:@x.x.x.x:1521:orcl',
driver='oracle.jdbc.driver.OracleDriver',
dbtable=dist_table_name,
user='user',
password='pass').mode('append').save()
Even though I set config for the SparkSession, it print default config. When I add this line to code:
print('spark : ', spark.sparkContext.getConf().getAll())
Also, I use this way, but nothing is change:
spark = SparkSession.builder.appName("ETL")
.master("local[4]").config('spark.executor.memory', '8g')\
.config('spark.driver.memory', '8g')\
.config('spark.executor.cores', '4')\
.config("spark.driver.bindAddress", "localhost")\
.config("spark.ui.port", "4050")\
.getOrCreate()
Would you please guide me how to set config for spark session to override the default config?
Any help is really appreciated.
Solution 1:[1]
Do you use Jupyter?
By the time your notebook kernel has started, the SparkSession is already created with parameters defined in a kernel configuration file. To change this, you will need to update or replace the kernel configuration file, which I believe is usually somewhere like /kernels//kernel.json.
See details: How to start sparksession in pyspark
Spark related posts: https://bigdata-etl.com/tag/apache-spark/
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Pawe? Cie?la |