Hi all, I'm using Spark (1.4.1) + Hive (0.13.1), I found that a large number of network IO appeared when query a parquet table *with only one part file* use SparkSQL.
The SQL is: SELECT concat(year(fkbb5855f0), "-", month(fkbb5855f0), "-", day(fkbb5855f0), " 00:00:00"),COUNT(fk919b1d80) FROM test WHERE fkbb5855f0 >= '2015-08-02 00:00:00' AND fkbb5855f0 < '2015-09-01 00:00:00' AND fk418c5509 IN ('add_summary') AND (fkbb5855f0 != '' AND fkbb5855f0 is not NULL) GROUP BY year(fkbb5855f0), month(fkbb5855f0), day(fkbb5855f0) The SQL's query explain is: == Parsed Logical Plan == 'Limit 10000 'Aggregate ['year('fkbb5855f0),'month('fkbb5855f0),'day('fkbb5855f0)], ['concat('year('fkbb5855f0),-,'month('fkbb5855f0),-,'day('fkbb5855f0), 00:00:00) AS _c0#14,COUNT('fk919b1d80) AS _c1#15] 'Filter (((('fkbb5855f0 >= 2015-08-02 00:00:00) && ('fkbb5855f0 < 2015-09-01 00:00:00)) && 'fk418c5509 IN (add_summary)) && (NOT ('fkbb5855f0 = ) && IS NOT NULL 'fkbb5855f0)) 'UnresolvedRelation [test], None == Analyzed Logical Plan == _c0: string, _c1: bigint Limit 10000 Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)], [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36), 00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L] Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) && (fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 IN (add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL fkbb5855f0#36)) Subquery test Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36] org.apache.spark.sql.parquet.ParquetRelation2@5a271032 == Optimized Logical Plan == Limit 10000 Aggregate [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)], [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36), 00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L] Project [fkbb5855f0#36,fk919b1d80#34] Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) && (fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 INSET (add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL fkbb5855f0#36)) Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36] org.apache.spark.sql.parquet.ParquetRelation2@5a271032 == Physical Plan == Limit 10000 Aggregate false, [PartialGroup#42,PartialGroup#43,PartialGroup#44], [HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(PartialGroup#42,-,PartialGroup#43,-,PartialGroup#44, 00:00:00) AS _c0#14,Coalesce(SUM(PartialCount#41L),0) AS _c1#15L] Exchange (HashPartitioning 200) Aggregate true, [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)], [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36) AS PartialGroup#42,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36) AS PartialGroup#43,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36) AS PartialGroup#44,COUNT(fk919b1d80#34) AS PartialCount#41L] Project [fkbb5855f0#36,fk919b1d80#34] Filter (((((fkbb5855f0#36 >= 2015-08-02 00:00:00) && (fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 INSET (add_summary)) && NOT (fkbb5855f0#36 = )) && IS NOT NULL fkbb5855f0#36) PhysicalRDD [fkbb5855f0#36,fk919b1d80#34,fk418c5509#35], MapPartitionsRDD[5] at Code Generation: false == RDD == The size of the `test` table is 3.3GB, I have 5 nodes in the Hadoop cluster, and Spark use the same cluster. There are 3 replications of test table and the block size is 64MB. The task count of the first stage is 54 when SparkSQL execute the SQL, the Locality Level of all task is NODE_LOCAL. I use dstat monitoring a node of the cluster, there are a large number of network IO: ----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system-- usr sys idl wai hiq siq| read writ| recv send| in out | int csw 1 0 99 0 0 0| 107k 389k| 0 0 | 193B 1097B|2408 5443 0 0 100 0 0 0| 0 0 |5709B 3285B| 0 0 |1921 4282 0 0 100 0 0 0|1936k 0 |3761B 1251B| 0 0 |1907 4197 0 0 100 0 0 0| 0 584k|3399B 1539B| 0 0 |1903 4338 0 0 99 0 0 0|1936k 0 |4332B 1447B| 0 0 |2070 4448 4 1 93 3 0 0| 16M 0 |5117B 1439B| 0 0 |9177 11k 1 1 77 21 0 0| 215M 0 | 921k 68M| 0 0 | 21k 10k 6 1 79 14 0 0| 175M 0 | 327k 19M| 0 0 | 10k 8207 32 1 68 0 0 0|3308k 0 | 237k 14M| 0 0 | 16k 6536 22 0 78 0 0 0|2048k 0 | 98k 5733k| 0 0 |9190 5823 30 0 69 0 0 0|8080k 8192B| 175k 11M| 0 0 | 18k 6950 23 0 77 0 0 0| 18M 0 | 727k 52M| 0 0 | 25k 8648 22 0 78 0 0 0| 28M 0 | 920k 96M| 0 0 | 26k 11k 22 0 78 0 0 0| 31M 0 |1003k 114M| 0 0 | 25k 10k 22 0 78 0 0 0|9372k 0 | 487k 49M| 0 0 |9935 5599 18 1 81 0 0 0| 0 125k| 47k 2027k| 0 0 |8820 5358 4 1 95 0 0 0| 28M 450k| 289k 23M| 0 0 | 16k 7992 0 0 99 0 0 0| 10M 0 | 446k 42M| 0 0 |3765 5262 0 0 100 0 0 0|1944k 0 |8540B 1364k| 0 0 |1943 4378 0 0 99 0 0 0| 0 426k| 11k 2469B| 0 0 |2008 4476 1 0 99 0 0 0|1852k 368k| 16k 1687B| 0 0 |2111 4509 But the sql's result size is only 3KB, the network IO confused me. There are also something confused me too, I found *half of the task's `Input Size/Records` is `64.0 MB (hadoop) / 0`. * I think Spark dispatch jobs in a wrong way. I split the `test` table into different count of part files to valid my thought. The data is: part files, single file length,query els,network IO 1, 3.34G, 12s, ~200MB/node 5, 800MB, 10s,~100MB/node 64, 74MB, 8s, ~50MB/node 74, 64MB, 7s, only a little 150, 32MB, 6s, only a little 297,20MB, 5s, only a little I also set the replication of `test` table to 5, I found no network IO appeared, but the query cost 11s! So, I think the network IO should be spark fetching data from a remote node when a parquet file large than the block size of Hadoop (64MB). I want to know: 1. How spark dispatch jobs when a parquet file only have one part file which large than the block size of Hadoop? 2. In which condition task's `Input Size/Records` is `64.0 MB (hadoop) / 0` or `5.2 KB (hadoop) / 0` or `0.0 B (hadoop) / 50`? Any ideas? Thanks -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-How-does-spark-handle-a-parquet-file-in-parallel-tp14210.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.