My hunch is that the TaskCommitDenied is perhaps a red hearing and the problem is groupByKey - but I've also just seen a lot of people be bitten by it so that might not be issue. If you just do a count at the point of the groupByKey does the pipeline succeed?
On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <[email protected]> wrote: > Usually the pipeline works, it just failed on this particular input data. > The other data it has run on is of similar size. > > Speculation is enabled. > > I'm using Spark 1.5.0. > > Here is the config. Many of these may not be needed anymore, they are from > trying to get things working in Spark 1.2 and 1.3. > > .set("spark.storage.memoryFraction","0.2") // default 0.6 > .set("spark.shuffle.memoryFraction","0.2") // default 0.2 > .set("spark.shuffle.manager","SORT") // preferred setting for > optimized joins > .set("spark.shuffle.consolidateFiles","true") // helpful for "too > many files open" > .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker > errors? > .set("spark.akka.frameSize","300") // helpful when using > consildateFiles=true > .set("spark.shuffle.compress","false") // > http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html > .set("spark.file.transferTo","false") // > http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html > .set("spark.core.connection.ack.wait.timeout","600") // > http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html > .set("spark.speculation","true") > .set("spark.worker.timeout","600") // > http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html > .set("spark.akka.timeout","300") // > http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html > .set("spark.storage.blockManagerSlaveTimeoutMs","120000") > .set("spark.driver.maxResultSize","2048") // in response to error: > Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than > spark.driver.maxResultSize (1024.0 MB) > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryo.registrator","------.MyRegistrator") > .set("spark.kryo.registrationRequired", "true") > .set("spark.yarn.executor.memoryOverhead","600") > > On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <[email protected]> > wrote: > >> Is speculation enabled? This TaskCommitDenied by driver error is thrown >> by writers who lost the race to commit an output partition. I don't think >> this had anything to do with key skew etc. Replacing the groupbykey with a >> count will mask this exception because the coordination does not get >> triggered in non save/write operations. >> >> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <[email protected]> >> wrote: >> >>> Before we dig too far into this, the thing which most quickly jumps out >>> to me is groupByKey which could be causing some problems - whats the >>> distribution of keys like? Try replacing the groupByKey with a count() and >>> see if the pipeline works up until that stage. Also 1G of driver memory is >>> also a bit small for something with 90 executors... >>> >>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <[email protected]> >>> wrote: >>> >>>> >>>> >>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop >>>> library for your platform... using builtin-java classes where applicable >>>> >>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler >>>> for source because spark.app.id is not set. >>>> >>>> spark.yarn.driver.memoryOverhead is set but does not apply in client >>>> mode. >>>> >>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local >>>> reads feature cannot be used because libhadoop cannot be loaded. >>>> >>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache >>>> broadcast_4 in memory! (computed 60.2 MB so far) >>>> >>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to >>>> disk instead. >>>> >>>> [Stage 1:====================================================>(2260 + >>>> 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage >>>> 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job: >>>> 1, partition: 1440, attempt: 4530 >>>> >>>> [Stage 1:====================================================>(2260 + >>>> 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage >>>> 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job: >>>> 1, partition: 1488, attempt: 4531 >>>> >>>> [Stage 1:====================================================>(2261 + >>>> 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in stage >>>> 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) for job: >>>> 1, partition: 1982, attempt: 4532 >>>> >>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0 >>>> (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>>> partition: 2214, attempt: 4482 >>>> >>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 >>>> (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1, >>>> partition: 2168, attempt: 4436 >>>> >>>> >>>> I am running with: >>>> >>>> spark-submit --class "myclass" \ >>>> >>>> --num-executors 90 \ >>>> >>>> --driver-memory 1g \ >>>> >>>> --executor-memory 60g \ >>>> >>>> --executor-cores 8 \ >>>> >>>> --master yarn-client \ >>>> >>>> --conf "spark.executor.extraJavaOptions=-verbose:gc >>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \ >>>> >>>> my.jar >>>> >>>> >>>> There are 2262 input files totaling just 98.6G. The DAG is basically >>>> textFile().map().filter().groupByKey().saveAsTextFile(). >>>> >>>> On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <[email protected]> >>>> wrote: >>>> >>>>> Can you post more of your log? How big are the partitions? What is the >>>>> action you are performing? >>>>> >>>>> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <[email protected]> >>>>> wrote: >>>>> >>>>>> Example warning: >>>>>> >>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0 >>>>>> (TID 4436, XXXXXXX): TaskCommitDenied (Driver denied task commit) for >>>>>> job: >>>>>> 1, partition: 2168, attempt: 4436 >>>>>> >>>>>> >>>>>> Is there a solution for this? Increase driver memory? I'm using just >>>>>> 1G driver memory but ideally I won't have to increase it. >>>>>> >>>>>> The RDD being processed has 2262 partitions. >>>>>> >>>>>> Arun >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Cell : 425-233-8271 >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>> >>>> >>> >>> >>> -- >>> Cell : 425-233-8271 >>> Twitter: https://twitter.com/holdenkarau >>> >> > -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau
