Two changes I made that appear to be keeping various errors at bay: 1) bumped up spark.yarn.executor.memoryOverhead to 2000 in the spirit of https://mail-archives.apache.org/mod_mbox/spark-user/201511.mbox/%3ccacbyxkld8qasymj2ghk__vttzv4gejczcqfaw++s1d5te1d...@mail.gmail.com%3E . Even though I couldn't find the same error in my yarn log.
2) very important: I ran coalesce(1000) on the RDD at the start of the DAG. I know keeping the # of partitions lower is helpful, based on past experience with groupByKey. I haven't run this pipeline in a bit so that rule of thumb was not forefront in my mind. On Thu, Jan 21, 2016 at 5:35 PM, Arun Luthra <arun.lut...@gmail.com> wrote: > Looking into the yarn logs for a similar job where an executor was > associated with the same error, I find: > > ... > 16/01/22 01:17:18 INFO client.TransportClientFactory: Found inactive > connection to (SERVER), creating a new one. > 16/01/22 01:17:18 *ERROR shuffle.RetryingBlockFetcher: Exception while > beginning fetch of 46 outstanding blocks* > *java.io.IOException: Failed to connect to (SERVER)* > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193) > at > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) > at > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) > at > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) > at > org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:97) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:152) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:265) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:112) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:43) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > *Caused by: java.net.ConnectException: Connection refused:* (SERVER) > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) > at > io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > ... 1 more > > ... > > > Not sure if this reveals anything at all. > > > On Thu, Jan 21, 2016 at 2:58 PM, Holden Karau <hol...@pigscanfly.ca> > wrote: > >> My hunch is that the TaskCommitDenied is perhaps a red hearing and the >> problem is groupByKey - but I've also just seen a lot of people be bitten >> by it so that might not be issue. If you just do a count at the point of >> the groupByKey does the pipeline succeed? >> >> On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <arun.lut...@gmail.com> >> wrote: >> >>> Usually the pipeline works, it just failed on this particular input >>> data. The other data it has run on is of similar size. >>> >>> Speculation is enabled. >>> >>> I'm using Spark 1.5.0. >>> >>> Here is the config. Many of these may not be needed anymore, they are >>> from trying to get things working in Spark 1.2 and 1.3. >>> >>> .set("spark.storage.memoryFraction","0.2") // default 0.6 >>> .set("spark.shuffle.memoryFraction","0.2") // default 0.2 >>> .set("spark.shuffle.manager","SORT") // preferred setting for >>> optimized joins >>> .set("spark.shuffle.consolidateFiles","true") // helpful for >>> "too many files open" >>> .set("spark.mesos.coarse", "true") // helpful for >>> MapOutputTracker errors? >>> .set("spark.akka.frameSize","300") // helpful when using >>> consildateFiles=true >>> .set("spark.shuffle.compress","false") // >>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>> .set("spark.file.transferTo","false") // >>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>> .set("spark.core.connection.ack.wait.timeout","600") // >>> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html >>> .set("spark.speculation","true") >>> .set("spark.worker.timeout","600") // >>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>> .set("spark.akka.timeout","300") // >>> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html >>> .set("spark.storage.blockManagerSlaveTimeoutMs","120000") >>> .set("spark.driver.maxResultSize","2048") // in response to >>> error: Total size of serialized results of 39901 tasks (1024.0 MB) is >>> bigger than spark.driver.maxResultSize (1024.0 MB) >>> .set("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> .set("spark.kryo.registrator","------.MyRegistrator") >>> .set("spark.kryo.registrationRequired", "true") >>> .set("spark.yarn.executor.memoryOverhead","600") >>> >>> On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <joshro...@databricks.com> >>> wrote: >>> >>>> Is speculation enabled? This TaskCommitDenied by driver error is thrown >>>> by writers who lost the race to commit an output partition. I don't think >>>> this had anything to do with key skew etc. Replacing the groupbykey with a >>>> count will mask this exception because the coordination does not get >>>> triggered in non save/write operations. >>>> >>>> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> Before we dig too far into this, the thing which most quickly jumps >>>>> out to me is groupByKey which could be causing some problems - whats the >>>>> distribution of keys like? Try replacing the groupByKey with a count() and >>>>> see if the pipeline works up until that stage. Also 1G of driver memory is >>>>> also a bit small for something with 90 executors... >>>>> >>>>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <arun.lut...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop >>>>>> library for your platform... using builtin-java classes where applicable >>>>>> >>>>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler >>>>>> for source because spark.app.id is not set. >>>>>> >>>>>> spark.yarn.driver.memoryOverhead is set but does not apply in client >>>>>> mode. >>>>>> >>>>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local >>>>>> reads feature cannot be used because libhadoop cannot be loaded. >>>>>> >>>>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache >>>>>> broadcast_4 in memory! (computed 60.2 MB so far) >>>>>> >>>>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to >>>>>> disk instead. >>>>>> >>>>>> [Stage 1:====================================================>(2260 + >>>>>> 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in >>>>>> stage >>>>>> 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: >>>>>> 1, partition: 1440, attempt: 4530 >>>>>> >>>>>> [Stage 1:====================================================>(2260 + >>>>>> 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in >>>>>> stage >>>>>> 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: >>>>>> 1, partition: 1488, attempt: 4531 >>>>>> >>>>>> [Stage 1:====================================================>(2261 + >>>>>> 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in >>>>>> stage >>>>>> 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) for job: >>>>>> 1, partition: 1982, attempt: 4532 >>>>>> >>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0 >>>>>> (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>>>>> partition: 2214, attempt: 4482 >>>>>> >>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 >>>>>> (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>>>>> partition: 2168, attempt: 4436 >>>>>> >>>>>> >>>>>> I am running with: >>>>>> >>>>>> spark-submit --class "myclass" \ >>>>>> >>>>>> --num-executors 90 \ >>>>>> >>>>>> --driver-memory 1g \ >>>>>> >>>>>> --executor-memory 60g \ >>>>>> >>>>>> --executor-cores 8 \ >>>>>> >>>>>> --master yarn-client \ >>>>>> >>>>>> --conf "spark.executor.extraJavaOptions=-verbose:gc >>>>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ >>>>>> >>>>>> my.jar >>>>>> >>>>>> >>>>>> There are 2262 input files totaling just 98.6G. The DAG is basically >>>>>> textFile().map().filter().groupByKey().saveAsTextFile(). >>>>>> >>>>>> On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> Can you post more of your log? How big are the partitions? What is >>>>>>> the action you are performing? >>>>>>> >>>>>>> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <arun.lut...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Example warning: >>>>>>>> >>>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage >>>>>>>> 1.0 (TID 4436, XXXXXXX): TaskCommitDenied (Driver denied task commit) >>>>>>>> for >>>>>>>> job: 1, partition: 2168, attempt: 4436 >>>>>>>> >>>>>>>> >>>>>>>> Is there a solution for this? Increase driver memory? I'm using >>>>>>>> just 1G driver memory but ideally I won't have to increase it. >>>>>>>> >>>>>>>> The RDD being processed has 2262 partitions. >>>>>>>> >>>>>>>> Arun >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cell : 425-233-8271 >>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Cell : 425-233-8271 >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>> >>> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> > >