This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b3786ca  [CARBONDATA-3624] Support creating MV datamap without giving 
filter columns in projection and bug fixes
b3786ca is described below

commit b3786ca0eee6a0b5e09cf58dd818dee1170288d1
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Wed Dec 18 21:34:10 2019 +0530

    [CARBONDATA-3624] Support creating MV datamap without giving filter columns 
in projection and bug fixes
    
    Problem:
    We need to maintain a fieldRelationMap between MV and Parent tables, for 
inheriting properties from parent table and for restricting alter opeartions on 
main table columns. Deriving fieldRelationMap from logical Plan should be in a 
generalised way, hence adding filter columns also to projection check was 
required. But this check should not restrict user to create datamap, if user 
query doesnt contain filter columns in projection.
    
    Solution:
    Use Modular Plan instead of Logical plan to form a fieldRelationMap and 
make it generalised for all type's of queries. By this way, we can avoid 
restricting user to give filter columns in projection while creating datamap.
    
    This closes #3517
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  65 ++-----
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 186 ++++++++-------------
 .../org/apache/carbondata/mv/rewrite/Utils.scala   |  42 ++++-
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  43 +++--
 .../mv/rewrite/SelectAllColumnsSuite.scala         |   9 +-
 .../mv/rewrite/TestAllOperationsOnMV.scala         |   6 +-
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  |   4 +-
 .../execution/command/mv/DataMapListeners.scala    |   4 +-
 8 files changed, 163 insertions(+), 196 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 29afec1..c28a7a5 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -46,6 +46,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.plans.modular._
+import org.apache.carbondata.mv.plans.util.SQLBuilder
 import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite, 
SummaryDatasetCatalog}
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -82,7 +83,8 @@ object MVHelper {
       throw new MalformedCarbonCommandException(
         s"Non-Carbon table does not support creating MV datamap")
     }
-    val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
+    val modularPlan = validateMVQuery(sparkSession, logicalPlan)
+    val updatedQueryWithDb = modularPlan.asCompactSQL
     val (timeSeriesColumn, granularity): (String, String) = 
validateMVTimeSeriesQuery(
       logicalPlan,
       dataMapSchema)
@@ -159,8 +161,9 @@ object MVHelper {
     tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, 
dataMapSchema.getDataMapName)
     tableProperties.put(CarbonCommonConstants.PARENT_TABLES, 
parentTables.asScala.mkString(","))
 
-    val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
-      logicalPlan, queryString, sparkSession)
+    val finalModularPlan = new 
SQLBuilder(modularPlan).SQLizer.execute(modularPlan)
+    val fieldRelationMap = 
mvUtil.getFieldsAndDataMapFieldsFromPlan(finalModularPlan,
+      getLogicalRelation(logicalPlan))
     // If dataMap is mapped to single main table, then inherit table 
properties from main table,
     // else, will use default table properties. If DMProperties contains table 
properties, then
     // table properties of datamap table will be updated
@@ -261,11 +264,11 @@ object MVHelper {
             mainTable =>
               if (null == 
mainTableToColumnsMap.get(mainTable.parentTableName)) {
                 val columns = new util.HashSet[String]()
-                columns.add(mainTable.parentColumnName)
+                columns.add(mainTable.parentColumnName.toLowerCase())
                 mainTableToColumnsMap.put(mainTable.parentTableName, columns)
               } else {
                 mainTableToColumnsMap.get(mainTable.parentTableName)
-                  .add(mainTable.parentColumnName)
+                  .add(mainTable.parentColumnName.toLowerCase())
               }
           }
       }
@@ -304,39 +307,8 @@ object MVHelper {
     }
   }
 
-  private def isValidSelect(isValidExp: Boolean,
-      filterPredicate: Seq[Expression], outputList: Seq[NamedExpression]): 
Boolean = {
-    // Make sure all predicates are present in projections.
-    var predicateList: Seq[AttributeReference] = Seq.empty
-    filterPredicate.map { f =>
-      f.children.collect {
-        case p: AttributeReference =>
-          predicateList = predicateList.+:(p)
-        case e: Expression =>
-          e.collect {
-            case attr: AttributeReference =>
-              predicateList = predicateList.+:(attr)
-          }
-      }
-    }
-    if (predicateList.nonEmpty) {
-      predicateList.forall { p =>
-        outputList.exists {
-          case a: Alias =>
-            a.semanticEquals(p) || a.child.semanticEquals(p) || a.collect {
-              case attr: AttributeReference =>
-                attr.semanticEquals(p)
-            }.exists(p => p.equals(true))
-          case other => other.semanticEquals(p)
-        }
-      }
-    } else {
-      isValidExp
-    }
-  }
-
   private def validateMVQuery(sparkSession: SparkSession,
-      logicalPlan: LogicalPlan): String = {
+      logicalPlan: LogicalPlan): ModularPlan = {
     val dataMapProvider = DataMapManager.get().getDataMapProvider(null,
       new DataMapSchema("", DataMapClassProvider.MV.getShortName), 
sparkSession)
     var catalog = 
DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
@@ -355,25 +327,18 @@ object MVHelper {
     val isValid = modularPlan match {
       case g: GroupBy =>
         // Make sure all predicates are present in projections.
-        val isValidExp = g.predicateList.forall{p =>
+        g.predicateList.forall{p =>
           g.outputList.exists{
             case a: Alias =>
               a.semanticEquals(p) || a.child.semanticEquals(p)
             case other => other.semanticEquals(p)
           }
         }
-        g.child match {
-          case s: Select =>
-            isValidSelect(isValidExp, s.predicateList, g.outputList)
-          case m: ModularRelation => isValidExp
-        }
-      case s: Select =>
-        isValidSelect(true, s.predicateList, s.outputList)
       case _ => true
     }
     if (!isValid) {
       throw new UnsupportedOperationException(
-        "Group by/Filter columns must be present in project columns")
+        "Group by columns must be present in project columns")
     }
     if (catalog.isMVWithSameQueryPresent(logicalPlan)) {
       throw new UnsupportedOperationException("MV with same query present")
@@ -390,7 +355,7 @@ object MVHelper {
     if (!expressionValid) {
       throw new UnsupportedOperationException("MV doesn't support Coalesce")
     }
-    modularPlan.asCompactSQL
+    modularPlan
   }
 
   def getUpdatedName(name: String, counter: Int): String = {
@@ -418,6 +383,12 @@ object MVHelper {
     }
   }
 
+  def getLogicalRelation(logicalPlan: LogicalPlan): Seq[LogicalRelation] = {
+    logicalPlan.collect {
+      case l: LogicalRelation => l
+    }
+  }
+
   def dropDummFuc(plan: LogicalPlan): LogicalPlan = {
     plan transform {
       case p@Project(exps, child) =>
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 74b0474..fe76cc3 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -20,16 +20,16 @@ package org.apache.carbondata.mv.datamap
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count}
-import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, 
DataMapField, Field}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, 
ModularRelation, Select}
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -42,39 +42,46 @@ class MVUtil {
   /**
    * Below method will be used to validate and get the required fields from 
select plan
    */
-  def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan,
-      selectStmt: String,
-      sparkSession: SparkSession): 
scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+  def getFieldsAndDataMapFieldsFromPlan(plan: ModularPlan,
+      logicalRelation: Seq[LogicalRelation]): 
scala.collection.mutable.LinkedHashMap[Field,
+    DataMapField] = {
     plan match {
-      case Project(projectList, child: Sort) =>
-        getFieldsFromProject(projectList, plan, child)
-      case Project(projectList, _) =>
-        getFieldsFromProject(projectList, plan)
-      case Aggregate(groupByExp, aggExp, _) =>
-        getFieldsFromAggregate(groupByExp, aggExp, plan)
+      case select: Select =>
+        select.children.map {
+          case groupBy: GroupBy =>
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, 
logicalRelation)
+          case _: ModularRelation =>
+            getFieldsFromProject(select.outputList, select.predicateList, 
logicalRelation)
+        }.head
+      case groupBy: GroupBy =>
+        groupBy.child match {
+          case select: Select =>
+            getFieldsFromProject(groupBy.outputList, select.predicateList, 
logicalRelation)
+          case _: ModularRelation =>
+            getFieldsFromProject(groupBy.outputList, groupBy.predicateList, 
logicalRelation)
+        }
     }
   }
 
-  def getFieldsFromProject(projectList: Seq[NamedExpression],
-      plan: LogicalPlan, sort: LogicalPlan): mutable.LinkedHashMap[Field, 
DataMapField] = {
+  def getFieldsFromProject(outputList: Seq[NamedExpression],
+      predicateList: Seq[Expression],
+      logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, 
DataMapField] = {
     var fieldToDataMapFieldMap = 
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
-    sort.transformDown {
-      case agg@Aggregate(groupByExp, aggExp, _) =>
-        fieldToDataMapFieldMap ++== getFieldsFromAggregate(groupByExp, aggExp, 
plan)
-        agg
+    fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, 
logicalRelation)
+    var finalPredicateList: Seq[NamedExpression] = Seq.empty
+    predicateList.map { p =>
+      p.collect {
+        case attr: AttributeReference =>
+          finalPredicateList = finalPredicateList.:+(attr)
+      }
     }
-    fieldToDataMapFieldMap ++== getFieldsFromProject(projectList, plan)
+    fieldToDataMapFieldMap ++== 
getFieldsFromProject(finalPredicateList.distinct, logicalRelation)
     fieldToDataMapFieldMap
   }
 
   def getFieldsFromProject(projectList: Seq[NamedExpression],
-      plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
+      logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, 
DataMapField] = {
     var fieldToDataMapFieldMap = 
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
-    val logicalRelation =
-      plan.collect {
-        case lr: LogicalRelation =>
-          lr
-      }
     projectList.map {
       case attr: AttributeReference =>
         val carbonTable = getCarbonTable(logicalRelation, attr)
@@ -88,10 +95,18 @@ class MVUtil {
           if (null != relation) {
             arrayBuffer += relation
           }
+          var qualifier: Option[String] = None
+          if (attr.qualifier.isDefined) {
+            qualifier = if (attr.qualifier.get.startsWith("gen_sub")) {
+              Some(carbonTable.getTableName)
+            } else {
+              attr.qualifier
+            }
+          }
           fieldToDataMapFieldMap +=
           getFieldToDataMapFields(attr.name,
             attr.dataType,
-            attr.qualifier,
+            qualifier,
             "",
             arrayBuffer,
             carbonTable.getTableName)
@@ -111,7 +126,8 @@ class MVUtil {
           fieldToDataMapFieldMap +=
           getFieldToDataMapFields(name, attr.dataType, None, "", arrayBuffer, 
"")
         }
-      case a@Alias(_, name) =>
+
+      case a@Alias(agg: AggregateExpression, _) =>
         checkIfComplexDataTypeExists(a)
         val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new 
ArrayBuffer[ColumnTableRelation]()
         a.collect {
@@ -129,102 +145,32 @@ class MVUtil {
             }
         }
         fieldToDataMapFieldMap +=
-        getFieldToDataMapFields(a.name, a.dataType, None, "arithmetic", 
arrayBuffer, "")
-    }
-    fieldToDataMapFieldMap
-  }
-
-  def getFieldsFromAggregate(groupByExp: Seq[Expression],
-      aggExp: Seq[NamedExpression],
-      plan: LogicalPlan): mutable.LinkedHashMap[Field, DataMapField] = {
-    var fieldToDataMapFieldMap = 
scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
-    val logicalRelation =
-      plan.collect {
-        case lr: LogicalRelation =>
-          lr
-      }
-    aggExp.map { agg =>
-      var aggregateType: String = ""
-      val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new 
ArrayBuffer[ColumnTableRelation]()
-      var isLiteralPresent = false
-      agg.collect {
-        case Alias(attr: AggregateExpression, name) =>
-          attr.aggregateFunction.collect {
-            case l@Literal(_, _) =>
-              isLiteralPresent = true
-          }
-          if (isLiteralPresent) {
-            fieldToDataMapFieldMap +=
-            getFieldToDataMapFields(name,
-              attr.aggregateFunction.dataType,
-              None,
-              attr.aggregateFunction.nodeName,
-              arrayBuffer,
-              "")
-            aggregateType = attr.aggregateFunction.nodeName
-          } else {
-            aggregateType = attr.aggregateFunction.nodeName
-          }
-        case a@Alias(_, name) =>
-          checkIfComplexDataTypeExists(a)
-          // In case of arithmetic expressions like sum(a)+sum(b)
-          aggregateType = "arithmetic"
-      }
-      agg.collect {
-        case attr: AttributeReference =>
-          val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
-          if (null != carbonTable) {
-            val relation = getColumnRelation(attr.name,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-              carbonTable)
-            if (null != relation) {
-              arrayBuffer += relation
-            }
-            if (aggregateType.isEmpty && arrayBuffer.nonEmpty) {
-              val tableName = carbonTable.getTableName
-              fieldToDataMapFieldMap +=
-              getFieldToDataMapFields(agg.name,
-                agg.dataType,
-                attr.qualifier,
-                aggregateType,
-                arrayBuffer,
-                tableName)
-            }
-          }
-      }
-      if (!aggregateType.isEmpty && arrayBuffer.nonEmpty && !isLiteralPresent) 
{
-        fieldToDataMapFieldMap +=
-        getFieldToDataMapFields(agg.name,
-          agg.dataType,
-          agg.qualifier,
-          aggregateType,
+        getFieldToDataMapFields(a.name,
+          a.dataType,
+          None,
+          agg.aggregateFunction.nodeName,
           arrayBuffer,
           "")
-      }
-    }
-    groupByExp map {
-      case attr: AttributeReference =>
-        val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
-        if (null != carbonTable) {
-          val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
-              ArrayBuffer[ColumnTableRelation]()
-          arrayBuffer += getColumnRelation(attr.name,
-            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-            carbonTable)
-          fieldToDataMapFieldMap +=
-          getFieldToDataMapFields(attr.name,
-            attr.dataType,
-            attr.qualifier,
-            "",
-            arrayBuffer,
-            carbonTable.getTableName)
+
+      case a@Alias(_, _) =>
+        checkIfComplexDataTypeExists(a)
+        val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new 
ArrayBuffer[ColumnTableRelation]()
+        a.collect {
+          case attr: AttributeReference =>
+            val carbonTable = getCarbonTable(logicalRelation, attr)
+            if (null != carbonTable) {
+              val relation = getColumnRelation(attr.name,
+                
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+                
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+                
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+                carbonTable)
+              if (null != relation) {
+                arrayBuffer += relation
+              }
+            }
         }
-        attr
-      case _ =>
+        fieldToDataMapFieldMap +=
+        getFieldToDataMapFields(a.name, a.dataType, None, "arithmetic", 
arrayBuffer, "")
     }
     fieldToDataMapFieldMap
   }
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
index 7a8b538..802be83 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -273,8 +273,46 @@ object Utils extends PredicateHelper {
           matchable = false
           None
         }
-
-        derivative.getOrElse { matchable = false; avg_q }
+        // If derivative is empty, check if subsumer contains 
aggregateFunction instance of Average
+        // function and form an Average expression
+        if (derivative.isEmpty) {
+          matchable = true
+          operator_a.outputList.find {
+            case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                                 alias_m(alias.toAttribute).child
+                                   .isInstanceOf[AggregateExpression] &&
+                                 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                   .aggregateFunction.isInstanceOf[Average] => 
{
+              val avg_a = 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
+              if (avg_a.isDistinct != avg_q.isDistinct) {
+                false
+              } else {
+                expr_a.semanticEquals(expr_q)
+              }
+            }
+            case attr: Attribute if alias_m.contains(attr) &&
+                                    
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                    
alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                      .aggregateFunction.isInstanceOf[Average] 
=> {
+              val avg_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
+              if (avg_a.isDistinct != avg_q.isDistinct) {
+                false
+              } else {
+                expr_a.semanticEquals(expr_q)
+              }
+            }
+            case _ => false
+          }.map { avg => AggregateExpression(
+            Average(avg.toAttribute),
+            avg_q.mode,
+            isDistinct = false,
+            avg_q.resultId)
+          }.getOrElse { matchable = false; avg_q }
+        } else {
+          derivative.getOrElse { matchable = false; avg_q }
+        }
 
       case other: AggregateExpression =>
         matchable = false
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 31f22a7..16ca086 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -27,6 +27,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -455,9 +456,9 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap29")
   }
 
-  ignore("test create datamap with join with group by with filter") {
+  test("test create datamap with join with group by with filter") {
     sql("drop datamap if exists datamap30")
-    sql("create datamap datamap30 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization),sum(t2.empname) from fact_table1 t1 inner 
join fact_table2 t2 on (t1.empname = t2.empname) group by t1.empname, 
t2.designation")
+    sql("create datamap datamap30 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 inner join fact_table2 
t2 on (t1.empname = t2.empname) group by t1.empname, t2.designation")
     val frame = sql(
       "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
       "where t1.empname = t2.empname and t2.designation='SA' group by 
t1.empname, t2.designation")
@@ -489,7 +490,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap32")
   }
 
-  ignore("test create datamap with simple and sub group by query and avg agg") 
{
+  test("test create datamap with simple and sub group by query and avg agg") {
     sql(s"drop datamap if exists datamap33")
     sql("create datamap datamap33 using 'mv' as select empname, 
avg(utilization) from fact_table1 group by empname")
     val frame = sql("select empname,avg(utilization) from fact_table1 where 
empname='shivani' group by empname")
@@ -512,7 +513,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap34")
   }
 
-  ignore("test create datamap with simple and group by query with filter on 
datamap but not on projection") {
+  test("test create datamap with simple and group by query with filter on 
datamap but not on projection") {
     sql("create datamap datamap35 using 'mv' as select designation, 
sum(utilization) from fact_table1 where empname='shivani' group by designation")
     val frame = sql(
       "select designation, sum(utilization) from fact_table1 where 
empname='shivani' group by designation")
@@ -522,7 +523,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap35")
   }
 
-  ignore("test create datamap with simple and sub group by query with filter 
on datamap but not on projection") {
+  test("test create datamap with simple and sub group by query with filter on 
datamap but not on projection") {
     sql("create datamap datamap36 using 'mv' as select designation, 
sum(utilization) from fact_table1 where empname='shivani' group by designation")
     val frame = sql(
       "select sum(utilization) from fact_table1 where empname='shivani' group 
by designation")
@@ -558,7 +559,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap38")
   }
 
-  ignore("test create datamap with agg push join with group by with filter") {
+  test("test create datamap with agg push join with group by with filter") {
     sql("drop datamap if exists datamap39")
     sql("create datamap datamap39 using 'mv' as select empname, designation, 
sum(utilization) from fact_table1 group by empname, designation ")
     val frame = sql(
@@ -584,7 +585,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap40")
   }
 
-  ignore("test create datamap with left join with group by with filter") {
+  test("test create datamap with left join with group by with filter") {
     sql("drop datamap if exists datamap41")
     sql("create datamap datamap41 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
     val frame = sql(
@@ -597,7 +598,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap41")
   }
 
-  ignore("test create datamap with left join with sub group by") {
+  test("test create datamap with left join with sub group by") {
     sql("drop datamap if exists datamap42")
     sql("create datamap datamap42 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
     val frame = sql(
@@ -610,7 +611,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap42")
   }
 
-  ignore("test create datamap with left join with sub group by with filter") {
+  test("test create datamap with left join with sub group by with filter") {
     sql("drop datamap if exists datamap43")
     sql("create datamap datamap43 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
     val frame = sql(
@@ -623,7 +624,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"drop datamap datamap43")
   }
 
-  ignore("test create datamap with left join with sub group by with filter on 
mv") {
+  test("test create datamap with left join with sub group by with filter on 
mv") {
     sql("drop datamap if exists datamap44")
     sql("create datamap datamap44 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, 
t2.designation")
     val frame = sql(
@@ -1054,7 +1055,7 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(
       "create table mv_like(name string, age int, address string, Country 
string, id int) stored by 'carbondata'")
     sql(
-      "create datamap mvlikedm1 using 'mv' as select name,address,sum(Country) 
from mv_like where Country NOT LIKE 'US' group by name,address")
+      "create datamap mvlikedm1 using 'mv' as select name,address from mv_like 
where Country NOT LIKE 'US' group by name,address")
     sql(
       "create datamap mvlikedm2 using 'mv' as select name,address,Country from 
mv_like where Country = 'US' or Country = 'China' group by 
name,address,Country")
     sql("insert into mv_like select 'chandler', 32, 'newYork', 'US', 5")
@@ -1345,7 +1346,25 @@ class MVCreateTestCase extends QueryTest with 
BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
-
+  test("test join query with & without filter columns in projection") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("drop datamap if exists mv1")
+    sql("create table t1(userId string,score int) stored by 'carbondata'")
+    sql("create table t2(userId string,age int,sex string) stored by 
'carbondata'")
+    sql("insert into t1 values(1,100),(2,500)")
+    sql("insert into t2 values(1,20,'f'),(2,30,'m')")
+    val result  = sql("select avg(t1.score),t2.age,t2.sex from t1 join t2 on 
t1.userId=t2.userId group by t2.age,t2.sex")
+    sql("create datamap mv1 using 'mv' as select avg(t1.score),t2.age,t2.sex 
from t1 join t2 on t1.userId=t2.userId group by t2.age,t2.sex")
+    val df = sql("select avg(t1.score),t2.age,t2.sex from t1 join t2 on 
t1.userId=t2.userId group by t2.age,t2.sex")
+    TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "mv1")
+    checkAnswer(df, result)
+    intercept[ProcessMetaDataException] {
+      sql("alter table t1 drop columns(userId)")
+    }.getMessage.contains("Column name cannot be dropped because it exists in 
mv datamap: mv1")
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
 
   def copy(oldLoc: String, newLoc: String): Unit = {
     val oldFolder = FileFactory.getCarbonFile(oldLoc)
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
index 75e54b2..8120dbf 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectAllColumnsSuite.scala
@@ -36,15 +36,8 @@ class SelectAllColumnsSuite extends QueryTest {
       Seq(Row(26.0, 177.5, "tom")))
     val frame = sql("select avg(age),avg(height),name from all_table group by 
name")
     val analyzed = frame.queryExecution.analyzed
-    assert(verifyMVDataMap(analyzed, "all_table_mv"))
+    assert(TestUtil.verifyMVDataMap(analyzed, "all_table_mv"))
     sql("drop table if exists all_table")
   }
 
-  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
-    val tables = logicalPlan collect {
-      case l: LogicalRelation => l.catalogTable.get
-    }
-    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
-  }
-
 }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 28d5289..f140e5d 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -556,9 +556,9 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("CREATE TABLE maintable (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string, DOB date, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format'")
     sql("insert into maintable values(1, 'abc', 'abc001', 
'1975-06-11','1975-06-11 02:00:03.0', 120, 1234,4.34,24.56,12345, 2464, 45)")
     sql("drop datamap if exists dm ")
-    intercept[UnsupportedOperationException] {
-      sql("create datamap dm using 'mv' as select dob from maintable where 
(dob='1975-06-11' or cust_id=2)")
-    }.getMessage.contains("Group by/Filter columns must be present in project 
columns")
+    sql("create datamap dm using 'mv' as select dob from maintable where 
(dob='1975-06-11' or cust_id=2)")
+    val df = sql("select dob from maintable where (dob='1975-06-11' or 
cust_id=2)")
+    TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "dm")
     sql("drop table IF EXISTS maintable")
   }
 
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 3bd1e60..2b57dda 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -329,11 +329,11 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
     checkPlan("datamap1", df)
   }
 
-  test("test create datamap with group by & filter columns not present in 
projection") {
+  test("test create datamap with group by columns not present in projection") {
     sql("drop datamap if exists dm ")
     intercept[UnsupportedOperationException] {
       sql("create datamap dm using 'mv' as select 
timeseries(projectjoindate,'day') from maintable where empname='chandler' group 
by timeseries(projectjoindate,'day'),empname")
-    }.getMessage.contains("Group by/Filter columns must be present in project 
columns")
+    }.getMessage.contains("Group by columns must be present in project 
columns")
     sql("create datamap dm using 'mv' as select 
timeseries(projectjoindate,'day'),empname from maintable where 
empname='chandler' group by timeseries(projectjoindate,'day'),empname")
     var df = sql("select timeseries(projectjoindate,'day'),empname from 
maintable where empname='chandler' group by 
timeseries(projectjoindate,'day'),empname")
     checkPlan("dm", df)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
index fcee878..d554b1b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -221,7 +221,7 @@ object DataMapDropColumnPreListener extends 
OperationEventListener {
         if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
           val listOfColumns = 
DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
           val columnExistsInChild = listOfColumns.collectFirst {
-            case parentColumnName if 
columnsToBeDropped.contains(parentColumnName) =>
+            case parentColumnName if 
columnsToBeDropped.contains(parentColumnName.toLowerCase) =>
               parentColumnName
           }
           if (columnExistsInChild.isDefined) {
@@ -262,7 +262,7 @@ object DataMapChangeDataTypeorRenameColumnPreListener
       for (dataMapSchema <- dataMapSchemaList) {
         if (null != dataMapSchema && !dataMapSchema.isIndexDataMap) {
           val listOfColumns = 
DataMapListeners.getDataMapTableColumns(dataMapSchema, carbonTable)
-          if (listOfColumns.contains(columnToBeAltered)) {
+          if (listOfColumns.contains(columnToBeAltered.toLowerCase)) {
             throw new UnsupportedOperationException(
               s"Column $columnToBeAltered exists in a " + 
dataMapSchema.getProviderName +
               " datamap. Drop " + dataMapSchema.getProviderName + "  datamap 
to continue")

Reply via email to