SPARK-8813 - combining small files in spark sql
Hi, This jira https://issues.apache.org/jira/browse/SPARK-8813 is fixed in spark 2.0.But resolution is not mentioned there. In our use case, there are big as well as many small parquet files which are being queried using spark sql.Can someone please explain what is the fix and how I can use it in spark 2.0 ? I did search commits done in 2.0 branch and looks like I need to use spark.sql.files.openCostInBytes but I am not sure. Regards,Ajay
Re: Some tasks are taking long time
Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: Some tasks are taking long time
Thanks Nicos.GC does not contribute much to the execution time of the task. I will debug it further today. Regards,Ajay On Thursday, January 15, 2015 11:55 PM, Nicos n...@hotmail.com wrote: Ajay, Unless we are dealing with some synchronization/conditional variable bug in Spark, try this per tuning guide:Cache Size TuningOne important configuration parameter for GC is the amount of memory that should be used for caching RDDs. By default, Spark uses 60% of the configured executor memory (spark.executor.memory) to cache RDDs. This means that 40% of memory is available for any objects created during task execution.In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of memory, lowering this value will help reduce the memory consumption. To change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 0.5) on your SparkConf. Combined with the use of serialized caching, using a smaller cache should be sufficient to mitigate most of the garbage collection problems. In case you are interested in further tuning the Java GC, continue reading below. Complete list of tips here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage Cheers,- Nicos On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Thanks RK. I can turn on speculative execution but I am trying to find out actual reason for delay as it happens on any node. Any idea about the stack trace in my previous mail. Regards,Ajay On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote: If you don't want a few slow tasks to slow down the entire job, you can turn on speculation. Here are the speculation settings from Spark Configuration - Spark 1.2.0 Documentation. | | | | | | | | | Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark Properties Dynamically Loading Spark Properties Viewing Spark Properties Available Properties Application Properties Runtime Environment Shuffle Behavior Spark UI | | | | View on spark.apache.org | Preview by Yahoo | | | | | | spark.speculation | false | If set to true, performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched. | | spark.speculation.interval | 100 | How often Spark will check for tasks to speculate, in milliseconds. | | spark.speculation.quantile | 0.75 | Percentage of tasks which must be complete before speculation is enabled for a particular stage. | | spark.speculation.multiplier | 1.5 | How many times slower a task is than the median to be considered for speculation. | On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158
Some tasks are taking long time
Hi, My spark job is taking long time. I see that some tasks are taking longer time for same amount of data and shuffle read/write. What could be the possible reasons for it ? The thread-dump sometimes show that all the tasks in an executor are waiting with following stack trace - Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 nid=0x3f85 waiting on condition [0x7fcce3ddc000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x7fd0aee82e00 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) Any inputs/suggestions to improve job time will be appreciated. Regards,Ajay
Re: Creating RDD from only few columns of a Parquet file
Setting spark.sql.hive.convertMetastoreParquet to true has fixed this. Regards,Ajay On Tuesday, January 13, 2015 11:50 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID wrote: Hi,I am trying to read a parquet file using -val parquetFile = sqlContext.parquetFile(people.parquet) There is no way to specify that I am interested in reading only some columns from disk. For example, If the parquet file has 10 columns and want to read only 3 columns from disk. We have done an experiment - Table1 - Parquet file containing 10 columns Table2 - Parquet file containing only 3 columns which were used in query The time taken by query on table1 and table2 shows huge difference. Query on Table1 takes more than double of time taken on table2 which makes me think that spark is reading all the columns from disk in case of table1 when it needs only 3 columns. How should I make sure that it reads only 3 of 10 columns from disk ? Regards, Ajay
Creating RDD from only few columns of a Parquet file
Hi,I am trying to read a parquet file using -val parquetFile = sqlContext.parquetFile(people.parquet) There is no way to specify that I am interested in reading only some columns from disk. For example, If the parquet file has 10 columns and want to read only 3 columns from disk. We have done an experiment - Table1 - Parquet file containing 10 columns Table2 - Parquet file containing only 3 columns which were used in query The time taken by query on table1 and table2 shows huge difference. Query on Table1 takes more than double of time taken on table2 which makes me think that spark is reading all the columns from disk in case of table1 when it needs only 3 columns. How should I make sure that it reads only 3 of 10 columns from disk ? Regards, Ajay
Spark summit 2014 videos ?
Hi, I did not find any videos on apache spark channel in youtube yet. Any idea when these will be made available ? Regards, Ajay
OFF_HEAP storage level
Hi, I was checking different storage level of an RDD and found OFF_HEAP. Has anybody used this level ? If i use this level, where will data be stored ? If not in heap, does it mean that we can avoid GC ? How can I use this level ? I did not find anything in archive regarding this. Can someone also explain the behavior of storage level - NONE ? Regards, Ajay
Re: Join : Giving incorrect result
Thanks Matei. We have tested the fix and it's working perfectly. Andrew, we set spark.shuffle.spill=false but the application goes out of memory. I think that is expected. Regards,Ajay On Friday, June 6, 2014 3:49 AM, Andrew Ash and...@andrewash.com wrote: Hi Ajay, Can you please try running the same code with spark.shuffle.spill=false and see if the numbers turn out correctly? That parameter controls whether or not the buggy code that Matei fixed in ExternalAppendOnlyMap is used. FWIW I saw similar issues in 0.9.0 but no longer in 0.9.1 after I think some fixes in spilling landed. Andrew On Thu, Jun 5, 2014 at 3:05 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Ajay, thanks for reporting this. There was indeed a bug, specifically in the way join tasks spill to disk (which happened when you had more concurrent tasks competing for memory). I’ve posted a patch for it here: https://github.com/apache/spark/pull/986. Feel free to try that if you’d like; it will also be in 0.9.2 and 1.0.1. Matei On Jun 5, 2014, at 12:19 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Re: Join : Giving incorrect result
Sorry for replying late. It was night here. Lian/Matei, Here is the code snippet - sparkConf.set(spark.executor.memory, 10g) sparkConf.set(spark.cores.max, 5) val sc = new SparkContext(sparkConf) val accId2LocRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2location).map(getKeyValueFromString(_, 0, ',', true)) val accId2DemoRDD = sc.textFile(hdfs://bbr-dev178:9000/data/subDbSpark/account2demographic_planType).map(getKeyValueFromString(_, 0, ',', true)) val joinedRDD = accId2LocRDD.join(accId2DemoRDD) def getKeyValueFromString(line: String, keyIndex: Int, delimit: Char, retFullLine: Boolean): Tuple2[String, String] = { val splits = line.split(delimit) if (splits.length = 1) { (null, null) } else if (retFullLine) { (splits(keyIndex), line) } else{ (splits(keyIndex), splits(splits.length-keyIndex-1)) } } Both of these files have 10 M records with same unique keys. Size of the file is nearly 280 MB and block size in hdfs is 256 MB. The output of join should contain 10 M records. We have done some more experiments - 1) Running cogroup instead of join - it also gives incorrect count. 2) Running union followed by groupbykey and then filtering records with two entries in sequence - It also gives incorrect count. 3) Increase spark.executor.memory to 50 g and everything works fine. Count comes 10 M for join,cogroup and union/groupbykey/filter transformations. I thought that 10g is enough memory for executors but even if the memory is less it should not result in incorrect computation. Probably there is a problem in reconstructing RDDs when memory is not enough. Thanks Chen for your observation. I get this problem on single worker so there will not be any mismatch of jars. On two workers, since executor memory gets doubled the code works fine. Regards, Ajay On Thursday, June 5, 2014 1:35 AM, Matei Zaharia matei.zaha...@gmail.com wrote: If this isn’t the problem, it would be great if you can post the code for the program. Matei On Jun 4, 2014, at 12:58 PM, Xu (Simon) Chen xche...@gmail.com wrote: Maybe your two workers have different assembly jar files? I just ran into a similar problem that my spark-shell is using a different jar file than my workers - got really confusing results. On Jun 4, 2014 8:33 AM, Ajay Srivastava a_k_srivast...@yahoo.com wrote: Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay
Join : Giving incorrect result
Hi, I am doing join of two RDDs which giving different results ( counting number of records ) each time I run this code on same input. The input files are large enough to be divided in two splits. When the program runs on two workers with single core assigned to these, output is consistent and looks correct. But when single worker is used with two or more than two cores, the result seems to be random. Every time, count of joined record is different. Does this sound like a defect or I need to take care of something while using join ? I am using spark-0.9.1. Regards Ajay