http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala new file mode 100644 index 0000000..89813b5 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTpchTestCase.scala @@ -0,0 +1,247 @@ +package org.apache.carbondata.mv.rewrite + +import java.io.File + +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class MVTpchTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + drop() + val projectPath = new File(this.getClass.getResource("/").getPath + "../../../../../") + .getCanonicalPath.replaceAll("\\\\", "/") + val integrationPath = s"$projectPath/integration" + val resourcesPath = s"$integrationPath/spark-common-test/src/test/resources" + + sql(s"""create table if not exists LINEITEM( L_SHIPDATE date, L_SHIPMODE string, L_SHIPINSTRUCT string, L_RETURNFLAG string, L_RECEIPTDATE date, L_ORDERKEY INT , L_PARTKEY INT , L_SUPPKEY string, L_LINENUMBER int, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_LINESTATUS string, L_COMMITDATE date, L_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists ORDERS( O_ORDERDATE date, O_ORDERPRIORITY string, O_ORDERSTATUS string, O_ORDERKEY int, O_CUSTKEY string, O_TOTALPRICE double, O_CLERK string, O_SHIPPRIORITY int, O_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists CUSTOMER( C_MKTSEGMENT string, C_NATIONKEY string, C_CUSTKEY string, C_NAME string, C_ADDRESS string, C_PHONE string, C_ACCTBAL double, C_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists REGION( R_NAME string, R_REGIONKEY string, R_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists NATION ( N_NAME string, N_NATIONKEY string, N_REGIONKEY string, N_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists SUPPLIER(S_COMMENT string,S_SUPPKEY string,S_NAME string, S_ADDRESS string, S_NATIONKEY string, S_PHONE string, S_ACCTBAL double) STORED BY 'org.apache.carbondata.format'""") + + sql(s"""load data inpath "$resourcesPath/tpch/lineitem.csv" into table lineitem options('DELIMITER'='|','FILEHEADER'='L_ORDERKEY,L_PARTKEY,L_SUPPKEY,L_LINENUMBER,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/orders.csv" into table ORDERS options('DELIMITER'='|','FILEHEADER'='O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/customers.csv" into table CUSTOMER options('DELIMITER'='|','FILEHEADER'='C_CUSTKEY,C_NAME,C_ADDRESS,C_NATIONKEY,C_PHONE,C_ACCTBAL,C_MKTSEGMENT,C_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/region.csv" into table REGION options('DELIMITER'='|','FILEHEADER'='R_REGIONKEY,R_NAME,R_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/nation.csv" into table NATION options('DELIMITER'='|','FILEHEADER'='N_NATIONKEY,N_NAME,N_REGIONKEY,N_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/supplier.csv" into table SUPPLIER options('DELIMITER'='|','FILEHEADER'='S_SUPPKEY,S_NAME,S_ADDRESS,S_NATIONKEY,S_PHONE,S_ACCTBAL,S_COMMENT')""") + + + sql(s"""create table if not exists LINEITEM1( L_SHIPDATE date, L_SHIPMODE string, L_SHIPINSTRUCT string, L_RETURNFLAG string, L_RECEIPTDATE date, L_ORDERKEY INT , L_PARTKEY INT , L_SUPPKEY string, L_LINENUMBER int, L_QUANTITY double, L_EXTENDEDPRICE double, L_DISCOUNT double, L_TAX double, L_LINESTATUS string, L_COMMITDATE date, L_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists ORDERS1( O_ORDERDATE date, O_ORDERPRIORITY string, O_ORDERSTATUS string, O_ORDERKEY int, O_CUSTKEY string, O_TOTALPRICE double, O_CLERK string, O_SHIPPRIORITY int, O_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists CUSTOMER1( C_MKTSEGMENT string, C_NATIONKEY string, C_CUSTKEY string, C_NAME string, C_ADDRESS string, C_PHONE string, C_ACCTBAL double, C_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists REGION1( R_NAME string, R_REGIONKEY string, R_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists NATION1 ( N_NAME string, N_NATIONKEY string, N_REGIONKEY string, N_COMMENT string) STORED BY 'org.apache.carbondata.format'""") + sql(s"""create table if not exists SUPPLIER1(S_COMMENT string,S_SUPPKEY string,S_NAME string, S_ADDRESS string, S_NATIONKEY string, S_PHONE string, S_ACCTBAL double) STORED BY 'org.apache.carbondata.format'""") + + sql(s"""load data inpath "$resourcesPath/tpch/lineitem.csv" into table lineitem1 options('DELIMITER'='|','FILEHEADER'='L_ORDERKEY,L_PARTKEY,L_SUPPKEY,L_LINENUMBER,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/orders.csv" into table ORDERS1 options('DELIMITER'='|','FILEHEADER'='O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/customers.csv" into table CUSTOMER1 options('DELIMITER'='|','FILEHEADER'='C_CUSTKEY,C_NAME,C_ADDRESS,C_NATIONKEY,C_PHONE,C_ACCTBAL,C_MKTSEGMENT,C_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/region.csv" into table REGION1 options('DELIMITER'='|','FILEHEADER'='R_REGIONKEY,R_NAME,R_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/nation.csv" into table NATION1 options('DELIMITER'='|','FILEHEADER'='N_NATIONKEY,N_NAME,N_REGIONKEY,N_COMMENT')""") + sql(s"""load data inpath "$resourcesPath/tpch/supplier.csv" into table SUPPLIER1 options('DELIMITER'='|','FILEHEADER'='S_SUPPKEY,S_NAME,S_ADDRESS,S_NATIONKEY,S_PHONE,S_ACCTBAL,S_COMMENT')""") + + + } + + test("test create datamap with tpch1") { + sql(s"drop datamap if exists datamap1") + sql("create datamap datamap1 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem group by l_returnflag, l_linestatus,l_shipdate") + sql(s"rebuild datamap datamap1") + val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap1")) +// checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,count(*) as count_order from lineitem1 where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")) + sql(s"drop datamap datamap1") + } + + test("test create datamap with tpch1 with order") { + sql(s"drop datamap if exists datamap2") + sql("create datamap datamap2 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate order by l_returnflag, l_linestatus") + sql(s"rebuild datamap datamap2") + val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap2")) +// checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem1 where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")) + sql(s"drop datamap datamap2") + } + + test("test create datamap with tpch1 with sub group by") { + sql(s"drop datamap if exists datamap3") + sql("create datamap datamap3 using 'mv' as select l_returnflag, l_linestatus,l_shipdate, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge from lineitem group by l_returnflag, l_linestatus,l_shipdate") + sql(s"rebuild datamap datamap3") + val df = sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap3")) +// checkAnswer(df, sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price from lineitem1 where l_shipdate <= date('1998-09-02') group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus")) + sql(s"drop datamap datamap3") + } + + ignore("test create datamap with tpch3") { + sql(s"drop datamap if exists datamap4") + sql("create datamap datamap4 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority") + sql(s"rebuild datamap datamap4") + val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap4")) +// checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")) + sql(s"drop datamap datamap4") + } + + test("test create datamap with tpch3 with no filters on mv") { + sql(s"drop datamap if exists datamap5") + sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate") + sql(s"rebuild datamap datamap5") + val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap5")) +// checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")) + sql(s"drop datamap datamap5") + } + + ignore("test create datamap with tpch3 with filters on mv and all filter columns on projection") { + sql(s"drop datamap if exists datamap5") + sql("create datamap datamap5 using 'mv' as select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority,c_mktsegment,l_shipdate from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority,c_mktsegment,l_shipdate") + sql(s"rebuild datamap datamap5") + val df = sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap5")) +// checkAnswer(df, sql("select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer1, orders1, lineitem1 where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date('1995-03-15') and l_shipdate > date('1995-03-15') group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10")) + sql(s"drop datamap datamap5") + } + + ignore("test create datamap with tpch4 (core issue)") { + sql(s"drop datamap if exists datamap6") + sql("create datamap datamap6 using 'mv' as select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority") + sql(s"rebuild datamap datamap6") + val df = sql("select o_orderpriority, count(*) as order_count from orders where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap6")) +// checkAnswer(df, sql("select o_orderpriority, count(*) as order_count from orders1 where o_orderdate >= date('1993-07-01') and o_orderdate < date('1993-10-01') and exists ( select * from lineitem1 where l_orderkey = o_orderkey and l_commitdate < l_receiptdate ) group by o_orderpriority order by o_orderpriority")) + sql(s"drop datamap datamap6") + } + + ignore("test create datamap with tpch5") { + sql(s"drop datamap if exists datamap7") + sql("create datamap datamap7 using 'mv' as select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name") + sql(s"rebuild datamap datamap7") + val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap7")) +// checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")) + sql(s"drop datamap datamap7") + } + + test("test create datamap with tpch5 with no filters on mv") { + sql(s"drop datamap if exists datamap8") + sql("create datamap datamap8 using 'mv' as select n_name,o_orderdate,r_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey group by n_name,o_orderdate,r_name") + sql(s"rebuild datamap datamap8") + val df = sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer, orders, lineitem, supplier, nation, region where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap8")) +// checkAnswer(df, sql("select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue from customer1, orders1, lineitem1, supplier1, nation1, region1 where c_custkey = o_custkey and l_orderkey = o_orderkey and l_suppkey = s_suppkey and c_nationkey = s_nationkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'ASIA' and o_orderdate >= date('1994-01-01') and o_orderdate < date('1995-01-01') group by n_name order by revenue desc")) + sql(s"drop datamap datamap8") + } + + test("test create datamap with tpch6") { + sql(s"drop datamap if exists datamap9") + sql("create datamap datamap9 using 'mv' as select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") + sql(s"rebuild datamap datamap9") + val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap9")) + assert(verifyAgg(analyzed)) +// checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")) + sql(s"drop datamap datamap9") + } + + test("test create datamap with tpch6 with no filters on mv") { + sql(s"drop datamap if exists datamap10") + sql("create datamap datamap10 using 'mv' as select sum(l_extendedprice * l_discount) as revenue,l_shipdate,l_discount,l_quantity from lineitem group by l_shipdate,l_discount,l_quantity") + sql(s"rebuild datamap datamap10") + val df = sql("select sum(l_extendedprice * l_discount) as revenue from lineitem where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap10")) + assert(verifyAgg(analyzed)) +// checkAnswer(df, sql("select sum(l_extendedprice * l_discount) as revenue from lineitem1 where l_shipdate >= date('1994-01-01') and l_shipdate < date('1995-01-01') and l_discount between 0.05 and 0.07 and l_quantity < 24")) + sql(s"drop datamap datamap10") + } + + test("test create datamap with tpch7 part of query1") { + sql(s"drop datamap if exists datamap11") + sql("create datamap datamap11 using 'mv' as select l_shipdate,n_name , l_extendedprice , l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") + sql(s"rebuild datamap datamap11") + val df = sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap11")) +// checkAnswer(df, sql("select year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier1,lineitem1,orders1,customer1,nation1 n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE') or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31')")) + sql(s"drop datamap datamap11") + } + + test("test create datamap with tpch7 part of query2 (core issue)") { + sql(s"drop datamap if exists datamap12") + sql("create datamap datamap12 using 'mv' as select n1.n_name, l_shipdate, l_extendedprice ,l_discount from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey") + sql(s"rebuild datamap datamap12") + val df = sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) as shipping group by supp_nation, l_year order by supp_nation, l_year") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap12")) +// checkAnswer(df, sql("select supp_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier1,lineitem1,orders1,customer1,nation1 n1 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n1.n_nationkey and ( (n1.n_name = 'FRANCE' ) or (n1.n_name = 'GERMANY') ) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) as shipping group by supp_nation, l_year order by supp_nation, l_year")) + sql(s"drop datamap datamap12") + } + + ignore("test create datamap with tpch7 part of query3 (self join issue)") { + sql(s"drop datamap if exists datamap13") + sql("create datamap datamap13 using 'mv' as select n1.n_name as supp_nation, n2.n_name as cust_nation, l_shipdate, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey") + sql(s"rebuild datamap datamap13") + val df = sql("select supp_nation, cust_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem,orders,customer,nation n1,nation n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE') ) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) as shipping group by supp_nation, cust_nation, l_year order by supp_nation, cust_nation, l_year") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap13")) +// checkAnswer(df, sql("select supp_nation, cust_nation, l_year, sum(volume) as revenue from ( select n1.n_name as supp_nation, n2.n_name as cust_nation, year(l_shipdate) as l_year, l_extendedprice * (1 - l_discount) as volume from supplier,lineitem1,orders1,customer1,nation1 n1,nation1 n2 where s_suppkey = l_suppkey and o_orderkey = l_orderkey and c_custkey = o_custkey and s_nationkey = n1.n_nationkey and c_nationkey = n2.n_nationkey and ( (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY') or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE') ) and l_shipdate between date('1995-01-01') and date('1996-12-31') ) as shipping group by supp_nation, cust_nation, l_year order by supp_nation, cust_nation, l_year")) + sql(s"drop datamap datamap13") + } + + + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { + val tables = logicalPlan collect { + case l: LogicalRelation => l.catalogTable.get + } + tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table")) + } + def verifyAgg(logicalPlan: LogicalPlan): Boolean = { + var aggExpExists = false + logicalPlan transformExpressions { + case a:AggregateExpression => + aggExpExists = true + a + } + aggExpExists + } + + + def drop(): Unit = { + sql("drop table IF EXISTS LINEITEM") + sql("drop table IF EXISTS ORDERS") + sql("drop table IF EXISTS CUSTOMER") + sql("drop table IF EXISTS REGION") + sql("drop table IF EXISTS NATION") + sql("drop table IF EXISTS SUPPLIER") + sql("drop table IF EXISTS LINEITEM1") + sql("drop table IF EXISTS ORDERS1") + sql("drop table IF EXISTS CUSTOMER1") + sql("drop table IF EXISTS REGION1") + sql("drop table IF EXISTS NATION1") + sql("drop table IF EXISTS SUPPLIER1") + } + + override def afterAll { +// drop() + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala new file mode 100644 index 0000000..0ee2475 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.test.util.PlanTest + +class SelectSelectExactChildrenSuite extends PlanTest { + + object Match extends DefaultMatchMaker { + val patterns = SelectSelectNoChildDelta :: Nil + } + + val testRelation1 = LocalRelation('tid.int,'fpgid.int,'flid.int,'date.timestamp,'faid.int,'price.double,'qty.int,'disc.string) + val testRelation2 = LocalRelation('lid.int,'city.string,'state.string,'country.string) + +// test("pro-typical lower select") { +// val fact = testRelation1.subquery('fact) +// val dim = testRelation2.subquery('dim) +// +// val lowerSTSelect = +// fact +// .select('faid,'flid,Year('date) as 'year) +// .analyze +// val lowerUQSelect = +// fact.join(dim) +// .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA") +// .select('faid,'flid,Year('date) as 'year, 'state) +// .analyze +// +// val matched = Match.execute(lowerSTSelect.model,lowerUQSelect.model,None).next +// +// val correctAnswer = +// lowerSTSelect.join(dim) +// .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA") +// .select('faid,'flid,Year('date) as 'year, 'state) +// .analyze.model +// +// comparePlans(matched, correctAnswer) +// } + +// val testSummaryDataset = +// s""" +// |SELECT faid, flid, year_proj(date) as year, count(*) as cnt +// |FROM Fact +// |GROUP BY faid, flid, year_proj(date) +// """.stripMargin.trim +// +// val testUserQuery = +// s""" +// |SELECT faid, state, year_proj(date) as year, count(*) as cnt +// |FROM Fact +// | INNER JOIN Dim +// | ON Fact.flid = Dim.lid AND Dim.country = "USA" +// |GROUP BY Fact.faid,Fact.state,year_proj(Fact.date) +// |HAVING count(*) > 2 +// """.stripMargin.trim + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala new file mode 100644 index 0000000..7fac508 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.util.PlanTest +import org.scalatest.BeforeAndAfter + +import org.apache.carbondata.mv.testutil.Tpcds_1_4_Tables._ + +class Tpcds_1_4_Suite extends PlanTest with BeforeAndAfter { + +// test("test using tpc-ds queries") { +// +// tpcds1_4Tables.foreach { create_table => +// hiveClient.runSqlHive(create_table) +// } + +// val dest = "case_30" +// val dest = "case_32" + val dest = "case_3" + +// tpcds_1_4_testCases.foreach { testcase => +// if (testcase._1 == dest) { +// val mqoSession = new MQOSession(testHive.sparkSession) +// val summaryDF = testHive.sparkSession.sql(testcase._2) +// mqoSession.sharedState.registerSummaryDataset(summaryDF) +// +// Try(mqoSession.rewrite(testcase._3).withSummaryData) match { +// case Success(rewrittenPlan) => +// println(s"""\n\n===== REWRITTEN MODULAR PLAN for ${testcase._1} =====\n\n$rewrittenPlan \n""") +// +// Try(rewrittenPlan.asCompactSQL) match { +// case Success(s) => +// println(s"\n\n===== CONVERTED SQL for ${testcase._1} =====\n\n${s}\n") +// if (!s.trim.equals(testcase._4)) { +// println( +// s""" +// |=== FAIL: SQLs do not match === +// |${sideBySide(s, testcase._4).mkString("\n")} +// """.stripMargin) +// } +// +// case Failure(e) => println(s"""\n\n===== CONVERTED SQL for ${testcase._1} failed =====\n\n${e.toString}""") +// } +// +// case Failure(e) => println(s"""\n\n==== MODULARIZE the logical query plan for ${testcase._1} failed =====\n\n${e.toString}""") +// } + +// val rewrittenSQL = rewrittenPlan.asCompactSQL +// val rewrittenSQL = mqoSession.rewrite(testcase._3).toCompactSQL + +// if (!rewrittenSQL.equals(testcase._4)) { +// fail( +// s""" +// |=== FAIL: SQLs do not match === +// |${sideBySide(rewrittenSQL, testcase._4).mkString("\n")} +// """.stripMargin) +// } +// } +// +// } +// +// } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala new file mode 100644 index 0000000..02bbff3 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/matching/TestSQLBatch.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.carbondata.mv.rewrite.matching + +object TestSQLBatch { + + // seq of (summaryDataset(MV), testUserSQL(Q), correctRewrittenSQL) + val sampleTestCases = Seq( + ("case_1", + s""" + |SELECT i_item_id + |FROM Item + |WHERE i_item_sk = 1 + """.stripMargin.trim, + s""" + |SELECT i_item_id, i_item_sk + |FROM Item + |WHERE i_item_sk = 1 and i_item_id > 0 + """.stripMargin.trim, + s""" + |SELECT item.`i_item_id`, item.`i_item_sk` + |FROM + | item + |WHERE + | (item.`i_item_sk` = 1) AND (item.`i_item_id` > 0) + """.stripMargin.trim), + ("case_2", + s""" + |SELECT i_item_id + |FROM Item + |WHERE i_item_sk = 1 + """.stripMargin.trim, + s""" + |SELECT i_item_id, i_item_sk + |FROM Item + |WHERE i_item_sk = 1 or i_item_id > 0 + """.stripMargin.trim, + s""" + |SELECT item.`i_item_id`, item.`i_item_sk` + |FROM + | item + |WHERE + | ((item.`i_item_sk` = 1) OR (item.`i_item_id` > 0)) + """.stripMargin.trim), + ("case_3", + s""" + |SELECT faid, flid, date + |FROM Fact + """.stripMargin.trim, + s""" + |SELECT faid, flid, year(date) as year + |FROM Fact + """.stripMargin.trim, + s""" + |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid`, year(CAST(gen_subsumer_0.`date` AS DATE)) AS `year` + |FROM + | (SELECT fact.`faid`, fact.`flid`, fact.`date` + | FROM + | fact) gen_subsumer_0 + """.stripMargin.trim), + ("case_4", + s""" + |SELECT faid, flid, date + |FROM Fact + """.stripMargin.trim, + s""" + |SELECT faid, flid + |FROM Fact + |WHERE year(date) = 2000 + """.stripMargin.trim, + s""" + |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid` + |FROM + | (SELECT fact.`faid`, fact.`flid`, fact.`date` + | FROM + | fact) gen_subsumer_0 + |WHERE + | (year(CAST(gen_subsumer_0.`date` AS DATE)) = 2000) + """.stripMargin.trim), + ("case_5", + s""" + |SELECT faid, flid, date + |FROM Fact + |WHERE year(date) = 2000 + """.stripMargin.trim, + s""" + |SELECT faid, flid + |FROM Fact + |WHERE year(date) = 2000 + """.stripMargin.trim, + s""" + |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`flid` + |FROM + | (SELECT fact.`faid`, fact.`flid`, fact.`date` + | FROM + | fact + | WHERE + | (year(CAST(fact.`date` AS DATE)) = 2000)) gen_subsumer_0 + |WHERE + | (year(CAST(gen_subsumer_0.`date` AS DATE)) = 2000) + """.stripMargin.trim), + ("case_6", + s""" + |SELECT faid, flid, date + |FROM Fact + |WHERE year(date) in (2000,2001) + """.stripMargin.trim, + s""" + |SELECT faid, flid + |FROM Fact + |WHERE year(date) = 2000 + """.stripMargin.trim, + s""" + |SELECT fact.`faid`, fact.`flid` + |FROM + | fact + |WHERE + | (year(CAST(fact.`date` AS DATE)) = 2000) + """.stripMargin.trim), + ("case_7", + s""" + |SELECT faid, flid, year(date) as year, count(*) as cnt + |FROM Fact + |GROUP BY faid, flid, year(date) + """.stripMargin.trim, + s""" + |SELECT faid, year(date) as year, count(*) as cnt + |FROM Fact + |GROUP BY Fact.faid,year(Fact.date) + |HAVING count(*) > 2 + """.stripMargin.trim, + s""" + |SELECT gen_subsumer_0.`faid`, gen_subsumer_0.`year` AS `year`, sum(gen_subsumer_0.`cnt`) AS `cnt` + |FROM + | (SELECT fact.`faid`, fact.`flid`, year(CAST(fact.`date` AS DATE)) AS `year`, count(1) AS `cnt` + | FROM + | fact + | GROUP BY fact.`faid`, fact.`flid`, year(CAST(fact.`date` AS DATE))) gen_subsumer_0 + |GROUP BY gen_subsumer_0.`faid`, gen_subsumer_0.`year` + |HAVING (sum(gen_subsumer_0.`cnt`) > 2L) + """.stripMargin.trim), + ("case_8", + s""" + |SELECT date + |FROM Fact + """.stripMargin.trim, + s""" + |SELECT year(date) + |FROM Fact + """.stripMargin.trim, + s""" + |SELECT year(CAST(gen_subsumer_0.`date` AS DATE)) AS `year(CAST(date AS DATE))` + |FROM + | (SELECT fact.`date` + | FROM + | fact) gen_subsumer_0 + """.stripMargin.trim), + ("case_9", + s""" + |SELECT faid, flid + |FROM Fact + |WHERE faid > 0 + """.stripMargin.trim, + s""" + |SELECT faid + |FROM Fact + |WHERE faid > 0 AND flid > 0 + """.stripMargin.trim, + s""" + |SELECT gen_subsumer_0.`faid` + |FROM + | (SELECT fact.`faid`, fact.`flid` + | FROM + | fact + | WHERE + | (fact.`faid` > 0)) gen_subsumer_0 + |WHERE + | (gen_subsumer_0.`faid` > 0) AND (gen_subsumer_0.`flid` > 0) + """.stripMargin.trim), + ("case_10", + s""" + |SELECT faid, flid + |FROM Fact + |WHERE faid > 0 + """.stripMargin.trim, + s""" + |SELECT faid + |FROM Fact + |WHERE faid > 0 OR flid > 0 + """.stripMargin.trim, + s""" + |SELECT fact.`faid` + |FROM + | fact + |WHERE + | ((fact.`faid` > 0) OR (fact.`flid` > 0)) + """.stripMargin.trim)) +} \ No newline at end of file