So if I run my code directly from the spark-shell, it works as well. Luckily, I have a fairly small main function. I wonder if there is something funky going on with my spark context - that seems to be the main difference in launching the program. Anyway, I am unblocked now, so I will go off and experiment with various options and see whether it is an error with launching my script, or whether I can report something more reproducible.
Thanks, Shankari On Mon, Dec 2, 2013 at 8:32 PM, K. Shankari <[email protected]>wrote: > I compiled against 0.8.0-incubating. And I am running against > 0.8.0-incubating. > > Thanks, > Shankari > > > On Mon, Dec 2, 2013 at 8:18 PM, Patrick Wendell <[email protected]>wrote: > >> The traces just show that your cluster is trying to shut down and >> getting hung somewhere while shutting down, so unfortunately that >> doesn't tell us much. The key question is why everything wants to shut >> down in the first place. >> >> One thing I was wondering was if maybe you'd compiled against an older >> version of Spark so there are communications issues on all of the >> slaves and they are getting in a weird state. >> >> What version did you compile your program against? >> >> On Mon, Dec 2, 2013 at 8:03 PM, K. Shankari <[email protected]> >> wrote: >> > After I kill -9 the worker processes and start them up again, I can >> connect >> > using the spark shell and access my HDFS data using it. I haven't yet >> tried >> > to shuffle from the spark-shell, though, which is the part that is >> giving me >> > problems in my program. >> > >> > I also did some more debugging: >> > The stacktrace where the worker is waiting for the unix process is this >> one: >> > "Thread-9" daemon prio=10 tid=0x00007f596c063000 nid=0x3cb0 in >> Object.wait() >> > [0x00007f599b35e000] >> > java.lang.Thread.State: WAITING (on object monitor) >> > at java.lang.Object.wait(Native Method) >> > - waiting on <0x00000007e025e150> (a java.lang.UNIXProcess) >> > at java.lang.Object.wait(Object.java:503) >> > at java.lang.UNIXProcess.waitFor(UNIXProcess.java:210) >> > - locked <0x00000007e025e150> (a java.lang.UNIXProcess) >> > at >> > >> org.apache.spark.deploy.worker.ExecutorRunner$$anon$2.run(ExecutorRunner.scala:69) >> > >> > which corresponds to this code: >> > >> > 66 override def run() { >> > 67 if (process != null) { >> > 68 logInfo("Shutdown hook killing child process.") >> > 69 process.destroy() >> > 70 process.waitFor() >> > 71 } >> > 72 } >> > >> > >> > Looking at the associated log file, I see that the launched process was: >> > >> > 13/12/02 06:47:20 INFO worker.ExecutorRunner: Launch command: "java" >> "-cp" >> > >> ":/root/ephemeral-hdfs/conf:/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/assembly/target/scala-2.9.3/spark-assembly_2.9.3-0.8.0-incubating-hadoop1.0.4.jar" >> > "-Djava.library.path=/root/ephemeral-hdfs/lib/native/" >> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" >> "-Dspark.worker.timeout=90000" >> > "-Dspark.akka.timeout=90000" >> "-Dspark.storage.blockManagerHeartBeatMs=90000" >> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000" >> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit" >> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" >> "-Dspark.worker.timeout=90000" >> > "-Dspark.akka.timeout=90000" >> "-Dspark.storage.blockManagerHeartBeatMs=90000" >> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000" >> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit" >> > "-Dspark.local.dir=/mnt/spark,/mnt2/spark" >> "-Dspark.worker.timeout=90000" >> > "-Dspark.akka.timeout=90000" >> "-Dspark.storage.blockManagerHeartBeatMs=90000" >> > "-Dspark.akka.retry.wait=90000" "-Dspark.akka.frameSize=30000" >> > "-Dsun.rmi.dgc.server.gcInterval=3600000" "-XX:-UseGCOverheadLimit" >> > "-Xms6154M" "-Xmx6154M" >> > "org.apache.spark.executor.StandaloneExecutorBackend" >> > "akka://spark@<master-host-name>:38832/user/StandaloneScheduler" "0" >> > "<slave-host-name>" "2" >> > >> > And sure enough, the process is still running. >> > >> > logs]$ jps >> > 1510 DataNode >> > 16795 Jps >> > 12446 StandaloneExecutorBackend >> > 12382 Worker >> > >> > But I am not able to call jstack on it because it is not responding: >> > logs]$ jstack 12446 >> > 12446: Unable to open socket file: target process not responding or >> HotSpot >> > VM not loaded >> > The -F option can be used when the target process is not responding >> > >> > Its logs don't have any errors, but seem to indicate issues while >> fetching >> > blocks using the block manager. >> > >> > On the master, I see the following: >> > >> > 13/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map >> output >> > locations for shuffle 0 to <slave-ip>:39992 >> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing >> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent >> heart >> > beats: 51656ms exceeds 45000ms >> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo: >> > Registering block manager <slave-ip>:46556 with 3.8 GB RAM >> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo: >> > Added rdd_9_0 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 >> GB) >> > 13/12/02 06:49:28 INFO storage.BlockManagerMasterActor$BlockManagerInfo: >> > Added rdd_9_1 in memory on <slave-ip>:46556 (size: 166.4 KB, free: 3.8 >> GB) >> > 13/12/02 06:51:19 WARN storage.BlockManagerMasterActor: Removing >> > BlockManager BlockManagerId(0, <slave-ip>, 46556, 0) with no recent >> heart >> > beats: 81974ms exceeds 45000ms >> > >> > And on the slave, I see: >> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Don't have map outputs >> for >> > shuffle 0, fetching them >> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Doing the fetch; tracker >> > actor = >> > Actor[akka://[email protected]:38832/user/MapOutputTracker] >> > 13/12/02 06:47:33 INFO spark.MapOutputTracker: Got the output locations >> > 13/12/02 06:47:33 INFO >> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 >> > non-zero-bytes blocks out of 2 blocks >> > 13/12/02 06:47:33 INFO >> > storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote >> > gets in 10 ms >> > 13/12/02 06:47:37 INFO network.ConnectionManager: Accepted connection >> from >> > [other-slave.ec2.internal/10.116.46.231] >> > 13/12/02 06:47:44 INFO network.SendingConnection: Initiating connection >> to >> > [other-slave.internal/10.116.46.231:32807] >> > 13/12/02 06:47:44 INFO network.SendingConnection: Connected to >> > [other-slave.internal/10.116.46.231:32807], 1 messages pending >> > *** this is where we get the warning on the master, but it doesn't look >> like >> > we have started any block fetches by that time *** >> > 13/12/02 06:49:28 INFO storage.BlockManager: BlockManager reregistering >> with >> > master >> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Trying to register >> > BlockManager >> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Registered >> BlockManager >> > 13/12/02 06:49:28 INFO storage.BlockManager: Reporting 7 blocks to the >> > master. >> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block >> > rdd_9_0 >> > 13/12/02 06:49:28 INFO storage.BlockManagerMaster: Updated info of block >> > rdd_9_1 >> > *** this is where we get the second warning on the master *** >> > >> > Thanks, >> > Shankari >> > >> > >> > >> > Thanks, >> > Shankari >> > >> > >> > On Mon, Dec 2, 2013 at 7:41 PM, Patrick Wendell <[email protected]> >> wrote: >> >> >> >> This is indeed a bit strange. One question - when you launch the ec2 >> >> cluster can you run the spark shell? Can you access your HDFS data >> >> using the Spark shell? >> >> >> >> Also, what version of Spark is your application compiled against? Is >> >> it exactly the same version that is on the ec2 cluster? >> >> >> >> On Mon, Dec 2, 2013 at 4:24 PM, K. Shankari < >> [email protected]> >> >> wrote: >> >> > So this is pretty weird, and my debugging hasn't made much progress, >> so >> >> > I >> >> > thought I'd ask for help. >> >> > >> >> > I have a medium size dataset, but I am developing my code against a >> much >> >> > smaller version since it is much smaller and faster to work with. I >> have >> >> > deployed a cluster on EC2 using the default scripts and the AMI and >> >> > loaded >> >> > my simple dataset into the ephermeral HDFS. >> >> > >> >> > When I ssh into the master node and run my application using "local", >> >> > everything works. However, if I just change to using distributed >> spark >> >> > (spark://...:7077) then everything starts up, but the shuffle >> operations >> >> > start failing with the error: >> >> > 3/12/02 06:47:33 INFO spark.MapOutputTrackerActor: Asked to send map >> >> > output >> >> > locations for shuffle 0 to ip-10-38-11-59.ec2.internal:39992 >> >> > ... >> >> > 13/12/02 06:49:19 WARN storage.BlockManagerMasterActor: Removing >> >> > BlockManager BlockManagerId(0, ip-10-38-11-59.ec2.internal, 46556, 0) >> >> > with >> >> > no recent heart beats: 51656ms exceeds 45000ms >> >> > >> >> > I looked around the prior documentation and put the extra options >> into >> >> > SPARK_JAVA_OPTS, and increased them to 90000 from the 30000 in the >> >> > example >> >> > but I get the same error. >> >> > -Dspark.worker.timeout=90000 -Dspark.akka.timeout=90000 >> >> > -Dspark.storage.blockManagerHeartBeatMs=90000 >> >> > -Dspark.akka.retry.wait=90000 >> >> > -Dspark.akka.frameSize=30000 -Dsun.rmi.dgc.server.gcInterval=3600000 >> >> > >> >> > I don't think that this is a memory issue because the dataset fits on >> >> > the >> >> > master alone and I am successfully able to run my program with >> "local". >> >> > I >> >> > checked the stdout and stderr on the worker, and one of the few times >> >> > that >> >> > there was a stacktrace, it was from here: >> >> > >> >> > 148 private def askDriverWithReply[T](message: Any): T = { >> >> > 149 // TODO: Consider removing multiple attempts >> >> > 150 if (driverActor == null) { >> >> > 151 throw new SparkException("Error sending message to >> >> > BlockManager as >> >> > driverActor is null " + >> >> > 152 "[message = " + message + "]") >> >> > 153 } >> >> > 154 var attempts = 0 >> >> > 155 var lastException: Exception = null >> >> > 156 while (attempts < AKKA_RETRY_ATTEMPTS) { >> >> > 157 attempts += 1 >> >> > 158 try { >> >> > 159 val future = driverActor.ask(message)(timeout) >> >> > 160 val result = Await.result(future, timeout) >> >> > 161 if (result == null) { >> >> > 162 throw new SparkException("BlockManagerMaster returned >> >> > null") >> >> > 163 } >> >> > 164 return result.asInstanceOf[T] >> >> > 165 } catch { >> >> > 166 case ie: InterruptedException => throw ie >> >> > 167 case e: Exception => >> >> > 168 lastException = e >> >> > 169 logWarning("Error sending message to >> BlockManagerMaster in >> >> > " + >> >> > attempts + "attempts", e) >> >> > 170 } >> >> > 171 Thread.sleep(AKKA_RETRY_INTERVAL_MS) >> >> > 172 } >> >> > 173 >> >> > 174 throw new SparkException( >> >> > 175 "Error sending message to BlockManagerMaster [message = " + >> >> > message + "]", lastException) >> >> > 176 } >> >> > >> >> > Most of the time, though, the worker just hangs. I can't run anything >> >> > else >> >> > against that master because there are no resources available. When I >> try >> >> > to >> >> > stop the workers using stop-slaves.sh, they don't stop. The only way >> to >> >> > recover the cluster is to use "kill -9 <pid>" on the worker >> processes, >> >> > which >> >> > does work. >> >> > >> >> > So I took a quick look at one of the hung worker processes using >> jstack. >> >> > The >> >> > output is attached. As you can see, I have tried to shut down the >> >> > process >> >> > multiple times, and the SIGTERM handlers are stuck waiting. I think >> that >> >> > this may be the reason that the workers are not responsive for block >> >> > handling as well. >> >> > >> >> > If I read this correctly, the handler is waiting on >> 0x00000007e025d468 >> >> > which >> >> > is locked by spark.deploy.worker.ExecutorRunner$$anon$2. And that is >> >> > waiting >> >> > for some unixprocess? >> >> > >> >> > Thanks, >> >> > Shankari >> > >> > >> > >
