This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b222d4  [CARBONDATA-3629] Fix Select query failure on aggregation of 
same column on MV
2b222d4 is described below

commit 2b222d4d279e7f0c53c062b0740d4aed3904a960
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Tue Dec 24 13:15:02 2019 +0530

    [CARBONDATA-3629] Fix Select query failure on aggregation of same column on 
MV
    
    Problem:
    If MV datamap is created with SELECT a,sum(a) from maintable group by a and 
later queried with SELECT sum(a) from maintable, query fails as rewritten plan 
output list doesn't match with the table.
    
    Solution:
    Check if Aggregation exists in GroupBy Node and copy select node with 
aliasMap
    
    This closes #3530
---
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 39 ++++++++++++++---
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  | 51 ++++++++++++++++------
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 29 ++++++++++++
 .../TestMVTimeSeriesCreateDataMapCommand.scala     | 13 +++++-
 docs/datamap/mv-datamap-guide.md                   | 11 ++++-
 .../command/timeseries/TimeSeriesUtil.scala        |  4 +-
 6 files changed, 122 insertions(+), 25 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index fe76cc3..f3e8091 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -49,23 +49,37 @@ class MVUtil {
       case select: Select =>
         select.children.map {
           case groupBy: GroupBy =>
-            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, 
logicalRelation)
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
+              logicalRelation, groupBy.flagSpec)
           case _: ModularRelation =>
-            getFieldsFromProject(select.outputList, select.predicateList, 
logicalRelation)
+            getFieldsFromProject(select.outputList, select.predicateList,
+              logicalRelation, select.flagSpec)
         }.head
       case groupBy: GroupBy =>
         groupBy.child match {
           case select: Select =>
-            getFieldsFromProject(groupBy.outputList, select.predicateList, 
logicalRelation)
+            getFieldsFromProject(groupBy.outputList, select.predicateList,
+              logicalRelation, select.flagSpec)
           case _: ModularRelation =>
-            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, 
logicalRelation)
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
+              logicalRelation, groupBy.flagSpec)
         }
     }
   }
 
+  /**
+   * Create's main table to datamap table field relation map by using modular 
plan generated from
+   * user query
+   * @param outputList of the modular plan
+   * @param predicateList of the modular plan
+   * @param logicalRelation list of main table from query
+   * @param flagSpec to get SortOrder attribute if exists
+   * @return fieldRelationMap
+   */
   def getFieldsFromProject(outputList: Seq[NamedExpression],
       predicateList: Seq[Expression],
-      logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, 
DataMapField] = {
+      logicalRelation: Seq[LogicalRelation],
+      flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, DataMapField] = {
     var fieldToDataMapFieldMap = 
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
     fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, 
logicalRelation)
     var finalPredicateList: Seq[NamedExpression] = Seq.empty
@@ -75,6 +89,21 @@ class MVUtil {
           finalPredicateList = finalPredicateList.:+(attr)
       }
     }
+    // collect sort by columns
+    if (flagSpec.nonEmpty) {
+      flagSpec.map { f =>
+        f.map {
+          case list: ArrayBuffer[_] =>
+            list.map {
+              case s: SortOrder =>
+                s.collect {
+                  case attr: AttributeReference =>
+                    finalPredicateList = finalPredicateList.:+(attr)
+                }
+            }
+        }
+      }
+    }
     fieldToDataMapFieldMap ++== 
getFieldsFromProject(finalPredicateList.distinct, logicalRelation)
     fieldToDataMapFieldMap
   }
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 616d0bd..7e8eb96 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -385,16 +385,15 @@ object GroupbyGroupbyNoChildDelta extends 
DefaultMatchPattern {
           gb_2a.predicateList.exists(_.semanticEquals(expr)))
         val isGroupingRmE = gb_2a.predicateList.forall(expr =>
           gb_2q.predicateList.exists(_.semanticEquals(expr)))
+        val isOutputEmR = gb_2q.outputList.forall {
+          case a @ Alias(_, _) =>
+            gb_2a.outputList.exists{
+              case a1: Alias => a1.child.semanticEquals(a.child)
+              case exp => exp.semanticEquals(a.child)
+            }
+          case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+        }
         if (isGroupingEmR && isGroupingRmE) {
-          val isOutputEmR = gb_2q.outputList.forall {
-            case a @ Alias(_, _) =>
-              gb_2a.outputList.exists{
-                case a1: Alias => a1.child.semanticEquals(a.child)
-                case exp => exp.semanticEquals(a.child)
-              }
-            case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
-          }
-
           if (isOutputEmR) {
             // Mappings of output of two plans by checking semantic equals.
             val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, 
index) =>
@@ -424,11 +423,35 @@ object GroupbyGroupbyNoChildDelta extends 
DefaultMatchPattern {
             Utils.tryMatch(
               gb_2a, gb_2q, aliasMap).flatMap {
               case g: GroupBy =>
-                Some(g.copy(child = g.child.withNewChildren(
-                  g.child.children.map {
-                    case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
-                    case other => other
-                  })));
+                // Check any agg function exists on outputlist, in case of 
expressions like
+                // sum(a), then create new alias and copy to group by node
+                val aggFunExists = g.outputList.exists { f =>
+                  f.find {
+                    case _: AggregateExpression => true
+                    case _ => false
+                  }.isDefined
+                }
+                if (aggFunExists && !isGroupingRmE && isOutputEmR) {
+                  val tChildren = new 
collection.mutable.ArrayBuffer[ModularPlan]()
+                  val sel_1a = g.child.asInstanceOf[Select]
+
+                  val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
+                  tChildren += gb_2a
+                  val sel_1q_temp = sel_1a.copy(
+                    predicateList = sel_1a.predicateList,
+                    children = tChildren,
+                    joinEdges = sel_1a.joinEdges,
+                    aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+
+                  val res = factorOutSubsumer(sel_1q_temp, usel_1a, 
sel_1q_temp.aliasMap)
+                  Some(g.copy(child = res))
+                } else {
+                  Some(g.copy(child = g.child.withNewChildren(
+                    g.child.children.map {
+                      case modular.Select(_, _, _, _, _, _, _, _, _, _) => 
gb_2a;
+                      case other => other
+                    })));
+                }
               case _ => None}.map(Seq(_)).getOrElse(Nil)
           } else {
             Nil
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index a81cd2f..1750ce7 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -541,6 +541,35 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table IF EXISTS maintable")
   }
 
+  test("test query aggregation on mv datamap ") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, age int, add string) stored by 
'carbondata'")
+    sql("insert into maintable 
values('abc',1,'a'),('def',2,'b'),('ghi',3,'c')")
+    val res = sql("select sum(age) from maintable")
+    sql("drop datamap if exists mv3")
+    sql("create datamap mv3 on table maintable using 'mv' as select 
age,sum(age) from maintable group by age")
+    val df = sql("select sum(age) from maintable")
+    TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "mv3")
+    checkAnswer(res, df)
+    sql("drop table if exists maintable")
+  }
+
+  test("test order by columns not given in projection") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    val res = sql("select name from maintable order by c_code")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 using 'mv' as select name from maintable order by 
c_code")
+    val df = sql("select name from maintable order by c_code")
+    TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "dm1")
+    checkAnswer(res, df)
+    intercept[Exception] {
+      sql("alter table maintable drop columns(c_code)")
+    }.getMessage.contains("Column name cannot be dropped because it exists in 
mv datamap: dm1")
+   sql("drop table if exists maintable")
+  }
+
   test("drop meta cache on mv datamap table") {
     sql("drop table IF EXISTS maintable")
     sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
index 4feab2f..699b189 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala
@@ -101,8 +101,17 @@ class TestMVTimeSeriesCreateDataMapCommand extends 
QueryTest with BeforeAndAfter
       sql(
         "create datamap datamap1 on table maintable_new using 'mv' as " +
         "select timeseries(projectjoindate,'second') from maintable_new")
-    }.getMessage
-      .contains("Granularity should be DAY,MONTH or YEAR, for timeseries 
column of Date type")
+    }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for 
timeseries column of Date type")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable_new using 'mv' as " +
+        "select timeseries(projectjoindate,'five_minute') from maintable_new")
+    }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for 
timeseries column of Date type")
+    intercept[MalformedCarbonCommandException] {
+      sql(
+        "create datamap datamap1 on table maintable_new using 'mv' as " +
+        "select timeseries(projectjoindate,'hour') from maintable_new")
+      }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for 
timeseries column of Date type")
     sql("drop table IF EXISTS maintable_new")
   }
 
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index a0c3f1a..b75243f 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -73,7 +73,7 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
     GROUP BY country, sex
   ```
  **NOTE**:
- * Group by/Filter columns has to be provided in projection list while 
creating mv datamap
+ * Group by columns has to be provided in projection list while creating mv 
datamap
  * If only single parent table is involved in mv datamap creation, then 
TableProperties of Parent table
    (if not present in a aggregate function like sum(col)) listed below will be
    inherited to datamap table
@@ -91,7 +91,14 @@ EXPLAIN SELECT a, sum(b) from maintable group by a;
    12. NO_INVERTED_INDEX
    13. COLUMN_COMPRESSOR
 
- * All columns of main table at once cannot participate in mv datamap table 
creation
+ * Creating MV datamap with select query containing only project of all 
columns of maintable is unsupported 
+      
+   **Example:**
+   If table 'x' contains columns 'a,b,c',
+   then creating MV datamap with below queries is not supported.
+   
+   1. ```select a,b,c from x```
+   2. ```select * from x```
  * TableProperties can be provided in DMProperties excluding 
LOCAL_DICTIONARY_INCLUDE,
    LOCAL_DICTIONARY_EXCLUDE, DICTIONARY_INCLUDE, DICTIONARY_EXCLUDE, 
INVERTED_INDEX,
    NO_INVERTED_INDEX, SORT_COLUMNS, LONG_STRING_COLUMNS, RANGE_COLUMN & 
COLUMN_META_CACHE
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index e72cf9b..f79e7d8 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -154,10 +154,10 @@ object TimeSeriesUtil {
       timeSeriesFunction: String): Unit = {
     for (granularity <- Granularity.values()) {
       if (timeSeriesFunction.equalsIgnoreCase(granularity.getName
-        .substring(0, 
granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) {
+        .substring(0, 
granularity.getName.lastIndexOf(CarbonCommonConstants.UNDERSCORE)))) {
         if (!supportedGranularitiesForDate.contains(granularity.getName)) {
           throw new MalformedCarbonCommandException(
-            "Granularity should be DAY,MONTH or YEAR, for timeseries column of 
Date type")
+            "Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries 
column of Date type")
         }
       }
     }

Reply via email to