According to our offline discussion, the target table consists of 1M+ small Parquet files (~12M by average). The OOM occurred at driver side while listing input files.

My theory is that the total size of all listed FileStatus objects is too large for the driver and caused the OOM.

Suggestions:

1. Merge those small Parquet files to reduce file number. Also, to be efficient, typically the size of a Parquet file should be at least larger than an HDFS block.
2. Try to increase driver size.

One possible improvement here in Spark is that we probably shouldn't list all input files of a partitioned table when the query only touches a fraction of all the partitions.

Cheng



On 7/8/16 8:44 PM, Sea wrote:
My spark version is 1.6.1.

== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
         +- Subquery dwd_native
+- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native

== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6))
         +- Subquery dwd_native
+- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native

== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
   +- Aggregate
      +- Project
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) && (appid#5 = 6)) +- Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5] ParquetRelation: omega.dwd_native

== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L]) +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
   +- Limit 1
      +- ConvertToSafe
         +- TungstenAggregate(key=[], functions=[], output=[])
            +- TungstenExchange SinglePartition, None
               +- TungstenAggregate(key=[], functions=[], output=[])
+- Scan ParquetRelation: omega.dwd_native[] InputPaths: hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0, hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/


Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
  at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151) at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163) at scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
  at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59) at scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41) at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26) at scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24) at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48) at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477) at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489) at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487) at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
  at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202) at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636) at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635) at org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)


------------------ ???????? ------------------
*??????:* "lian.cs.zju";<lian.cs....@gmail.com>;
*????????:* 2016??7??8??(??????) ????4:47
*??????:* "Sea"<261810...@qq.com>;
*????:* "user"<user@spark.apache.org>;
*????:* Re: Bug about reading parquet files

What's the Spark version? Could you please also attach result of explain(extended = true)?

On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810...@qq.com <mailto:261810...@qq.com>> wrote:

    I have a problem reading parquet files.
    sql:
    select count(1) from   omega.dwd_native where year='2016' and
    month='07' and day='05' and hour='12' and appid='6';
    The hive partition is (year,month,day,appid)

    only two tasks, and it will list all directories in my table, not
    only /user/omega/events/v4/h/2016/07/07/12/appid=6
    [Stage 1:>                  (0 + 0) / 2]

    16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
    16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2

    16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
    16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing 
hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536



Reply via email to