Hi all,

I am developing a spark application where I am loading the data into
Cassandra and I am using the Spark Cassandra connector for the same. I have
created a FAT jar with all the dependencies and submitted that using
spark-submit.

I am able to load the data successfully to cassandra, but I am not able to
get the metrics from the spark cassandra connector. I checked the executor
logs and saw that the following properties failed to initialize because of
the mentioned error.

Properties:

"spark.metrics.conf.driver.source.cassandra-connector.class":
"org.apache.spark.metrics.CassandraConnectorSource"
"spark.metrics.conf.executor.source.cassandra-connector.class":
"org.apache.spark.metrics.CassandraConnectorSource"

Error:


22/01/28 15:30:55 ERROR MetricsSystem: Source class
org.apache.spark.metrics.CassandraConnectorSource cannot be
instantiated
java.lang.ClassNotFoundException:
org.apache.spark.metrics.CassandraConnectorSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:182)
at 
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:179)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at 
org.apache.spark.metrics.MetricsSystem.registerSources(MetricsSystem.scala:179)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:101)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:364)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:200)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:228)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)


I am loading the data to cassandra using the below code:

cassandraTableDataset.toDF(cassandraTable.getRenamedColumns()).
        write().format(sparkCassandraFormat).
        options(ImmutableMap.of(cassandraKeyspaceString, getKeyspace(config),
                cassandraTableString, getTable(config))).
        mode(SaveMode.Append).
        save();


I cannot copy the spark cassandra connector jar to all the nodes in the
cluster because of some restrictions.

*Solutions tried:*

*Solution 1: *

Used spark.jars and spark.executor.extraClassPath options, but it did not
work. As the executor's spark session is getting created before these jars
or FAT application jar is fetched/copied to the executor node.

*Solution 2:*

I tried to manually initialize the
org.apache.spark.metrics.CassandraConnectorSource class and registered with
SparkEnv Metric system just before the cassandra loading, and again it did
not work. I am assuming these changes are happening on the driver only.

*Solution 3:*

I also tried to set the same Properties using  sparkEnvironment
.getSparkSession().conf().set(), and it did not work as well. I do not know
if the above mentioned properties can be added at runtime or not, so I
tried this as well. I was hoping that this would help me change the
executor config at runtime.


Spark Version: 2.3.0
Scala Version: 2.11
Spark Cassandra Connector:
com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

Please help with this issue, as these metrics are important. Thanks in
advance.

Thank You,
Yogesh

Reply via email to