The first stage for 1st query is to build a hash table for map join. It took 7s to finish. Why do you think it's slow? Of course, it seemed you had many small files, since there were 100 mappers, so each file would be very small. This is not good for performance. Also consider using other data formats other than text.
If a given stage is absurdly slow, check the task or executor statistics on Spark. You could have a bad node in your cluster, or your data is skewed. You can check if there is any specific task that takes much longer time than the rest. On Thu, Dec 3, 2015 at 6:51 PM, Jone Zhang <[email protected]> wrote: > *Thanks for you warning.* > *The first query is mapjoin and second query is reducejoin.The data format > is all textInputFormat.* > *I'll go to learn more about mapjoin of **hive on spark** anyway,But > why** stage1 of first query in attachment is so slowly?* > > *Explain first query:* > hive (u_wsd)> explain insert overwrite table t_sd_ucm_cominfo_incremental > partition (ds=20151202) > > select t1.uin,t1.clientip from > > (select uin,clientip from t_sd_ucm_cominfo_FinalResult where > ds=20151202) t1 > > left outer join (select uin,clientip from > t_sd_ucm_cominfo_FinalResult where ds=20151201) t2 > > on t1.uin=t2.uin > > where t2.clientip is NULL; > OK > STAGE DEPENDENCIES: > Stage-3 is a root stage > Stage-1 depends on stages: Stage-3 > Stage-0 depends on stages: Stage-1 > Stage-2 depends on stages: Stage-0 > > STAGE PLANS: > Stage: Stage-3 > Spark > DagName: mqq_20151204103210_fb913c60-c5ed-438f-8efb-950ee5639dd5:2 > Vertices: > Map 2 > Map Operator Tree: > TableScan > alias: t_sd_ucm_cominfo_finalresult > Statistics: Num rows: 108009 Data size: 2873665 Basic > stats: COMPLETE Column stats: NONE > Select Operator > expressions: uin (type: string), clientip (type: > string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 108009 Data size: 2873665 Basic > stats: COMPLETE Column stats: NONE > Spark HashTable Sink Operator > keys: > 0 _col0 (type: string) > 1 _col0 (type: string) > Local Work: > Map Reduce Local Work > > Stage: Stage-1 > Spark > DagName: mqq_20151204103210_fb913c60-c5ed-438f-8efb-950ee5639dd5:1 > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: t_sd_ucm_cominfo_finalresult > Statistics: Num rows: 103779 Data size: 2746785 Basic > stats: COMPLETE Column stats: NONE > Select Operator > expressions: uin (type: string), clientip (type: > string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 103779 Data size: 2746785 Basic > stats: COMPLETE Column stats: NONE > Map Join Operator > condition map: > Left Outer Join0 to 1 > keys: > 0 _col0 (type: string) > 1 _col0 (type: string) > outputColumnNames: _col0, _col1, _col3 > input vertices: > 1 Map 2 > Statistics: Num rows: 118809 Data size: 3161031 > Basic stats: COMPLETE Column stats: NONE > Filter Operator > predicate: _col3 is null (type: boolean) > Statistics: Num rows: 59404 Data size: 1580502 > Basic stats: COMPLETE Column stats: NONE > Select Operator > expressions: _col0 (type: string), _col1 (type: > string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 59404 Data size: 1580502 > Basic stats: COMPLETE Column stats: NONE > File Output Operator > compressed: false > Statistics: Num rows: 59404 Data size: 1580502 > Basic stats: COMPLETE Column stats: NONE > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: u_wsd.t_sd_ucm_cominfo_incremental > Local Work: > Map Reduce Local Work > > Stage: Stage-0 > Move Operator > tables: > partition: > ds 20151202 > replace: true > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: u_wsd.t_sd_ucm_cominfo_incremental > > Stage: Stage-2 > Stats-Aggr Operator > > > *Explain second query:* > hive (u_wsd)> explain insert overwrite table t_sd_ucm_cominfo_incremental > partition (ds=20151201) > > select t1.uin,t1.clientip from > > (select uin,clientip from t_sd_ucm_cominfo_FinalResult where > ds=20151201) t1 > > left outer join (select uin,clientip from > t_sd_ucm_cominfo_FinalResult where ds=20151130) t2 > > on t1.uin=t2.uin > > where t2.clientip is NULL; > OK > STAGE DEPENDENCIES: > Stage-1 is a root stage > Stage-0 depends on stages: Stage-1 > Stage-2 depends on stages: Stage-0 > > STAGE PLANS: > Stage: Stage-1 > Spark > Edges: > Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 100), Map 3 > (PARTITION-LEVEL SORT, 100) > DagName: mqq_20151204103243_3eab6e6c-941e-476a-897f-cae97657063e:3 > Vertices: > Map 1 > Map Operator Tree: > TableScan > alias: t_sd_ucm_cominfo_finalresult > Statistics: Num rows: 108009 Data size: 2873665 Basic > stats: COMPLETE Column stats: NONE > Select Operator > expressions: uin (type: string), clientip (type: > string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 108009 Data size: 2873665 Basic > stats: COMPLETE Column stats: NONE > Reduce Output Operator > key expressions: _col0 (type: string) > sort order: + > Map-reduce partition columns: _col0 (type: string) > Statistics: Num rows: 108009 Data size: 2873665 > Basic stats: COMPLETE Column stats: NONE > value expressions: _col1 (type: string) > Map 3 > Map Operator Tree: > TableScan > alias: t_sd_ucm_cominfo_finalresult > Statistics: Num rows: 590130 Data size: 118026051 Basic > stats: COMPLETE Column stats: NONE > Select Operator > expressions: uin (type: string), clientip (type: > string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 590130 Data size: 118026051 > Basic stats: COMPLETE Column stats: NONE > Reduce Output Operator > key expressions: _col0 (type: string) > sort order: + > Map-reduce partition columns: _col0 (type: string) > Statistics: Num rows: 590130 Data size: 118026051 > Basic stats: COMPLETE Column stats: NONE > value expressions: _col1 (type: string) > Reducer 2 > Reduce Operator Tree: > Join Operator > condition map: > Left Outer Join0 to 1 > keys: > 0 _col0 (type: string) > 1 _col0 (type: string) > outputColumnNames: _col0, _col1, _col3 > Statistics: Num rows: 649143 Data size: 129828658 Basic > stats: COMPLETE Column stats: NONE > Filter Operator > predicate: _col3 is null (type: boolean) > Statistics: Num rows: 324571 Data size: 64914228 Basic > stats: COMPLETE Column stats: NONE > Select Operator > expressions: _col0 (type: string), _col1 (type: string) > outputColumnNames: _col0, _col1 > Statistics: Num rows: 324571 Data size: 64914228 Basic > stats: COMPLETE Column stats: NONE > File Output Operator > compressed: false > Statistics: Num rows: 324571 Data size: 64914228 > Basic stats: COMPLETE Column stats: NONE > table: > input format: > org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: u_wsd.t_sd_ucm_cominfo_incremental > > Stage: Stage-0 > Move Operator > tables: > partition: > ds 20151201 > replace: true > table: > input format: org.apache.hadoop.mapred.TextInputFormat > output format: > org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat > serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe > name: u_wsd.t_sd_ucm_cominfo_incremental > > Stage: Stage-2 > Stats-Aggr Operator > > *Thanks.* > > 2015-12-03 22:17 GMT+08:00 Xuefu Zhang <[email protected]>: > >> Can you also attach explain query result? What's your data format? >> >> --Xuefu >> >> On Thu, Dec 3, 2015 at 12:09 AM, Jone Zhang <[email protected]> >> wrote: >> >>> Hive1.2.1 on Spark1.4.1 >>> >>> *The first query is:* >>> set mapred.reduce.tasks=100; >>> use u_wsd; >>> insert overwrite table t_sd_ucm_cominfo_incremental partition (ds= >>> 20151202) >>> select t1.uin,t1.clientip from >>> (select uin,clientip from t_sd_ucm_cominfo_FinalResult where ds=20151202) >>> t1 >>> left outer join (select uin,clientip from t_sd_ucm_cominfo_FinalResult >>> where ds=20151201) t2 >>> on t1.uin=t2.uin >>> where t2.clientip is NULL; >>> >>> *The second query is:* >>> set mapred.reduce.tasks=100; >>> use u_wsd; >>> insert overwrite table t_sd_ucm_cominfo_incremental partition (ds= >>> 20151201) >>> select t1.uin,t1.clientip from >>> (select uin,clientip from t_sd_ucm_cominfo_FinalResult where ds=20151201) >>> t1 >>> left outer join (select uin,clientip from t_sd_ucm_cominfo_FinalResult >>> where ds=20151130) t2 >>> on t1.uin=t2.uin >>> where t2.clientip is NULL; >>> >>> *The attachment show the two query's stages.* >>> *Here is the partition info* >>> 104.3 M >>> /user/hive/warehouse/u_wsd.db/t_sd_ucm_cominfo_finalresult/ds=20151202 >>> 110.0 M >>> /user/hive/warehouse/u_wsd.db/t_sd_ucm_cominfo_finalresult/ds=20151201 >>> 112.6 M >>> /user/hive/warehouse/u_wsd.db/t_sd_ucm_cominfo_finalresult/ds=20151130 >>> >>> >>> >>> *Why there are two different stages?* >>> *The stage1 in first query is very slowly.* >>> >>> *Thanks.* >>> *Best wishes.* >>> >> >> >
