Just to close the loop, it seems no issues pop up when i submit the job using 'spark submit' so that the driver process also runs on a container in the YARN cluster.
In the above, the driver was running on the gateway machine through which the job was submitted, which led to quite a few issues. On Tue, Nov 18, 2014 at 5:01 PM, Pala M Muthaia <mchett...@rocketfuelinc.com > wrote: > Sandy, > > Good point - i forgot about NM logs. > > When i looked up the NM logs, i only see the following statements that > align with the driver side log about lost executor. Many executors show the > same log statement at the same time, so it seems like the decision to kill > many if not all executors happened centrally, and all executors got > notified somehow: > > 14/11/18 00:18:25 INFO Executor: Executor is trying to kill task 2013 > 14/11/18 00:18:25 INFO Executor: Executor killed task 2013 > > > In general, i also see quite a few instances of the following exception > across many executors/nodes. : > > 14/11/17 23:58:00 INFO HadoopRDD: Input split: <hdfs dir > path>/sorted_keys-1020_3-r-00255.deflate:0+415841 > > 14/11/17 23:58:00 WARN BlockReaderLocal: error creating DomainSocket > java.net.ConnectException: connect(2) error: Connection refused when trying > to connect to '/srv/var/hadoop/runs/hdfs/dn_socket' > at org.apache.hadoop.net.unix.DomainSocket.connect0(Native Method) > at > org.apache.hadoop.net.unix.DomainSocket.connect(DomainSocket.java:250) > at > org.apache.hadoop.hdfs.DomainSocketFactory.createSocket(DomainSocketFactory.java:158) > at > org.apache.hadoop.hdfs.BlockReaderFactory.nextDomainPeer(BlockReaderFactory.java:721) > at > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:441) > at > org.apache.hadoop.hdfs.client.ShortCircuitCache.create(ShortCircuitCache.java:780) > at > org.apache.hadoop.hdfs.client.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:714) > at > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:395) > at > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:303) > at > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:567) > at > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:790) > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837) > at java.io.DataInputStream.read(DataInputStream.java:149) > at > org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159) > at > org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143) > at > org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) > at java.io.InputStream.read(InputStream.java:101) > at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) > at > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201) > at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.isEmpty(Iterator.scala:256) > at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157) > at > $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51) > at > $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:50) > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at org.apache.spark.scheduler.Task.run(Task.scala:51) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > 14/11/17 23:58:00 WARN ShortCircuitCache: ShortCircuitCache(0x71a8053d): > failed to load 1276010498_BP-1416824317-172.22.48.2-1387241776581 > > > However, in some of the nodes, it seems execution proceeded after the > error, so the above could just be a transient error. > > Finally, in the driver logs, i was looking for hint on the decision to > kill many executors, around the 00:18:25 timestamp when many tasks were > killed across many executors, but i didn't find anything different. > > > > On Tue, Nov 18, 2014 at 1:59 PM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > >> Hi Pala, >> >> Do you have access to your YARN NodeManager logs? Are you able to check >> whether they report killing any containers for exceeding memory limits? >> >> -Sandy >> >> On Tue, Nov 18, 2014 at 1:54 PM, Pala M Muthaia < >> mchett...@rocketfuelinc.com> wrote: >> >>> Hi, >>> >>> I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark >>> shell. >>> >>> I am running a job that essentially reads a bunch of HBase keys, looks >>> up HBase data, and performs some filtering and aggregation. The job works >>> fine in smaller datasets, but when i try to execute on the full dataset, >>> the job never completes. The few symptoms i notice are: >>> >>> a. The job shows progress for a while and then starts throwing lots of >>> the following errors: >>> >>> 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO >>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor >>> 906 disconnected, so removing it* >>> 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR >>> org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost >>> executor 906 on <machine name>: remote Akka client disassociated* >>> >>> 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN >>> org.apache.spark.storage.BlockManagerMasterActor - *Removing >>> BlockManager BlockManagerId(9186, <machine name>, 54600, 0) with no recent >>> heart beats: 82313ms exceeds 45000ms* >>> >>> Looking at the logs, the job never recovers from these errors, and >>> continues to show errors about lost executors and launching new executors, >>> and this just continues for a long time. >>> >>> Could this be because the executors are running out of memory? >>> >>> In terms of memory usage, the intermediate data could be large (after >>> the HBase lookup), but partial and fully aggregated data set size should be >>> quite small - essentially a bunch of ids and counts (< 1 mil in total). >>> >>> >>> >>> b. In the Spark UI, i am seeing the following errors (redacted for >>> brevity), not sure if they are transient or real issue: >>> >>> java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read >>> timed out} >>> ... >>> org.apache.spark.util.Utils$.fetchFile(Utils.scala:349) >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328) >>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>> ... >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> java.lang.Thread.run(Thread.java:724) >>> >>> >>> >>> >>> I was trying to get more data to investigate but haven't been able to >>> figure out how to enable logging on the executors. The Spark UI appears >>> stuck and i only see driver side logs in the jobhistory directory specified >>> in the job. >>> >>> >>> Thanks, >>> pala >>> >>> >>> >> >