Fixed full join query issue with aggregate

Fixed in spark-1.6

Fixed style


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e67003cf
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e67003cf
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e67003cf

Branch: refs/heads/branch-1.1
Commit: e67003cf657e743194cf449792b67f896b1adc74
Parents: 0c6f5f3
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Tue May 23 10:32:21 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Jun 15 12:57:35 2017 +0530

----------------------------------------------------------------------
 .../joinquery/AllDataTypesTestCaseJoin.scala    |   9 +-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 101 ++++++++++++-------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 101 +++++++++++--------
 3 files changed, 131 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
index be0f8e6..08fad0b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/joinquery/AllDataTypesTestCaseJoin.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 class AllDataTypesTestCaseJoin extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-    sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format'")
+    sql("CREATE TABLE alldatatypestableJoin (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary int) 
STORED BY 'org.apache.carbondata.format' 
TBLPROPERTIES('DICTIONARY_INCLUDE'='empno','TABLE_BLOCKSIZE'='4')")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
alldatatypestableJoin OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')""");
 
     sql("CREATE TABLE alldatatypestableJoin_hive (empno int, empname String, 
designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname 
String, deptno int, deptname String, projectcode int, projectjoindate 
Timestamp, projectenddate Timestamp,attendance int,utilization int,salary 
int)row format delimited fields terminated by ','")
@@ -90,6 +90,13 @@ class AllDataTypesTestCaseJoin extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon_table2")
   }
 
+  test("join with aggregate plan") {
+    checkAnswer(sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT 
empno,empname FROM alldatatypestableJoin GROUP BY empno,empname) c1 FULL JOIN " 
+
+                    "(SELECT empno FROM alldatatypestableJoin GROUP BY empno) 
c2 ON c1.empno = c2.empno"),
+      sql("SELECT c1.empno,c1.empname, c2.empno FROM (SELECT empno,empname 
FROM alldatatypestableJoin_hive GROUP BY empno,empname) c1 FULL JOIN " +
+          "(SELECT empno FROM alldatatypestableJoin_hive GROUP BY empno) c2 ON 
c1.empno = c2.empno"))
+  }
+
   override def afterAll {
     sql("drop table alldatatypestableJoin")
     sql("drop table alldatatypestableJoin_hive")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 9aa8158..02ac5f8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -206,6 +206,47 @@ class ResolveCarbonFunctions(relations: 
Seq[CarbonDecoderRelation])
     relations.foreach(_.fillAttributeMap(attrMap))
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
+
+      def transformAggregateExpression(agg: Aggregate,
+          aggonGroups: util.HashSet[AttributeReferenceWrapper] = null): 
LogicalPlan = {
+        val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
+        if (aggonGroups != null) {
+          attrsOndimAggs.addAll(aggonGroups)
+        }
+        agg.aggregateExpressions.map {
+          case attr: AttributeReference =>
+          case a@Alias(attr: AttributeReference, name) =>
+          case aggExp: AggregateExpression =>
+            aggExp.transform {
+              case aggExp: AggregateExpression =>
+                collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, 
attrMap)
+                aggExp
+            }
+          case others =>
+            others.collect {
+              case attr: AttributeReference
+                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
+                
attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+        }
+        var child = agg.child
+        // Incase if the child also aggregate then push down decoder to child
+        if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+          child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+            new util.HashSet[AttributeReferenceWrapper](),
+            agg.child)
+        }
+        if (!decoder && aggonGroups == null) {
+          decoder = true
+          CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
+            new util.HashSet[AttributeReferenceWrapper](),
+            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, 
child),
+            isOuter = true)
+        } else {
+          Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
+        }
+      }
+
       currentPlan match {
         case limit@Limit(_, child: Sort) =>
           if (!decoder) {
@@ -288,39 +329,7 @@ class ResolveCarbonFunctions(relations: 
Seq[CarbonDecoderRelation])
           }
 
         case agg: Aggregate if 
!agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
-          agg.aggregateExpressions.map {
-            case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) =>
-            case aggExp: AggregateExpression =>
-              aggExp.transform {
-                case aggExp: AggregateExpression =>
-                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, 
attrMap)
-                  aggExp
-              }
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  
attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          var child = agg.child
-          // Incase if the child also aggregate then push down decoder to child
-          if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-            child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              agg.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, 
child),
-              isOuter = true)
-          } else {
-            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-          }
+          transformAggregateExpression(agg)
         case expand: Expand if 
!expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
           expand.projections.map {s =>
@@ -410,15 +419,29 @@ class ResolveCarbonFunctions(relations: 
Seq[CarbonDecoderRelation])
             var rightPlan = j.right
             if (leftCondAttrs.size() > 0 &&
                 !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.left)
+              leftPlan = leftPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, leftCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.left)
+              }
             }
             if (rightCondAttrs.size() > 0 &&
                 !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.right)
+              rightPlan = rightPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, rightCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.right)
+              }
             }
             if (!decoder) {
               decoder = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e67003cf/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index fd6f14e..d1a0c90 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -178,6 +178,46 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
     relations.foreach(_.fillAttributeMap(attrMap))
 
     def addTempDecoder(currentPlan: LogicalPlan): LogicalPlan = {
+
+      def transformAggregateExpression(agg: Aggregate,
+          attrsOnGroup: util.HashSet[AttributeReferenceWrapper] = null): 
LogicalPlan = {
+        val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
+        if (attrsOnGroup != null) {
+          attrsOndimAggs.addAll(attrsOnGroup)
+        }
+        agg.aggregateExpressions.map {
+          case attr: AttributeReference =>
+          case a@Alias(attr: AttributeReference, name) =>
+          case aggExp: AggregateExpression =>
+            aggExp.transform {
+              case aggExp: AggregateExpression =>
+                collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, 
attrMap)
+                aggExp
+            }
+          case others =>
+            others.collect {
+              case attr: AttributeReference
+                if isDictionaryEncoded(attr, attrMap, aliasMap) =>
+                
attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+        }
+        var child = agg.child
+        // Incase if the child also aggregate then push down decoder to child
+        if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
+          child = CarbonDictionaryTempDecoder(attrsOndimAggs,
+            new util.HashSet[AttributeReferenceWrapper](),
+            agg.child)
+        }
+        if (!decoder && attrsOnGroup == null) {
+          decoder = true
+          CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
+            new util.HashSet[AttributeReferenceWrapper](),
+            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, 
child),
+            isOuter = true)
+        } else {
+          Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
+        }
+      }
       currentPlan match {
         case limit@GlobalLimit(_, LocalLimit(_, child: Sort)) =>
           if (!decoder) {
@@ -259,39 +299,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
             Union(children)
           }
         case agg: Aggregate if 
!agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
-          agg.aggregateExpressions.map {
-            case attr: AttributeReference =>
-            case a@Alias(attr: AttributeReference, name) =>
-            case aggExp: AggregateExpression =>
-              aggExp.transform {
-                case aggExp: AggregateExpression =>
-                  collectDimensionAggregates(aggExp, attrsOndimAggs, aliasMap, 
attrMap)
-                  aggExp
-              }
-            case others =>
-              others.collect {
-                case attr: AttributeReference
-                  if isDictionaryEncoded(attr, attrMap, aliasMap) =>
-                  
attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
-              }
-          }
-          var child = agg.child
-          // Incase if the child also aggregate then push down decoder to child
-          if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
-            child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-              new util.HashSet[AttributeReferenceWrapper](),
-              agg.child)
-          }
-          if (!decoder) {
-            decoder = true
-            CarbonDictionaryTempDecoder(new 
util.HashSet[AttributeReferenceWrapper](),
-              new util.HashSet[AttributeReferenceWrapper](),
-              Aggregate(agg.groupingExpressions, agg.aggregateExpressions, 
child),
-              isOuter = true)
-          } else {
-            Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
-          }
+          transformAggregateExpression(agg)
         case expand: Expand if 
!expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
           expand.projections.map {s =>
@@ -381,15 +389,29 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
             var rightPlan = j.right
             if (leftCondAttrs.size() > 0 &&
                 !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.left)
+              leftPlan = leftPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, leftCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(leftCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.left)
+              }
             }
             if (rightCondAttrs.size() > 0 &&
                 !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
-              rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                new util.HashSet[AttributeReferenceWrapper](),
-                j.right)
+              rightPlan = rightPlan match {
+                case agg: Aggregate =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    transformAggregateExpression(agg, rightCondAttrs))
+                case _ =>
+                  CarbonDictionaryTempDecoder(rightCondAttrs,
+                    new util.HashSet[AttributeReferenceWrapper](),
+                    j.right)
+              }
             }
             Join(leftPlan, rightPlan, j.joinType, j.condition)
           } else {
@@ -503,7 +525,6 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with 
PredicateHelper {
 
         case others => others
       }
-
     }
 
     val transFormedPlan =

Reply via email to