According to our offline discussion, the target table consists of 1M+
small Parquet files (~12M by average). The OOM occurred at driver side
while listing input files.
My theory is that the total size of all listed FileStatus objects is too
large for the driver and caused the OOM.
Suggestions:
1. Merge those small Parquet files to reduce file number. Also, to be
efficient, typically the size of a Parquet file should be at least
larger than an HDFS block.
2. Try to increase driver size.
One possible improvement here in Spark is that we probably shouldn't
list all input files of a partitioned table when the query only touches
a fraction of all the partitions.
Cheng
On 7/8/16 8:44 PM, Sea wrote:
My spark version is 1.6.1.
== Parsed Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) &&
(appid#5 = 6))
+- Subquery dwd_native
+-
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
ParquetRelation: omega.dwd_native
== Analyzed Logical Plan ==
count: bigint
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate [(count(1),mode=Complete,isDistinct=false) AS events#0L]
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700) &&
(appid#5 = 6))
+- Subquery dwd_native
+-
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
ParquetRelation: omega.dwd_native
== Optimized Logical Plan ==
Aggregate [(count(1),mode=Complete,isDistinct=false) AS count#112L]
+- Limit 1
+- Aggregate
+- Project
+- Filter ((concat(year#1,month#2,day#3,hour#4) = 2016070700)
&& (appid#5 = 6))
+-
Relation[oid#6,eventid#7,clientid#8L,uid#9,passengerid#10L,phone_time#11L,server_time#12L,app_name#13,app_version#14,os_type#15,os_version#16,brand#17,model#18,idfa#19,udid#20,usid#21,ekey#22,mid#23,sdk_version#24,longitude#25,latitude#26,channel#27,tel#28,cip#29,country#30,province#31,city#32,cityid#33,label#34,attrs#35,attrs_n#36,fr#37,year#1,month#2,day#3,hour#4,appid#5]
ParquetRelation: omega.dwd_native
== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[count#112L])
+- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#115L])
+- Limit 1
+- ConvertToSafe
+- TungstenAggregate(key=[], functions=[], output=[])
+- TungstenExchange SinglePartition, None
+- TungstenAggregate(key=[], functions=[], output=[])
+- Scan ParquetRelation: omega.dwd_native[]
InputPaths:
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=0,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=1,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=2,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=3,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=4,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=5,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/00/appid=6,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=0,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=1,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=2,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=3,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=4,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=5,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/01/appid=6,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=0,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=1,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=2,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=3,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=4,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=5,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/02/appid=6,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/appid=0,
hdfs://mycluster-tj/user/omega/events/v4/h/2016/04/01/03/
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at scala.collection.mutable.HashTable$class.resize(HashTable.scala:247)
at
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:151)
at
scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:163)
at
scala.collection.mutable.LinkedHashSet.findOrAddEntry(LinkedHashSet.scala:41)
at scala.collection.mutable.LinkedHashSet.add(LinkedHashSet.scala:62)
at
scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:59)
at
scala.collection.mutable.LinkedHashSet.$plus$eq(LinkedHashSet.scala:41)
at
scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:26)
at
scala.collection.mutable.GrowingBuilder.$plus$eq(GrowingBuilder.scala:24)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.GrowingBuilder.$plus$plus$eq(GrowingBuilder.scala:24)
at
scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:48)
at
org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:910)
at
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445)
at
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)
at
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)
at
org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)
at
org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)
at
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)
at
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
at
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:37)
------------------ ???????? ------------------
*??????:* "lian.cs.zju";<lian.cs....@gmail.com>;
*????????:* 2016??7??8??(??????) ????4:47
*??????:* "Sea"<261810...@qq.com>;
*????:* "user"<user@spark.apache.org>;
*????:* Re: Bug about reading parquet files
What's the Spark version? Could you please also attach result of
explain(extended = true)?
On Fri, Jul 8, 2016 at 4:33 PM, Sea <261810...@qq.com
<mailto:261810...@qq.com>> wrote:
I have a problem reading parquet files.
sql:
select count(1) from omega.dwd_native where year='2016' and
month='07' and day='05' and hour='12' and appid='6';
The hive partition is (year,month,day,appid)
only two tasks, and it will list all directories in my table, not
only /user/omega/events/v4/h/2016/07/07/12/appid=6
[Stage 1:> (0 + 0) / 2]
16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing
hdfs://mycluster-tj/user/omega/events/v4/h/2016/05/31/21/appid=1
16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing
hdfs://mycluster-tj/user/omega/events/v4/h/2016/06/28/20/appid=2
16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing
hdfs://mycluster-tj/user/omega/events/v4/h/2016/07/22/21/appid=65537
16/07/08 16:16:51 INFO sources.HadoopFsRelation: Listing
hdfs://mycluster-tj/user/omega/events/v4/h/2016/08/14/05/appid=65536