Hi all,

I'm using Spark (1.4.1) + Hive (0.13.1), I found that a large number of
network IO appeared when query a parquet table *with only one part file* use
SparkSQL. 

The SQL is: SELECT concat(year(fkbb5855f0), "-", month(fkbb5855f0), "-",
day(fkbb5855f0), " 00:00:00"),COUNT(fk919b1d80) FROM test WHERE fkbb5855f0
>= '2015-08-02 00:00:00' AND fkbb5855f0 <
'2015-09-01 00:00:00' AND fk418c5509 IN ('add_summary') AND (fkbb5855f0
!= '' AND fkbb5855f0 is not NULL) GROUP BY year(fkbb5855f0),
month(fkbb5855f0), day(fkbb5855f0)

The SQL's query explain is:

== Parsed Logical Plan ==
'Limit 10000
 'Aggregate ['year('fkbb5855f0),'month('fkbb5855f0),'day('fkbb5855f0)],
['concat('year('fkbb5855f0),-,'month('fkbb5855f0),-,'day('fkbb5855f0),
00:00:00) AS _c0#14,COUNT('fk919b1d80) AS _c1#15]
  'Filter (((('fkbb5855f0 >= 2015-08-02 00:00:00) &&
('fkbb5855f0 < 2015-09-01 00:00:00)) && 'fk418c5509 IN
(add_summary)) && (NOT ('fkbb5855f0 = ) && IS NOT NULL
'fkbb5855f0))
   'UnresolvedRelation [test], None

== Analyzed Logical Plan ==
_c0: string, _c1: bigint
Limit 10000
 Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
  Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) &&
(fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 IN
(add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL
fkbb5855f0#36))
   Subquery test
   
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032

== Optimized Logical Plan ==
Limit 10000
 Aggregate
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),-,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36),
00:00:00) AS _c0#14,COUNT(fk919b1d80#34) AS _c1#15L]
  Project [fkbb5855f0#36,fk919b1d80#34]
   Filter ((((fkbb5855f0#36 >= 2015-08-02 00:00:00) &&
(fkbb5855f0#36 < 2015-09-01 00:00:00)) && fk418c5509#35 INSET
(add_summary)) && (NOT (fkbb5855f0#36 = ) && IS NOT NULL
fkbb5855f0#36))
   
Relation[fkb80bb774#33,fk919b1d80#34,fk418c5509#35,fkbb5855f0#36]
org.apache.spark.sql.parquet.ParquetRelation2@5a271032

== Physical Plan ==
Limit 10000
 Aggregate false, [PartialGroup#42,PartialGroup#43,PartialGroup#44],
[HiveGenericUdf#org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat(PartialGroup#42,-,PartialGroup#43,-,PartialGroup#44,
00:00:00) AS _c0#14,Coalesce(SUM(PartialCount#41L),0) AS _c1#15L]
  Exchange (HashPartitioning 200)
   Aggregate true,
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36),HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)],
[HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFYear(fkbb5855f0#36) AS
PartialGroup#42,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFMonth(fkbb5855f0#36)
AS
PartialGroup#43,HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFDayOfMonth(fkbb5855f0#36)
AS PartialGroup#44,COUNT(fk919b1d80#34) AS PartialCount#41L]
    Project [fkbb5855f0#36,fk919b1d80#34]
     Filter (((((fkbb5855f0#36 >= 2015-08-02 00:00:00)
&& (fkbb5855f0#36 < 2015-09-01 00:00:00)) &&
fk418c5509#35 INSET (add_summary)) && NOT (fkbb5855f0#36 = ))
&& IS NOT NULL fkbb5855f0#36)
      PhysicalRDD
[fkbb5855f0#36,fk919b1d80#34,fk418c5509#35], MapPartitionsRDD[5] at

Code Generation: false
== RDD ==

The size of the `test` table is 3.3GB, I have 5 nodes in the Hadoop cluster,
and Spark use the same cluster. There are 3 replications of test table and
the block size is 64MB. 

The task count of the first stage is 54 when SparkSQL execute the SQL, the
Locality Level of all task is NODE_LOCAL. I use dstat monitoring a node
of the cluster, there are a large number of network IO:

----total-cpu-usage---- -dsk/total- -net/total- ---paging-- ---system--
usr sys idl wai hiq siq| read  writ| recv  send|  in  
out | int   csw
  1   0  99   0   0   0| 107k  389k|
  0     0 | 193B 1097B|2408  5443
  0   0 100   0   0   0|   0     0
|5709B 3285B|   0     0 |1921  4282
  0   0 100   0   0   0|1936k    0 |3761B
1251B|   0     0 |1907  4197
  0   0 100   0   0   0|   0   584k|3399B
1539B|   0     0 |1903  4338
  0   0  99   0   0   0|1936k    0
|4332B 1447B|   0     0 |2070  4448
  4   1  93   3   0   0|  16M  
 0 |5117B 1439B|   0     0 |9177    11k
  1   1  77  21   0   0| 215M    0 |
921k   68M|   0     0 |  21k   10k
  6   1  79  14   0   0| 175M    0 |
327k   19M|   0     0 |  10k 8207
 32   1  68   0   0   0|3308k    0 |
237k   14M|   0     0 |  16k 6536
 22   0  78   0   0   0|2048k    0 |
 98k 5733k|   0     0 |9190  5823
 30   0  69   0   0   0|8080k 8192B| 175k
  11M|   0     0 |  18k 6950
 23   0  77   0   0   0|  18M  
 0 | 727k   52M|   0     0 |  25k 8648
 22   0  78   0   0   0|  28M  
 0 | 920k   96M|   0     0 |  26k   11k
 22   0  78   0   0   0|  31M  
 0 |1003k  114M|   0     0 |  25k   10k
 22   0  78   0   0   0|9372k    0 |
487k   49M|   0     0 |9935  5599
 18   1  81   0   0   0|   0   125k|
 47k 2027k|   0     0 |8820  5358
  4   1  95   0   0   0|  28M  450k|
289k   23M|   0     0 |  16k 7992
  0   0  99   0   0   0|  10M  
 0 | 446k   42M|   0     0 |3765  5262
  0   0 100   0   0   0|1944k    0 |8540B
1364k|   0     0 |1943  4378
  0   0  99   0   0   0|   0   426k|
 11k 2469B|   0     0 |2008  4476
  1   0  99   0   0   0|1852k  368k|
 16k 1687B|   0     0 |2111  4509

But the sql's result size is only 3KB, the network IO confused me. There are
also something confused me too, I found *half of the task's `Input
Size/Records` is `64.0 MB (hadoop) / 0`. * I think Spark dispatch jobs
in a wrong way.

I split the `test` table into different count of part files to valid my
thought. The data is:

part files, single file length,query els,network IO
1, 3.34G, 12s, ~200MB/node
5, 800MB, 10s,~100MB/node
64, 74MB, 8s, ~50MB/node
74, 64MB, 7s, only a little
150, 32MB, 6s, only a little
297,20MB, 5s,  only a little
I also set the replication of `test` table to 5, I found no network IO
appeared, but the query cost 11s! So, I think the network IO should be spark
fetching data from a remote node when a parquet file large than the block
size of Hadoop (64MB).

I want to know:
1. How spark dispatch jobs when a parquet file only have one part file which
large than the block size of Hadoop?
2. In which condition task's `Input Size/Records` is `64.0 MB (hadoop) / 0`
or `5.2 KB (hadoop) / 0` or `0.0 B (hadoop) / 50`?

Any ideas? 
Thanks




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SparkSQL-How-does-spark-handle-a-parquet-file-in-parallel-tp14210.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to