HIVE-13300 : Hive on spark throws exception for multi-insert with join (Szehon, reviewed by Xuefu and Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/219d3527 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/219d3527 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/219d3527 Branch: refs/heads/llap Commit: 219d3527cfac09045f0ac247821746e7c95dcb8c Parents: e665f02 Author: Szehon Ho <sze...@cloudera.com> Authored: Thu Mar 24 11:08:04 2016 -0700 Committer: Szehon Ho <sze...@cloudera.com> Committed: Thu Mar 24 11:09:10 2016 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../ql/exec/spark/SparkReduceRecordHandler.java | 2 + .../clientpositive/multi_insert_with_join.q | 29 +++++ .../clientpositive/multi_insert_with_join.q.out | 128 +++++++++++++++++++ .../spark/multi_insert_with_join.q.out | 128 +++++++++++++++++++ 5 files changed, 288 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 232e262..f8e8bda 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1006,6 +1006,7 @@ spark.query.files=add_part_multiple.q, \ multi_insert_lateral_view.q, \ multi_insert_mixed.q, \ multi_insert_move_tasks_share_dependencies.q, \ + multi_insert_with_join.q, \ multi_join_union.q, \ multi_join_union_src.q, \ multigroupby_singlemr.q, \ http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 439e0df..0d31e5f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -230,8 +230,10 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { if (isTagged) { // remove the tag from key coming out of reducer // and store it in separate variable. + // make a copy for multi-insert with join case as Spark re-uses input key from same parent int size = keyWritable.getSize() - 1; tag = keyWritable.get()[size]; + keyWritable = new BytesWritable(keyWritable.getBytes(), size); keyWritable.setSize(size); } http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/queries/clientpositive/multi_insert_with_join.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/multi_insert_with_join.q b/ql/src/test/queries/clientpositive/multi_insert_with_join.q new file mode 100644 index 0000000..862dd9f --- /dev/null +++ b/ql/src/test/queries/clientpositive/multi_insert_with_join.q @@ -0,0 +1,29 @@ +set hive.auto.convert.join=false; + +drop table if exists status_updates; +drop table if exists profiles; +drop table if exists school_summary; +drop table if exists gender_summary; + +create table status_updates(userid int,status string,ds string); +create table profiles(userid int,school string,gender int); +create table school_summary(school string,cnt int) partitioned by (ds string); +create table gender_summary(gender int, cnt int) partitioned by (ds string); + +insert into status_updates values (1, "status_1", "2009-03-20"); +insert into profiles values (1, "school_1", 0); + +FROM (SELECT a.status, b.school, b.gender +FROM status_updates a JOIN profiles b +ON (a.userid = b.userid and +a.ds='2009-03-20' ) +) subq1 +INSERT OVERWRITE TABLE gender_summary +PARTITION(ds='2009-03-20') +SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender +INSERT OVERWRITE TABLE school_summary +PARTITION(ds='2009-03-20') +SELECT subq1.school, COUNT(1) GROUP BY subq1.school; + +select * from school_summary; +select * from gender_summary; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/results/clientpositive/multi_insert_with_join.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/multi_insert_with_join.q.out b/ql/src/test/results/clientpositive/multi_insert_with_join.q.out new file mode 100644 index 0000000..28bce84 --- /dev/null +++ b/ql/src/test/results/clientpositive/multi_insert_with_join.q.out @@ -0,0 +1,128 @@ +PREHOOK: query: drop table if exists status_updates +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists status_updates +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists profiles +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists profiles +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists school_summary +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists school_summary +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists gender_summary +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists gender_summary +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table status_updates(userid int,status string,ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@status_updates +POSTHOOK: query: create table status_updates(userid int,status string,ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@status_updates +PREHOOK: query: create table profiles(userid int,school string,gender int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@profiles +POSTHOOK: query: create table profiles(userid int,school string,gender int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@profiles +PREHOOK: query: create table school_summary(school string,cnt int) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@school_summary +POSTHOOK: query: create table school_summary(school string,cnt int) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@school_summary +PREHOOK: query: create table gender_summary(gender int, cnt int) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@gender_summary +POSTHOOK: query: create table gender_summary(gender int, cnt int) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@gender_summary +PREHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@status_updates +POSTHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@status_updates +POSTHOOK: Lineage: status_updates.ds SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: status_updates.status SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: status_updates.userid EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into profiles values (1, "school_1", 0) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@profiles +POSTHOOK: query: insert into profiles values (1, "school_1", 0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@profiles +POSTHOOK: Lineage: profiles.gender EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: profiles.school SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: profiles.userid EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: FROM (SELECT a.status, b.school, b.gender +FROM status_updates a JOIN profiles b +ON (a.userid = b.userid and +a.ds='2009-03-20' ) +) subq1 +INSERT OVERWRITE TABLE gender_summary +PARTITION(ds='2009-03-20') +SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender +INSERT OVERWRITE TABLE school_summary +PARTITION(ds='2009-03-20') +SELECT subq1.school, COUNT(1) GROUP BY subq1.school +PREHOOK: type: QUERY +PREHOOK: Input: default@profiles +PREHOOK: Input: default@status_updates +PREHOOK: Output: default@gender_summary@ds=2009-03-20 +PREHOOK: Output: default@school_summary@ds=2009-03-20 +POSTHOOK: query: FROM (SELECT a.status, b.school, b.gender +FROM status_updates a JOIN profiles b +ON (a.userid = b.userid and +a.ds='2009-03-20' ) +) subq1 +INSERT OVERWRITE TABLE gender_summary +PARTITION(ds='2009-03-20') +SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender +INSERT OVERWRITE TABLE school_summary +PARTITION(ds='2009-03-20') +SELECT subq1.school, COUNT(1) GROUP BY subq1.school +POSTHOOK: type: QUERY +POSTHOOK: Input: default@profiles +POSTHOOK: Input: default@status_updates +POSTHOOK: Output: default@gender_summary@ds=2009-03-20 +POSTHOOK: Output: default@school_summary@ds=2009-03-20 +POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).cnt EXPRESSION [(status_updates)a.null, (profiles)b.null, ] +POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).gender SIMPLE [(profiles)b.FieldSchema(name:gender, type:int, comment:null), ] +POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).cnt EXPRESSION [(status_updates)a.null, (profiles)b.null, ] +POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).school SIMPLE [(profiles)b.FieldSchema(name:school, type:string, comment:null), ] +PREHOOK: query: select * from school_summary +PREHOOK: type: QUERY +PREHOOK: Input: default@school_summary +PREHOOK: Input: default@school_summary@ds=2009-03-20 +#### A masked pattern was here #### +POSTHOOK: query: select * from school_summary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@school_summary +POSTHOOK: Input: default@school_summary@ds=2009-03-20 +#### A masked pattern was here #### +school_1 1 2009-03-20 +PREHOOK: query: select * from gender_summary +PREHOOK: type: QUERY +PREHOOK: Input: default@gender_summary +PREHOOK: Input: default@gender_summary@ds=2009-03-20 +#### A masked pattern was here #### +POSTHOOK: query: select * from gender_summary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@gender_summary +POSTHOOK: Input: default@gender_summary@ds=2009-03-20 +#### A masked pattern was here #### +0 1 2009-03-20 http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out b/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out new file mode 100644 index 0000000..28bce84 --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out @@ -0,0 +1,128 @@ +PREHOOK: query: drop table if exists status_updates +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists status_updates +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists profiles +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists profiles +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists school_summary +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists school_summary +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists gender_summary +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists gender_summary +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table status_updates(userid int,status string,ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@status_updates +POSTHOOK: query: create table status_updates(userid int,status string,ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@status_updates +PREHOOK: query: create table profiles(userid int,school string,gender int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@profiles +POSTHOOK: query: create table profiles(userid int,school string,gender int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@profiles +PREHOOK: query: create table school_summary(school string,cnt int) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@school_summary +POSTHOOK: query: create table school_summary(school string,cnt int) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@school_summary +PREHOOK: query: create table gender_summary(gender int, cnt int) partitioned by (ds string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@gender_summary +POSTHOOK: query: create table gender_summary(gender int, cnt int) partitioned by (ds string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@gender_summary +PREHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20") +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__1 +PREHOOK: Output: default@status_updates +POSTHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20") +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__1 +POSTHOOK: Output: default@status_updates +POSTHOOK: Lineage: status_updates.ds SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: status_updates.status SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: status_updates.userid EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into profiles values (1, "school_1", 0) +PREHOOK: type: QUERY +PREHOOK: Input: default@values__tmp__table__2 +PREHOOK: Output: default@profiles +POSTHOOK: query: insert into profiles values (1, "school_1", 0) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@values__tmp__table__2 +POSTHOOK: Output: default@profiles +POSTHOOK: Lineage: profiles.gender EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: profiles.school SIMPLE [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: profiles.userid EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: FROM (SELECT a.status, b.school, b.gender +FROM status_updates a JOIN profiles b +ON (a.userid = b.userid and +a.ds='2009-03-20' ) +) subq1 +INSERT OVERWRITE TABLE gender_summary +PARTITION(ds='2009-03-20') +SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender +INSERT OVERWRITE TABLE school_summary +PARTITION(ds='2009-03-20') +SELECT subq1.school, COUNT(1) GROUP BY subq1.school +PREHOOK: type: QUERY +PREHOOK: Input: default@profiles +PREHOOK: Input: default@status_updates +PREHOOK: Output: default@gender_summary@ds=2009-03-20 +PREHOOK: Output: default@school_summary@ds=2009-03-20 +POSTHOOK: query: FROM (SELECT a.status, b.school, b.gender +FROM status_updates a JOIN profiles b +ON (a.userid = b.userid and +a.ds='2009-03-20' ) +) subq1 +INSERT OVERWRITE TABLE gender_summary +PARTITION(ds='2009-03-20') +SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender +INSERT OVERWRITE TABLE school_summary +PARTITION(ds='2009-03-20') +SELECT subq1.school, COUNT(1) GROUP BY subq1.school +POSTHOOK: type: QUERY +POSTHOOK: Input: default@profiles +POSTHOOK: Input: default@status_updates +POSTHOOK: Output: default@gender_summary@ds=2009-03-20 +POSTHOOK: Output: default@school_summary@ds=2009-03-20 +POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).cnt EXPRESSION [(status_updates)a.null, (profiles)b.null, ] +POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).gender SIMPLE [(profiles)b.FieldSchema(name:gender, type:int, comment:null), ] +POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).cnt EXPRESSION [(status_updates)a.null, (profiles)b.null, ] +POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).school SIMPLE [(profiles)b.FieldSchema(name:school, type:string, comment:null), ] +PREHOOK: query: select * from school_summary +PREHOOK: type: QUERY +PREHOOK: Input: default@school_summary +PREHOOK: Input: default@school_summary@ds=2009-03-20 +#### A masked pattern was here #### +POSTHOOK: query: select * from school_summary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@school_summary +POSTHOOK: Input: default@school_summary@ds=2009-03-20 +#### A masked pattern was here #### +school_1 1 2009-03-20 +PREHOOK: query: select * from gender_summary +PREHOOK: type: QUERY +PREHOOK: Input: default@gender_summary +PREHOOK: Input: default@gender_summary@ds=2009-03-20 +#### A masked pattern was here #### +POSTHOOK: query: select * from gender_summary +POSTHOOK: type: QUERY +POSTHOOK: Input: default@gender_summary +POSTHOOK: Input: default@gender_summary@ds=2009-03-20 +#### A masked pattern was here #### +0 1 2009-03-20