Hi, I am stuck at this for 3 days now. I am using the spark-cassandra-connector with spark and I am able to make RDDs with sc.cassandraTable function that means spark is able to communicate with Cassandra properly.
But somehow the saveToCassandra is not working. Below are the steps I am doing. Does it have something to do with my spark-env or spark-defaults? Am I missing something critical ? scala> import com.datastax.spark.connector._ scala> sc.addJar("/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar") scala> val myTable = sc.cassandraTable("test2", " words") scala> myTable.collect() --- this works perfectly fine. scala> val data = sc.parallelize(Seq((81, "XXX"), (82, "YYYY"))) scala> data.saveToCassandra("test2", "words", SomeColumns("word", "count")) --- this fails 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042 added 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 10.131.141.192 (datacenter1) 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042 added 15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 10.131.141.193 (datacenter1) 15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042 added 15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster: wfan_cluster_DB 15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at RDDFunctions.scala:29 15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at RDDFunctions.scala:29) with 2 output partitions (allowLocal=false) 15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at RDDFunctions.scala:29) 15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List() 15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List() 15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1 (ParallelCollectionRDD[1] at parallelize at <console>:20), which has no missing parents 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with curMem=1792, maxMem=2778778828 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 7.2 KB, free 2.6 GB) 15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with curMem=9192, maxMem=2778778828 15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.5 KB, free 2.6 GB) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB) 15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (ParallelCollectionRDD[1] at parallelize at <console>:20) 15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 10.131.141.192, PROCESS_LOCAL, 1216 bytes) 15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 10.131.141.193, PROCESS_LOCAL, 1217 bytes) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB) 15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB) 15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra cluster: wfan_cluster_DB 15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 10.131.141.192): java.lang.NoSuchMethodError: org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option; com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) 15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, 10.131.141.192, PROCESS_LOCAL, 1216 bytes) 15/03/11 15:16:46 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@29ffe58e 15/03/11 15:16:46 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@29ffe58e java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145) 15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor 1 disconnected, so removing it 15/03/11 15:16:46 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 ERROR TaskSchedulerImpl: Lost executor 1 on 10.131.141.192: remote Akka client disassociated 15/03/11 15:16:46 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0 15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875) 15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/1 is now EXITED (Command exited with code 50) 15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor app-20150311151059-0008/1 removed: Command exited with code 50 15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 4, 10.131.141.192): ExecutorLostFailure (executor lost) 15/03/11 15:16:46 INFO DAGScheduler: Executor lost: 1 (epoch 0) 15/03/11 15:16:46 INFO AppClient$ClientActor: Executor added: app-20150311151059-0008/2 on worker-20150310181709-10.131.141.192-58499 (10.131.141.192:58499) with 1 cores 15/03/11 15:16:46 INFO BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster. 15/03/11 15:16:46 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(1, 10.131.141.192, 32875, 0) 15/03/11 15:16:46 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150311151059-0008/2 on hostPort 10.131.141.192:58499 with 1 cores, 512.0 MB RAM 15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/2 is now LOADING 15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 5, 10.131.141.193, PROCESS_LOCAL, 1216 bytes) 15/03/11 15:16:46 INFO TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3) on executor 10.131.141.193: java.lang.NoSuchMethodError (org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;) [duplicate 1] 15/03/11 15:16:47 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660) 15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor 0 disconnected, so removing it 15/03/11 15:16:47 ERROR TaskSchedulerImpl: Lost executor 0 on 10.131.141.193: remote Akka client disassociated 15/03/11 15:16:47 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660) 15/03/11 15:16:47 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0 15/03/11 15:16:47 WARN TaskSetManager: Lost task 0.2 in stage 1.0 (TID 5, 10.131.141.193): ExecutorLostFailure (executor lost) 15/03/11 15:16:47 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660) not found 15/03/11 15:16:47 INFO DAGScheduler: Executor lost: 0 (epoch 1) 15/03/11 15:16:47 INFO BlockManagerMasterActor: Trying to remove executor 0 from BlockManagerMaster. 15/03/11 15:16:47 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(0, 10.131.141.193, 51660, 0) 15/03/11 15:16:47 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor 15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/0 is now EXITED (Command exited with code 50) 15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor app-20150311151059-0008/0 removed: Command exited with code 50 15/03/11 15:16:47 INFO AppClient$ClientActor: Executor added: app-20150311151059-0008/3 on worker-20150310181710-10.131.141.193-35791 (10.131.141.193:35791) with 1 cores 15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150311151059-0008/3 on hostPort 10.131.141.193:35791 with 1 cores, 512.0 MB RAM 15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/3 is now LOADING 15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/2 is now RUNNING 15/03/11 15:16:49 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/3 is now RUNNING 15/03/11 15:16:51 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.131.141.192:37230/user/Executor#544826218] with ID 2 15/03/11 15:16:51 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 6, 10.131.141.192, PROCESS_LOCAL, 1216 bytes) 15/03/11 15:16:52 INFO BlockManagerMasterActor: Registering block manager 10.131.141.192:40195 with 267.3 MB RAM, BlockManagerId(2, 10.131.141.192, 40195, 0) 15/03/11 15:16:52 INFO ConnectionManager: Accepted connection from [kvs-in-wfanub03.int.kronos.com/10.131.141.192:59385] 15/03/11 15:16:52 INFO SendingConnection: Initiating connection to [kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195] 15/03/11 15:16:52 INFO SendingConnection: Connected to [kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195], 1 messages pending 15/03/11 15:16:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.131.141.192:40195 (size: 3.5 KB, free: 267.3 MB) 15/03/11 15:16:53 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 7, 10.131.141.192, PROCESS_LOCAL, 1217 bytes) 15/03/11 15:16:53 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 6) on executor 10.131.141.192: java.lang.NoSuchMethodError (org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;) [duplicate 2] 15/03/11 15:16:53 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job 15/03/11 15:16:53 INFO TaskSchedulerImpl: Cancelling stage 1 15/03/11 15:16:53 INFO TaskSchedulerImpl: Stage 1 was cancelled 15/03/11 15:16:53 INFO DAGScheduler: Failed to run runJob at RDDFunctions.scala:29 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, 10.131.141.192): java.lang.NoSuchMethodError: org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option; com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70) com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) scala> 15/03/11 15:16:53 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195) 15/03/11 15:16:53 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195) 15/03/11 15:16:53 ERROR ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195) not found 15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor 2 disconnected, so removing it 15/03/11 15:16:53 ERROR TaskSchedulerImpl: Lost executor 2 on 10.131.141.192: remote Akka client disassociated 15/03/11 15:16:53 INFO TaskSetManager: Re-queueing tasks for 2 from TaskSet 1.0 15/03/11 15:16:53 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 7, 10.131.141.192): ExecutorLostFailure (executor lost) 15/03/11 15:16:53 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/03/11 15:16:53 INFO DAGScheduler: Executor lost: 2 (epoch 2) 15/03/11 15:16:53 INFO BlockManagerMasterActor: Trying to remove executor 2 from BlockManagerMaster. 15/03/11 15:16:53 INFO BlockManagerMasterActor: Removing block manager BlockManagerId(2, 10.131.141.192, 40195, 0) 15/03/11 15:16:53 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor 15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/2 is now EXITED (Command exited with code 50) 15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor app-20150311151059-0008/2 removed: Command exited with code 50 15/03/11 15:16:53 INFO AppClient$ClientActor: Executor added: app-20150311151059-0008/4 on worker-20150310181709-10.131.141.192-58499 (10.131.141.192:58499) with 1 cores 15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Granted executor ID app-20150311151059-0008/4 on hostPort 10.131.141.192:58499 with 1 cores, 512.0 MB RAM 15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/4 is now LOADING 15/03/11 15:16:54 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.131.141.193:55631/user/Executor#-183896177] with ID 3 15/03/11 15:16:54 INFO BlockManagerMasterActor: Registering block manager 10.131.141.193:56916 with 267.3 MB RAM, BlockManagerId(3, 10.131.141.193, 56916, 0) 15/03/11 15:16:54 INFO AppClient$ClientActor: Executor updated: app-20150311151059-0008/4 is now RUNNING 15/03/11 15:16:58 INFO SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@10.131.141.192:37182/user/Executor#577095849] with ID 4 15/03/11 15:16:58 INFO BlockManagerMasterActor: Registering block manager 10.131.141.192:34398 with 267.3 MB RAM, BlockManagerId(4, 10.131.141.192, 34398, 0) data.sa Regards, Tarun Tiwari | Workforce Analytics-ETL | Kronos India M: +91 9540 28 27 77 | Tel: +91 120 4015200 Kronos | Time & Attendance * Scheduling * Absence Management * HR & Payroll * Hiring * Labor Analytics Join Kronos on: kronos.com<http://www.kronos.com/> | Facebook<http://www.kronos.com/facebook> | Twitter<http://www.kronos.com/twitter> | LinkedIn<http://www.kronos.com/linkedin> | YouTube<http://www.kronos.com/youtube>