Hi,

I am stuck at this for 3 days now. I am using the spark-cassandra-connector 
with spark and I am able to make RDDs with sc.cassandraTable function that 
means spark is able to communicate with Cassandra properly.

But somehow the saveToCassandra is not working. Below are the steps I am doing.
Does it have something to do with my spark-env or spark-defaults? Am I missing 
something critical ?

scala> import com.datastax.spark.connector._
scala> 
sc.addJar("/home/analytics/Installers/spark-1.1.1/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-SNAPSHOT.jar")
scala> val myTable = sc.cassandraTable("test2", " words")
scala> myTable.collect()
--- this works perfectly fine.

scala> val data = sc.parallelize(Seq((81, "XXX"), (82, "YYYY")))
scala> data.saveToCassandra("test2", "words", SomeColumns("word", "count"))
--- this fails

15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.192:9042 added
15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 
10.131.141.192 (datacenter1)
15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.193:9042 added
15/03/11 15:16:45 INFO LocalNodeFirstLoadBalancingPolicy: Added host 
10.131.141.193 (datacenter1)
15/03/11 15:16:45 INFO Cluster: New Cassandra host /10.131.141.200:9042 added
15/03/11 15:16:45 INFO CassandraConnector: Connected to Cassandra cluster: 
wfan_cluster_DB
15/03/11 15:16:45 INFO SparkContext: Starting job: runJob at 
RDDFunctions.scala:29
15/03/11 15:16:45 INFO DAGScheduler: Got job 1 (runJob at 
RDDFunctions.scala:29) with 2 output partitions (allowLocal=false)
15/03/11 15:16:45 INFO DAGScheduler: Final stage: Stage 1(runJob at 
RDDFunctions.scala:29)
15/03/11 15:16:45 INFO DAGScheduler: Parents of final stage: List()
15/03/11 15:16:45 INFO DAGScheduler: Missing parents: List()
15/03/11 15:16:45 INFO DAGScheduler: Submitting Stage 1 
(ParallelCollectionRDD[1] at parallelize at <console>:20), which has no missing 
parents
15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(7400) called with 
curMem=1792, maxMem=2778778828
15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 7.2 KB, free 2.6 GB)
15/03/11 15:16:45 INFO MemoryStore: ensureFreeSpace(3602) called with 
curMem=9192, maxMem=2778778828
15/03/11 15:16:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 3.5 KB, free 2.6 GB)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.131.141.200:56502 (size: 3.5 KB, free: 2.6 GB)
15/03/11 15:16:45 INFO BlockManagerMaster: Updated info of block 
broadcast_1_piece0
15/03/11 15:16:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 
(ParallelCollectionRDD[1] at parallelize at <console>:20)
15/03/11 15:16:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/03/11 15:16:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
10.131.141.193, PROCESS_LOCAL, 1217 bytes)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.131.141.193:51660 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.131.141.192:32875 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:45 INFO CassandraConnector: Disconnected from Cassandra cluster: 
wfan_cluster_DB
15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 
10.131.141.192): java.lang.NoSuchMethodError: 
org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;
        
com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)
        
com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
        
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
        
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, 
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:46 INFO ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@29ffe58e
15/03/11 15:16:46 INFO ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@29ffe58e
java.nio.channels.CancelledKeyException
        at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:392)
        at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:145)
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor 1 disconnected, so 
removing it
15/03/11 15:16:46 INFO ConnectionManager: Handling connection error on 
connection to ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 ERROR TaskSchedulerImpl: Lost executor 1 on 10.131.141.192: 
remote Akka client disassociated
15/03/11 15:16:46 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,32875)
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/1 is now EXITED (Command exited with code 50)
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Executor 
app-20150311151059-0008/1 removed: Command exited with code 50
15/03/11 15:16:46 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 4, 
10.131.141.192): ExecutorLostFailure (executor lost)
15/03/11 15:16:46 INFO DAGScheduler: Executor lost: 1 (epoch 0)
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor added: 
app-20150311151059-0008/2 on worker-20150310181709-10.131.141.192-58499 
(10.131.141.192:58499) with 1 cores
15/03/11 15:16:46 INFO BlockManagerMasterActor: Trying to remove executor 1 
from BlockManagerMaster.
15/03/11 15:16:46 INFO BlockManagerMasterActor: Removing block manager 
BlockManagerId(1, 10.131.141.192, 32875, 0)
15/03/11 15:16:46 INFO BlockManagerMaster: Removed 1 successfully in 
removeExecutor
15/03/11 15:16:46 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150311151059-0008/2 on hostPort 10.131.141.192:58499 with 1 cores, 512.0 
MB RAM
15/03/11 15:16:46 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/2 is now LOADING
15/03/11 15:16:46 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 5, 
10.131.141.193, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:46 INFO TaskSetManager: Lost task 1.0 in stage 1.0 (TID 3) on 
executor 10.131.141.193: java.lang.NoSuchMethodError 
(org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;) 
[duplicate 1]
15/03/11 15:16:47 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660)
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor 0 disconnected, so 
removing it
15/03/11 15:16:47 ERROR TaskSchedulerImpl: Lost executor 0 on 10.131.141.193: 
remote Akka client disassociated
15/03/11 15:16:47 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660)
15/03/11 15:16:47 INFO TaskSetManager: Re-queueing tasks for 0 from TaskSet 1.0
15/03/11 15:16:47 WARN TaskSetManager: Lost task 0.2 in stage 1.0 (TID 5, 
10.131.141.193): ExecutorLostFailure (executor lost)
15/03/11 15:16:47 ERROR ConnectionManager: Corresponding SendingConnection to 
ConnectionManagerId(kvs-in-wfanub02.int.kronos.com,51660) not found
15/03/11 15:16:47 INFO DAGScheduler: Executor lost: 0 (epoch 1)
15/03/11 15:16:47 INFO BlockManagerMasterActor: Trying to remove executor 0 
from BlockManagerMaster.
15/03/11 15:16:47 INFO BlockManagerMasterActor: Removing block manager 
BlockManagerId(0, 10.131.141.193, 51660, 0)
15/03/11 15:16:47 INFO BlockManagerMaster: Removed 0 successfully in 
removeExecutor
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/0 is now EXITED (Command exited with code 50)
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Executor 
app-20150311151059-0008/0 removed: Command exited with code 50
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor added: 
app-20150311151059-0008/3 on worker-20150310181710-10.131.141.193-35791 
(10.131.141.193:35791) with 1 cores
15/03/11 15:16:47 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150311151059-0008/3 on hostPort 10.131.141.193:35791 with 1 cores, 512.0 
MB RAM
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/3 is now LOADING
15/03/11 15:16:47 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/2 is now RUNNING
15/03/11 15:16:49 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/3 is now RUNNING
15/03/11 15:16:51 INFO SparkDeploySchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@10.131.141.192:37230/user/Executor#544826218] 
with ID 2
15/03/11 15:16:51 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 6, 
10.131.141.192, PROCESS_LOCAL, 1216 bytes)
15/03/11 15:16:52 INFO BlockManagerMasterActor: Registering block manager 
10.131.141.192:40195 with 267.3 MB RAM, BlockManagerId(2, 10.131.141.192, 
40195, 0)
15/03/11 15:16:52 INFO ConnectionManager: Accepted connection from 
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:59385]
15/03/11 15:16:52 INFO SendingConnection: Initiating connection to 
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195]
15/03/11 15:16:52 INFO SendingConnection: Connected to 
[kvs-in-wfanub03.int.kronos.com/10.131.141.192:40195], 1 messages pending
15/03/11 15:16:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.131.141.192:40195 (size: 3.5 KB, free: 267.3 MB)
15/03/11 15:16:53 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 7, 
10.131.141.192, PROCESS_LOCAL, 1217 bytes)
15/03/11 15:16:53 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 6) on 
executor 10.131.141.192: java.lang.NoSuchMethodError 
(org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;) 
[duplicate 2]
15/03/11 15:16:53 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; 
aborting job
15/03/11 15:16:53 INFO TaskSchedulerImpl: Cancelling stage 1
15/03/11 15:16:53 INFO TaskSchedulerImpl: Stage 1 was cancelled
15/03/11 15:16:53 INFO DAGScheduler: Failed to run runJob at 
RDDFunctions.scala:29
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 
6, 10.131.141.192): java.lang.NoSuchMethodError: 
org.apache.spark.executor.TaskMetrics.outputMetrics()Lscala/Option;
        
com.datastax.spark.connector.metrics.OutputMetricsUpdater$.apply(OutputMetricsUpdater.scala:70)
        
com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:119)
        
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
        
com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:29)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala> 15/03/11 15:16:53 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195)
15/03/11 15:16:53 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195)
15/03/11 15:16:53 ERROR ConnectionManager: Corresponding SendingConnection to 
ConnectionManagerId(kvs-in-wfanub03.int.kronos.com,40195) not found
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor 2 disconnected, so 
removing it
15/03/11 15:16:53 ERROR TaskSchedulerImpl: Lost executor 2 on 10.131.141.192: 
remote Akka client disassociated
15/03/11 15:16:53 INFO TaskSetManager: Re-queueing tasks for 2 from TaskSet 1.0
15/03/11 15:16:53 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 7, 
10.131.141.192): ExecutorLostFailure (executor lost)
15/03/11 15:16:53 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have 
all completed, from pool
15/03/11 15:16:53 INFO DAGScheduler: Executor lost: 2 (epoch 2)
15/03/11 15:16:53 INFO BlockManagerMasterActor: Trying to remove executor 2 
from BlockManagerMaster.
15/03/11 15:16:53 INFO BlockManagerMasterActor: Removing block manager 
BlockManagerId(2, 10.131.141.192, 40195, 0)
15/03/11 15:16:53 INFO BlockManagerMaster: Removed 2 successfully in 
removeExecutor
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/2 is now EXITED (Command exited with code 50)
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Executor 
app-20150311151059-0008/2 removed: Command exited with code 50
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor added: 
app-20150311151059-0008/4 on worker-20150310181709-10.131.141.192-58499 
(10.131.141.192:58499) with 1 cores
15/03/11 15:16:53 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20150311151059-0008/4 on hostPort 10.131.141.192:58499 with 1 cores, 512.0 
MB RAM
15/03/11 15:16:53 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/4 is now LOADING
15/03/11 15:16:54 INFO SparkDeploySchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@10.131.141.193:55631/user/Executor#-183896177] 
with ID 3
15/03/11 15:16:54 INFO BlockManagerMasterActor: Registering block manager 
10.131.141.193:56916 with 267.3 MB RAM, BlockManagerId(3, 10.131.141.193, 
56916, 0)
15/03/11 15:16:54 INFO AppClient$ClientActor: Executor updated: 
app-20150311151059-0008/4 is now RUNNING
15/03/11 15:16:58 INFO SparkDeploySchedulerBackend: Registered executor: 
Actor[akka.tcp://sparkExecutor@10.131.141.192:37182/user/Executor#577095849] 
with ID 4
15/03/11 15:16:58 INFO BlockManagerMasterActor: Registering block manager 
10.131.141.192:34398 with 267.3 MB RAM, BlockManagerId(4, 10.131.141.192, 
34398, 0)
data.sa


Regards,
Tarun Tiwari | Workforce Analytics-ETL | Kronos India
M: +91 9540 28 27 77 | Tel: +91 120 4015200
Kronos | Time & Attendance * Scheduling * Absence Management * HR & Payroll * 
Hiring * Labor Analytics
Join Kronos on: kronos.com<http://www.kronos.com/> | 
Facebook<http://www.kronos.com/facebook> | 
Twitter<http://www.kronos.com/twitter> | 
LinkedIn<http://www.kronos.com/linkedin> | 
YouTube<http://www.kronos.com/youtube>

Reply via email to