'How to set config for Sparksession using Python

I have a huge Oracle table and want to process it using Pyspark Dataframe. I use ora_hash to force each node reads its data based on ora_hash output. Also, I config spark as I want. My code is in the following:

num_partitions = num_partitions - 1
hash_function = lambda x: 'ora_hash({}, {})'.format(x, num_partitions)
conf = SparkConf().setAll([("spark.app.master", "local[4]"), ("spark.appName", "ETL"),
                           ('spark.executor.memory', '8g'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'),
                           ('spark.driver.memory', '8g'), ("spark.driver.bindAddress", "localhost"),
                           ("spark.ui.port", "4050")])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

spark.sparkContext.setLogLevel('ERROR')
dates_df = connection.read_sql('SELECT distinct time_stamp d FROM (select substr(time_stamp, 1, 8) time_stamp '
                               'from {})'.format(source_table_name)) 

dates = list(dates_df.loc[:, 'D'])
cst_hash_df = connection.read_sql('SELECT distinct {} hash FROM {}'.format(
    hash_function('cst_id'), source_table_name))  # todo: remove rownum<10000

cst_hash_values = list(cst_hash_df.loc[:, 'HASH'])
predicates = [
    "to_date(substr(transtamp, 1, 8),'YYYYMMDD','nls_calendar=persian')= to_date({} ,'YYYYMMDD','nls_calendar=persian') and ora_hash(cst_id,{}) = {}"
        .format(d, num_partitions, cst) for d in dates for cst in
    cst_hash_values]

dataframe = spark.read \
    .option('driver', 'oracle.jdbc.driver.OracleDriver') \
    .jdbc(url='jdbc:oracle:thin:user/[email protected]:1521:orcl',
          table=source_table_name,
          predicates=predicates)
dataframe = dataframe.distinct()    
  
... do some process on data

dataframe.write.format('jdbc').options(
    url='jdbc:oracle:thin:@x.x.x.x:1521:orcl',
    driver='oracle.jdbc.driver.OracleDriver',
    dbtable=dist_table_name,
    user='user',
    password='pass').mode('append').save()

Even though I set config for the SparkSession, it print default config. When I add this line to code:

 print('spark : ', spark.sparkContext.getConf().getAll())

Also, I use this way, but nothing is change:

 spark = SparkSession.builder.appName("ETL")
     .master("local[4]").config('spark.executor.memory', '8g')\
     .config('spark.driver.memory', '8g')\
     .config('spark.executor.cores', '4')\
     .config("spark.driver.bindAddress", "localhost")\
     .config("spark.ui.port", "4050")\
     .getOrCreate()

Would you please guide me how to set config for spark session to override the default config?

Any help is really appreciated.



Solution 1:[1]

Do you use Jupyter?

By the time your notebook kernel has started, the SparkSession is already created with parameters defined in a kernel configuration file. To change this, you will need to update or replace the kernel configuration file, which I believe is usually somewhere like /kernels//kernel.json.

See details: How to start sparksession in pyspark

Spark related posts: https://bigdata-etl.com/tag/apache-spark/

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Pawe? Cie?la