'How to fix org.apache.spark.SparkException: Job aborted due to stage failure Task & com.datastax.spark.connector.rdd.partitioner.CassandraPartition

In my project i am using spark-Cassandra-connector to read the from Cassandra table and process it further into JavaRDD but i am facing issue while processing Cassandra row to javaRDD.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 52, 172.20.0.4, executor 1):
 java.lang.ClassNotFoundException: com.datastax.spark.connector.rdd.partitioner.CassandraPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:370)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

I have configured my spark to user a spark cluster. When i am using master as local the code works fine but as soon as i replace it with the master i am facing issue. Here is my spark configuration:


SparkConf sparkConf = new SparkConf().setAppName("Data Transformation")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("spark://masterip:7077");

        sparkConf.set("spark.cassandra.connection.host", cassandraContactPoints);
        sparkConf.set("spark.cassandra.connection.port", cassandraPort);
        sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
        sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
        sparkConf.set("spark.driver.allowMultipleContexts", "true");

        /*
         * sparkConf.set("spark.cassandra.auth.username", "centralrw");
         * sparkConf.set("spark.cassandra.auth.password", "t8b9HRWy");
         */
        logger.info("creating spark context object");
        sparkContext = new JavaSparkContext(sparkConf);
        logger.info("returning sparkcontext object");
        return sparkContext;

Spark version - 2.4.0 spark-Cassandra_connector - 2.4.0

ReceiverConfig:

public List<Map<String, GenericTriggerEntity>> readDataFromGenericTriggerEntityUsingSpark(
            JavaSparkContext sparkContext) {

        List<Map<String, GenericTriggerEntity>> genericTriggerEntityList = new ArrayList<Map<String, GenericTriggerEntity>>();

        try {

            logger.info("Keyspace & table name to read data from cassandra");
            String tableName = "generictriggerentity";
            String keySpace = "centraldatalake";

            logger.info("establishing conection");
            CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(sparkContext)
                    .cassandraTable(keySpace, tableName);
            int num = cassandraRDD.getNumPartitions();
            System.out.println("num- " + num);

            logger.info("Converting extracted rows to JavaRDD");
            JavaRDD<Map<String, GenericTriggerEntity>> rdd = cassandraRDD
                    .map(new Function<CassandraRow, Map<String, GenericTriggerEntity>>() {

                        private static final long serialVersionUID = -165799649937652815L;

                        @Override
                        public Map<String, GenericTriggerEntity> call(CassandraRow row) throws Exception {
                            Map<String, GenericTriggerEntity> genericTriggerEntityMap = new HashMap<String, GenericTriggerEntity>();
                            GenericTriggerEntity genericTriggerEntity = new GenericTriggerEntity();
                            if (row.getString("end") != null)
                                genericTriggerEntity.setEnd(row.getString("end"));
                            if (row.getString("key") != null)
                                genericTriggerEntity.setKey(row.getString("key"));
                            if (row.getString("keyspacename") != null)
                                genericTriggerEntity.setKeyspacename(row.getString("keyspacename"));
                            if (row.getString("partitiondeleted") != null)
                                genericTriggerEntity.setPartitiondeleted(row.getString("partitiondeleted"));
                            if (row.getString("rowdeleted") != null)
                                genericTriggerEntity.setRowdeleted(row.getString("rowdeleted"));
                            if (row.getString("rows") != null)
                                genericTriggerEntity.setRows(row.getString("rows"));
                            if (row.getString("start") != null)
                                genericTriggerEntity.setStart(row.getString("start"));
                            if (row.getString("tablename") != null) {
                                genericTriggerEntity.setTablename(row.getString("tablename"));
                                dataTableName = row.getString("tablename");
                            }
                            if (row.getString("triggerdate") != null)
                                genericTriggerEntity.setTriggerdate(row.getString("triggerdate"));
                            if (row.getString("triggertime") != null)
                                genericTriggerEntity.setTriggertime(row.getString("triggertime"));
                            if (row.getString("uuid") != null)
                                genericTriggerEntity.setUuid(row.getUUID("uuid"));

                            genericTriggerEntityMap.put(dataTableName, genericTriggerEntity);
                            return genericTriggerEntityMap;
                        }

                    });

            List<Partition> partition = rdd.partitions();
            System.out.println("partion - " + partition.size());

            logger.info("Collecting data into rdd");
            genericTriggerEntityList = rdd.collect();

        } catch (Exception e) {
            e.printStackTrace();
        }
        logger.info("returning generic trigger entity list");
        return genericTriggerEntityList;
    }

when i am doing rdd.collect() it gives exception

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 21, 10.22.3.55, executor 0): java.lang.ClassNotFoundException: in.dmart.central.data.transform.base.config.ReceiverConfig$1

I found a solution of creating a fat jar and including it in my code but i do not want to do that as every time ill make any changes ill have to do the process again and that is not possible.

Please suggest some solution to configure in the code or spark cluster. Thanks in advance.



Solution 1:[1]

If you don't create fat jar, then you need to submit job with correct package specified, like this:

spark-submit --packages datastax:spark-cassandra-connector:2.4.1-s_2.11 \
   ...rest of your options/arguments...

This will distribute corresponding SCC packages to all Spark nodes.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alex Ott