Follow up: I have some more details about this problem.  The problem can be
replicated with a toy dataset:
val lookup = (1 to 10000000).toSet
val lB = sc.broadcast(lookup)
val data = sc.parallelize(1 to 10000000).map(i => (1 to 500).toArray)
val dataSel = data.map(vv => vv.filter(lB.value.contains(_)))
dataSel.count

This is designed to emulate feature selection on a 10,000,000-example
dataset.  If the lookup is small, this will run successfully.  When the
lookup becomes sufficiently large, the problem arises.  Sufficiently large
isn't all that big (< 1G).  I've included a snippet from the log messages.
 We've tried increasing spark.akka.askTimeout=20, but this only seems to
increase the delay in the logging stream between the INFO message for the
executor failure and the stack trace.  The problem always appears near a GC
on the driver.  As I said before, there is nothing out of the ordinary in
stdout/stderr on the workers, and the node logs show task restarts, but
nothing of use.  I would appreciate any suggestions for debugging this
further.  Thank you in advance.  Here is the log output:



13/10/23 14:16:39 INFO cluster.ClusterTaskSetManager: Serialized task
0.0:263 as 39625334 bytes in 55 ms
13/10/23 14:16:39 INFO client.Client$ClientActor: Executor updated:
app-20131023141253-0011/9 is now FAILED (Command exited with code 1)
13/10/23 14:16:39 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20131023141253-0011/9 removed: Command exited with code 1
[GC
Desired survivor size 5596774400 bytes, new threshold 4 (max 15)
 [PSYoungGen: 19318548K->4461016K(20650496K)]
25045771K->11103784K(70982144K), 0.6651370 secs] [Times: user=8.74
sys=3.09, real=0.66 secs]
[GC
Desired survivor size 5615648768 bytes, new threshold 3 (max 15)
 [PSYoungGen: 20640075K->3083947K(18713088K)]
27282843K->11130718K(69044736K), 0.9221720 secs] [Times: user=8.82
sys=7.61, real=0.92 secs]
[GC
Desired survivor size 5275910144 bytes, new threshold 2 (max 15)
 [PSYoungGen: 17246485K->2101931K(19681792K)]
25293256K->11155006K(70013440K), 0.3125910 secs] [Times: user=4.82
sys=0.73, real=0.31 secs]
[GC
Desired survivor size 5356126208 bytes, new threshold 1 (max 15)
 [PSYoungGen: 16270913K->41254K(19856896K)]
25323988K->11148757K(70188544K), 0.2097400 secs] [Times: user=2.45
sys=1.26, real=0.21 secs]
13/10/23 14:16:59 INFO client.Client$ClientActor: Connecting to master
spark://sanji-03:7077
13/10/23 14:16:59 ERROR client.Client$ClientActor: Error notifying
standalone scheduler's driver actor
org.apache.spark.SparkException: Error notifying standalone scheduler's
driver actor
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:192)
at
org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.executorRemoved(SparkDeploySchedulerBackend.scala:90)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:92)
at
org.apache.spark.deploy.client.Client$ClientActor$$anonfun$receive$1.apply(Client.scala:72)
at akka.actor.Actor$class.apply(Actor.scala:318)
at org.apache.spark.deploy.client.Client$ClientActor.apply(Client.scala:51)
at akka.actor.ActorCell.invoke(ActorCell.scala:626)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:197)
at akka.dispatch.Mailbox.run(Mailbox.scala:179)
at
akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:516)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1479)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[20000] milliseconds
at akka.dispatch.DefaultPromise.ready(Future.scala:870)
at akka.dispatch.DefaultPromise.result(Future.scala:874)
at akka.dispatch.Await$.result(Future.scala:74)
at
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.removeExecutor(StandaloneSchedulerBackend.scala:189)
... 13 more



On Tue, Oct 22, 2013 at 3:17 PM, Tom Vacek <[email protected]> wrote:

> I have a simple code snippet for the shell.  I'm running 0.8.0, and this
> happens with both the Spark master and Mesos.  Basically, I'm just reading
> a local file from the login node, broadcasting the contents as a set, and
> then filtering a list embedded in an RDD.  However, the stage fails every
> time I have run it.  I'm looking for advice about the problem.  I suspect
> there is a misconfiguration on the cluster, but I have no idea where to
> start.  Any suggestions are appreciated.  Code snippet and log messages
> follow.
>
> val wordLookup = scala.io.Source.fromFile("/data/share/rnaTokLookup",
> "latin1").getLines().toList
>
> val rnaToks = wordLookup.map(ss => {val chunks = ss.split("\\t");
> chunks(0) } ).toSet
>
> val rnaToksB = sc.broadcast(rnaToks)
>
> val text = sc.textFile("hdfs://sanji-03/user/tom/rnaHuge/docVecs")
>
> val ngrams = text.map(tt => {val blobs = tt.split("\\t"); (blobs(0),
> blobs(1).split(" "))})
> //ngrams: [(String, Array[String])]
> val ngramsLight = textBlobs.map(tt => (tt._1,
> tt._2.filter(rnaToksB.value.contains(_))))
>
> ngramsLight.map(tt => tt._1 + "\t" + tt._2.mkString("
> ")).saveAsTextFile("hdfs://sanji-03/user/tom/rnaHuge/docVecsLight")
>

Reply via email to