This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 2b222d4 [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV 2b222d4 is described below commit 2b222d4d279e7f0c53c062b0740d4aed3904a960 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Tue Dec 24 13:15:02 2019 +0530 [CARBONDATA-3629] Fix Select query failure on aggregation of same column on MV Problem: If MV datamap is created with SELECT a,sum(a) from maintable group by a and later queried with SELECT sum(a) from maintable, query fails as rewritten plan output list doesn't match with the table. Solution: Check if Aggregation exists in GroupBy Node and copy select node with aliasMap This closes #3530 --- .../org/apache/carbondata/mv/datamap/MVUtil.scala | 39 ++++++++++++++--- .../carbondata/mv/rewrite/DefaultMatchMaker.scala | 51 ++++++++++++++++------ .../mv/rewrite/TestAllOperationsOnMV.scala | 29 ++++++++++++ .../TestMVTimeSeriesCreateDataMapCommand.scala | 13 +++++- docs/datamap/mv-datamap-guide.md | 11 ++++- .../command/timeseries/TimeSeriesUtil.scala | 4 +- 6 files changed, 122 insertions(+), 25 deletions(-) diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala index fe76cc3..f3e8091 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala @@ -49,23 +49,37 @@ class MVUtil { case select: Select => select.children.map { case groupBy: GroupBy => - getFieldsFromProject(groupBy.outputList, groupBy.predicateList, logicalRelation) + getFieldsFromProject(groupBy.outputList, groupBy.predicateList, + logicalRelation, groupBy.flagSpec) case _: ModularRelation => - getFieldsFromProject(select.outputList, select.predicateList, logicalRelation) + getFieldsFromProject(select.outputList, select.predicateList, + logicalRelation, select.flagSpec) }.head case groupBy: GroupBy => groupBy.child match { case select: Select => - getFieldsFromProject(groupBy.outputList, select.predicateList, logicalRelation) + getFieldsFromProject(groupBy.outputList, select.predicateList, + logicalRelation, select.flagSpec) case _: ModularRelation => - getFieldsFromProject(groupBy.outputList, groupBy.predicateList, logicalRelation) + getFieldsFromProject(groupBy.outputList, groupBy.predicateList, + logicalRelation, groupBy.flagSpec) } } } + /** + * Create's main table to datamap table field relation map by using modular plan generated from + * user query + * @param outputList of the modular plan + * @param predicateList of the modular plan + * @param logicalRelation list of main table from query + * @param flagSpec to get SortOrder attribute if exists + * @return fieldRelationMap + */ def getFieldsFromProject(outputList: Seq[NamedExpression], predicateList: Seq[Expression], - logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, DataMapField] = { + logicalRelation: Seq[LogicalRelation], + flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, DataMapField] = { var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField] fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, logicalRelation) var finalPredicateList: Seq[NamedExpression] = Seq.empty @@ -75,6 +89,21 @@ class MVUtil { finalPredicateList = finalPredicateList.:+(attr) } } + // collect sort by columns + if (flagSpec.nonEmpty) { + flagSpec.map { f => + f.map { + case list: ArrayBuffer[_] => + list.map { + case s: SortOrder => + s.collect { + case attr: AttributeReference => + finalPredicateList = finalPredicateList.:+(attr) + } + } + } + } + } fieldToDataMapFieldMap ++== getFieldsFromProject(finalPredicateList.distinct, logicalRelation) fieldToDataMapFieldMap } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala index 616d0bd..7e8eb96 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -385,16 +385,15 @@ object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern { gb_2a.predicateList.exists(_.semanticEquals(expr))) val isGroupingRmE = gb_2a.predicateList.forall(expr => gb_2q.predicateList.exists(_.semanticEquals(expr))) + val isOutputEmR = gb_2q.outputList.forall { + case a @ Alias(_, _) => + gb_2a.outputList.exists{ + case a1: Alias => a1.child.semanticEquals(a.child) + case exp => exp.semanticEquals(a.child) + } + case exp => gb_2a.outputList.exists(_.semanticEquals(exp)) + } if (isGroupingEmR && isGroupingRmE) { - val isOutputEmR = gb_2q.outputList.forall { - case a @ Alias(_, _) => - gb_2a.outputList.exists{ - case a1: Alias => a1.child.semanticEquals(a.child) - case exp => exp.semanticEquals(a.child) - } - case exp => gb_2a.outputList.exists(_.semanticEquals(exp)) - } - if (isOutputEmR) { // Mappings of output of two plans by checking semantic equals. val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) => @@ -424,11 +423,35 @@ object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern { Utils.tryMatch( gb_2a, gb_2q, aliasMap).flatMap { case g: GroupBy => - Some(g.copy(child = g.child.withNewChildren( - g.child.children.map { - case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a; - case other => other - }))); + // Check any agg function exists on outputlist, in case of expressions like + // sum(a), then create new alias and copy to group by node + val aggFunExists = g.outputList.exists { f => + f.find { + case _: AggregateExpression => true + case _ => false + }.isDefined + } + if (aggFunExists && !isGroupingRmE && isOutputEmR) { + val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]() + val sel_1a = g.child.asInstanceOf[Select] + + val usel_1a = sel_1a.copy(outputList = sel_1a.outputList) + tChildren += gb_2a + val sel_1q_temp = sel_1a.copy( + predicateList = sel_1a.predicateList, + children = tChildren, + joinEdges = sel_1a.joinEdges, + aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap) + + val res = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap) + Some(g.copy(child = res)) + } else { + Some(g.copy(child = g.child.withNewChildren( + g.child.children.map { + case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a; + case other => other + }))); + } case _ => None}.map(Seq(_)).getOrElse(Nil) } else { Nil diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala index a81cd2f..1750ce7 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala @@ -541,6 +541,35 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach { sql("drop table IF EXISTS maintable") } + test("test query aggregation on mv datamap ") { + sql("drop table if exists maintable") + sql("create table maintable(name string, age int, add string) stored by 'carbondata'") + sql("insert into maintable values('abc',1,'a'),('def',2,'b'),('ghi',3,'c')") + val res = sql("select sum(age) from maintable") + sql("drop datamap if exists mv3") + sql("create datamap mv3 on table maintable using 'mv' as select age,sum(age) from maintable group by age") + val df = sql("select sum(age) from maintable") + TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "mv3") + checkAnswer(res, df) + sql("drop table if exists maintable") + } + + test("test order by columns not given in projection") { + sql("drop table IF EXISTS maintable") + sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") + sql("insert into table maintable select 'abc',21,2000") + val res = sql("select name from maintable order by c_code") + sql("drop datamap if exists dm1") + sql("create datamap dm1 using 'mv' as select name from maintable order by c_code") + val df = sql("select name from maintable order by c_code") + TestUtil.verifyMVDataMap(df.queryExecution.analyzed, "dm1") + checkAnswer(res, df) + intercept[Exception] { + sql("alter table maintable drop columns(c_code)") + }.getMessage.contains("Column name cannot be dropped because it exists in mv datamap: dm1") + sql("drop table if exists maintable") + } + test("drop meta cache on mv datamap table") { sql("drop table IF EXISTS maintable") sql("create table maintable(name string, c_code int, price int) stored by 'carbondata'") diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala index 4feab2f..699b189 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesCreateDataMapCommand.scala @@ -101,8 +101,17 @@ class TestMVTimeSeriesCreateDataMapCommand extends QueryTest with BeforeAndAfter sql( "create datamap datamap1 on table maintable_new using 'mv' as " + "select timeseries(projectjoindate,'second') from maintable_new") - }.getMessage - .contains("Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type") + }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") + intercept[MalformedCarbonCommandException] { + sql( + "create datamap datamap1 on table maintable_new using 'mv' as " + + "select timeseries(projectjoindate,'five_minute') from maintable_new") + }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") + intercept[MalformedCarbonCommandException] { + sql( + "create datamap datamap1 on table maintable_new using 'mv' as " + + "select timeseries(projectjoindate,'hour') from maintable_new") + }.getMessage.contains("Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") sql("drop table IF EXISTS maintable_new") } diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md index a0c3f1a..b75243f 100644 --- a/docs/datamap/mv-datamap-guide.md +++ b/docs/datamap/mv-datamap-guide.md @@ -73,7 +73,7 @@ EXPLAIN SELECT a, sum(b) from maintable group by a; GROUP BY country, sex ``` **NOTE**: - * Group by/Filter columns has to be provided in projection list while creating mv datamap + * Group by columns has to be provided in projection list while creating mv datamap * If only single parent table is involved in mv datamap creation, then TableProperties of Parent table (if not present in a aggregate function like sum(col)) listed below will be inherited to datamap table @@ -91,7 +91,14 @@ EXPLAIN SELECT a, sum(b) from maintable group by a; 12. NO_INVERTED_INDEX 13. COLUMN_COMPRESSOR - * All columns of main table at once cannot participate in mv datamap table creation + * Creating MV datamap with select query containing only project of all columns of maintable is unsupported + + **Example:** + If table 'x' contains columns 'a,b,c', + then creating MV datamap with below queries is not supported. + + 1. ```select a,b,c from x``` + 2. ```select * from x``` * TableProperties can be provided in DMProperties excluding LOCAL_DICTIONARY_INCLUDE, LOCAL_DICTIONARY_EXCLUDE, DICTIONARY_INCLUDE, DICTIONARY_EXCLUDE, INVERTED_INDEX, NO_INVERTED_INDEX, SORT_COLUMNS, LONG_STRING_COLUMNS, RANGE_COLUMN & COLUMN_META_CACHE diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index e72cf9b..f79e7d8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -154,10 +154,10 @@ object TimeSeriesUtil { timeSeriesFunction: String): Unit = { for (granularity <- Granularity.values()) { if (timeSeriesFunction.equalsIgnoreCase(granularity.getName - .substring(0, granularity.getName.indexOf(CarbonCommonConstants.UNDERSCORE)))) { + .substring(0, granularity.getName.lastIndexOf(CarbonCommonConstants.UNDERSCORE)))) { if (!supportedGranularitiesForDate.contains(granularity.getName)) { throw new MalformedCarbonCommandException( - "Granularity should be DAY,MONTH or YEAR, for timeseries column of Date type") + "Granularity should be of DAY/WEEK/MONTH/YEAR, for timeseries column of Date type") } } }