[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Li updated HIVE-16600: -- Resolution: Fixed Fix Version/s: 3.0.0 Status: Resolved (was: Patch Available) Pushed to master. Thanks [~kellyzly] for the contribution! > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Fix For: 3.0.0 > > Attachments: HIVE-16600.10.patch, HIVE-16600.11.patch, > HIVE-16600.12.patch, HIVE-16600.13.patch, HIVE-16600.1.patch, > HIVE-16600.2.patch, HIVE-16600.3.patch, HIVE-16600.4.patch, > HIVE-16600.5.patch, HIVE-16600.6.patch, HIVE-16600.7.patch, > HIVE-16600.8.patch, HIVE-16600.9.patch, mr.explain, > mr.explain.log.HIVE-16600, Node.java, > TestSetSparkReduceParallelism_MultiInsertCase.java > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.13.patch [~lirui]: thanks for review. update HIVE-16600.13.patch here and on RB according to last round of review. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.10.patch, HIVE-16600.11.patch, > HIVE-16600.12.patch, HIVE-16600.13.patch, HIVE-16600.1.patch, > HIVE-16600.2.patch, HIVE-16600.3.patch, HIVE-16600.4.patch, > HIVE-16600.5.patch, HIVE-16600.6.patch, HIVE-16600.7.patch, > HIVE-16600.8.patch, HIVE-16600.9.patch, mr.explain, > mr.explain.log.HIVE-16600, Node.java, > TestSetSparkReduceParallelism_MultiInsertCase.java > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.12.patch [~lirui]: fix comments from RB in HIVE-16600.12, help review. 1 thing need to be confirmed current following case will be an order by case, but actually it is not. Do you want add this or you will modify the output of it after HIVE-6348 is committed? {code} from (select key,value from src order by key) a insert overwrite table e1 select key,value limit 10 insert overwrite table e2 select key; {code} > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.10.patch, HIVE-16600.11.patch, > HIVE-16600.12.patch, HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, > HIVE-16600.6.patch, HIVE-16600.7.patch, HIVE-16600.8.patch, > HIVE-16600.9.patch, mr.explain, mr.explain.log.HIVE-16600, Node.java, > TestSetSparkReduceParallelism_MultiInsertCase.java > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.11.patch [~lirui]: changes in HIVE-16600.11.patch 1. check all branches to verify whether there is LIMIT after RS, once condition matches, return true( this is an order by limit case), If TermialOperator is encountered, do not search its children any more. 2. add a new unit test like "select * from A order by columnB limit N" 3. remove case like following as after HIVE-6348, the physical plan of following query will be: query: {code} FROM (select key,value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key INSERT OVERWRITE TABLE e2 SELECT key limit xx; {code} physical plan: {code} TS[0]-SEL[1]-FS[3] -LIM[5]-RS[6]-SEL[7]-LIM[8]-FS[9] {code} we judge above case in the same way we do in "select * from A order by columnB limit N)" > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.10.patch, HIVE-16600.11.patch, > HIVE-16600.1.patch, HIVE-16600.2.patch, HIVE-16600.3.patch, > HIVE-16600.4.patch, HIVE-16600.5.patch, HIVE-16600.6.patch, > HIVE-16600.7.patch, HIVE-16600.8.patch, HIVE-16600.9.patch, mr.explain, > mr.explain.log.HIVE-16600, Node.java, > TestSetSparkReduceParallelism_MultiInsertCase.java > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.10.patch TestSetSparkReduceParallelism_MultiInsertCase.java Node.java > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.10.patch, HIVE-16600.1.patch, > HIVE-16600.2.patch, HIVE-16600.3.patch, HIVE-16600.4.patch, > HIVE-16600.5.patch, HIVE-16600.6.patch, HIVE-16600.7.patch, > HIVE-16600.8.patch, HIVE-16600.9.patch, mr.explain, > mr.explain.log.HIVE-16600, Node.java, > TestSetSparkReduceParallelism_MultiInsertCase.java > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.9.patch > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, > HIVE-16600.6.patch, HIVE-16600.7.patch, HIVE-16600.8.patch, > HIVE-16600.9.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.8.patch [~lirui] and [~xuefuz]: thanks for your suggestion about multi-insert. I update the code of judge multiInsert case like following in HIVE-16600.8.patch. In HIVE-16600.8.patch, if there are more than 1 path from current RS to FS in the operator tree, it is considered as a multi-insert case. Do you think it is ok? {code} // the multi insert case is like // TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] // -SEL[6]-LIM[7]-RS[8]-SEL[9]-LIM[10]-FS[11] // verify Multi Insert case: if there are more than 1 path from RS(RS[2]) to FS in the operator tree, it is a multi-insert // case private boolean isMultiInsert(ReduceSinkOperator rs) { int pathToFSNum = 0; DequechildQueue = new LinkedList<>(); childQueue.addAll(rs.getChildOperators()); while (!childQueue.isEmpty()) { Operator child = childQueue.pop(); if (child instanceof FileSinkOperator) { pathToFSNum = pathToFSNum + 1; } else { childQueue.addAll(child.getChildOperators()); } } boolean isMultiInsert = pathToFSNum > 1 ? true : false; LOG.debug("reducesink:" + rs + " isMultiInsert:" + isMultiInsert); return isMultiInsert; } {code} > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, > HIVE-16600.6.patch, HIVE-16600.7.patch, HIVE-16600.8.patch, mr.explain, > mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.7.patch [~lirui]: thanks for your review. update HIVE-16600.7.patch. Changes: add two test case, 1. order by limit + multi-insert {code} FROM (select key,value from src order by key limit 10) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key; {code} we should disable parallel order by in this case, which matches expectation {code} Stage-2 is a root stage [MAPRED] Stage-0 depends on stages: Stage-2 [MOVE] Stage-3 depends on stages: Stage-0 [STATS] Stage-1 depends on stages: Stage-2 [MOVE] Stage-4 depends on stages: Stage-1 [STATS] STAGE PLANS: Stage: Stage-2 Spark Edges: Reducer 2 <- Map 1 (SORT, 1) DagName: root_20170519161033_283bdbba-0f73-4db1-afba-4da456e3cc55:1 Vertices: Map 1 Map Operator Tree: TableScan alias: src Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE GatherStats: false Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) null sort order: a sort order: + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE tag: -1 TopN: 10 TopN Hash Memory Usage: 0.1 value expressions: _col1 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/user/hive/warehouse/src [src] Path -> Partition: hdfs://bdpe41:8020/user/hive/warehouse/src Partition base file name: src input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src name: default.src Truncated Path -> Alias: /src [src] Reducer 2 Needs Tagging: false Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.6.patch [~lirui]: help review HIVE-16600.5.patch bq.Please add more tests. E.g. add limit to the sub-query, add order by to inserts, etc. I add following test, is this what you mean "add order by to inserts"? {code} FROM (select key,value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key order by key limit 10; {code} If yes, it is very interesting that the explain is following, it seems that the part of e1 is deleted in this case. When you execute the query, you will find there is no result in e1 in this case. {code} STAGE DEPENDENCIES: Stage-2 is a root stage Stage-1 depends on stages: Stage-2 Stage-3 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-2 Spark Edges: Reducer 2 <- Map 1 (SORT, 1) A masked pattern was here Vertices: Map 1 Map Operator Tree: TableScan alias: src Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) sort order: + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE TopN Hash Memory Usage: 0.1 Reducer 2 Reduce Operator Tree: Select Operator expressions: KEY.reducesinkkey0 (type: string) outputColumnNames: _col0 Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Limit Number of rows: 10 Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 Stage: Stage-1 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 Stage: Stage-3 Stats-Aggr Operator PREHOOK: query: FROM (select key,value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 compressed: false Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 Stage: Stage-1 Move Operator tables: replace: true table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.e2 Stage: Stage-3 Stats-Aggr Operator {code} > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, > HIVE-16600.6.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: (was: HIVE-16600.5.patch) > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, mr.explain, > mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.5.patch > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, mr.explain, > mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.5.patch [~lirui]: modification in HIVE-16600.5.patch 1. change the logic of SetSparkReducerParallelism#needSetParallelism. return true if it is a multi_insert(contain limit) case 2. remove test case "orderby+multi_insert case(contains limit) " in multi_insert_parallel_orderby.q as the result of ""orderby+multi_insert case(contains limit) " may be different in different runtime. {code} FROM (select key,value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key limit 10; {code} the result in table e2 may be different in different runtime. This is because It returns 10 keys from src order by key *not top 10 keys*. The behavior is same with mr. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, HIVE-16600.4.patch, HIVE-16600.5.patch, mr.explain, > mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.4.patch [~lirui]: update HIVE-16600.4.patch changes: 1. add {{-- SORT_QUERY_RESULTS}} to the test About the question you mentioned bq.But why we disable parallel order by when there's a limit is to avoid an extra stage (see this [comment|https://issues.apache.org/jira/browse/HIVE-10458?focusedCommentId=14539299=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14539299]). If the extra stage is needed anyway, then it makes no sense to disable parallel order by. I guess [~xuefuz] means the common order by+limit case like following {code} set hive.mapred.mode=nonstrict; set hive.exec.reducers.bytes.per.reducer=256; set hive.optimize.sampling.orderby=true; select key, value from src order by key limit 10; {code} the plan is: {code} STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (SORT, 1) DagName: root_20170515101633_2093e4cb-a060-452c-aa21-201132ea0819:1 Vertices: Map 1 Map Operator Tree: TableScan alias: src Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE GatherStats: false Select Operator expressions: key (type: string), value (type: string) outputColumnNames: _col0, _col1 Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator key expressions: _col0 (type: string) null sort order: a sort order: + Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE tag: -1 TopN: 10 TopN Hash Memory Usage: 0.1 value expressions: _col1 (type: string) auto parallelism: false Path -> Alias: hdfs://bdpe41:8020/user/hive/warehouse/src [src] Path -> Partition: hdfs://bdpe41:8020/user/hive/warehouse/src Partition base file name: src input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat properties: bucket_count -1 column.name.delimiter , columns key,value columns.comments 'default','default' columns.types string:string file.inputformat org.apache.hadoop.mapred.TextInputFormat file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat location hdfs://bdpe41:8020/user/hive/warehouse/src name default.src numFiles 1 numRows 0 rawDataSize 0 serialization.ddl struct src { string key, string value} serialization.format 1 serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe totalSize 5812 transient_lastDdlTime 1493960133 serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.src name: default.src Truncated Path -> Alias: /src [src] Reducer 2
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: mr.explain [~lirui]: attach mr.explain is the total plan of {code} set hive.mapred.mode=nonstrict; set hive.exec.reducers.bytes.per.reducer=256; set hive.optimize.sampling.orderby=true; drop table if exists e1; drop table if exists e2; create table e1 (key string, value string); create table e2 (key string); FROM (select key, cast(key as double) as keyD, value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key limit 10; {code} It seems that mr also has extra shuffle stage(Stage-4 in mr.explain) for multi-insert case. If i am wrong,please tell me. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, mr.explain, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.3.patch > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > HIVE-16600.3.patch, mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: mr.explain.log.HIVE-16600 {code} create table e1 (key string, value string); create table e2 (key string); FROM (select key, cast(key as double) as keyD, value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key limit 10; {code} the attached mr.explain.log.HIVE-16600 shows there is only 1 reduce to execute the multiple insert case. This is different from what shows in spark. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch, > mr.explain.log.HIVE-16600 > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.2.patch [~lirui]: add new test file "multi_insert_parallel_order.q" to test the parallel order by +multi insert case. Please help review. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch, HIVE-16600.2.patch > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.1.patch [~lirui]: I update HIVE-16600.1.patch. help review {noformat} in the case i provided in description, after HIVE-16600.1.patch, the parallelism of RS[2] (Sort)is 46. without HIVE-16600.1.patch, the parallelism of RS[2](Sort) is 1. before HIVE-16600.1.patch the parallelism of RS[2] is 1 #grep SetSparkReducerParallelism logs/hive.log 2017-05-08T10:31:10,820 INFO [63ddd225-f012-4b14-9141-38597f94c85b main] spark.SetSparkReducerParallelism: Number of reducers determined to be: 1 after HIVE-16600.1.patch the parallelism of RS[2] is 46 #grep SetSparkReducerParallelism logs/hive.log 2017-05-08T10:22:49,432 DEBUG [42c701ac-380e-43e3-a3ab-f5aa7c2b55ee main] spark.SetSparkReducerParallelism: Sibling RS[2] has stats: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE 2017-05-08T10:23:10,403 INFO [42c701ac-380e-43e3-a3ab-f5aa7c2b55ee main] spark.SetSparkReducerParallelism: Set parallelism for reduce sink RS[2] to: 46 (calculated) {noformat} > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.1.patch > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: (was: HIVE-16600.patch) > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Description: multi_insert_gby.case.q {code} set hive.exec.reducers.bytes.per.reducer=256; set hive.optimize.sampling.orderby=true; drop table if exists e1; drop table if exists e2; create table e1 (key string, value string); create table e2 (key string); FROM (select key, cast(key as double) as keyD, value from src order by key) a INSERT OVERWRITE TABLE e1 SELECT key, value INSERT OVERWRITE TABLE e2 SELECT key; select * from e1; select * from e2; {code} the parallelism of Sort is 1 even we enable parallel order by("hive.optimize.sampling.orderby" is set as "true"). This is not reasonable because the parallelism should be calcuated by [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] this is because SetSparkReducerParallelism#needSetParallelism returns false when [children size of RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] is greater than 1. in this case, the children size of {{RS[2]}} is two. the logical plan of the case {code} TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] -SEL[6]-FS[7] {code} was:in multi_insert cases multi_insert_gby2.q, the parallelism of SORT operator is 1 even we set "hive.optimize.sampling.orderby" = true. This is because the logic of SetSparkReducerParallelism#needSetParallelism does not support this case. > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.patch > > > multi_insert_gby.case.q > {code} > set hive.exec.reducers.bytes.per.reducer=256; > set hive.optimize.sampling.orderby=true; > drop table if exists e1; > drop table if exists e2; > create table e1 (key string, value string); > create table e2 (key string); > FROM (select key, cast(key as double) as keyD, value from src order by key) a > INSERT OVERWRITE TABLE e1 > SELECT key, value > INSERT OVERWRITE TABLE e2 > SELECT key; > select * from e1; > select * from e2; > {code} > the parallelism of Sort is 1 even we enable parallel order > by("hive.optimize.sampling.orderby" is set as "true"). This is not > reasonable because the parallelism should be calcuated by > [Utilities.estimateReducers|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L170] > this is because SetSparkReducerParallelism#needSetParallelism returns false > when [children size of > RS|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java#L207] > is greater than 1. > in this case, the children size of {{RS[2]}} is two. > the logical plan of the case > {code} >TS[0]-SEL[1]-RS[2]-SEL[3]-SEL[4]-FS[5] > -SEL[6]-FS[7] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Status: Patch Available (was: Open) > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.patch > > > in multi_insert cases multi_insert_gby2.q, the parallelism of SORT operator > is 1 even we set "hive.optimize.sampling.orderby" = true. This is because > the logic of SetSparkReducerParallelism#needSetParallelism does not support > this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (HIVE-16600) Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel order by in multi_insert cases
[ https://issues.apache.org/jira/browse/HIVE-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] liyunzhang_intel updated HIVE-16600: Attachment: HIVE-16600.patch > Refactor SetSparkReducerParallelism#needSetParallelism to enable parallel > order by in multi_insert cases > > > Key: HIVE-16600 > URL: https://issues.apache.org/jira/browse/HIVE-16600 > Project: Hive > Issue Type: Sub-task >Reporter: liyunzhang_intel >Assignee: liyunzhang_intel > Attachments: HIVE-16600.patch > > > in multi_insert cases multi_insert_gby2.q, the parallelism of SORT operator > is 1 even we set "hive.optimize.sampling.orderby" = true. This is because > the logic of SetSparkReducerParallelism#needSetParallelism does not support > this case. -- This message was sent by Atlassian JIRA (v6.3.15#6346)