Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay





   

Re: Some tasks are taking long time

2015-01-15 Thread Nicos
Ajay,
Unless we are dealing with some synchronization/conditional variable 
bug in Spark, try this per tuning guide:
Cache Size Tuning

One important configuration parameter for GC is the amount of memory that 
should be used for caching RDDs. By default, Spark uses 60% of the configured 
executor memory (spark.executor.memory) to cache RDDs. This means that 40% of 
memory is available for any objects created during task execution.

In case your tasks slow down and you find that your JVM is garbage-collecting 
frequently or running out of memory, lowering this value will help reduce the 
memory consumption. To change this to, say, 50%, you can call 
conf.set(spark.storage.memoryFraction, 0.5) on your SparkConf. Combined 
with the use of serialized caching, using a smaller cache should be sufficient 
to mitigate most of the garbage collection problems. In case you are interested 
in further tuning the Java GC, continue reading below.


Complete list of tips here:
https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage 
https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage

Cheers,
- Nicos

 On Jan 15, 2015, at 6:49 AM, Ajay Srivastava 
 a_k_srivast...@yahoo.com.INVALID wrote:
 
 Thanks RK. I can turn on speculative execution but I am trying to find out 
 actual reason for delay as it happens on any node. Any idea about the stack 
 trace in my previous mail.
 
 Regards,
 Ajay
 
 
 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
 
 
 If you don't want a few slow tasks to slow down the entire job, you can turn 
 on speculation. 
 
 Here are the speculation settings from Spark Configuration - Spark 1.2.0 
 Documentation http://spark.apache.org/docs/1.2.0/configuration.html.
  
  
  
  
  
  
 Spark Configuration - Spark 1.2.0 Documentation
  http://spark.apache.org/docs/1.2.0/configuration.htmlSpark Configuration 
 Spark Properties Dynamically Loading Spark Properties Viewing Spark 
 Properties Available Properties Application Properties Runtime Environment 
 Shuffle Behavior Spark UI
 View on spark.apache.org 
 http://spark.apache.org/docs/1.2.0/configuration.html  
 Preview by Yahoo
  
 
 spark.speculation false   If set to true, performs speculative 
 execution of tasks. This means if one or more tasks are running slowly in a 
 stage, they will be re-launched.
 spark.speculation.interval100 How often Spark will check for tasks to 
 speculate, in milliseconds.
 spark.speculation.quantile0.75Percentage of tasks which must be 
 complete before speculation is enabled for a particular stage.
 spark.speculation.multiplier  1.5 
 How many times slower a task is than the median to be considered for 
 speculation.
 
  
 
 
 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
 a_k_srivast...@yahoo.com.INVALID wrote:
 
 
 Hi,
 
 My spark job is taking long time. I see that some tasks are taking longer 
 time for same amount of data and shuffle read/write. What could be the 
 possible reasons for it ?
 
 The thread-dump sometimes show that all the tasks in an executor are waiting 
 with following stack trace -
 
 Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
 nid=0x3f85 waiting on condition [0x7fcce3ddc000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7fd0aee82e00 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(Unknown Source)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
  Source)
 at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
 at 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
 at 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at 
 

Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks Nicos.GC does not contribute much to the execution time of the task. I 
will debug it further today.
Regards,Ajay
 

 On Thursday, January 15, 2015 11:55 PM, Nicos n...@hotmail.com wrote:
   

 Ajay, Unless we are dealing with some synchronization/conditional variable bug 
in Spark, try this per tuning guide:Cache Size TuningOne important 
configuration parameter for GC is the amount of memory that should be used for 
caching RDDs. By default, Spark uses 60% of the configured executor memory 
(spark.executor.memory) to cache RDDs. This means that 40% of memory is 
available for any objects created during task execution.In case your tasks slow 
down and you find that your JVM is garbage-collecting frequently or running out 
of memory, lowering this value will help reduce the memory consumption. To 
change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 
0.5) on your SparkConf. Combined with the use of serialized caching, using a 
smaller cache should be sufficient to mitigate most of the garbage collection 
problems. In case you are interested in further tuning the Java GC, continue 
reading below.
Complete list of tips 
here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage
Cheers,- Nicos

On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID 
wrote:
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at 

Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay



Re: Some tasks are taking long time

2015-01-15 Thread RK
If you don't want a few slow tasks to slow down the entire job, you can turn on 
speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay