http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
new file mode 100644
index 0000000..074d369
--- /dev/null
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * Utility functions used by mqo matcher to convert our plan to new 
aggregation code path
+ */
+private[rewrite] object Utils extends PredicateHelper {
+
+  // use for match qb_2a, qb_2q and sel_3a, sel_3q
+  private def doMatch(
+      operator_a: modular.Matchable,
+      operator_q: modular.Matchable,
+      alias_m: AttributeMap[Alias]): Option[modular.Matchable] = {
+    var matchable = true
+    val matched = operator_q.transformExpressions {
+      case cnt_q@AggregateExpression(Count(exprs_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Count] =>
+            // case for groupby
+            val cnt_a = 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != 
exprs_a.length) {
+              false
+            } else {
+              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+                .forall(p => p._1.semanticEquals(p._2))
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  
alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Count] =>
+            val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != 
exprs_a.length) {
+              false
+            } else {
+              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+                .forall(p => p._1.semanticEquals(p._2))
+            }
+
+          case _ => false
+        }.map { cnt => AggregateExpression(
+            Sum(cnt.toAttribute),
+            cnt_q.mode,
+            isDistinct = false,
+            cnt_q.resultId)
+        }.getOrElse { matchable = false; cnt_q }
+
+      case sum_q@AggregateExpression(Sum(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Sum] =>
+            val sum_a = 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+            if (sum_a.isDistinct != sum_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  
alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Sum] =>
+            val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+            if (sum_a.isDistinct != sum_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case _ => false
+        }.map { sum => AggregateExpression(
+            Sum(sum.toAttribute),
+            sum_q.mode,
+            isDistinct = false,
+            sum_q.resultId)
+        }.getOrElse { matchable = false; sum_q }
+
+      case max_q@AggregateExpression(Max(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Max] =>
+            val max_a = 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+            if (max_a.isDistinct != max_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  
alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Max] =>
+            val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+            if (max_a.isDistinct != max_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case _ => false
+        }.map { max => AggregateExpression(
+            Max(max.toAttribute),
+            max_q.mode,
+            isDistinct = false,
+            max_q.resultId)
+        }.getOrElse { matchable = false; max_q }
+
+      case min_q@AggregateExpression(Min(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Min] => {
+            val min_a = 
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+            if (min_a.isDistinct != min_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+          }
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  
alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Min] => {
+            val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+            if (min_a.isDistinct != min_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+          }
+          case _ => false
+        }.map { min => AggregateExpression(
+            Min(min.toAttribute),
+            min_q.mode,
+            isDistinct = false,
+            min_q.resultId)
+        }.getOrElse { matchable = false; min_q }
+
+      case other: AggregateExpression =>
+        matchable = false
+        other
+
+      case expr: Expression if !expr.isInstanceOf[AggregateFunction] =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               
alias_m(alias.toAttribute).child.semanticEquals(expr) &&
+                               !alias_m(alias.toAttribute).child
+                                 .isInstanceOf[AggregateExpression] => true
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.semanticEquals(expr) &&
+                                  
!alias_m(attr).child.isInstanceOf[AggregateExpression] => true
+          case _ => false
+        }.map(_.toAttribute)
+         .getOrElse { expr }
+    }
+
+    if (matchable) {
+      Some(matched)
+    } else {
+      None
+    }
+  }
+
+  def tryMatch(a: modular.Matchable,
+      q: modular.Matchable,
+      m: AttributeMap[Alias]): Option[modular.Matchable] = {
+    if (a.getClass == q.getClass) {
+      doMatch(a, q, m)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * (Subsumee) expression translation:
+   *
+   * The translation begins by creating a copy of the whole expression (step 
1).  Then each input
+   * column is translated in turn.
+   * To translate an input column, we first find the child block that produces 
the input column
+   * and replace the input column with the
+   * associated output column expression (step 2).  The next step is to 
replace the translated
+   * expression with its equivalent output
+   * expression at the top of the child compensation (step 3).  Then, We 
recursively translate
+   * each new input column(except input
+   * columns produced by rejoin children) until we reach the bottom of the 
child compensation
+   * (step 4).  Finally, we find an
+   * equivalent output expression in subsumer (step 5).
+   *
+   * So given a subsumee expr, the translation follows the following path:
+   *
+   * top of subsumee --> child of subsumee --> top of compensation --> bottom 
of compensation -->
+   * top of subsumer
+   *
+   * To simplify this we assume in subsumer outputList of top select 1-1 
corresponds to the
+   * outputList of groupby
+   * note that subsumer outputList is list of attributes and that of groupby 
is list of aliases
+   *
+   */
+  private def doTopSelectTranslation(exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Option[Expression] = {
+    (subsumer, subsumee, compensation) match {
+      // top selects whose children do not match exactly
+      // for simplicity, we assume outputList of subsumer is 1-1 corresponding 
to that of its
+      // immediately groupby child
+      case (
+        sel_3a@modular.Select(
+          _, _, _, _, _,
+          Seq(gb_2a@modular.GroupBy(
+            _, _, _, _, sel_2a@modular.Select(_, _, _, _, _, _, _, _, _, _), 
_, _, _)),
+          _, _, _, _),
+        sel_3q@modular.Select(
+          _, _, _, _, _, Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), 
_, _, _, _),
+        Some(gb_2c@modular.GroupBy(
+          _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _, _), _, 
_, _))
+        ) =>
+        if (sel_3q.predicateList.contains(exprE)) {
+          val expr1E = exprE.transform {
+            case attr: Attribute =>
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case alias: Alias if alias.toAttribute.semanticEquals(attr) 
=> true;
+                  case other => false
+                  }).getOrElse { attr }
+          }
+          if (expr1E.eq(exprE)) {
+            None
+          } else {
+            Some(expr1E)
+          }
+        }
+        else if (sel_3q.outputList.contains(exprE)) {
+          exprE match {
+            case attr: Attribute => // this subexpression must in subsumee 
select output list
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case a if a.toAttribute.semanticEquals(attr) => true;
+                  case other => false
+                  })
+
+            case alias: Alias =>
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case a if a.toAttribute.semanticEquals(alias.toAttribute) => 
true;
+                  case other => false
+                  })
+
+            case _ => None
+          }
+        } else if (sel_2c.predicateList.contains(exprE)) {
+          if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+              canEvaluate(exprE, subsumer)) {
+            Some(exprE)
+          } else {
+            None
+          }
+        } else if (gb_2c.predicateList.contains(exprE)) {
+          if (gb_2a.outputList.exists {
+                case a: Alias if a.child.semanticEquals(exprE) => true;
+                case _ => false
+              } || canEvaluate(exprE, subsumer)) {
+            Some(exprE)
+          } else {
+            None
+          }
+        } else if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+                   canEvaluate(exprE, subsumer)) {
+          Some(exprE)
+        } else {
+          None
+        }
+
+      case _ => None // TODO: implement this
+    }
+  }
+
+  private def isSemanticEquivalent(translatedExpr: Expression, subsumer: 
ModularPlan) = {
+    subsumer match {
+      // if subsumer has where clause, even if expr can be translated into new 
expr based on
+      // subsumer, the two may not be semantic equivalent
+      // TODO: refine this
+      case modular.Select(
+        _, _, predicateList, _, _,
+        Seq(modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _)
+        if predicateList.nonEmpty => false
+      case _ => true
+    }
+  }
+
+  /**
+   * derivable = translatable + semantic equivalent
+   *
+   * The translation method described above is also the first step in deriving 
a subsumee
+   * expression Eexp from the subsumer's output columns.  After translating
+   * Eexp to E'exp, deriavability can be established by making sure that the 
subsumer
+   * computes at its output certain necessary subexpressions of E'exp (or even 
the entire
+   * E'exp).  The problem that arises, however, is to determine the parts of 
E'exp that
+   * can/should be computed by the subsumer.
+   *
+   * In general, translation causes an expression to expand by replacing 
individual input
+   * columns with equivalent subexpressions.  Derivation is the reverse 
operation, where
+   * pieces of the translated expression are collapsed as they are computed 
along the
+   * derivation path.
+   */
+
+  def isDerivable(exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Boolean = {
+    val exprE1 = doTopSelectTranslation(exprE, exprListR, subsumee, subsumer, 
compensation)
+    exprE1 match {
+      case Some(e) => isSemanticEquivalent(e, subsumer)
+      case _ => false
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
new file mode 100644
index 0000000..184fdc1
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -0,0 +1,676 @@
+package org.apache.carbondata.mv.rewrite
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val projectPath = new File(this.getClass.getResource("/").getPath + 
"../../../../../")
+      .getCanonicalPath.replaceAll("\\\\", "/")
+    val integrationPath = s"$projectPath/integration"
+    val resourcesPath = 
s"$integrationPath/spark-common-test/src/test/resources"
+    sql(
+      """
+        | CREATE TABLE fact_table1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(
+      """
+        | CREATE TABLE fact_table2 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE fact_table3 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE fact_table4 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table4 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table4 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(
+      """
+        | CREATE TABLE fact_table5 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE fact_table6 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE 
fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
+  test("test create datamap with simple and same projection") {
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select empname, designation 
from fact_table1")
+    sql(s"rebuild datamap datamap1")
+    val df = sql("select empname,designation from fact_table1")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+    checkAnswer(df, sql("select empname,designation from fact_table2"))
+    sql(s"drop datamap datamap1")
+  }
+
+  test("test create datamap with simple and sub projection") {
+    sql("drop datamap if exists datamap2")
+    sql("create datamap datamap2 using 'mv' as select empname, designation 
from fact_table1")
+    sql(s"rebuild datamap datamap2")
+    val df = sql("select empname from fact_table1")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap2"))
+    checkAnswer(df, sql("select empname from fact_table2"))
+    sql(s"drop datamap datamap2")
+  }
+
+  test("test create datamap with simple and same projection with projection 
filter") {
+    sql("drop datamap if exists datamap3")
+    sql("create datamap datamap3 using 'mv' as select empname, designation 
from fact_table1")
+    sql(s"rebuild datamap datamap3")
+    val frame = sql("select empname, designation from fact_table1 where 
empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap3"))
+
+    checkAnswer(frame, sql("select empname, designation from fact_table2 where 
empname='shivani'"))
+    sql(s"drop datamap datamap3")
+  }
+
+  test("test create datamap with simple and sub projection with non projection 
filter") {
+    sql("create datamap datamap4 using 'mv' as select empname, designation 
from fact_table1")
+    sql(s"rebuild datamap datamap4")
+    val frame = sql("select designation from fact_table1 where 
empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap4"))
+    checkAnswer(frame, sql("select designation from fact_table2 where 
empname='shivani'"))
+    sql(s"drop datamap datamap4")
+  }
+
+  test("test create datamap with simple and sub projection with datamap 
filter") {
+    sql("create datamap datamap5 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
+    sql(s"rebuild datamap datamap5")
+    val frame = sql("select designation from fact_table1 where 
empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap5"))
+    checkAnswer(frame, sql("select designation from fact_table2 where 
empname='shivani'"))
+    sql(s"drop datamap datamap5")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter ") {
+    sql("create datamap datamap6 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
+    sql(s"rebuild datamap datamap6")
+    val frame = sql("select empname,designation from fact_table1 where 
empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap6"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2 where 
empname='shivani'"))
+    sql(s"drop datamap datamap6")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter and extra query column filter") {
+    sql("create datamap datamap7 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
+    sql(s"rebuild datamap datamap7")
+    val frame = sql(
+      "select empname,designation from fact_table1 where empname='shivani' and 
designation='SA'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap7"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2 where 
empname='shivani' and designation='SA'"))
+    sql(s"drop datamap datamap7")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter and different column filter") {
+    sql("create datamap datamap8 using 'mv' as select empname, designation 
from fact_table1 where empname='shivani'")
+    sql(s"rebuild datamap datamap8")
+    val frame = sql("select empname,designation from fact_table1 where 
designation='SA'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap8"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2 where 
designation='SA'"))
+    sql(s"drop datamap datamap8")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter on non projection column and extra column filter") {
+    sql("create datamap datamap9 using 'mv' as select empname, designation 
from fact_table1 where deptname='cloud'")
+    sql(s"rebuild datamap datamap9")
+    val frame = sql("select empname,designation from fact_table1 where 
deptname='cloud'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap9"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2 where 
deptname='cloud'"))
+    sql(s"drop datamap datamap9")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter on non projection column and no column filter") {
+    sql("create datamap datamap10 using 'mv' as select empname, designation 
from fact_table1 where deptname='cloud'")
+    sql(s"rebuild datamap datamap10")
+    val frame = sql("select empname,designation from fact_table1")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap10"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2"))
+    sql(s"drop datamap datamap10")
+  }
+
+  test("test create datamap with simple and same projection with datamap 
filter on non projection column and different column filter") {
+    sql("create datamap datamap11 using 'mv' as select empname, designation 
from fact_table1 where deptname='cloud'")
+    sql(s"rebuild datamap datamap11")
+    val frame = sql("select empname,designation from fact_table1 where 
designation='SA'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap11"))
+    checkAnswer(frame, sql("select empname,designation from fact_table2 where 
designation='SA'"))
+    sql(s"drop datamap datamap11")
+  }
+
+  test("test create datamap with simple and same group by query") {
+    sql("drop datamap if exists datamap12")
+    sql("create datamap datamap12 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap12")
+    val frame = sql("select empname, sum(utilization) from fact_table1 group 
by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap12"))
+    checkAnswer(frame, sql("select empname, sum(utilization) from fact_table2 
group by empname"))
+    sql(s"drop datamap datamap12")
+  }
+
+  test("test create datamap with simple and sub group by query") {
+    sql("drop datamap if exists datamap13")
+    sql("create datamap datamap13 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap13")
+    val frame = sql("select sum(utilization) from fact_table1 group by 
empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap13"))
+    checkAnswer(frame, sql("select sum(utilization) from fact_table2 group by 
empname"))
+    sql(s"drop datamap datamap13")
+  }
+
+  test("test create datamap with simple and sub group by query with filter on 
query") {
+    sql("drop datamap if exists datamap14")
+    sql("create datamap datamap14 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap14")
+    val frame = sql(
+      "select empname,sum(utilization) from fact_table1 group by empname 
having empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap14"))
+    checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
where empname='shivani' group by empname"))
+    sql(s"drop datamap datamap14")
+  }
+
+  test("test create datamap with simple and sub group and sub projection by 
query with filter on query") {
+    sql("drop datamap if exists datamap32")
+    sql("create datamap datamap32 using 'mv' as select empname, 
sum(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap32")
+    val frame = sql(
+      "select empname, sum(utilization) from fact_table1 group by empname 
having empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap32"))
+    checkAnswer(frame, sql( "select empname, sum(utilization) from fact_table2 
group by empname having empname='shivani'"))
+    sql(s"drop datamap datamap32")
+  }
+
+  test("test create datamap with simple and sub group by query with filter on 
datamap") {
+    sql("create datamap datamap15 using 'mv' as select empname, 
sum(utilization) from fact_table1 where empname='shivani' group by empname")
+    sql(s"rebuild datamap datamap15")
+    val frame = sql(
+      "select empname,sum(utilization) from fact_table1 where 
empname='shivani' group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap15"))
+    checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
where empname='shivani' group by empname"))
+    sql(s"drop datamap datamap15")
+  }
+
+  test("test create datamap with simple and sub group by query with filter on 
datamap and no filter on query") {
+    sql("create datamap datamap16 using 'mv' as select empname, 
sum(utilization) from fact_table1 where empname='shivani' group by empname")
+    sql(s"rebuild datamap datamap16")
+    val frame = sql("select empname,sum(utilization) from fact_table1 group by 
empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap16"))
+    checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 
group by empname"))
+    sql(s"drop datamap datamap16")
+  }
+
+  test("test create datamap with simple and same group by with expression") {
+    sql("create datamap datamap17 using 'mv' as select empname, sum(CASE WHEN 
utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap17")
+    val frame = sql(
+      "select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) 
from fact_table1 group" +
+      " by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap17"))
+    checkAnswer(frame, sql("select empname, sum(CASE WHEN utilization=27 THEN 
deptno ELSE 0 END) from fact_table2 group" +
+                           " by empname"))
+    sql(s"drop datamap datamap17")
+  }
+
+  test("test create datamap with simple and sub group by with expression") {
+    sql("drop datamap if exists datamap18")
+    sql("create datamap datamap18 using 'mv' as select empname, sum(CASE WHEN 
utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap18")
+    val frame = sql(
+      "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap18"))
+    checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 group by empname"))
+    sql(s"drop datamap datamap18")
+  }
+
+  test("test create datamap with simple and sub count group by with 
expression") {
+    sql("drop datamap if exists datamap19")
+    sql("create datamap datamap19 using 'mv' as select empname, count(CASE 
WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap19")
+    val frame = sql(
+      "select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap19"))
+    checkAnswer(frame, sql("select count(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 group by empname"))
+    sql(s"drop datamap datamap19")
+  }
+
+  test("test create datamap with simple and sub group by with expression and 
filter on query") {
+    sql("drop datamap if exists datamap20")
+    sql("create datamap datamap20 using 'mv' as select empname, sum(CASE WHEN 
utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap20")
+    val frame = sql(
+      "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from 
fact_table1 where " +
+      "empname='shivani' group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap20"))
+    checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno 
ELSE 0 END) from fact_table2 where " +
+                           "empname='shivani' group by empname"))
+    sql(s"drop datamap datamap20")
+  }
+
+  test("test create datamap with simple join") {
+    sql("drop datamap if exists datamap21")
+    sql("create datamap datamap21 using 'mv' as select t1.empname as c1, 
t2.designation, t2.empname as c2 from fact_table1 t1,fact_table2 t2 where 
t1.empname = t2.empname")
+    sql(s"rebuild datamap datamap21")
+    val frame = sql(
+      "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 
t2 where t1.empname = t2.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap21"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
+    sql(s"drop datamap datamap21")
+  }
+
+  test("test create datamap with simple join and filter on query") {
+    sql("drop datamap if exists datamap22")
+    sql("create datamap datamap22 using 'mv' as select t1.empname, 
t2.designation, t2.empname from fact_table1 t1,fact_table2 t2 where t1.empname 
= t2.empname")
+    sql(s"rebuild datamap datamap22")
+    val frame = sql(
+      "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = " +
+      "t2.empname and t1.empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap22"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = " +
+                           "t2.empname and t1.empname='shivani'"))
+    sql(s"drop datamap datamap22")
+  }
+
+
+  test("test create datamap with simple join and filter on query and datamap") 
{
+    sql("drop datamap if exists datamap23")
+    sql("create datamap datamap23 using 'mv' as select t1.empname, 
t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname 
and t1.empname='shivani'")
+    sql(s"rebuild datamap datamap23")
+    val frame = sql(
+      "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = " +
+      "t2.empname and t1.empname='shivani'")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap23"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = " +
+                           "t2.empname and t1.empname='shivani'"))
+    sql(s"drop datamap datamap23")
+  }
+
+  test("test create datamap with simple join and filter on datamap and no 
filter on query") {
+    sql("drop datamap if exists datamap24")
+    sql("create datamap datamap24 using 'mv' as select t1.empname, 
t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname 
and t1.empname='shivani'")
+    sql(s"rebuild datamap datamap24")
+    val frame = sql(
+      "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 
where t1.empname = t2.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap24"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
+    sql(s"drop datamap datamap24")
+  }
+
+  test("test create datamap with multiple join") {
+    sql("drop datamap if exists datamap25")
+    sql("create datamap datamap25 using 'mv' as select t1.empname as c1, 
t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 t3  where 
t1.empname = t2.empname and t1.empname=t3.empname")
+    sql(s"rebuild datamap datamap25")
+    val frame = sql(
+      "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 
t2 where t1.empname = t2.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap25"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 
t1,fact_table5 t2 where t1.empname = t2.empname"))
+    sql(s"drop datamap datamap25")
+  }
+
+  ignore("test create datamap with simple join on datamap and multi join on 
query") {
+    sql("create datamap datamap26 using 'mv' as select t1.empname, 
t2.designation, t2.empname from fact_table1 t1,fact_table2 t2 where t1.empname 
= t2.empname")
+    sql(s"rebuild datamap datamap26")
+    val frame = sql(
+      "select t1.empname, t2.designation, t2.empname from fact_table1 
t1,fact_table2 t2,fact_table3 " +
+      "t3  where t1.empname = t2.empname and t1.empname=t3.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap26"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, t2.empname from 
fact_table4 t1,fact_table5 t2,fact_table6 " +
+                           "t3  where t1.empname = t2.empname and 
t1.empname=t3.empname"))
+    sql(s"drop datamap datamap26")
+  }
+
+  test("test create datamap with join with group by") {
+    sql("create datamap datamap27 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where 
t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap27")
+    val frame = sql(
+      "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
+      "where t1.empname = t2.empname group by t1.empname, t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap27"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1,fact_table5 t2  " +
+                           "where t1.empname = t2.empname group by t1.empname, 
t2.designation"))
+    sql(s"drop datamap datamap27")
+  }
+
+  test("test create datamap with join with group by and sub projection") {
+    sql("drop datamap if exists datamap28")
+    sql("create datamap datamap28 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where 
t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap28")
+    val frame = sql(
+      "select t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  where " +
+      "t1.empname = t2.empname group by t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap28"))
+    checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from 
fact_table4 t1,fact_table5 t2  where " +
+                           "t1.empname = t2.empname group by t2.designation"))
+    sql(s"drop datamap datamap28")
+  }
+
+  test("test create datamap with join with group by and sub projection with 
filter") {
+    sql("drop datamap if exists datamap29")
+    sql("create datamap datamap29 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where 
t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap29")
+    val frame = sql(
+      "select t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  where " +
+      "t1.empname = t2.empname and t1.empname='shivani' group by 
t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap29"))
+    checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from 
fact_table4 t1,fact_table5 t2  where " +
+                           "t1.empname = t2.empname and t1.empname='shivani' 
group by t2.designation"))
+    sql(s"drop datamap datamap29")
+  }
+
+  test("test create datamap with join with group by with filter") {
+    sql("drop datamap if exists datamap30")
+    sql("create datamap datamap30 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2  where 
t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap30")
+    val frame = sql(
+      "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
+      "where t1.empname = t2.empname and t2.designation='SA' group by 
t1.empname, t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap30"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1,fact_table5 t2  " +
+                           "where t1.empname = t2.empname and 
t2.designation='SA' group by t1.empname, t2.designation"))
+    sql(s"drop datamap datamap30")
+  }
+
+  test("test create datamap with expression on projection") {
+    sql(s"drop datamap if exists datamap31")
+    sql("create datamap datamap31 using 'mv' as select empname, designation, 
utilization, projectcode from fact_table1 ")
+    sql(s"rebuild datamap datamap31")
+    val frame = sql(
+      "select empname, designation, utilization+projectcode from fact_table1")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap31"))
+    checkAnswer(frame, sql("select empname, designation, 
utilization+projectcode from fact_table2"))
+    sql(s"drop datamap datamap31")
+  }
+
+  test("test create datamap with simple and sub group by query and count agg") 
{
+    sql(s"drop datamap if exists datamap32")
+    sql("create datamap datamap32 using 'mv' as select empname, 
count(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap32")
+    val frame = sql("select empname,count(utilization) from fact_table1 where 
empname='shivani' group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap32"))
+    checkAnswer(frame, sql("select empname,count(utilization) from fact_table2 
where empname='shivani' group by empname"))
+    sql(s"drop datamap datamap32")
+  }
+
+  ignore("test create datamap with simple and sub group by query and avg agg") 
{
+    sql(s"drop datamap if exists datamap33")
+    sql("create datamap datamap33 using 'mv' as select empname, 
avg(utilization) from fact_table1 group by empname")
+    sql(s"rebuild datamap datamap33")
+    val frame = sql("select empname,avg(utilization) from fact_table1 where 
empname='shivani' group by empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap33"))
+    checkAnswer(frame, sql("select empname,avg(utilization) from fact_table2 
where empname='shivani' group by empname"))
+    sql(s"drop datamap datamap33")
+  }
+
+  test("test create datamap with left join with group by") {
+    sql("drop datamap if exists datamap34")
+    sql("create datamap datamap34 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap34")
+    val frame = sql(
+      "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
+      "on t1.empname = t2.empname group by t1.empname, t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap34"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname group by t1.empname, 
t2.designation"))
+    sql(s"drop datamap datamap34")
+  }
+
+  test("test create datamap with simple and group by query with filter on 
datamap but not on projection") {
+    sql("create datamap datamap35 using 'mv' as select designation, 
sum(utilization) from fact_table1 where empname='shivani' group by designation")
+    sql(s"rebuild datamap datamap35")
+    val frame = sql(
+      "select designation, sum(utilization) from fact_table1 where 
empname='shivani' group by designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap35"))
+    checkAnswer(frame, sql("select designation, sum(utilization) from 
fact_table2 where empname='shivani' group by designation"))
+    sql(s"drop datamap datamap35")
+  }
+
+  test("test create datamap with simple and sub group by query with filter on 
datamap but not on projection") {
+    sql("create datamap datamap36 using 'mv' as select designation, 
sum(utilization) from fact_table1 where empname='shivani' group by designation")
+    sql(s"rebuild datamap datamap36")
+    val frame = sql(
+      "select sum(utilization) from fact_table1 where empname='shivani' group 
by designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap36"))
+    checkAnswer(frame, sql("select sum(utilization) from fact_table2 where 
empname='shivani' group by designation"))
+    sql(s"drop datamap datamap36")
+  }
+
+  test("test create datamap with agg push join with sub group by ") {
+    sql("drop datamap if exists datamap37")
+    sql("create datamap datamap37 using 'mv' as select empname, designation, 
sum(utilization) from fact_table1 group by empname, designation")
+    sql(s"rebuild datamap datamap37")
+    val frame = sql(
+      "select t1.empname, sum(t1.utilization) from fact_table1 t1,fact_table2 
t2  " +
+      "where t1.empname = t2.empname group by t1.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap37"))
+    checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table3 t1,fact_table4 t2  " +
+                           "where t1.empname = t2.empname group by t1.empname, 
t1.designation"))
+    sql(s"drop datamap datamap37")
+  }
+
+  test("test create datamap with agg push join with group by ") {
+    sql("drop datamap if exists datamap38")
+    sql("create datamap datamap38 using 'mv' as select empname, designation, 
sum(utilization) from fact_table1 group by empname, designation")
+    sql(s"rebuild datamap datamap38")
+    val frame = sql(
+      "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
+      "where t1.empname = t2.empname group by t1.empname,t1.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap38"))
+    checkAnswer(frame, sql("select t1.empname,t1.designation, 
sum(t1.utilization) from fact_table3 t1,fact_table4 t2  " +
+                           "where t1.empname = t2.empname group by t1.empname, 
t1.designation"))
+    sql(s"drop datamap datamap38")
+  }
+
+  test("test create datamap with agg push join with group by with filter") {
+    sql("drop datamap if exists datamap39")
+    sql("create datamap datamap39 using 'mv' as select empname, designation, 
sum(utilization) from fact_table1 group by empname, designation ")
+    sql(s"rebuild datamap datamap39")
+    val frame = sql(
+      "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 
t1,fact_table2 t2  " +
+      "where t1.empname = t2.empname and t1.empname='shivani' group by 
t1.empname,t1.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap39"))
+    checkAnswer(frame, sql("select t1.empname,t1.designation, 
sum(t1.utilization) from fact_table3 t1,fact_table4 t2  " +
+                           "where t1.empname = t2.empname and 
t1.empname='shivani' group by t1.empname, t1.designation"))
+    sql(s"drop datamap datamap39")
+  }
+
+  test("test create datamap with more agg push join with group by with 
filter") {
+    sql("drop datamap if exists datamap40")
+    sql("create datamap datamap40 using 'mv' as select empname, designation, 
sum(utilization), count(utilization) from fact_table1 group by empname, 
designation ")
+    sql(s"rebuild datamap datamap40")
+    val frame = sql(
+      "select t1.empname, t1.designation, 
sum(t1.utilization),count(t1.utilization) from fact_table1 t1,fact_table2 t2  " 
+
+      "where t1.empname = t2.empname and t1.empname='shivani' group by 
t1.empname,t1.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap40"))
+    checkAnswer(frame, sql("select t1.empname, t1.designation, 
sum(t1.utilization),count(t1.utilization) from fact_table3 t1,fact_table4 t2  " 
+
+                           "where t1.empname = t2.empname and 
t1.empname='shivani' group by t1.empname,t1.designation"))
+    sql(s"drop datamap datamap40")
+  }
+
+  test("test create datamap with left join with group by with filter") {
+    sql("drop datamap if exists datamap41")
+    sql("create datamap datamap41 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap41")
+    val frame = sql(
+      "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
+      "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname, t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap41"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname, t2.designation"))
+    sql(s"drop datamap datamap41")
+  }
+
+  test("test create datamap with left join with sub group by") {
+    sql("drop datamap if exists datamap42")
+    sql("create datamap datamap42 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap42")
+    val frame = sql(
+      "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
+      "on t1.empname = t2.empname group by t1.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap42"))
+    checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname group by t1.empname"))
+    sql(s"drop datamap datamap42")
+  }
+
+  test("test create datamap with left join with sub group by with filter") {
+    sql("drop datamap if exists datamap43")
+    sql("create datamap datamap43 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap43")
+    val frame = sql(
+      "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
+      "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap43"))
+    checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname"))
+    sql(s"drop datamap datamap43")
+  }
+
+  test("test create datamap with left join with sub group by with filter on 
mv") {
+    sql("drop datamap if exists datamap44")
+    sql("create datamap datamap44 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 
t2  on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, 
t2.designation")
+    sql(s"rebuild datamap datamap44")
+    val frame = sql(
+      "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join 
fact_table2 t2  " +
+      "on t1.empname = t2.empname where t1.empname='shivani' group by 
t1.empname")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap44"))
+    checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from 
fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname where 
t1.empname='shivani' group by t1.empname"))
+    sql(s"drop datamap datamap44")
+  }
+
+  test("test create datamap with left join on query and equi join on mv with 
group by with filter") {
+    sql("drop datamap if exists datamap45")
+
+    sql("create datamap datamap45 using 'mv' as select t1.empname, 
t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2  on 
t1.empname = t2.empname group by t1.empname, t2.designation")
+    sql(s"rebuild datamap datamap45")
+    // During spark optimizer it converts the left outer join queries with 
equi join if any filter present on right side table
+    val frame = sql(
+      "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 
t1 left join fact_table2 t2  " +
+      "on t1.empname = t2.empname where t2.designation='SA' group by 
t1.empname, t2.designation")
+    val analyzed = frame.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap45"))
+    checkAnswer(frame, sql("select t1.empname, t2.designation, 
sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2  " +
+                           "on t1.empname = t2.empname where 
t2.designation='SA' group by t1.empname, t2.designation"))
+    sql(s"drop datamap datamap45")
+  }
+
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+
+  def drop(): Unit = {
+    sql("drop table IF EXISTS fact_table1")
+    sql("drop table IF EXISTS fact_table2")
+    sql("drop table IF EXISTS fact_table3")
+    sql("drop table IF EXISTS fact_table4")
+    sql("drop table IF EXISTS fact_table5")
+    sql("drop table IF EXISTS fact_table6")
+  }
+
+  override def afterAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
new file mode 100644
index 0000000..f8eb11f
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala
@@ -0,0 +1,167 @@
+package org.apache.carbondata.mv.rewrite
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._
+
+class MVSampleTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val projectPath = new File(this.getClass.getResource("/").getPath + 
"../../../../../")
+      .getCanonicalPath.replaceAll("\\\\", "/")
+    val integrationPath = s"$projectPath/integration"
+    val resourcesPath = 
s"$integrationPath/spark-common-test/src/test/resources"
+    sql("drop database if exists sample cascade")
+    sql("create database sample")
+    sql("use sample")
+
+    createTables.map(sql)
+
+  }
+
+  def createTables: Seq[String] = {
+    Seq[String](
+      s"""
+         |CREATE TABLE Fact (
+         |  `tid`     int,
+         |  `fpgid`   int,
+         |  `flid`    int,
+         |  `date`    timestamp,
+         |  `faid`    int,
+         |  `price`   double,
+         |  `qty`     int,
+         |  `disc`    string
+         |)
+         |STORED BY 'org.apache.carbondata.format'
+        """.stripMargin.trim,
+      s"""
+         |CREATE TABLE Dim (
+         |  `lid`     int,
+         |  `city`    string,
+         |  `state`   string,
+         |  `country` string
+         |)
+         |STORED BY 'org.apache.carbondata.format'
+        """.stripMargin.trim,
+      s"""
+         |CREATE TABLE Item (
+         |  `i_item_id`     int,
+         |  `i_item_sk`     int
+         |)
+         |STORED BY 'org.apache.carbondata.format'
+        """.stripMargin.trim
+    )
+  }
+
+
+  test("test create datamap with sampleTestCases case_1") {
+    sql(s"drop datamap if exists datamap_sm1")
+    sql(s"create datamap datamap_sm1 using 'mv' as ${sampleTestCases(0)._2}")
+    sql(s"rebuild datamap datamap_sm1")
+    val df = sql(sampleTestCases(0)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap_sm1"))
+    sql(s"drop datamap datamap_sm1")
+  }
+
+  test("test create datamap with sampleTestCases case_3") {
+    sql(s"drop datamap if exists datamap_sm2")
+    sql(s"create datamap datamap_sm2 using 'mv' as ${sampleTestCases(2)._2}")
+    sql(s"rebuild datamap datamap_sm2")
+    val df = sql(sampleTestCases(2)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm2"))
+    sql(s"drop datamap datamap_sm2")
+  }
+
+  test("test create datamap with sampleTestCases case_4") {
+    sql(s"drop datamap if exists datamap_sm3")
+    sql(s"create datamap datamap_sm3 using 'mv' as ${sampleTestCases(3)._2}")
+    sql(s"rebuild datamap datamap_sm3")
+    val df = sql(sampleTestCases(3)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm3"))
+    sql(s"drop datamap datamap_sm3")
+  }
+
+  test("test create datamap with sampleTestCases case_5") {
+    sql(s"drop datamap if exists datamap_sm4")
+    sql(s"create datamap datamap_sm4 using 'mv' as ${sampleTestCases(4)._2}")
+    sql(s"rebuild datamap datamap_sm4")
+    val df = sql(sampleTestCases(4)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm4"))
+    sql(s"drop datamap datamap_sm4")
+  }
+
+  test("test create datamap with sampleTestCases case_6") {
+    sql(s"drop datamap if exists datamap_sm5")
+    sql(s"create datamap datamap_sm5 using 'mv' as ${sampleTestCases(5)._2}")
+    sql(s"rebuild datamap datamap_sm5")
+    val df = sql(sampleTestCases(5)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(!verifyMVDataMap(analyzed, "datamap_sm5"))
+    sql(s"drop datamap datamap_sm5")
+  }
+
+  test("test create datamap with sampleTestCases case_7") {
+    sql(s"drop datamap if exists datamap_sm6")
+    sql(s"create datamap datamap_sm6 using 'mv' as ${sampleTestCases(6)._2}")
+    sql(s"rebuild datamap datamap_sm6")
+    val df = sql(sampleTestCases(6)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm6"))
+    sql(s"drop datamap datamap_sm6")
+  }
+
+  test("test create datamap with sampleTestCases case_8") {
+    sql(s"drop datamap if exists datamap_sm7")
+    sql(s"create datamap datamap_sm7 using 'mv' as ${sampleTestCases(7)._2}")
+    sql(s"rebuild datamap datamap_sm7")
+    val df = sql(sampleTestCases(7)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm7"))
+    sql(s"drop datamap datamap_sm7")
+  }
+
+  test("test create datamap with sampleTestCases case_9") {
+    sql(s"drop datamap if exists datamap_sm8")
+    sql(s"create datamap datamap_sm8 using 'mv' as ${sampleTestCases(8)._2}")
+    sql(s"rebuild datamap datamap_sm8")
+    val df = sql(sampleTestCases(8)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_sm8"))
+    sql(s"drop datamap datamap_sm8")
+  }
+
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+
+  def drop(): Unit = {
+    sql("use default")
+    sql("drop database if exists sample cascade")
+  }
+
+  override def afterAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
new file mode 100644
index 0000000..473b338
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala
@@ -0,0 +1,146 @@
+package org.apache.carbondata.mv.rewrite
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.mv.rewrite.matching.TestTPCDS_1_4_Batch._
+import org.apache.carbondata.mv.testutil.Tpcds_1_4_Tables.tpcds1_4Tables
+
+class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val projectPath = new File(this.getClass.getResource("/").getPath + 
"../../../../../")
+      .getCanonicalPath.replaceAll("\\\\", "/")
+    val integrationPath = s"$projectPath/integration"
+    val resourcesPath = 
s"$integrationPath/spark-common-test/src/test/resources"
+    sql("drop database if exists tpcds cascade")
+    sql("create database tpcds")
+    sql("use tpcds")
+
+    tpcds1_4Tables.foreach { create_table =>
+      sql(create_table)
+    }
+
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_1") {
+    sql(s"drop datamap if exists datamap_tpcds1")
+    sql(s"create datamap datamap_tpcds1 using 'mv' as 
${tpcds_1_4_testCases(0)._2}")
+    sql(s"rebuild datamap datamap_tpcds1")
+    val df = sql(tpcds_1_4_testCases(0)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds1"))
+    sql(s"drop datamap datamap_tpcds1")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_3") {
+    sql(s"drop datamap if exists datamap_tpcds3")
+    sql(s"create datamap datamap_tpcds3 using 'mv' as 
${tpcds_1_4_testCases(2)._2}")
+    sql(s"rebuild datamap datamap_tpcds3")
+    val df = sql(tpcds_1_4_testCases(2)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds3"))
+    sql(s"drop datamap datamap_tpcds3")
+  }
+
+  test("test create datamap with tpcds_1_4_testCases case_4") {
+    sql(s"drop datamap if exists datamap_tpcds4")
+    sql(s"create datamap datamap_tpcds4 using 'mv' as 
${tpcds_1_4_testCases(3)._2}")
+    sql(s"rebuild datamap datamap_tpcds4")
+    val df = sql(tpcds_1_4_testCases(3)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds4"))
+    sql(s"drop datamap datamap_tpcds4")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_5") {
+    sql(s"drop datamap if exists datamap_tpcds5")
+    sql(s"create datamap datamap_tpcds5 using 'mv' as 
${tpcds_1_4_testCases(4)._2}")
+    sql(s"rebuild datamap datamap_tpcds5")
+    val df = sql(tpcds_1_4_testCases(4)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds5"))
+    sql(s"drop datamap datamap_tpcds5")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_6") {
+    sql(s"drop datamap if exists datamap_tpcds6")
+    sql(s"create datamap datamap_tpcds6 using 'mv' as 
${tpcds_1_4_testCases(5)._2}")
+    sql(s"rebuild datamap datamap_tpcds6")
+    val df = sql(tpcds_1_4_testCases(5)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds6"))
+    sql(s"drop datamap datamap_tpcds6")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_8") {
+    sql(s"drop datamap if exists datamap_tpcds8")
+    sql(s"create datamap datamap_tpcds8 using 'mv' as 
${tpcds_1_4_testCases(7)._2}")
+    sql(s"rebuild datamap datamap_tpcds8")
+    val df = sql(tpcds_1_4_testCases(7)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds8"))
+    sql(s"drop datamap datamap_tpcds8")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_11") {
+    sql(s"drop datamap if exists datamap_tpcds11")
+    sql(s"create datamap datamap_tpcds11 using 'mv' as 
${tpcds_1_4_testCases(10)._2}")
+    sql(s"rebuild datamap datamap_tpcds11")
+    val df = sql(tpcds_1_4_testCases(10)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds11"))
+    sql(s"drop datamap datamap_tpcds11")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_15") {
+    sql(s"drop datamap if exists datamap_tpcds15")
+    sql(s"create datamap datamap_tpcds15 using 'mv' as 
${tpcds_1_4_testCases(14)._2}")
+    sql(s"rebuild datamap datamap_tpcds15")
+    val df = sql(tpcds_1_4_testCases(14)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds15"))
+    sql(s"drop datamap datamap_tpcds15")
+  }
+
+  ignore("test create datamap with tpcds_1_4_testCases case_16") {
+    sql(s"drop datamap if exists datamap_tpcds16")
+    sql(s"create datamap datamap_tpcds16 using 'mv' as 
${tpcds_1_4_testCases(15)._2}")
+    sql(s"rebuild datamap datamap_tpcds16")
+    val df = sql(tpcds_1_4_testCases(15)._3)
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap_tpcds16"))
+    sql(s"drop datamap datamap_tpcds16")
+  }
+
+
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+
+
+  def drop(): Unit = {
+    sql("use default")
+    sql("drop database if exists tpcds cascade")
+  }
+
+  override def afterAll {
+    drop()
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+  }
+}

Reply via email to