diff --git 
new file mode 100644
index 0000000..89813b5
--- /dev/null
@@ -0,0 +1,247 @@
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+class MVTpchTestCase extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    drop()
+    val projectPath = new File(this.getClass.getResource("/").getPath + 
+      .getCanonicalPath.replaceAll("\\\\", "/")
+    val integrationPath = s"$projectPath/integration"
+    val resourcesPath = 
+    sql(s"""create table if not exists LINEITEM(  L_SHIPDATE date,  L_SHIPMODE 
string,  L_SHIPINSTRUCT string,  L_RETURNFLAG string,  L_RECEIPTDATE date,  
L_QUANTITY double,  L_EXTENDEDPRICE double,  L_DISCOUNT double,  L_TAX double,  
+    sql(s"""create table if not exists ORDERS(  O_ORDERDATE date,  
string,  O_TOTALPRICE double,  O_CLERK string,  O_SHIPPRIORITY int,  O_COMMENT 
string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""create table if not exists CUSTOMER(  C_MKTSEGMENT string,  
C_NATIONKEY string,  C_CUSTKEY string,  C_NAME string,  C_ADDRESS string,  
C_PHONE string,  C_ACCTBAL double,  C_COMMENT string) STORED BY 
+    sql(s"""create table if not exists REGION(  R_NAME string,  R_REGIONKEY 
string,  R_COMMENT string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""create table if not exists NATION (  N_NAME string,  N_NATIONKEY 
string,  N_REGIONKEY string,  N_COMMENT  string) STORED BY 
+    sql(s"""create table if not exists SUPPLIER(S_COMMENT string,S_SUPPKEY 
string,S_NAME string, S_ADDRESS string, S_NATIONKEY string, S_PHONE string, 
S_ACCTBAL double) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""load data inpath "$resourcesPath/tpch/lineitem.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/orders.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/customers.csv" into  table 
+    sql(s"""load data inpath "$resourcesPath/tpch/region.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/nation.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/supplier.csv" into table 
+    sql(s"""create table if not exists LINEITEM1(  L_SHIPDATE date,  
int,  L_QUANTITY double,  L_EXTENDEDPRICE double,  L_DISCOUNT double,  L_TAX 
double,  L_LINESTATUS string,  L_COMMITDATE date,  L_COMMENT  string) STORED BY 
+    sql(s"""create table if not exists ORDERS1(  O_ORDERDATE date,  
string,  O_TOTALPRICE double,  O_CLERK string,  O_SHIPPRIORITY int,  O_COMMENT 
string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""create table if not exists CUSTOMER1(  C_MKTSEGMENT string,  
C_NATIONKEY string,  C_CUSTKEY string,  C_NAME string,  C_ADDRESS string,  
C_PHONE string,  C_ACCTBAL double,  C_COMMENT string) STORED BY 
+    sql(s"""create table if not exists REGION1(  R_NAME string,  R_REGIONKEY 
string,  R_COMMENT string) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""create table if not exists NATION1 (  N_NAME string,  N_NATIONKEY 
string,  N_REGIONKEY string,  N_COMMENT  string) STORED BY 
+    sql(s"""create table if not exists SUPPLIER1(S_COMMENT string,S_SUPPKEY 
string,S_NAME string, S_ADDRESS string, S_NATIONKEY string, S_PHONE string, 
S_ACCTBAL double) STORED BY 'org.apache.carbondata.format'""")
+    sql(s"""load data inpath "$resourcesPath/tpch/lineitem.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/orders.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/customers.csv" into  table 
+    sql(s"""load data inpath "$resourcesPath/tpch/region.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/nation.csv" into table 
+    sql(s"""load data inpath "$resourcesPath/tpch/supplier.csv" into table 
+  }
+  test("test create datamap with tpch1") {
+    sql(s"drop datamap if exists datamap1")
+    sql("create datamap datamap1 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem group by l_returnflag, l_linestatus,l_shipdate")
+    sql(s"rebuild datamap datamap1")
+    val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem where l_shipdate <= date('1998-09-02') group by 
l_returnflag, l_linestatus order by l_returnflag, l_linestatus")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap1"))
+//    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as 
count_order from lineitem1 where l_shipdate <= date('1998-09-02') group by 
l_returnflag, l_linestatus order by l_returnflag, l_linestatus"))
+    sql(s"drop datamap datamap1")
+  }
+  test("test create datamap with tpch1 with order") {
+    sql(s"drop datamap if exists datamap2")
+    sql("create datamap datamap2 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group 
by l_returnflag, l_linestatus,l_shipdate order by l_returnflag, l_linestatus")
+    sql(s"rebuild datamap datamap2")
+    val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap2"))
+//    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem1 
where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus 
order by l_returnflag, l_linestatus"))
+    sql(s"drop datamap datamap2")
+  }
+  test("test create datamap with tpch1 with sub group by") {
+    sql(s"drop datamap if exists datamap3")
+    sql("create datamap datamap3 using 'mv' as select l_returnflag, 
l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as 
sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, 
sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group 
by l_returnflag, l_linestatus,l_shipdate")
+    sql(s"rebuild datamap datamap3")
+    val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as 
sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap3"))
+//    checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) 
as sum_qty, sum(l_extendedprice) as sum_base_price, 
sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem1 where 
l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by 
l_returnflag, l_linestatus"))
+    sql(s"drop datamap datamap3")
+  }
+  ignore("test create datamap with tpch3") {
+    sql(s"drop datamap if exists datamap4")
+    sql("create datamap datamap4 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority 
from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = 
o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and 
l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, 
+    sql(s"rebuild datamap datamap4")
+    val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap4"))
+//    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
+    sql(s"drop datamap datamap4")
+  }
+  test("test create datamap with tpch3 with no filters on mv") {
+    sql(s"drop datamap if exists datamap5")
+    sql("create datamap datamap5 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, 
o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where 
c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, 
o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
+    sql(s"rebuild datamap datamap5")
+    val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap5"))
+//    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
+    sql(s"drop datamap datamap5")
+  }
+  ignore("test create datamap with tpch3 with filters on mv and all filter 
columns on projection") {
+    sql(s"drop datamap if exists datamap5")
+    sql("create datamap datamap5 using 'mv' as select l_orderkey, 
sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, 
o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate")
+    sql(s"rebuild datamap datamap5")
+    val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) 
as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where 
c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey 
and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group 
by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate 
limit 10")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap5"))
+//    checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - 
l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, 
lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and 
l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > 
date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by 
revenue desc, o_orderdate limit 10"))
+    sql(s"drop datamap datamap5")
+  }
+  ignore("test create datamap with tpch4 (core issue)") {
+    sql(s"drop datamap if exists datamap6")
+    sql("create datamap datamap6 using 'mv' as select o_orderpriority, 
count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and 
o_orderdate < date('1993-10-01') and exists ( select * from lineitem where 
l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by 
o_orderpriority order by o_orderpriority")
+    sql(s"rebuild datamap datamap6")
+    val df = sql("select o_orderpriority, count(*) as order_count from orders 
where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') 
and exists ( select * from lineitem where l_orderkey = o_orderkey and 
l_commitdate < l_receiptdate ) group by o_orderpriority order by 
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap6"))
+//    checkAnswer(df, sql("select o_orderpriority, count(*) as order_count 
from orders1 where o_orderdate >= date('1993-07-01') and o_orderdate < 
date('1993-10-01') and exists ( select * from lineitem1 where l_orderkey = 
o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by 
+    sql(s"drop datamap datamap6")
+  }
+  ignore("test create datamap with tpch5") {
+    sql(s"drop datamap if exists datamap7")
+    sql("create datamap datamap7 using 'mv' as select n_name, 
sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, 
lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = 
o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and 
s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and 
o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by 
+    sql(s"rebuild datamap datamap7")
+    val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as 
revenue from customer, orders, lineitem, supplier, nation, region where 
c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and 
c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = 
r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and 
o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap7"))
+//    checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - 
l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, 
region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = 
s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and 
n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= 
date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order 
by revenue desc"))
+    sql(s"drop datamap datamap7")
+  }
+  test("test create datamap with tpch5 with no filters on mv") {
+    sql(s"drop datamap if exists datamap8")
+    sql("create datamap datamap8 using 'mv' as select 
n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue 
from customer, orders, lineitem, supplier, nation, region where c_custkey = 
o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey 
= s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey  
group by n_name,o_orderdate,r_name")
+    sql(s"rebuild datamap datamap8")
+    val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as 
revenue from customer, orders, lineitem, supplier, nation, region where 
c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and 
c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = 
r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and 
o_orderdate < date('1995-01-01') group by n_name order by revenue desc")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap8"))
+//    checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - 
l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, 
region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = 
s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and 
n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= 
date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order 
by revenue desc"))
+    sql(s"drop datamap datamap8")
+  }
+  test("test create datamap with tpch6") {
+    sql(s"drop datamap if exists datamap9")
+    sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * 
l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and 
l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and 
l_quantity < 24")
+    sql(s"rebuild datamap datamap9")
+    val df = sql("select sum(l_extendedprice * l_discount) as revenue from 
lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap9"))
+    assert(verifyAgg(analyzed))
+//    checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue 
from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24"))
+    sql(s"drop datamap datamap9")
+  }
+  test("test create datamap with tpch6 with no filters on mv") {
+    sql(s"drop datamap if exists datamap10")
+    sql("create datamap datamap10 using 'mv' as select sum(l_extendedprice * 
l_discount) as revenue,l_shipdate,l_discount,l_quantity from lineitem group by 
+    sql(s"rebuild datamap datamap10")
+    val df = sql("select sum(l_extendedprice * l_discount) as revenue from 
lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap10"))
+    assert(verifyAgg(analyzed))
+//    checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue 
from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < 
date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24"))
+    sql(s"drop datamap datamap10")
+  }
+  test("test create datamap with tpch7 part of query1") {
+    sql(s"drop datamap if exists datamap11")
+    sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , 
l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+    sql(s"rebuild datamap datamap11")
+    val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - 
l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where 
s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and 
s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name 
= 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('1996-12-31')")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap11"))
+//    checkAnswer(df, sql("select year(l_shipdate) as l_year, l_extendedprice 
* (1 - l_discount) as volume from supplier1,lineitem1,orders1,customer1,nation1 
n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and 
( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('1996-12-31')"))
+    sql(s"drop datamap datamap11")
+  }
+  test("test create datamap with tpch7 part of query2 (core issue)") {
+    sql(s"drop datamap if exists datamap12")
+    sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, 
l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey")
+    sql(s"rebuild datamap datamap12")
+    val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( 
select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * 
(1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 
where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and 
( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between 
date('1995-01-01') and date('1996-12-31') ) as shipping group by supp_nation, 
l_year order by supp_nation, l_year")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap12"))
+//    checkAnswer(df, sql("select supp_nation, l_year, sum(volume) as revenue 
from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, 
l_extendedprice * (1 - l_discount) as volume from 
supplier1,lineitem1,orders1,customer1,nation1 n1 where s_suppkey = l_suppkey 
and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = 
n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) 
or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and 
date('1996-12-31') ) as shipping group by supp_nation, l_year order by 
supp_nation, l_year"))
+    sql(s"drop datamap datamap12")
+  }
+  ignore("test create datamap with tpch7 part of query3 (self join issue)") {
+    sql(s"drop datamap if exists datamap13")
+    sql("create datamap datamap13 using 'mv' as select n1.n_name as 
supp_nation, n2.n_name as cust_nation, l_shipdate, l_extendedprice * (1 - 
l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation 
n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = 
o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey")
+    sql(s"rebuild datamap datamap13")
+    val df = sql("select supp_nation, cust_nation, l_year, sum(volume) as 
revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, 
year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from 
supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = 
l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey 
= n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' 
and n2.n_name = 'GERMANY') or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE') 
) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) as 
shipping group by supp_nation, cust_nation, l_year order by supp_nation, 
cust_nation, l_year")
+    val analyzed = df.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed, "datamap13"))
+//    checkAnswer(df, sql("select supp_nation, cust_nation, l_year, 
sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as 
cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as 
volume from supplier,lineitem1,orders1,customer1,nation1 n1,nation1 n2 where 
s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and 
s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name 
= 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_name = 'GERMANY' and n2.n_name = 
'FRANCE') ) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) 
as shipping group by supp_nation, cust_nation, l_year order by supp_nation, 
cust_nation, l_year"))
+    sql(s"drop datamap datamap13")
+  }
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table"))
+  }
+  def verifyAgg(logicalPlan: LogicalPlan): Boolean = {
+    var aggExpExists = false
+    logicalPlan transformExpressions {
+      case a:AggregateExpression =>
+        aggExpExists = true
+        a
+    }
+    aggExpExists
+  }
+  def drop(): Unit = {
+    sql("drop table IF EXISTS LINEITEM")
+    sql("drop table IF EXISTS ORDERS")
+    sql("drop table IF EXISTS CUSTOMER")
+    sql("drop table IF EXISTS REGION")
+    sql("drop table IF EXISTS NATION")
+    sql("drop table IF EXISTS SUPPLIER")
+    sql("drop table IF EXISTS LINEITEM1")
+    sql("drop table IF EXISTS ORDERS1")
+    sql("drop table IF EXISTS CUSTOMER1")
+    sql("drop table IF EXISTS REGION1")
+    sql("drop table IF EXISTS NATION1")
+    sql("drop table IF EXISTS SUPPLIER1")
+  }
+  override def afterAll {
+//    drop()
+  }
diff --git 
new file mode 100644
index 0000000..0ee2475
--- /dev/null
@@ -0,0 +1,76 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.test.util.PlanTest
+class SelectSelectExactChildrenSuite extends PlanTest {
+  object Match extends DefaultMatchMaker {
+    val patterns = SelectSelectNoChildDelta :: Nil
+  }
+  val testRelation1 = 
+  val testRelation2 = 
+//  test("pro-typical lower select") {
+//    val fact = testRelation1.subquery('fact)
+//    val dim = testRelation2.subquery('dim)
+//    val lowerSTSelect =
+//      fact
+//        .select('faid,'flid,Year('date) as 'year)
+//        .analyze
+//    val lowerUQSelect =
+//      fact.join(dim)
+//          .where("fact.flid".attr === "dim.lid".attr && "".attr 
=== "USA")
+//          .select('faid,'flid,Year('date) as 'year, 'state)
+//          .analyze
+//    val matched = 
+//    val correctAnswer = 
+//      lowerSTSelect.join(dim)
+//          .where("fact.flid".attr === "dim.lid".attr && "".attr 
=== "USA") 
+//          .select('faid,'flid,Year('date) as 'year, 'state)
+//          .analyze.model
+//    comparePlans(matched, correctAnswer)
+//  }
+//  val testSummaryDataset =
+//    s"""
+//       |SELECT faid, flid, year_proj(date) as year, count(*) as cnt
+//       |FROM Fact
+//       |GROUP BY faid, flid, year_proj(date)
+//    """.stripMargin.trim
+//  val testUserQuery = 
+//    s"""
+//       |SELECT faid, state, year_proj(date) as year, count(*) as cnt
+//       |FROM Fact
+//       |  INNER JOIN Dim 
+//       |  ON Fact.flid = Dim.lid AND = "USA"
+//       |GROUP BY Fact.faid,Fact.state,year_proj(
+//       |HAVING count(*) > 2
+//    """.stripMargin.trim
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..7fac508
--- /dev/null
@@ -0,0 +1,80 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.test.util.PlanTest
+import org.scalatest.BeforeAndAfter
+class Tpcds_1_4_Suite extends PlanTest with BeforeAndAfter {
+//  test("test using tpc-ds queries") {
+//    tpcds1_4Tables.foreach { create_table =>
+//      hiveClient.runSqlHive(create_table)
+//    }
+//    val dest = "case_30"
+//    val dest = "case_32"
+    val dest = "case_3"
+//    tpcds_1_4_testCases.foreach { testcase =>
+//      if (testcase._1 == dest) {
+//        val mqoSession = new MQOSession(testHive.sparkSession)
+//        val summaryDF = testHive.sparkSession.sql(testcase._2)
+//        mqoSession.sharedState.registerSummaryDataset(summaryDF)
+//        Try(mqoSession.rewrite(testcase._3).withSummaryData) match {
+//          case Success(rewrittenPlan) =>
+//            println(s"""\n\n===== REWRITTEN MODULAR PLAN for ${testcase._1} 
=====\n\n$rewrittenPlan \n""")
+//            Try(rewrittenPlan.asCompactSQL) match {
+//              case Success(s) =>
+//                println(s"\n\n===== CONVERTED SQL for ${testcase._1} 
+//                if (!s.trim.equals(testcase._4)) {
+//                  println(
+//                      s"""
+//                      |=== FAIL: SQLs do not match ===
+//                      |${sideBySide(s, testcase._4).mkString("\n")}
+//                      """.stripMargin)
+//                      }
+//              case Failure(e) => println(s"""\n\n===== CONVERTED SQL for 
${testcase._1} failed =====\n\n${e.toString}""")
+//            }
+//          case Failure(e) => println(s"""\n\n==== MODULARIZE the logical 
query plan for ${testcase._1} failed =====\n\n${e.toString}""")
+//        }
+//        val rewrittenSQL = rewrittenPlan.asCompactSQL
+//        val rewrittenSQL = mqoSession.rewrite(testcase._3).toCompactSQL
+//        if (!rewrittenSQL.equals(testcase._4)) {
+//          fail(
+//              s"""
+//              |=== FAIL: SQLs do not match ===
+//              |${sideBySide(rewrittenSQL, testcase._4).mkString("\n")}
+//              """.stripMargin)
+//              }
+//        }
+//    }
+//  }
\ No newline at end of file
diff --git 
new file mode 100644
index 0000000..02bbff3
--- /dev/null
@@ -0,0 +1,214 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+object TestSQLBatch {
+  // seq of (summaryDataset(MV), testUserSQL(Q), correctRewrittenSQL)
+  val sampleTestCases = Seq(
+    ("case_1",
+     s"""
+        |SELECT i_item_id
+        |FROM Item
+        |WHERE i_item_sk = 1
+     """.stripMargin.trim,
+     s"""
+        |SELECT i_item_id, i_item_sk
+        |FROM Item
+        |WHERE i_item_sk = 1 and i_item_id > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT item.`i_item_id`, item.`i_item_sk` 
+        |FROM
+        |  item
+        |WHERE
+        |  (item.`i_item_sk` = 1) AND (item.`i_item_id` > 0)
+     """.stripMargin.trim),
+    ("case_2",
+     s"""
+        |SELECT i_item_id
+        |FROM Item
+        |WHERE i_item_sk = 1
+     """.stripMargin.trim,
+     s"""
+        |SELECT i_item_id, i_item_sk
+        |FROM Item
+        |WHERE i_item_sk = 1 or i_item_id > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT item.`i_item_id`, item.`i_item_sk` 
+        |FROM
+        |  item
+        |WHERE
+        |  ((item.`i_item_sk` = 1) OR (item.`i_item_id` > 0))
+     """.stripMargin.trim),
+    ("case_3",
+     s"""
+        |SELECT faid, flid, date
+        |FROM Fact
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid, flid, year(date) as year
+        |FROM Fact
+     """.stripMargin.trim,
+     s"""
+        |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid`, 
year(CAST(gen_subsumer_0.`date` AS DATE)) AS `year` 
+        |FROM
+        |  (SELECT fact.`faid`, fact.`flid`, fact.`date` 
+        |  FROM
+        |    fact) gen_subsumer_0
+     """.stripMargin.trim),
+    ("case_4",
+     s"""
+        |SELECT faid, flid, date
+        |FROM Fact
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid, flid
+        |FROM Fact
+        |WHERE year(date) = 2000
+     """.stripMargin.trim,
+     s"""
+        |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid` 
+        |FROM
+        |  (SELECT fact.`faid`, fact.`flid`, fact.`date` 
+        |  FROM
+        |    fact) gen_subsumer_0 
+        |WHERE
+        |  (year(CAST(gen_subsumer_0.`date` AS DATE)) = 2000)
+     """.stripMargin.trim),
+    ("case_5",
+     s"""
+        |SELECT faid, flid, date
+        |FROM Fact
+        |WHERE year(date) = 2000
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid, flid
+        |FROM Fact
+        |WHERE year(date) = 2000
+     """.stripMargin.trim,
+     s"""
+        |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid` 
+        |FROM
+        |  (SELECT fact.`faid`, fact.`flid`, fact.`date` 
+        |  FROM
+        |    fact
+        |  WHERE
+        |    (year(CAST(fact.`date` AS DATE)) = 2000)) gen_subsumer_0 
+        |WHERE
+        |  (year(CAST(gen_subsumer_0.`date` AS DATE)) = 2000)
+     """.stripMargin.trim),
+    ("case_6",
+     s"""
+        |SELECT faid, flid, date
+        |FROM Fact
+        |WHERE year(date) in (2000,2001)
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid, flid
+        |FROM Fact
+        |WHERE year(date) = 2000
+     """.stripMargin.trim,
+     s"""
+        |SELECT fact.`faid`, fact.`flid` 
+        |FROM
+        |  fact
+        |WHERE
+        |  (year(CAST(fact.`date` AS DATE)) = 2000)
+     """.stripMargin.trim),
+    ("case_7",
+     s"""
+        |SELECT faid, flid, year(date) as year, count(*) as cnt
+        |FROM Fact
+        |GROUP BY faid, flid, year(date)
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid, year(date) as year, count(*) as cnt
+        |FROM Fact
+        |GROUP BY Fact.faid,year(
+        |HAVING count(*) > 2
+     """.stripMargin.trim,
+     s"""
+        |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`year` AS `year`, 
sum(gen_subsumer_0.`cnt`) AS `cnt` 
+        |FROM
+        |  (SELECT fact.`faid`, fact.`flid`, year(CAST(fact.`date` AS DATE)) 
AS `year`, count(1) AS `cnt` 
+        |  FROM
+        |    fact
+        |  GROUP BY fact.`faid`, fact.`flid`, year(CAST(fact.`date` AS DATE))) 
+        |GROUP BY gen_subsumer_0.`faid`, gen_subsumer_0.`year`
+        |HAVING (sum(gen_subsumer_0.`cnt`) > 2L)
+     """.stripMargin.trim),
+    ("case_8",
+     s"""
+        |SELECT date
+        |FROM Fact
+     """.stripMargin.trim,
+     s"""
+        |SELECT year(date)
+        |FROM Fact
+     """.stripMargin.trim,
+     s"""
+        |SELECT year(CAST(gen_subsumer_0.`date` AS DATE)) AS `year(CAST(date 
+        |FROM
+        |  (SELECT fact.`date` 
+        |  FROM
+        |    fact) gen_subsumer_0
+     """.stripMargin.trim),
+    ("case_9",
+     s"""
+        |SELECT faid, flid
+        |FROM Fact
+        |WHERE faid > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid
+        |FROM Fact
+        |WHERE faid > 0 AND flid > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT gen_subsumer_0.`faid` 
+        |FROM
+        |  (SELECT fact.`faid`, fact.`flid` 
+        |  FROM
+        |    fact
+        |  WHERE
+        |    (fact.`faid` > 0)) gen_subsumer_0 
+        |WHERE
+        |  (gen_subsumer_0.`faid` > 0) AND (gen_subsumer_0.`flid` > 0)
+     """.stripMargin.trim),
+    ("case_10",
+     s"""
+        |SELECT faid, flid
+        |FROM Fact
+        |WHERE faid > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT faid
+        |FROM Fact
+        |WHERE faid > 0 OR flid > 0
+     """.stripMargin.trim,
+     s"""
+        |SELECT fact.`faid` 
+        |FROM
+        |  fact
+        |WHERE
+        |  ((fact.`faid` > 0) OR (fact.`flid` > 0))
+     """.stripMargin.trim))
\ No newline at end of file

Reply via email to