My hunch is that the TaskCommitDenied is perhaps a red hearing and the
problem is groupByKey - but I've also just seen a lot of people be bitten
by it so that might not be issue. If you just do a count at the point of
the groupByKey does the pipeline succeed?

On Thu, Jan 21, 2016 at 2:56 PM, Arun Luthra <[email protected]> wrote:

> Usually the pipeline works, it just failed on this particular input data.
> The other data it has run on is of similar size.
>
> Speculation is enabled.
>
> I'm using Spark 1.5.0.
>
> Here is the config. Many of these may not be needed anymore, they are from
> trying to get things working in Spark 1.2 and 1.3.
>
>         .set("spark.storage.memoryFraction","0.2") // default 0.6
>         .set("spark.shuffle.memoryFraction","0.2") // default 0.2
>         .set("spark.shuffle.manager","SORT") // preferred setting for
> optimized joins
>         .set("spark.shuffle.consolidateFiles","true") // helpful for "too
> many files open"
>         .set("spark.mesos.coarse", "true") // helpful for MapOutputTracker
> errors?
>         .set("spark.akka.frameSize","300") // helpful when using
> consildateFiles=true
>         .set("spark.shuffle.compress","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>         .set("spark.file.transferTo","false") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>         .set("spark.core.connection.ack.wait.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
>         .set("spark.speculation","true")
>         .set("spark.worker.timeout","600") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>         .set("spark.akka.timeout","300") //
> http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
>         .set("spark.storage.blockManagerSlaveTimeoutMs","120000")
>         .set("spark.driver.maxResultSize","2048") // in response to error:
> Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
>         .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>         .set("spark.kryo.registrator","------.MyRegistrator")
>         .set("spark.kryo.registrationRequired", "true")
>         .set("spark.yarn.executor.memoryOverhead","600")
>
> On Thu, Jan 21, 2016 at 2:50 PM, Josh Rosen <[email protected]>
> wrote:
>
>> Is speculation enabled? This TaskCommitDenied by driver error is thrown
>> by writers who lost the race to commit an output partition. I don't think
>> this had anything to do with key skew etc. Replacing the groupbykey with a
>> count will mask this exception because the coordination does not get
>> triggered in non save/write operations.
>>
>> On Thu, Jan 21, 2016 at 2:46 PM Holden Karau <[email protected]>
>> wrote:
>>
>>> Before we dig too far into this, the thing which most quickly jumps out
>>> to me is groupByKey which could be causing some problems - whats the
>>> distribution of keys like? Try replacing the groupByKey with a count() and
>>> see if the pipeline works up until that stage. Also 1G of driver memory is
>>> also a bit small for something with 90 executors...
>>>
>>> On Thu, Jan 21, 2016 at 2:40 PM, Arun Luthra <[email protected]>
>>> wrote:
>>>
>>>>
>>>>
>>>> 16/01/21 21:52:11 WARN NativeCodeLoader: Unable to load native-hadoop
>>>> library for your platform... using builtin-java classes where applicable
>>>>
>>>> 16/01/21 21:52:14 WARN MetricsSystem: Using default name DAGScheduler
>>>> for source because spark.app.id is not set.
>>>>
>>>> spark.yarn.driver.memoryOverhead is set but does not apply in client
>>>> mode.
>>>>
>>>> 16/01/21 21:52:16 WARN DomainSocketFactory: The short-circuit local
>>>> reads feature cannot be used because libhadoop cannot be loaded.
>>>>
>>>> 16/01/21 21:52:52 WARN MemoryStore: Not enough space to cache
>>>> broadcast_4 in memory! (computed 60.2 MB so far)
>>>>
>>>> 16/01/21 21:52:52 WARN MemoryStore: Persisting block broadcast_4 to
>>>> disk instead.
>>>>
>>>> [Stage 1:====================================================>(2260 +
>>>> 7) / 2262]16/01/21 21:57:24 WARN TaskSetManager: Lost task 1440.1 in stage
>>>> 1.0 (TID 4530, --): TaskCommitDenied (Driver denied task commit) for job:
>>>> 1, partition: 1440, attempt: 4530
>>>>
>>>> [Stage 1:====================================================>(2260 +
>>>> 6) / 2262]16/01/21 21:57:27 WARN TaskSetManager: Lost task 1488.1 in stage
>>>> 1.0 (TID 4531, --): TaskCommitDenied (Driver denied task commit) for job:
>>>> 1, partition: 1488, attempt: 4531
>>>>
>>>> [Stage 1:====================================================>(2261 +
>>>> 4) / 2262]16/01/21 21:57:39 WARN TaskSetManager: Lost task 1982.1 in stage
>>>> 1.0 (TID 4532, --): TaskCommitDenied (Driver denied task commit) for job:
>>>> 1, partition: 1982, attempt: 4532
>>>>
>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2214.0 in stage 1.0
>>>> (TID 4482, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>> partition: 2214, attempt: 4482
>>>>
>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0
>>>> (TID 4436, --): TaskCommitDenied (Driver denied task commit) for job: 1,
>>>> partition: 2168, attempt: 4436
>>>>
>>>>
>>>> I am running with:
>>>>
>>>>     spark-submit --class "myclass" \
>>>>
>>>>       --num-executors 90 \
>>>>
>>>>       --driver-memory 1g \
>>>>
>>>>       --executor-memory 60g \
>>>>
>>>>       --executor-cores 8 \
>>>>
>>>>       --master yarn-client \
>>>>
>>>>       --conf "spark.executor.extraJavaOptions=-verbose:gc
>>>> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
>>>>
>>>>       my.jar
>>>>
>>>>
>>>> There are 2262 input files totaling just 98.6G. The DAG is basically
>>>> textFile().map().filter().groupByKey().saveAsTextFile().
>>>>
>>>> On Thu, Jan 21, 2016 at 2:14 PM, Holden Karau <[email protected]>
>>>> wrote:
>>>>
>>>>> Can you post more of your log? How big are the partitions? What is the
>>>>> action you are performing?
>>>>>
>>>>> On Thu, Jan 21, 2016 at 2:02 PM, Arun Luthra <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Example warning:
>>>>>>
>>>>>> 16/01/21 21:57:57 WARN TaskSetManager: Lost task 2168.0 in stage 1.0
>>>>>> (TID 4436, XXXXXXX): TaskCommitDenied (Driver denied task commit) for 
>>>>>> job:
>>>>>> 1, partition: 2168, attempt: 4436
>>>>>>
>>>>>>
>>>>>> Is there a solution for this? Increase driver memory? I'm using just
>>>>>> 1G driver memory but ideally I won't have to increase it.
>>>>>>
>>>>>> The RDD being processed has 2262 partitions.
>>>>>>
>>>>>> Arun
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cell : 425-233-8271
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau

Reply via email to