Hi,
     The issue turn outs to be a memory issue. Thanks for the guidance.

周千昊 <qhz...@apache.org>于2015年9月17日周四 下午12:39写道:

> indeed, the operation in this stage is quite memory consuming.
> We are trying to enable the printGCDetail option and see what is going on.
>
> java8964 <java8...@hotmail.com>于2015年9月16日周三 下午11:47写道:
>
>> This sounds like a memory issue.
>>
>> Do you enable the GC output? When this is happening, are your executors
>> doing full gc? How long is the full gc?
>>
>> Yong
>>
>> ------------------------------
>> From: qhz...@apache.org
>> Date: Wed, 16 Sep 2015 13:52:25 +0000
>>
>> Subject: Re: application failed on large dataset
>> To: java8...@hotmail.com; user@spark.apache.org
>>
>> Hi,
>>      I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the
>> problem still exists. However the stack trace is a little bit different:
>> PART one:
>> 15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage
>> 15.0 (TID 5341)
>> java.io.IOException: Failed without being ACK'd
>>         at
>> org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
>>         at scala.collection.immutable.List.foreach(List.scala:318)
>>         at
>> org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>>         at
>> org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
>>         at
>> org.apache.spark.network.nio.Connection.close(Connection.scala:130)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>>         at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>>         at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>>         at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>         at
>> scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>>         at
>> org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
>>         at
>> org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
>>         at
>> org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>>         at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>>         at org.apache.spark.executor.Executor.stop(Executor.scala:144)
>>         at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
>>         at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>>         at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>>         at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>>         at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>>         at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>         at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>         at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>         at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>>         at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>         at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>         at
>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>         at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> PART two:
>> 15/09/16 06:14:36 INFO nio.ConnectionManager: Removing SendingConnection
>> to ConnectionManagerId()
>> 15/09/16 06:14:36 INFO nio.ConnectionManager: Removing
>> ReceivingConnection to ConnectionManagerId()
>> 15/09/16 06:14:36 ERROR nio.ConnectionManager: Corresponding
>> SendingConnection to ConnectionManagerId() not found
>> 15/09/16 06:14:36 INFO nio.ConnectionManager: Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@3011c7c9
>> 15/09/16 06:14:36 INFO nio.ConnectionManager: key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@3011c7c9
>> java.nio.channels.CancelledKeyException
>>         at
>> org.apache.spark.network.nio.ConnectionManager.run(ConnectionManager.scala:461)
>>         at
>> org.apache.spark.network.nio.ConnectionManager$$anon$7.run(ConnectionManager.scala:193)
>>
>> java8964 <java8...@hotmail.com>于2015年9月16日周三 下午8:17写道:
>>
>> Can you try for "nio", instead of "netty".
>>
>> set "spark.shuffle.blockTransferService", to "nio" and give it a try.
>>
>> Yong
>>
>> ------------------------------
>> From: z.qian...@gmail.com
>> Date: Wed, 16 Sep 2015 03:21:02 +0000
>>
>> Subject: Re: application failed on large dataset
>> To: java8...@hotmail.com; user@spark.apache.org
>>
>>
>> Hi,
>>       after check with the yarn logs, all the error stack looks like
>> below:
>>
>> 15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while
>> starting block fetches
>> java.io.IOException: Connection reset by peer
>>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>         at
>> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>>         at
>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>>         at
>> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>         It seems that some error occurs when try to fetch the block, and
>> after several retries, the executor just dies with such error.
>>         And for your question, I did not see any executor restart during
>> the job.
>>         PS: the operator I am using during that stage if
>> rdd.glom().mapPartitions()
>>
>>
>> java8964 <java8...@hotmail.com>于2015年9月15日周二 下午11:44写道:
>>
>> When you saw this error, does any executor die due to whatever error?
>>
>> Do you check to see if any executor restarts during your job?
>>
>> It is hard to help you just with the stack trace. You need to tell us the
>> whole picture when your jobs are running.
>>
>> Yong
>>
>> ------------------------------
>> From: qhz...@apache.org
>> Date: Tue, 15 Sep 2015 15:02:28 +0000
>> Subject: Re: application failed on large dataset
>> To: user@spark.apache.org
>>
>>
>> has anyone met the same problems?
>> 周千昊 <qhz...@apache.org>于2015年9月14日周一 下午9:07写道:
>>
>> Hi, community
>>       I am facing a strange problem:
>>       all executors does not respond, and then all of them failed with
>> the ExecutorLostFailure.
>>       when I look into yarn logs, there are full of such exception
>>
>> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 1 outstanding blocks (after 3 retries)
>> java.io.IOException: Failed to connect to host/ip:port
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>         at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.net.ConnectException: Connection refused: host/ip:port
>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>         at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>>         at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>         ... 1 more
>>
>>
>>       The strange thing is that, if I reduce the input size, the problems
>> just disappeared. I have found a similar issue in the mail-archive(
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E
>> <http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3cCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7KQM8A%40mail.gmail.com%3e>),
>> however I didn't see the solution. So I am wondering if anyone could help
>> with that?
>>
>>       My env is:
>>       hdp 2.2.6
>>       spark(1.4.1)
>>       mode: yarn-client
>>       spark-conf:
>>       spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>>       spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>>       spark.executor.memory 6g
>>       spark.storage.memoryFraction 0.3
>>       spark.dynamicAllocation.enabled true
>>       spark.shuffle.service.enabled true
>>
>> --
>> Best Regard
>> ZhouQianhao
>>
>> --
Best Regard
ZhouQianhao

Reply via email to