[CARBONDATA-2528][MV] Fixed order by in mv and aggregation functions inside projection expressions are fixed
Problem: Order by queries and the queries with functions like sum(a)+sum(b) are not working in MV. Please check jira for more details. Solution: The queries which have projection functions like sum(a)+sum(b) cannot be incrementally loaded, so introduced a new internal DM property to avoid group by on the final query. This closes #2453 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c33857f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c33857f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c33857f Branch: refs/heads/carbonstore Commit: 0c33857fd23611da393a3097511e66ddd81149f2 Parents: bc12de0 Author: ravipesala <ravi.pes...@gmail.com> Authored: Thu Jun 14 11:40:07 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Jul 15 14:49:39 2018 +0800 ---------------------------------------------------------------------- .../apache/carbondata/mv/datamap/MVHelper.scala | 156 +++++++++++++++---- .../mv/rewrite/DefaultMatchMaker.scala | 21 ++- .../carbondata/mv/rewrite/Navigator.scala | 26 +--- .../mv/rewrite/SummaryDatasetCatalog.scala | 16 +- .../mv/rewrite/MVCreateTestCase.scala | 65 ++++++++ 5 files changed, 225 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c33857f/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index f104d9b..fe761c0 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -20,11 +20,12 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor} @@ -33,10 +34,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, DataMapSchemaStorageProvider, RelationIdentifier} -import org.apache.carbondata.mv.plans.modular +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier} import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select} -import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, QueryRewrite} +import org.apache.carbondata.mv.rewrite.{MVPlanWrapper, QueryRewrite} import org.apache.carbondata.spark.util.CommonUtil /** @@ -51,6 +51,7 @@ object MVHelper { val dmProperties = dataMapSchema.getProperties.asScala val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) val logicalPlan = sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed + val fullRebuild = isFullReload(logicalPlan) val fields = logicalPlan.output.map { attr => val name = updateColumnName(attr) val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName @@ -113,6 +114,7 @@ object MVHelper { new RelationIdentifier(table.database, table.identifier.table, "") } dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava)) + dataMapSchema.getProperties.put("full_refresh", fullRebuild.toString) DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema) } @@ -147,6 +149,34 @@ object MVHelper { }.filter(_.isDefined).map(_.get) } + + /** + * Check if we can do incremental load on the mv table. Some cases like aggregation functions + * which are present inside other expressions like sum(a)+sum(b) cannot be incremental loaded. + */ + private def isFullReload(logicalPlan: LogicalPlan): Boolean = { + var isFullReload = false + logicalPlan.transformAllExpressions { + case a: Alias => + a + case agg: AggregateExpression => agg + case c: Cast => + isFullReload = c.child.find { + case agg: AggregateExpression => false + case _ => false + }.isDefined || isFullReload + c + case exp: Expression => + // Check any aggregation function present inside other expression. + isFullReload = exp.find { + case agg: AggregateExpression => true + case _ => false + }.isDefined || isFullReload + exp + } + isFullReload + } + def getAttributeMap(subsumer: Seq[NamedExpression], subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = { if (subsumer.length == subsume.length) { @@ -179,7 +209,8 @@ object MVHelper { case _ => false } - override def hashCode: Int = exp.hashCode + // Basically we want to use it as simple linked list so hashcode is hardcoded. + override def hashCode: Int = 1 } @@ -215,7 +246,7 @@ object MVHelper { expressions.map { case alias@Alias(agg: AggregateExpression, name) => - attrMap.get(AttributeKey(alias)).map { exp => + attrMap.get(AttributeKey(agg)).map { exp => Alias(getAttribute(exp), name)(alias.exprId, alias.qualifier, alias.explicitMetadata, @@ -233,9 +264,16 @@ object MVHelper { } }.getOrElse(attr) uattr + case alias@Alias(expression: Expression, name) => + attrMap.get(AttributeKey(expression)).map { exp => + Alias(getAttribute(exp), name)(alias.exprId, + alias.qualifier, + alias.explicitMetadata, + alias.isGenerated) + }.getOrElse(alias) case expression: Expression => - val uattr = attrMap.getOrElse(AttributeKey(expression), expression) - uattr + val uattr = attrMap.get(AttributeKey(expression)) + uattr.getOrElse(expression) } } @@ -292,14 +330,16 @@ object MVHelper { def updateDataMap(subsumer: ModularPlan, rewrite: QueryRewrite): ModularPlan = { subsumer match { case s: Select if s.dataMapTableRelation.isDefined => - val relation = s.dataMapTableRelation.get.asInstanceOf[Select] + val relation = + s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select] val mappings = s.outputList zip relation.outputList val oList = for ((o1, o2) <- mappings) yield { if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2 } relation.copy(outputList = oList).setRewritten() case g: GroupBy if g.dataMapTableRelation.isDefined => - val relation = g.dataMapTableRelation.get.asInstanceOf[Select] + val relation = + g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select] val in = relation.asInstanceOf[Select].outputList val mappings = g.outputList zip relation.outputList val oList = for ((left, right) <- mappings) yield { @@ -341,32 +381,43 @@ object MVHelper { case select: Select => select.children match { - case Seq(s: Select) if s.dataMapTableRelation.isDefined => - val relation = s.dataMapTableRelation.get.asInstanceOf[Select] - val child = updateDataMap(s, rewrite).asInstanceOf[Select] - val aliasMap = getAttributeMap(relation.outputList, s.outputList) - var outputSel = - updateOutPutList(select.outputList, relation, aliasMap, keepAlias = true) - val pred = updateSelectPredicates(select.predicateList, aliasMap, true) - select.copy(outputList = outputSel, - inputList = child.outputList, - predicateList = pred, - children = Seq(child)).setRewritten() - case Seq(g: GroupBy) if g.dataMapTableRelation.isDefined => - val relation = g.dataMapTableRelation.get.asInstanceOf[Select] + val relation = + g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select] val aliasMap = getAttributeMap(relation.outputList, g.outputList) - - val outputSel = - updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false) - val child = updateDataMap(g, rewrite).asInstanceOf[Matchable] - // TODO Remove the unnecessary columns from selection. - // Only keep columns which are required by parent. - val inputSel = child.outputList - select.copy( - outputList = outputSel, - inputList = inputSel, - children = Seq(child)).setRewritten() + val updatedFlagSpec: Seq[Seq[ArrayBuffer[SortOrder]]] = updateSortOrder( + keepAlias = false, + select, + relation, + aliasMap) + if (isFullRefresh(g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper])) { + val mappings = g.outputList zip relation.outputList + val oList = for ((o1, o2) <- mappings) yield { + if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2 + } + + val outList = select.outputList.map{ f => + oList.find(_.name.equals(f.name)).get + } + // Directly keep the relation as child. + select.copy( + outputList = outList, + children = Seq(relation), + aliasMap = relation.aliasMap, + flagSpec = updatedFlagSpec).setRewritten() + } else { + val outputSel = + updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false) + val child = updateDataMap(g, rewrite).asInstanceOf[Matchable] + // TODO Remove the unnecessary columns from selection. + // Only keep columns which are required by parent. + val inputSel = child.outputList + select.copy( + outputList = outputSel, + inputList = inputSel, + flagSpec = updatedFlagSpec, + children = Seq(child)).setRewritten() + } case _ => select } @@ -376,6 +427,43 @@ object MVHelper { } /** + * Updates the flagspec of given select plan with attributes of relation select plan + */ + private def updateSortOrder(keepAlias: Boolean, + select: Select, + relation: Select, + aliasMap: Map[AttributeKey, NamedExpression]) = { + val updatedFlagSpec = select.flagSpec.map { f => + f.map { + case list: ArrayBuffer[SortOrder] => + list.map { s => + val expressions = + updateOutPutList( + Seq(s.child.asInstanceOf[Attribute]), + relation, + aliasMap, + keepAlias = false) + SortOrder(expressions.head, s.direction, s.sameOrderExpressions) + } + } + } + updatedFlagSpec + } + + /** + * It checks whether full referesh for the table is required. It means we no need to apply + * aggregation function or group by functions on the mv table. + */ + private def isFullRefresh(mvPlanWrapper: MVPlanWrapper): Boolean = { + val fullRefesh = mvPlanWrapper.dataMapSchema.getProperties.get("full_refresh") + if (fullRefesh != null) { + fullRefesh.toBoolean + } else { + false + } + } + + /** * Rewrite the updated mv query with corresponding MV table. */ def rewriteWithMVTable(rewrittenPlan: ModularPlan, rewrite: QueryRewrite): ModularPlan = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c33857f/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala index 6bb1c6e..3e8b2cd 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.mv.rewrite import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter} import org.apache.carbondata.mv.plans.modular.{JoinEdge, Matchable, ModularPlan, _} @@ -498,13 +499,31 @@ object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with Predi val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias => (a.toAttribute, a) }) + val res = Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap { - case g: GroupBy => Some(g.copy(child = sel_2c1)); + case g: GroupBy => + + // Check any agg function exists on outputlist, in case of expressions like + // sum(a)+sum(b) , outputlist directly replaces with alias with in place of function + // so we should remove the groupby clause in those cases. + val aggFunExists = g.outputList.exists { f => + f.find { + case ag: AggregateExpression => true + case _ => false + }.isDefined + } + if (aggFunExists) { + Some(g.copy(child = sel_2c1)) + } else { + // Remove group by clause. + Some(g.copy(child = sel_2c1, predicateList = Seq.empty)) + } case _ => None }.map { wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap) }.map(Seq(_)) .getOrElse(Nil) + res } // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side) // via catalog service http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c33857f/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala index 5dc2245..76df4c2 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala @@ -102,29 +102,6 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) } } - private def updateDatamap(rchild: ModularPlan, subsume: ModularPlan) = { - val update = rchild match { - case s: Select if s.dataMapTableRelation.isDefined => - true - case g: GroupBy if g.dataMapTableRelation.isDefined => - true - case _ => false - } - - if (update) { - subsume match { - case s: Select => - s.copy(children = Seq(rchild)) - - case g: GroupBy => - g.copy(child = rchild) - case _ => subsume - } - } else { - subsume - } - } - // add Select operator as placeholder on top of subsumee to facilitate matching def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = { subsumee match { @@ -144,6 +121,9 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) dataMapRelation: ModularPlan): ModularPlan = { // Update datamap table relation to the subsumer modular plan val updatedSubsumer = subsumer match { + // In case of order by it adds extra select but that can be ignored while doing selection. + case s@ Select(_, _, _, _, _, Seq(g: GroupBy), _, _, _, _) => + s.copy(children = Seq(g.copy(dataMapTableRelation = Some(dataMapRelation)))) case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation)) case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation)) case other => other http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c33857f/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index a7dbdef..1072e96 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -21,6 +21,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.FindDataSourceTable import org.apache.spark.sql.parser.CarbonSpark2SqlParser @@ -39,6 +40,15 @@ private[mv] case class SummaryDataset(signature: Option[Signature], dataMapSchema: DataMapSchema, relation: ModularPlan) +/** + * It is wrapper on datamap relation along with schema. + */ +case class MVPlanWrapper(plan: ModularPlan, dataMapSchema: DataMapSchema) extends ModularPlan { + override def output: Seq[Attribute] = plan.output + + override def children: Seq[ModularPlan] = plan.children +} + private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) extends DataMapCatalog[SummaryDataset] { @@ -123,7 +133,11 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) Seq.empty, None) - summaryDatasets += SummaryDataset(signature, planToRegister, dataMapSchema, select) + summaryDatasets += SummaryDataset( + signature, + planToRegister, + dataMapSchema, + MVPlanWrapper(select, dataMapSchema)) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c33857f/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala index 0aa7b30..01884c5 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -395,6 +395,10 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") val analyzed = frame.queryExecution.analyzed assert(!verifyMVDataMap(analyzed, "datamap25")) + val frame1 = sql( + "select t1.empname as c1, t2.designation from fact_table1 t1 inner join fact_table2 t2 on (t1.empname = t2.empname) inner join fact_table3 t3 on (t1.empname=t3.empname)") + val analyzed1 = frame1.queryExecution.analyzed + assert(verifyMVDataMap(analyzed1, "datamap25")) checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = t2.empname")) sql(s"drop datamap datamap25") } @@ -665,6 +669,67 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { sql(s"drop datamap datamap45") } + test("jira carbondata-2523") { + + sql("drop datamap if exists mv13") + sql("drop table if exists test4") + sql("create table test4 ( name string,age int,salary int) stored by 'carbondata'") + + sql(" insert into test4 select 'babu',12,12").show() + sql("create datamap mv13 using 'mv' as select name,sum(salary) from test4 group by name") + sql("rebuild datamap mv13") + val frame = sql( + "select name,sum(salary) from test4 group by name") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "mv13")) + } + + test("jira carbondata-2528-1") { + + sql("drop datamap if exists MV_order") + sql("create datamap MV_order using 'mv' as select empname,sum(salary) as total from fact_table1 group by empname") + sql("rebuild datamap MV_order") + val frame = sql( + "select empname,sum(salary) as total from fact_table1 group by empname order by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_order")) + } + + test("jira carbondata-2528-2") { + + sql("drop datamap if exists MV_order") + sql("drop datamap if exists MV_desc_order") + sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname") + sql("rebuild datamap MV_order") + val frame = sql( + "select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_order")) + } + + test("jira carbondata-2528-3") { + + sql("drop datamap if exists MV_order") + sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC") + sql("rebuild datamap MV_order") + val frame = sql( + "select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_order")) + sql("drop datamap if exists MV_order") + } + + test("jira carbondata-2528-4") { + + sql("drop datamap if exists MV_order") + sql("create datamap MV_order using 'mv' as select empname,sum(salary)+sum(utilization) as total from fact_table1 group by empname order by empname DESC") + sql("rebuild datamap MV_order") + val frame = sql( + "select empname,sum(salary)+sum(utilization) as total from fact_table1 where empname = 'ravi' group by empname order by empname DESC") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "MV_order")) + sql("drop datamap if exists MV_order") + } def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { val tables = logicalPlan collect {