Just to close the loop, it seems no issues pop up when i submit the job
using 'spark submit' so that the driver process also runs on a container in
the YARN cluster.

In the above, the driver was running on the gateway machine through which
the job was submitted, which led to quite a few issues.

On Tue, Nov 18, 2014 at 5:01 PM, Pala M Muthaia <mchett...@rocketfuelinc.com
> wrote:

> Sandy,
>
> Good point - i forgot about NM logs.
>
> When i looked up the NM logs, i only see the following statements that
> align with the driver side log about lost executor. Many executors show the
> same log statement at the same time, so it seems like the decision to kill
> many if not all executors happened centrally, and all executors got
> notified somehow:
>
> 14/11/18 00:18:25 INFO Executor: Executor is trying to kill task 2013
> 14/11/18 00:18:25 INFO Executor: Executor killed task 2013
>
>
> In general, i also see quite a few instances of the following exception 
> across many executors/nodes. :
>
> 14/11/17 23:58:00 INFO HadoopRDD: Input split: <hdfs dir 
> path>/sorted_keys-1020_3-r-00255.deflate:0+415841
>
> 14/11/17 23:58:00 WARN BlockReaderLocal: error creating DomainSocket
> java.net.ConnectException: connect(2) error: Connection refused when trying 
> to connect to '/srv/var/hadoop/runs/hdfs/dn_socket'
>       at org.apache.hadoop.net.unix.DomainSocket.connect0(Native Method)
>       at 
> org.apache.hadoop.net.unix.DomainSocket.connect(DomainSocket.java:250)
>       at 
> org.apache.hadoop.hdfs.DomainSocketFactory.createSocket(DomainSocketFactory.java:158)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.nextDomainPeer(BlockReaderFactory.java:721)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:441)
>       at 
> org.apache.hadoop.hdfs.client.ShortCircuitCache.create(ShortCircuitCache.java:780)
>       at 
> org.apache.hadoop.hdfs.client.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:714)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:395)
>       at 
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:303)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:567)
>       at 
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:790)
>       at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
>       at java.io.DataInputStream.read(DataInputStream.java:149)
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
>       at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>       at java.io.InputStream.read(InputStream.java:101)
>       at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>       at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>       at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>       at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
>       at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>       at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201)
>       at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$class.isEmpty(Iterator.scala:256)
>       at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1157)
>       at 
> $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:51)
>       at 
> $line57.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:50)
>       at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
>       at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:559)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at org.apache.spark.scheduler.Task.run(Task.scala:51)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:724)
>
> 14/11/17 23:58:00 WARN ShortCircuitCache: ShortCircuitCache(0x71a8053d): 
> failed to load 1276010498_BP-1416824317-172.22.48.2-1387241776581
>
>
> However, in some of the nodes, it seems execution proceeded after the
> error, so the above could just be a transient error.
>
> Finally, in the driver logs, i was looking for hint on the decision to
> kill many executors, around the 00:18:25 timestamp when many tasks were
> killed across many executors, but i didn't find anything different.
>
>
>
> On Tue, Nov 18, 2014 at 1:59 PM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Hi Pala,
>>
>> Do you have access to your YARN NodeManager logs?  Are you able to check
>> whether they report killing any containers for exceeding memory limits?
>>
>> -Sandy
>>
>> On Tue, Nov 18, 2014 at 1:54 PM, Pala M Muthaia <
>> mchett...@rocketfuelinc.com> wrote:
>>
>>> Hi,
>>>
>>> I am using Spark 1.0.1 on Yarn 2.5, and doing everything through spark
>>> shell.
>>>
>>> I am running a job that essentially reads a bunch of HBase keys, looks
>>> up HBase data, and performs some filtering and aggregation. The job works
>>> fine in smaller datasets, but when i try to execute on the full dataset,
>>> the job never completes. The few symptoms i notice are:
>>>
>>> a. The job shows progress for a while and then starts throwing lots of
>>> the following errors:
>>>
>>> 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] INFO
>>>  org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend - *Executor
>>> 906 disconnected, so removing it*
>>> 2014-11-18 00:18:20,020 [spark-akka.actor.default-dispatcher-67] ERROR
>>> org.apache.spark.scheduler.cluster.YarnClientClusterScheduler - *Lost
>>> executor 906 on <machine name>: remote Akka client disassociated*
>>>
>>> 2014-11-18 16:52:02,283 [spark-akka.actor.default-dispatcher-22] WARN
>>>  org.apache.spark.storage.BlockManagerMasterActor - *Removing
>>> BlockManager BlockManagerId(9186, <machine name>, 54600, 0) with no recent
>>> heart beats: 82313ms exceeds 45000ms*
>>>
>>> Looking at the logs, the job never recovers from these errors, and
>>> continues to show errors about lost executors and launching new executors,
>>> and this just continues for a long time.
>>>
>>> Could this be because the executors are running out of memory?
>>>
>>> In terms of memory usage, the intermediate data could be large (after
>>> the HBase lookup), but partial and fully aggregated data set size should be
>>> quite small - essentially a bunch of ids and counts (< 1 mil in total).
>>>
>>>
>>>
>>> b. In the Spark UI, i am seeing the following errors (redacted for
>>> brevity), not sure if they are transient or real issue:
>>>
>>> java.net.SocketTimeoutException (java.net.SocketTimeoutException: Read 
>>> timed out}
>>> ...
>>> org.apache.spark.util.Utils$.fetchFile(Utils.scala:349)
>>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>> ...
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> java.lang.Thread.run(Thread.java:724)
>>>
>>>
>>>
>>>
>>> I was trying to get more data to investigate but haven't been able to
>>> figure out how to enable logging on the executors. The Spark UI appears
>>> stuck and i only see driver side logs in the jobhistory directory specified
>>> in the job.
>>>
>>>
>>> Thanks,
>>> pala
>>>
>>>
>>>
>>
>

Reply via email to