Follow up: I have some more details about this problem. The problem can be replicated with a toy dataset: val lookup = (1 to 10000000).toSet val lB = sc.broadcast(lookup) val data = sc.parallelize(1 to 10000000).map(i => (1 to 500).toArray) val dataSel = data.map(vv => vv.filter(lB.value.contains(_))) dataSel.count
This is designed to emulate feature selection on a 10,000,000-example dataset. If the lookup is small, this will run successfully. When the lookup becomes sufficiently large, the problem arises. Sufficiently large isn't all that big (< 1G). I've included a snippet from the log messages. We've tried increasing spark.akka.askTimeout=20, but this only seems to increase the delay in the logging stream between the INFO message for the executor failure and the stack trace. The problem always appears near a GC on the driver. As I said before, there is nothing out of the ordinary in stdout/stderr on the workers, and the node logs show task restarts, but nothing of use. I would appreciate any suggestions for debugging this further. Thank you in advance. Here is the log output: 13/10/23 14:16:39 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:263 as 39625334 bytes in 55 ms 13/10/23 14:16:39 INFO client.Client$ClientActor: Executor updated: app-20131023141253-0011/9 is now FAILED (Command exited with code 1) 13/10/23 14:16:39 INFO cluster.SparkDeploySchedulerBackend: Executor app-20131023141253-0011/9 removed: Command exited with code 1 [GC Desired survivor size 5596774400 bytes, new threshold 4 (max 15) [PSYoungGen: 19318548K->4461016K(20650496K)] 25045771K->11103784K(70982144K), 0.6651370 secs] [Times: user=8.74 sys=3.09, real=0.66 secs] [GC Desired survivor size 5615648768 bytes, new threshold 3 (max 15) [PSYoungGen: 20640075K->3083947K(18713088K)] 27282843K->11130718K(69044736K), 0.9221720 secs] [Times: user=8.82 sys=7.61, real=0.92 secs] [GC Desired survivor size 5275910144 bytes, new threshold 2 (max 15) [PSYoungGen: 17246485K->2101931K(19681792K)] 25293256K->11155006K(70013440K), 0.3125910 secs] [Times: user=4.82 sys=0.73, real=0.31 secs] [GC Desired survivor size 5356126208 bytes, new threshold 1 (max 15) [PSYoungGen: 16270913K->41254K(19856896K)] 25323988K->11148757K(70188544K), 0.2097400 secs] [Times: user=2.45 sys=1.26, real=0.21 secs] 13/10/23 14:16:59 INFO client.Client$ClientActor: Connecting to master spark://sanji-03:7077 13/10/23 14:16:59 ERROR client.Client$ClientActor: Error notifying standalone scheduler's driver actor org.apache.spark.SparkException: Error notifying standalone scheduler's driver actor at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:192) at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:90) at org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:92) at org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:72) at akka.actor.Actor$class.apply(Actor.scala:318) at org.apache.spark.deploy.client.Client$ClientActor.apply(Client.scala:51) at akka.actor.ActorCell.invoke(ActorCell.scala:626) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197) at akka.dispatch.Mailbox.run(Mailbox.scala:179) at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516) at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259) at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479) at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000] milliseconds at akka.dispatch.DefaultPromise.ready(Future.scala:870) at akka.dispatch.DefaultPromise.result(Future.scala:874) at akka.dispatch.Await$.result(Future.scala:74) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:189) ... 13 more On Tue, Oct 22, 2013 at 3:17 PM, Tom Vacek <[email protected]> wrote: > I have a simple code snippet for the shell. I'm running 0.8.0, and this > happens with both the Spark master and Mesos. Basically, I'm just reading > a local file from the login node, broadcasting the contents as a set, and > then filtering a list embedded in an RDD. However, the stage fails every > time I have run it. I'm looking for advice about the problem. I suspect > there is a misconfiguration on the cluster, but I have no idea where to > start. Any suggestions are appreciated. Code snippet and log messages > follow. > > val wordLookup = scala.io.Source.fromFile("/data/share/rnaTokLookup", > "latin1").getLines().toList > > val rnaToks = wordLookup.map(ss => {val chunks = ss.split("\\t"); > chunks(0) } ).toSet > > val rnaToksB = sc.broadcast(rnaToks) > > val text = sc.textFile("hdfs://sanji-03/user/tom/rnaHuge/docVecs") > > val ngrams = text.map(tt => {val blobs = tt.split("\\t"); (blobs(0), > blobs(1).split(" "))}) > //ngrams: [(String, Array[String])] > val ngramsLight = textBlobs.map(tt => (tt._1, > tt._2.filter(rnaToksB.value.contains(_)))) > > ngramsLight.map(tt => tt._1 + "\t" + tt._2.mkString(" > ")).saveAsTextFile("hdfs://sanji-03/user/tom/rnaHuge/docVecsLight") >
