Hello,
I am using Spark1.2 and Cassandra 2.0.12. And the table I am about to read
is with 20 million rows
When I use 10 threads of spark to read from cassandra, then it works fine.
val sc = new SparkContext("local[10]", "tungsten", conf)
When I use 40 threads of spark to read from cassandra, then it crashed with
following error.
val sc = new SparkContext("local[40]", "tungsten", conf)
Then I changed the concurrent_read parameter in cassandra.yaml from 32 to
320.
In this case, it works when i start 40 threads of spark. Then I tried to
increase threads to 80, then the exception happens again.
It seems like a configuration problem. But how can I get through it.
Thanks
15/03/23 20:25:43 WARN TaskSetManager: Lost task 222.0 in stage 0.0 (TID
222, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 240.0 in stage 0.0 (TID
240, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 241.0 in stage 0.0 (TID
241, localhost): TaskKilled (killed intentionally)
15/03/23 20:25:43 WARN TaskSetManager: Lost task 228.0 in stage 0.0 (TID
228, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 157
in stage 0.0 failed 1 times, most recent failure: Lost task 157.0 in stage
0.0 (TID 157, l ocalhost): java.io.IOException: Exception during
execution of SELECT count(*) FROM "tungsten"."cudb_dn" WHERE
token("entry_key") > ? AND token("entry_key") <= ? ALL OW
FILTERING: All host(s) tried for query failed (tried: /169.254.100.4:9042
(com.datastax.driver.core.exceptions.DriverException: Timed out waiting for
server respon se), /169.254.100.3:9042
(com.datastax.driver.core.TransportException: [/169.254.100.3:9042]
Connection has been closed))
at com.datastax.spark.connector.rdd.CassandraRDD.com
$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:433)
at
com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447)
at
com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$20.apply(CassandraRDD.scala:447)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:868)
at
org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389)
at
org.apache.spark.SparkContext$$anonfun$30.apply(SparkContext.scala:1389)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException:
All host(s) tried for query failed (tried: /169.254.100.4:9042
(com.datastax.driver.core.exce ptions.DriverException: Timed out
waiting for server response), /169.254.100.3:9042
(com.datastax.driver.core.TransportException: [/169.254.100.3:9042]
Connection has been closed))
at
com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at
com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
at
com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
at
com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at
com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
at com.sun.proxy.$Proxy11.execute(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraRDD.com
$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:424)
... 14 more
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException:
All host(s) tried for query failed (tried: /169.254.100.4:9042
(com.datastax.driver.core.exce ptions.DriverException: Timed out
waiting for server response), /169.254.100.3:9042
(com.datastax.driver.core.TransportException: [/169.254.100.3:9042]
Connection has been closed))
at
com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:107)
at
com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:210)
... 3 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)