SPARK-8813 - combining small files in spark sql

2016-07-07 Thread Ajay Srivastava
Hi,
This jira https://issues.apache.org/jira/browse/SPARK-8813 is fixed in spark 
2.0.But resolution is not mentioned there.
In our use case, there are big as well as many small parquet files which are 
being queried using spark sql.Can someone please explain what is the fix and 
how I can use it in spark 2.0 ? I did search commits done in 2.0 branch and 
looks like I need to use spark.sql.files.openCostInBytes but I am not sure.


Regards,Ajay


Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay





   

Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks Nicos.GC does not contribute much to the execution time of the task. I 
will debug it further today.
Regards,Ajay
 

 On Thursday, January 15, 2015 11:55 PM, Nicos n...@hotmail.com wrote:
   

 Ajay, Unless we are dealing with some synchronization/conditional variable bug 
in Spark, try this per tuning guide:Cache Size TuningOne important 
configuration parameter for GC is the amount of memory that should be used for 
caching RDDs. By default, Spark uses 60% of the configured executor memory 
(spark.executor.memory) to cache RDDs. This means that 40% of memory is 
available for any objects created during task execution.In case your tasks slow 
down and you find that your JVM is garbage-collecting frequently or running out 
of memory, lowering this value will help reduce the memory consumption. To 
change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 
0.5) on your SparkConf. Combined with the use of serialized caching, using a 
smaller cache should be sufficient to mitigate most of the garbage collection 
problems. In case you are interested in further tuning the Java GC, continue 
reading below.
Complete list of tips 
here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage
Cheers,- Nicos

On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID 
wrote:
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158

Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay



Re: Creating RDD from only few columns of a Parquet file

2015-01-13 Thread Ajay Srivastava
Setting spark.sql.hive.convertMetastoreParquet to true has fixed this.

Regards,Ajay 

 On Tuesday, January 13, 2015 11:50 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile(people.parquet)

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay




Creating RDD from only few columns of a Parquet file

2015-01-12 Thread Ajay Srivastava
Hi,I am trying to read a parquet file using -val parquetFile = 
sqlContext.parquetFile(people.parquet)

There is no way to specify that I am interested in reading only some columns 
from disk. For example, If the parquet file has 10 columns and want to read 
only 3 columns from disk.

We have done an experiment -
Table1 - Parquet file containing 10 columns
Table2 - Parquet file containing only 3 columns which were used in query 

The time taken by query on table1 and table2 shows huge difference. Query on 
Table1 takes more than double of time taken on table2 which makes me think that 
spark is reading all the columns from disk in case of table1 when it needs only 
3 columns.

How should I make sure that it reads only 3 of 10 columns from disk ?


Regards,
Ajay


Spark summit 2014 videos ?

2014-07-10 Thread Ajay Srivastava
Hi,
I did not find any videos on apache spark channel in youtube yet.
Any idea when these will be made available ?


Regards,
Ajay


OFF_HEAP storage level

2014-07-04 Thread Ajay Srivastava
Hi,
I was checking different storage level of an RDD and found OFF_HEAP.
Has anybody used this level ?
If i use this level, where will data be stored ? If not in heap, does it mean 
that we can avoid GC ?
How can I use this level ? I did not find anything in archive regarding this.
Can someone also explain the behavior of storage level - NONE ? 


Regards,
Ajay


Re: Join : Giving incorrect result

2014-06-06 Thread Ajay Srivastava


Thanks Matei. We have tested the fix and it's working perfectly.

Andrew, we set spark.shuffle.spill=false but the application goes out of 
memory. I think that is expected.

Regards,Ajay 


On Friday, June 6, 2014 3:49 AM, Andrew Ash and...@andrewash.com wrote:
 


Hi Ajay,

Can you please try running the same code with spark.shuffle.spill=false and see 
if the numbers turn out correctly?  That parameter controls whether or not the 
buggy code that Matei fixed in ExternalAppendOnlyMap is used.

FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some 
fixes in spilling landed.

Andrew



On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in 
the way join tasks spill to disk (which happened when you had more concurrent 
tasks competing for memory). I’ve posted a patch for it here: 
https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; 
it will also be in 0.9.2 and 1.0.1.


Matei


On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Sorry for replying late. It was night here.


Lian/Matei,
Here is the code snippet -
    sparkConf.set(spark.executor.memory, 10g)
    sparkConf.set(spark.cores.max, 5)
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)


  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length = 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.



We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.



I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a 
problem in reconstructing RDDs when memory is not enough. 



Thanks Chen for your observation. I get this problem on single worker so 
there will not be any mismatch of jars. On two workers, since executor memory 
gets doubled the code works fine.



Regards,
Ajay




On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com 
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.


Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

Maybe your two workers have different assembly jar files?
I just ran into a similar problem that my spark-shell is using a different 
jar file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Hi,


I am doing join of two RDDs which giving different results ( counting 
number of records ) each time I run this code on same input.


The input files are large enough to be divided in two splits. When the 
program runs on two workers with single core assigned to these, output is 
consistent and looks correct. But when single worker is used with two or 
more than two cores, the result seems to be random. Every time, count of 
joined record is different.


Does this sound like a defect or I need to take care of something while 
using join ? I am using spark-0.9.1.



Regards
Ajay





Re: Join : Giving incorrect result

2014-06-05 Thread Ajay Srivastava
Sorry for replying late. It was night here.

Lian/Matei,
Here is the code snippet -
    sparkConf.set(spark.executor.memory, 10g)
    sparkConf.set(spark.cores.max, 5)
    
    val sc = new SparkContext(sparkConf)
    
    val accId2LocRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_,
 0, ',', true))
  
    val accId2DemoRDD = 
sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_,
 0, ',', true))
    
    val joinedRDD = accId2LocRDD.join(accId2DemoRDD)

  def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, 
retFullLine: Boolean): Tuple2[String, String] = {
    val splits = line.split(delimit)
    if (splits.length = 1) {
  (null, null)
    } else if (retFullLine) {
  (splits(keyIndex), line)
    } else{
        (splits(keyIndex), splits(splits.length-keyIndex-1))
    }
  }

    

Both of these files have 10 M records with same unique keys. Size of the file 
is nearly 280 MB and block size in hdfs is 256 MB. The output of join should 
contain 10 M records.


We have done some more experiments -
1) Running cogroup instead of join - it also gives incorrect count.
2) Running union followed by groupbykey and then filtering records with two 
entries in sequence - It also gives incorrect count.
3) Increase spark.executor.memory to 50 g and everything works fine. Count 
comes 10 M for join,cogroup and union/groupbykey/filter transformations.


I thought that 10g is enough memory for executors but even if the memory is 
less it should not result in incorrect computation. Probably there is a problem 
in reconstructing RDDs when memory is not enough. 


Thanks Chen for your observation. I get this problem on single worker so there 
will not be any mismatch of jars. On two workers, since executor memory gets 
doubled the code works fine.


Regards,
Ajay



On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com 
wrote:
 


If this isn’t the problem, it would be great if you can post the code for the 
program.

Matei



On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote:

Maybe your two workers have different assembly jar files?
I just ran into a similar problem that my spark-shell is using a different jar 
file than my workers - got really confusing results.
On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote:

Hi,


I am doing join of two RDDs which giving different results ( counting number 
of records ) each time I run this code on same input.


The input files are large enough to be divided in two splits. When the 
program runs on two workers with single core assigned to these, output is 
consistent and looks correct. But when single worker is used with two or more 
than two cores, the result seems to be random. Every time, count of joined 
record is different.


Does this sound like a defect or I need to take care of something while using 
join ? I am using spark-0.9.1.



Regards
Ajay

Join : Giving incorrect result

2014-06-04 Thread Ajay Srivastava
Hi,

I am doing join of two RDDs which giving different results ( counting number of 
records ) each time I run this code on same input.

The input files are large enough to be divided in two splits. When the program 
runs on two workers with single core assigned to these, output is consistent 
and looks correct. But when single worker is used with two or more than two 
cores, the result seems to be random. Every time, count of joined record is 
different.

Does this sound like a defect or I need to take care of something while using 
join ? I am using spark-0.9.1.


Regards
Ajay