HIVE-13300 : Hive on spark throws exception for multi-insert with join (Szehon, 
reviewed by Xuefu and Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/219d3527
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/219d3527
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/219d3527

Branch: refs/heads/llap
Commit: 219d3527cfac09045f0ac247821746e7c95dcb8c
Parents: e665f02
Author: Szehon Ho <sze...@cloudera.com>
Authored: Thu Mar 24 11:08:04 2016 -0700
Committer: Szehon Ho <sze...@cloudera.com>
Committed: Thu Mar 24 11:09:10 2016 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../ql/exec/spark/SparkReduceRecordHandler.java |   2 +
 .../clientpositive/multi_insert_with_join.q     |  29 +++++
 .../clientpositive/multi_insert_with_join.q.out | 128 +++++++++++++++++++
 .../spark/multi_insert_with_join.q.out          | 128 +++++++++++++++++++
 5 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 232e262..f8e8bda 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1006,6 +1006,7 @@ spark.query.files=add_part_multiple.q, \
   multi_insert_lateral_view.q, \
   multi_insert_mixed.q, \
   multi_insert_move_tasks_share_dependencies.q, \
+  multi_insert_with_join.q, \
   multi_join_union.q, \
   multi_join_union_src.q, \
   multigroupby_singlemr.q, \

http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 439e0df..0d31e5f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -230,8 +230,10 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
       if (isTagged) {
         // remove the tag from key coming out of reducer
         // and store it in separate variable.
+        // make a copy for multi-insert with join case as Spark re-uses input 
key from same parent
         int size = keyWritable.getSize() - 1;
         tag = keyWritable.get()[size];
+        keyWritable = new BytesWritable(keyWritable.getBytes(), size);
         keyWritable.setSize(size);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/queries/clientpositive/multi_insert_with_join.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/multi_insert_with_join.q 
b/ql/src/test/queries/clientpositive/multi_insert_with_join.q
new file mode 100644
index 0000000..862dd9f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/multi_insert_with_join.q
@@ -0,0 +1,29 @@
+set hive.auto.convert.join=false;
+
+drop table if exists status_updates;
+drop table if exists profiles;
+drop table if exists school_summary;
+drop table if exists gender_summary;
+
+create table status_updates(userid int,status string,ds string);
+create table profiles(userid int,school string,gender int);
+create table school_summary(school string,cnt int) partitioned by (ds string);
+create table gender_summary(gender int, cnt int) partitioned by (ds string);
+
+insert into status_updates values (1, "status_1", "2009-03-20");
+insert into profiles values (1, "school_1", 0);
+
+FROM (SELECT a.status, b.school, b.gender
+FROM status_updates a JOIN profiles b
+ON (a.userid = b.userid and
+a.ds='2009-03-20' )
+) subq1
+INSERT OVERWRITE TABLE gender_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
+INSERT OVERWRITE TABLE school_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.school, COUNT(1) GROUP BY subq1.school;
+
+select * from school_summary;
+select * from gender_summary;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/results/clientpositive/multi_insert_with_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/multi_insert_with_join.q.out 
b/ql/src/test/results/clientpositive/multi_insert_with_join.q.out
new file mode 100644
index 0000000..28bce84
--- /dev/null
+++ b/ql/src/test/results/clientpositive/multi_insert_with_join.q.out
@@ -0,0 +1,128 @@
+PREHOOK: query: drop table if exists status_updates
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists status_updates
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists profiles
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists profiles
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists school_summary
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists school_summary
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists gender_summary
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists gender_summary
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table status_updates(userid int,status string,ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@status_updates
+POSTHOOK: query: create table status_updates(userid int,status string,ds 
string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@status_updates
+PREHOOK: query: create table profiles(userid int,school string,gender int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@profiles
+POSTHOOK: query: create table profiles(userid int,school string,gender int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@profiles
+PREHOOK: query: create table school_summary(school string,cnt int) partitioned 
by (ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@school_summary
+POSTHOOK: query: create table school_summary(school string,cnt int) 
partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@school_summary
+PREHOOK: query: create table gender_summary(gender int, cnt int) partitioned 
by (ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@gender_summary
+POSTHOOK: query: create table gender_summary(gender int, cnt int) partitioned 
by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@gender_summary
+PREHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20")
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@status_updates
+POSTHOOK: query: insert into status_updates values (1, "status_1", 
"2009-03-20")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@status_updates
+POSTHOOK: Lineage: status_updates.ds SIMPLE 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, 
type:string, comment:), ]
+POSTHOOK: Lineage: status_updates.status SIMPLE 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+POSTHOOK: Lineage: status_updates.userid EXPRESSION 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+PREHOOK: query: insert into profiles values (1, "school_1", 0)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@profiles
+POSTHOOK: query: insert into profiles values (1, "school_1", 0)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@profiles
+POSTHOOK: Lineage: profiles.gender EXPRESSION 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, 
type:string, comment:), ]
+POSTHOOK: Lineage: profiles.school SIMPLE 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+POSTHOOK: Lineage: profiles.userid EXPRESSION 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+PREHOOK: query: FROM (SELECT a.status, b.school, b.gender
+FROM status_updates a JOIN profiles b
+ON (a.userid = b.userid and
+a.ds='2009-03-20' )
+) subq1
+INSERT OVERWRITE TABLE gender_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
+INSERT OVERWRITE TABLE school_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.school, COUNT(1) GROUP BY subq1.school
+PREHOOK: type: QUERY
+PREHOOK: Input: default@profiles
+PREHOOK: Input: default@status_updates
+PREHOOK: Output: default@gender_summary@ds=2009-03-20
+PREHOOK: Output: default@school_summary@ds=2009-03-20
+POSTHOOK: query: FROM (SELECT a.status, b.school, b.gender
+FROM status_updates a JOIN profiles b
+ON (a.userid = b.userid and
+a.ds='2009-03-20' )
+) subq1
+INSERT OVERWRITE TABLE gender_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
+INSERT OVERWRITE TABLE school_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.school, COUNT(1) GROUP BY subq1.school
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@profiles
+POSTHOOK: Input: default@status_updates
+POSTHOOK: Output: default@gender_summary@ds=2009-03-20
+POSTHOOK: Output: default@school_summary@ds=2009-03-20
+POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).cnt EXPRESSION 
[(status_updates)a.null, (profiles)b.null, ]
+POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).gender SIMPLE 
[(profiles)b.FieldSchema(name:gender, type:int, comment:null), ]
+POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).cnt EXPRESSION 
[(status_updates)a.null, (profiles)b.null, ]
+POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).school SIMPLE 
[(profiles)b.FieldSchema(name:school, type:string, comment:null), ]
+PREHOOK: query: select * from school_summary
+PREHOOK: type: QUERY
+PREHOOK: Input: default@school_summary
+PREHOOK: Input: default@school_summary@ds=2009-03-20
+#### A masked pattern was here ####
+POSTHOOK: query: select * from school_summary
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@school_summary
+POSTHOOK: Input: default@school_summary@ds=2009-03-20
+#### A masked pattern was here ####
+school_1       1       2009-03-20
+PREHOOK: query: select * from gender_summary
+PREHOOK: type: QUERY
+PREHOOK: Input: default@gender_summary
+PREHOOK: Input: default@gender_summary@ds=2009-03-20
+#### A masked pattern was here ####
+POSTHOOK: query: select * from gender_summary
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@gender_summary
+POSTHOOK: Input: default@gender_summary@ds=2009-03-20
+#### A masked pattern was here ####
+0      1       2009-03-20

http://git-wip-us.apache.org/repos/asf/hive/blob/219d3527/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out
----------------------------------------------------------------------
diff --git 
a/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out 
b/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out
new file mode 100644
index 0000000..28bce84
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/multi_insert_with_join.q.out
@@ -0,0 +1,128 @@
+PREHOOK: query: drop table if exists status_updates
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists status_updates
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists profiles
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists profiles
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists school_summary
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists school_summary
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists gender_summary
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists gender_summary
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table status_updates(userid int,status string,ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@status_updates
+POSTHOOK: query: create table status_updates(userid int,status string,ds 
string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@status_updates
+PREHOOK: query: create table profiles(userid int,school string,gender int)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@profiles
+POSTHOOK: query: create table profiles(userid int,school string,gender int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@profiles
+PREHOOK: query: create table school_summary(school string,cnt int) partitioned 
by (ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@school_summary
+POSTHOOK: query: create table school_summary(school string,cnt int) 
partitioned by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@school_summary
+PREHOOK: query: create table gender_summary(gender int, cnt int) partitioned 
by (ds string)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@gender_summary
+POSTHOOK: query: create table gender_summary(gender int, cnt int) partitioned 
by (ds string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@gender_summary
+PREHOOK: query: insert into status_updates values (1, "status_1", "2009-03-20")
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@status_updates
+POSTHOOK: query: insert into status_updates values (1, "status_1", 
"2009-03-20")
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@status_updates
+POSTHOOK: Lineage: status_updates.ds SIMPLE 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, 
type:string, comment:), ]
+POSTHOOK: Lineage: status_updates.status SIMPLE 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+POSTHOOK: Lineage: status_updates.userid EXPRESSION 
[(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+PREHOOK: query: insert into profiles values (1, "school_1", 0)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__2
+PREHOOK: Output: default@profiles
+POSTHOOK: query: insert into profiles values (1, "school_1", 0)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__2
+POSTHOOK: Output: default@profiles
+POSTHOOK: Lineage: profiles.gender EXPRESSION 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col3, 
type:string, comment:), ]
+POSTHOOK: Lineage: profiles.school SIMPLE 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col2, 
type:string, comment:), ]
+POSTHOOK: Lineage: profiles.userid EXPRESSION 
[(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, 
type:string, comment:), ]
+PREHOOK: query: FROM (SELECT a.status, b.school, b.gender
+FROM status_updates a JOIN profiles b
+ON (a.userid = b.userid and
+a.ds='2009-03-20' )
+) subq1
+INSERT OVERWRITE TABLE gender_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
+INSERT OVERWRITE TABLE school_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.school, COUNT(1) GROUP BY subq1.school
+PREHOOK: type: QUERY
+PREHOOK: Input: default@profiles
+PREHOOK: Input: default@status_updates
+PREHOOK: Output: default@gender_summary@ds=2009-03-20
+PREHOOK: Output: default@school_summary@ds=2009-03-20
+POSTHOOK: query: FROM (SELECT a.status, b.school, b.gender
+FROM status_updates a JOIN profiles b
+ON (a.userid = b.userid and
+a.ds='2009-03-20' )
+) subq1
+INSERT OVERWRITE TABLE gender_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
+INSERT OVERWRITE TABLE school_summary
+PARTITION(ds='2009-03-20')
+SELECT subq1.school, COUNT(1) GROUP BY subq1.school
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@profiles
+POSTHOOK: Input: default@status_updates
+POSTHOOK: Output: default@gender_summary@ds=2009-03-20
+POSTHOOK: Output: default@school_summary@ds=2009-03-20
+POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).cnt EXPRESSION 
[(status_updates)a.null, (profiles)b.null, ]
+POSTHOOK: Lineage: gender_summary PARTITION(ds=2009-03-20).gender SIMPLE 
[(profiles)b.FieldSchema(name:gender, type:int, comment:null), ]
+POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).cnt EXPRESSION 
[(status_updates)a.null, (profiles)b.null, ]
+POSTHOOK: Lineage: school_summary PARTITION(ds=2009-03-20).school SIMPLE 
[(profiles)b.FieldSchema(name:school, type:string, comment:null), ]
+PREHOOK: query: select * from school_summary
+PREHOOK: type: QUERY
+PREHOOK: Input: default@school_summary
+PREHOOK: Input: default@school_summary@ds=2009-03-20
+#### A masked pattern was here ####
+POSTHOOK: query: select * from school_summary
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@school_summary
+POSTHOOK: Input: default@school_summary@ds=2009-03-20
+#### A masked pattern was here ####
+school_1       1       2009-03-20
+PREHOOK: query: select * from gender_summary
+PREHOOK: type: QUERY
+PREHOOK: Input: default@gender_summary
+PREHOOK: Input: default@gender_summary@ds=2009-03-20
+#### A masked pattern was here ####
+POSTHOOK: query: select * from gender_summary
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@gender_summary
+POSTHOOK: Input: default@gender_summary@ds=2009-03-20
+#### A masked pattern was here ####
+0      1       2009-03-20

Reply via email to