This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8acaf08  [CARBONDATA-3532] Support Query Rollup for MV TimeSeries 
Queries
8acaf08 is described below

commit 8acaf08b7494dfd9652380cb86d70eeb76738484
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Tue Dec 3 11:19:57 2019 +0530

    [CARBONDATA-3532] Support Query Rollup for MV TimeSeries Queries
    
    Supported Query RollUp for MV TimeSeries queries.
    How it is supported?
    For each timeseries query, check if query can be rolled up from the 
existing datasets, by replacing the granularity in given user query. If query 
is rewritten and rolledUp, then add Select and Group by nodes on the rewritten 
query.
    
    This closes #3495
---
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 160 +++++++++++-
 .../carbondata/mv/rewrite/DefaultMatchMaker.scala  |  19 +-
 .../apache/carbondata/mv/rewrite/Navigator.scala   | 181 ++++++++++++-
 .../timeseries/TestMVTimeSeriesQueryRollUp.scala   | 279 +++++++++++++++++++++
 .../carbondata/mv/plans/modular/ModularPlan.scala  |  18 ++
 .../org/apache/carbondata/mv/plans/package.scala   |  32 ++-
 .../apache/carbondata/mv/plans/util/Printers.scala |  56 ++++-
 docs/datamap/mv-datamap-guide.md                   |  35 ++-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |   8 +
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |   8 +
 10 files changed, 779 insertions(+), 17 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 12cad37..495ad5b 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -34,6 +34,7 @@ import 
org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction, Ti
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.types.{ArrayType, DateType, MapType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -830,18 +831,173 @@ object MVHelper {
    */
   def rewriteWithMVTable(rewrittenPlan: ModularPlan, rewrite: QueryRewrite): 
ModularPlan = {
     if (rewrittenPlan.find(_.rewritten).isDefined) {
-      val updatedDataMapTablePlan = rewrittenPlan transform {
+      var updatedMVTablePlan = rewrittenPlan transform {
         case s: Select =>
           MVHelper.updateDataMap(s, rewrite)
         case g: GroupBy =>
           MVHelper.updateDataMap(g, rewrite)
       }
-      updatedDataMapTablePlan
+      if (rewrittenPlan.isRolledUp) {
+        // If the rewritten query is rolled up, then rewrite the query based 
on the original modular
+        // plan. Make a new outputList based on original modular plan and wrap 
rewritten plan with
+        // select & group-by nodes with new outputList.
+
+        // For example:
+        // Given User query:
+        // SELECT timeseries(col,'day') from maintable group by 
timeseries(col,'day')
+        // If plan is rewritten as per 'hour' granularity of datamap1,
+        // then rewritten query will be like,
+        // SELECT datamap1_table.`UDF:timeseries_projectjoindate_hour` AS 
`UDF:timeseries
+        // (projectjoindate, hour)`
+        // FROM
+        // default.datamap1_table
+        // GROUP BY datamap1_table.`UDF:timeseries_projectjoindate_hour`
+        //
+        // Now, rewrite the rewritten plan as per the 'day' granularity
+        // SELECT timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, 
hour)`,'day' ) AS
+        // `UDF:timeseries(projectjoindate, day)`
+        //  FROM
+        //  (SELECT datamap2_table.`UDF:timeseries_projectjoindate_hour` AS 
`UDF:timeseries
+        //  (projectjoindate, hour)`
+        //  FROM
+        //    default.datamap2_table
+        //  GROUP BY datamap2_table.`UDF:timeseries_projectjoindate_hour`) 
gen_subsumer_0
+        // GROUP BY timeseries(gen_subsumer_0.`UDF:timeseries(projectjoindate, 
hour)`,'day' )
+        rewrite.modularPlan match {
+          case select: Select =>
+            val outputList = select.outputList
+            val rolledUpOutputList = 
updatedMVTablePlan.asInstanceOf[Select].outputList
+            var finalOutputList: Seq[NamedExpression] = Seq.empty
+            val mapping = outputList zip rolledUpOutputList
+            val newSubsume = rewrite.newSubsumerName()
+
+            mapping.foreach { outputLists =>
+              val name: String = getAliasName(outputLists._2)
+              outputLists._1 match {
+                case a@Alias(scalaUdf: ScalaUDF, aliasName) =>
+                  if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
+                    val newName = newSubsume + ".`" + name + "`"
+                    val transformedUdf = transformTimeSeriesUdf(scalaUdf, 
newName)
+                    finalOutputList = finalOutputList.:+(Alias(transformedUdf, 
aliasName)(a.exprId,
+                      a.qualifier).asInstanceOf[NamedExpression])
+                  }
+                case Alias(attr: AttributeReference, _) =>
+                  finalOutputList = finalOutputList.:+(
+                    CarbonToSparkAdapter.createAttributeReference(attr, name, 
newSubsume))
+                case attr: AttributeReference =>
+                  finalOutputList = finalOutputList.:+(
+                    CarbonToSparkAdapter.createAttributeReference(attr, name, 
newSubsume))
+              }
+            }
+            val newChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+            val newAliasMap = new collection.mutable.HashMap[Int, String]()
+
+            val sel_plan = select.copy(outputList = finalOutputList,
+              inputList = finalOutputList,
+              predicateList = Seq.empty)
+            newChildren += sel_plan
+            newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
+            updatedMVTablePlan = select.copy(outputList = finalOutputList,
+              inputList = finalOutputList,
+              aliasMap = newAliasMap.toMap,
+              predicateList = Seq.empty,
+              children = Seq(updatedMVTablePlan)).setRewritten()
+
+          case groupBy: GroupBy =>
+            updatedMVTablePlan match {
+              case select: Select =>
+                val selectOutputList = groupBy.outputList
+                val rolledUpOutputList = 
updatedMVTablePlan.asInstanceOf[Select].outputList
+                var finalOutputList: Seq[NamedExpression] = Seq.empty
+                var predicateList: Seq[Expression] = Seq.empty
+                val mapping = selectOutputList zip rolledUpOutputList
+                val newSubsume = rewrite.newSubsumerName()
+
+                mapping.foreach { outputLists =>
+                  val aliasName: String = getAliasName(outputLists._2)
+                  outputLists._1 match {
+                    case a@Alias(scalaUdf: ScalaUDF, _) =>
+                      if (scalaUdf.function.isInstanceOf[TimeSeriesFunction]) {
+                        val newName = newSubsume + ".`" + aliasName + "`"
+                        val transformedUdf = transformTimeSeriesUdf(scalaUdf, 
newName)
+                        groupBy.predicateList.foreach {
+                          case udf: ScalaUDF if udf.isInstanceOf[ScalaUDF] =>
+                            predicateList = predicateList.:+(transformedUdf)
+                          case attr: AttributeReference =>
+                            predicateList = predicateList.:+(
+                              
CarbonToSparkAdapter.createAttributeReference(attr,
+                                attr.name,
+                                newSubsume))
+                        }
+                        finalOutputList = 
finalOutputList.:+(Alias(transformedUdf, a.name)(a
+                          .exprId, a.qualifier).asInstanceOf[NamedExpression])
+                      }
+                    case attr: AttributeReference =>
+                      finalOutputList = finalOutputList.:+(
+                        CarbonToSparkAdapter.createAttributeReference(attr, 
aliasName, newSubsume))
+                    case Alias(attr: AttributeReference, _) =>
+                      finalOutputList = finalOutputList.:+(
+                        CarbonToSparkAdapter.createAttributeReference(attr, 
aliasName, newSubsume))
+                    case a@Alias(agg: AggregateExpression, name) =>
+                      val newAgg = agg.transform {
+                        case attr: AttributeReference =>
+                          CarbonToSparkAdapter.createAttributeReference(attr, 
name, newSubsume)
+                      }
+                      finalOutputList = finalOutputList.:+(Alias(newAgg, 
name)(a.exprId,
+                        a.qualifier).asInstanceOf[NamedExpression])
+                    case other => other
+                  }
+                }
+                val newChildren = new 
collection.mutable.ArrayBuffer[ModularPlan]()
+                val newAliasMap = new collection.mutable.HashMap[Int, String]()
+
+                val sel_plan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  predicateList = Seq.empty)
+                newChildren += sel_plan
+                newAliasMap += (newChildren.indexOf(sel_plan) -> newSubsume)
+                updatedMVTablePlan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  aliasMap = newAliasMap.toMap,
+                  children = Seq(updatedMVTablePlan)).setRewritten()
+                updatedMVTablePlan = groupBy.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  predicateList = predicateList,
+                  alias = Some(newAliasMap.mkString),
+                  child = updatedMVTablePlan).setRewritten()
+                updatedMVTablePlan = select.copy(outputList = finalOutputList,
+                  inputList = finalOutputList,
+                  children = Seq(updatedMVTablePlan)).setRewritten()
+            }
+        }
+      }
+      updatedMVTablePlan
     } else {
       rewrittenPlan
     }
   }
 
+  def getAliasName(exp: NamedExpression): String = {
+    exp match {
+      case Alias(_, name) =>
+        name
+      case attr: AttributeReference =>
+        attr.name
+    }
+  }
+
+  private def transformTimeSeriesUdf(scalaUdf: ScalaUDF, newName: String): 
Expression = {
+    scalaUdf.transformDown {
+      case attr: AttributeReference =>
+        AttributeReference(newName, attr.dataType)(
+          exprId = attr.exprId,
+          qualifier = attr.qualifier)
+      case l: Literal =>
+        Literal(UTF8String.fromString("'" + l.toString() + "'"),
+          org.apache.spark.sql.types.DataTypes.StringType)
+    }
+  }
+
   private def getUpdatedOutputList(outputList: Seq[NamedExpression],
       dataMapTableRelation: Option[ModularPlan]): Seq[NamedExpression] = {
     dataMapTableRelation.collect {
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
index 06dd7a1..50d6ae7 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -544,14 +544,19 @@ object GroupbyGroupbySelectOnlyChildDelta extends 
DefaultMatchPattern with Predi
    */
   private def isDerivableForUDF(exprE: Expression, exprListR: 
Seq[Expression]): Boolean = {
     var canBeDerived = false
-    exprListR.forall {
-      case a: ScalaUDF =>
-        a.references.foreach { a =>
-          canBeDerived = exprE.sql.contains(a.name)
-        }
-        canBeDerived
+    exprE match {
+      case f: ScalaUDF =>
+        canEvaluate(f, exprListR)
       case _ =>
-        canBeDerived
+        exprListR.forall {
+          case a: ScalaUDF =>
+            a.references.foreach { a =>
+              canBeDerived = exprE.sql.contains(a.name)
+            }
+            canBeDerived
+          case _ =>
+            canBeDerived
+        }
     }
   }
 
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
index 54a684e..b4a0ae1 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -17,8 +17,19 @@
 
 package org.apache.carbondata.mv.rewrite
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
 
+import org.apache.spark.sql.CarbonToSparkAdapter
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeMap, AttributeReference, Expression, Literal, NamedExpression, 
ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.DataTypes
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.metadata.schema.datamap.Granularity
+import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
 import org.apache.carbondata.mv.datamap.MVUtil
 import org.apache.carbondata.mv.expressions.modular._
 import org.apache.carbondata.mv.plans.modular
@@ -27,6 +38,8 @@ import org.apache.carbondata.mv.session.MVSession
 
 private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: 
MVSession) {
 
+  var modularPlan: java.util.Set[ModularPlan] = new 
java.util.HashSet[ModularPlan]()
+
   def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): 
ModularPlan = {
     val replaced = plan.transformAllExpressions {
       case s: ModularSubquery =>
@@ -66,12 +79,120 @@ private[mv] class Navigator(catalog: 
SummaryDatasetCatalog, session: MVSession)
         }
     }
     if (rewrittenPlan.fastEquals(plan)) {
-      None
+      if (modularPlan.asScala.exists(p => p.sameResult(rewrittenPlan))) {
+        return None
+      }
+      getRolledUpModularPlan(rewrittenPlan, rewrite)
     } else {
       Some(rewrittenPlan)
     }
   }
 
+  /**
+   * Check if modular plan can be rolled up by rewriting and matching the 
modular plan
+   * with existing mv datasets.
+   * @param rewrittenPlan to check if can be rolled up
+   * @param rewrite
+   * @return new modular plan
+   */
+  def getRolledUpModularPlan(rewrittenPlan: ModularPlan,
+      rewrite: QueryRewrite): Option[ModularPlan] = {
+    var canDoRollUp = true
+    // check if modular plan contains timeseries udf
+    val timeSeriesUdf = rewrittenPlan match {
+      case s: Select =>
+        getTimeSeriesUdf(s.outputList)
+      case g: GroupBy =>
+        getTimeSeriesUdf(g.outputList)
+      case _ => (null, null)
+    }
+    // set canDoRollUp to false, in case of Join queries and if filter has 
timeseries udf function
+    // TODO: support rollUp for join queries
+    rewrite.optimizedPlan.transformDown {
+      case join@Join(_, _, _, _) =>
+        canDoRollUp = false
+        join
+      case f@Filter(condition: Expression, _) =>
+        condition.collect {
+          case s: ScalaUDF if s.function.isInstanceOf[TimeSeriesFunction] =>
+            canDoRollUp = false
+        }
+        f
+    }
+    if (null != timeSeriesUdf._2 && canDoRollUp) {
+      // check for rollup and rewrite the plan
+      // collect all the datasets which contains timeseries datamap's
+      val dataSets = catalog.lookupFeasibleSummaryDatasets(rewrittenPlan)
+      var granularity: java.util.List[TimeSeriesFunctionEnum] = new java.util
+      .ArrayList[TimeSeriesFunctionEnum]()
+      // Get all the lower granularities for the query from datasets
+      dataSets.foreach { dataset =>
+        if (dataset.dataMapSchema.isTimeSeries) {
+          dataset.plan.transformExpressions {
+            case a@Alias(udf: ScalaUDF, _) =>
+              if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+                val gran = 
udf.children.last.asInstanceOf[Literal].toString().toUpperCase()
+                if 
(Granularity.valueOf(timeSeriesUdf._2.toUpperCase).ordinal() <=
+                    Granularity.valueOf(gran).ordinal()) {
+                  granularity.add(TimeSeriesFunctionEnum.valueOf(gran))
+                }
+              }
+              a
+          }
+        }
+      }
+      if (!granularity.isEmpty) {
+        granularity = 
granularity.asScala.sortBy(_.getOrdinal)(Ordering[Int].reverse).asJava
+        var orgTable: String = null
+        // get query Table
+        rewrittenPlan.collect {
+          case m: ModularRelation =>
+            orgTable = m.tableName
+        }
+        var queryGranularity: String = null
+        var newRewrittenPlan = rewrittenPlan
+        // replace granularities in the plan and check if plan can be changed
+        breakable {
+          granularity.asScala.foreach { func =>
+            newRewrittenPlan = rewriteGranularityInPlan(rewrittenPlan, 
func.getName)
+            modularPlan.add(newRewrittenPlan)
+            val logicalPlan = session
+              .sparkSession
+              .sql(newRewrittenPlan.asCompactSQL)
+              .queryExecution
+              .optimizedPlan
+            modularPlan.clear()
+            var rolledUpTable: String = null
+            logicalPlan.collect {
+              case l: LogicalRelation =>
+                rolledUpTable = l.catalogTable.get.identifier.table
+            }
+            if (!rolledUpTable.equalsIgnoreCase(orgTable)) {
+              queryGranularity = func.getName
+              break()
+            }
+          }
+        }
+        if (null != queryGranularity) {
+          // rewrite the plan and set it as rolled up plan
+          val modifiedPlan = rewriteWithSummaryDatasetsCore(newRewrittenPlan, 
rewrite)
+          if (modifiedPlan.isDefined) {
+            modifiedPlan.get.map(_.setRolledUp())
+            modifiedPlan
+          } else {
+            None
+          }
+        } else {
+          None
+        }
+      } else {
+        None
+      }
+    } else {
+      None
+    }
+  }
+
   def subsume(
       subsumer: ModularPlan,
       subsumee: ModularPlan,
@@ -206,4 +327,60 @@ private[mv] class Navigator(catalog: 
SummaryDatasetCatalog, session: MVSession)
     }
     true
   }
+
+  /**
+   * Replace the identified immediate lower level granularity in the modular 
plan
+   * to perform rollup
+   *
+   * @param plan             to be re-written
+   * @param queryGranularity to be replaced
+   * @return plan with granularity changed
+   */
+  private def rewriteGranularityInPlan(plan: ModularPlan, queryGranularity: 
String) = {
+    val newPlan = plan.transformDown {
+      case p => p.transformAllExpressions {
+        case udf: ScalaUDF =>
+          if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+            val transformedUdf = udf.transformDown {
+              case _: Literal =>
+                new 
Literal(UTF8String.fromString(queryGranularity.toLowerCase),
+                  DataTypes.StringType)
+            }
+            transformedUdf
+          } else {
+            udf
+          }
+        case alias@Alias(udf: ScalaUDF, name) =>
+          if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+            var literal: String = null
+            val transformedUdf = udf.transformDown {
+              case l: Literal =>
+                literal = l.value.toString
+                new 
Literal(UTF8String.fromString(queryGranularity.toLowerCase),
+                  DataTypes.StringType)
+            }
+            Alias(transformedUdf,
+              name.replace(", " + literal, ", " + 
queryGranularity))(alias.exprId,
+              alias.qualifier).asInstanceOf[NamedExpression]
+          } else {
+            alias
+          }
+      }
+    }
+    newPlan
+  }
+
+  def getTimeSeriesUdf(outputList: scala.Seq[NamedExpression]): (String, 
String) = {
+    outputList.collect {
+      case Alias(udf: ScalaUDF, name) =>
+        if (udf.function.isInstanceOf[TimeSeriesFunction]) {
+          udf.children.collect {
+            case l: Literal =>
+              return (name, l.value.toString)
+          }
+        }
+    }
+    (null, null)
+  }
+
 }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
new file mode 100644
index 0000000..d9c17b3
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesQueryRollUp.scala
@@ -0,0 +1,279 @@
+  /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+  package org.apache.carbondata.mv.timeseries
+
+  import org.apache.spark.sql.test.util.QueryTest
+  import org.scalatest.BeforeAndAfterAll
+
+  import org.apache.carbondata.mv.rewrite.TestUtil
+
+  class TestMVTimeSeriesQueryRollUp extends QueryTest with BeforeAndAfterAll {
+
+    override def beforeAll(): Unit = {
+      drop()
+      createTable()
+      loadData("maintable")
+    }
+
+    override def afterAll(): Unit = {
+      drop()
+    }
+
+    test("test timeseries query rollup with simple projection") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - 
scenario-1") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable group by timeseries(projectjoindate,'second'),projectcode")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable 
group by timeseries(projectjoindate,'hour'),projectcode")
+      var df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      df = sql("select timeseries(projectjoindate,'second'),projectcode from 
maintable group by timeseries(projectjoindate,'second'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      df = sql("select timeseries(projectjoindate,'second') from maintable 
group by timeseries(projectjoindate,'second')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - 
scenario-1 with single datamap") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable group by timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable group by timeseries(projectjoindate,'second'),projectcode")
+      var df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      checkAnswer(result,df)
+      df = sql("select timeseries(projectjoindate,'second'),projectcode from 
maintable group by timeseries(projectjoindate,'second'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      df = sql("select timeseries(projectjoindate,'second') from maintable 
group by timeseries(projectjoindate,'second')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      sql("drop datamap if exists datamap1")
+    }
+
+    test("test timeseries query rollup with simple projection with group by - 
scenario-2") {
+      val result  = sql("select 
timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by 
timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'hour')")
+      val df =sql("select timeseries(projectjoindate,'day'),sum(projectcode) 
from maintable group by timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with filter") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable where projectcode=8")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable where projectcode=8")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by & 
filter - scenario 1") {
+      val result = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable where projectcode=8 " +
+                       "group by 
timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql("create datamap datamap1 on table maintable using 'mv' as " +
+          "select timeseries(projectjoindate,'second'),projectcode from 
maintable group by " +
+          "timeseries(projectjoindate,'second'),projectcode")
+      sql("create datamap datamap2 on table maintable using 'mv' as " +
+          "select timeseries(projectjoindate,'hour'),projectcode from 
maintable group by timeseries" +
+          "(projectjoindate,'hour'),projectcode")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable where projectcode=8 " +
+                   "group by timeseries(projectjoindate,'day'),projectcode")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result, df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with group by & 
filter - scenario 2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable where projectcode=8 group by 
timeseries(projectjoindate,'day'),projectcode")
+      sql("drop datamap if exists datamap1")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable where projectcode=1 group by 
timeseries(projectjoindate,'second'),projectcode")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable where projectcode=8 group by 
timeseries(projectjoindate,'day'),projectcode")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap1"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+    }
+
+    test("test timeseries query rollup with simple projection with alias- 
scenario 1") {
+      val result  = sql("select timeseries(projectjoindate,'day') as 
a,projectcode as b from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),projectcode from 
maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),projectcode from maintable")
+      val df = sql("select timeseries(projectjoindate,'day') as a,projectcode 
as b from maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with simple projection with alias- 
scenario 2") {
+      val result  = sql("select timeseries(projectjoindate,'day'),projectcode 
from maintable")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second') as a,projectcode as b 
from maintable")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour') as a,projectcode as b from 
maintable")
+      val df = sql("select timeseries(projectjoindate,'day'),projectcode from 
maintable")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with projection with alias and group 
by- scenario 1") {
+      val result  = sql("select timeseries(projectjoindate,'day') as 
a,sum(projectcode) as b from maintable group by 
timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day') as 
a,sum(projectcode) as b from maintable group by 
timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("test timeseries query rollup with projection with alias and group 
by- scenario 2") {
+      val result  = sql("select 
timeseries(projectjoindate,'day'),sum(projectcode) from maintable group by 
timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second') as a,sum(projectcode) as 
b from maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour') as a,sum(projectcode) as b 
from maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day'),sum(projectcode) 
from maintable group by timeseries(projectjoindate,'day')")
+      assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("rollup not supported for join queries") {
+      sql("drop table if exists maintable1")
+      sql("CREATE TABLE maintable1 (empno int,empname string, projectcode int, 
projectjoindate " +
+        "Timestamp,salary double) STORED AS CARBONDATA")
+      loadData("maintable1")
+      val result = sql("select 
timeseries(t1.projectjoindate,'day'),count(timeseries(t2.projectjoindate,'day')),sum(t2.projectcode)
 from maintable t1 inner join maintable1 t2 " +
+          "on 
(timeseries(t1.projectjoindate,'day')=timeseries(t2.projectjoindate,'day')) 
group by timeseries(t1.projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql("create datamap datamap1 on table maintable using 'mv' as " +
+        "select 
timeseries(t1.projectjoindate,'second'),count(timeseries(t2.projectjoindate,'second')),sum(t2.projectcode)
 from maintable t1 inner join maintable1 t2 " +
+        "on 
(timeseries(t1.projectjoindate,'second')=timeseries(t2.projectjoindate,'second'))
 group by timeseries(t1.projectjoindate,'second')")
+      sql("create datamap datamap2 on table maintable using 'mv' as " +
+        "select 
timeseries(t1.projectjoindate,'hour'),count(timeseries(t2.projectjoindate,'hour')),sum(t2.projectcode)
 from maintable t1 inner join maintable1 t2 " +
+        "on 
(timeseries(t1.projectjoindate,'hour')=timeseries(t2.projectjoindate,'hour')) 
group by timeseries(t1.projectjoindate,'hour')")
+      val df = sql("select 
timeseries(t1.projectjoindate,'day'),count(timeseries(t2.projectjoindate,'day')),sum(t2.projectcode)
 from maintable t1 inner join maintable1 t2 " +
+          "on 
(timeseries(t1.projectjoindate,'day')=timeseries(t2.projectjoindate,'day')) 
group by timeseries(t1.projectjoindate,'day')")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    test("rollup not supported for timeseries udf in filter") {
+      val result  = sql("select 
timeseries(projectjoindate,'day'),sum(projectcode) from maintable where 
timeseries(projectjoindate,'day')='2016-02-23 00:00:00' group by 
timeseries(projectjoindate,'day')")
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+      sql(
+        "create datamap datamap1 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'second'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'second')")
+      sql(
+        "create datamap datamap2 on table maintable using 'mv' as " +
+        "select timeseries(projectjoindate,'hour'),sum(projectcode) from 
maintable group by timeseries(projectjoindate,'hour')")
+      val df = sql("select timeseries(projectjoindate,'day'),sum(projectcode) 
from maintable where timeseries(projectjoindate,'day')='2016-02-23 00:00:00' 
group by timeseries(projectjoindate,'day')")
+      assert(!TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, 
"datamap2"))
+      checkAnswer(result,df)
+      sql("drop datamap if exists datamap1")
+      sql("drop datamap if exists datamap2")
+    }
+
+    def drop(): Unit = {
+      sql("drop table if exists maintable")
+    }
+
+    def createTable(): Unit = {
+      sql(
+        "CREATE TABLE maintable (empno int,empname string, projectcode int, 
projectjoindate " +
+        "Timestamp,salary double) STORED AS CARBONDATA")
+    }
+
+    def loadData(table: String): Unit = {
+      sql(
+        s"""LOAD DATA local inpath '$resourcesPath/mv_sampledata.csv' INTO 
TABLE $table  OPTIONS
+           |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    }
+}
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index 55d3c5c..246ca0d 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -96,6 +96,24 @@ abstract class ModularPlan
     _rewritten
   }
 
+  private var rolledUp: Boolean = false
+
+  /**
+   * Marks this plan as rolledup plan
+   */
+  private[mv] def setRolledUp(): ModularPlan = {
+    rolledUp = true
+    children.foreach(_.setRolledUp())
+    this
+  }
+
+  /**
+   * Returns true if plan is rolledup
+   */
+  def isRolledUp: Boolean = {
+    rolledUp
+  }
+
   private var _skip: Boolean = false
 
   private[mv] def setSkip(): ModularPlan = {
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
index 8c799fe..e85c238 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
@@ -18,7 +18,7 @@
 package org.apache.carbondata.mv
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
PredicateHelper, ScalaUDF}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.carbondata.mv.plans.modular.ModularPlan
@@ -42,8 +42,36 @@ package object plans {
       expr.references.subsetOf(plan.outputSet)
     }
 
+
+    /**
+     * If exp is a ScalaUDF, then for each of it's children, we have to check 
if
+     * children can be derived from another scala UDF children from exprList
+     * @param exp scalaUDF
+     * @param exprList predicate and rejoin output list
+     * @return if udf can be derived from another udf
+     */
+    def canEvaluate(exp: ScalaUDF, exprList: Seq[Expression]): Boolean = {
+      var canBeDerived = false
+      exprList.forall {
+        case udf: ScalaUDF =>
+          if (udf.children.length == exp.children.length) {
+            if (udf.children.zip(exp.children).forall(e => 
e._1.sql.equalsIgnoreCase(e._2.sql))) {
+              canBeDerived = true
+            }
+          }
+          canBeDerived
+        case _ =>
+          canBeDerived
+      }
+    }
+
     def canEvaluate(expr: Expression, exprList: Seq[Expression]): Boolean = {
-      expr.references.subsetOf(AttributeSet(exprList))
+      expr match {
+        case exp: ScalaUDF =>
+          canEvaluate(exp, exprList)
+        case _ =>
+          expr.references.subsetOf(AttributeSet(exprList))
+      }
     }
   }
 
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
index 366284b..9112005 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -106,9 +106,9 @@ trait Printers {
     def printSelect(select: Seq[NamedExpression], flags: FlagSet): Unit = {
       if (flags.hasFlag(DISTINCT)) {
         print("SELECT DISTINCT %s "
-          .format(select.map(_.sql) mkString ", "))
+          .format(formatUDF(select)))
       } else {
-        print("SELECT %s ".format(select.map(_.sql) mkString ", "))
+        print("SELECT %s ".format(formatUDF(select)))
       }
     }
 
@@ -153,7 +153,7 @@ trait Printers {
     def printGroupby(groupby: (Seq[Expression], Seq[Seq[Expression]])): Unit = 
{
 
       if (groupby._1.nonEmpty) {
-        print("GROUP BY %s".format(groupby._1.map(_.sql).mkString(", ")))
+        print("GROUP BY %s".format(formatUDFinExpression(groupby._1)))
         if (groupby._2.nonEmpty) {
           print(" GROUPING SETS(%s)"
             .format(groupby._2.map(e => s"(${ e.map(_.sql).mkString(", ") 
})").mkString(", ")))
@@ -343,5 +343,55 @@ trait Printers {
     newSQLFragmentOneLinePrinter(
       new PrintWriter(stream))
   }
+
+  def formatUDF(select: Seq[NamedExpression]): String = {
+    val result = select.map {
+      // if scala UDF, then don't do direct sql
+      case a@Alias(child: ScalaUDF, _) =>
+        val udf = child.udfName
+        if (udf.isDefined) {
+          udf.get + "(" + (formatExpressionsInUDF(child.children)) + " ) AS `" 
+ a.name + "`"
+        } else {
+          child.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ", "
+  }
+
+  def formatExpressionsInUDF(exp: Seq[Expression]): String = {
+    val result = exp.map {
+      case attr: AttributeReference =>
+        if (attr.name.startsWith("gen_subsumer_")) {
+          attr.name
+        } else {
+          attr.sql
+        }
+      case literal: Literal =>
+        if (literal.value.toString.startsWith("`") || 
literal.value.toString.startsWith("'")) {
+          literal.value.toString
+        } else {
+          literal.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ","
+  }
+
+  def formatUDFinExpression(select: Seq[Expression]): String = {
+    val result = select.map {
+      case udf: ScalaUDF if udf.isInstanceOf[ScalaUDF] =>
+        if (udf.udfName.isDefined) {
+          udf.udfName.get + "(" + formatExpressionsInUDF(udf.children) + " )"
+        } else {
+          udf.sql
+        }
+      case other =>
+        other.sql
+    }
+    result mkString ", "
+  }
 }
 // scalastyle:on println
diff --git a/docs/datamap/mv-datamap-guide.md b/docs/datamap/mv-datamap-guide.md
index 1e5a0bc..dd218ee 100644
--- a/docs/datamap/mv-datamap-guide.md
+++ b/docs/datamap/mv-datamap-guide.md
@@ -24,6 +24,7 @@
 * [Compaction](#compacting-mv-datamap)
 * [Data Management](#data-management-with-mv-tables)
 * [MV TimeSeries Support](#mv-timeseries-support)
+* [MV TimeSeries RollUp Support](#mv-timeseries-rollup-support)
 
 ## Quick example
 
@@ -235,4 +236,36 @@ Timeseries queries with Date type support's only year, 
month, day and week granu
 
  **NOTE**:
  1. Single select statement cannot contain timeseries udf(s) neither with 
different granularity nor
- with different timestamp/date columns.
\ No newline at end of file
+ with different timestamp/date columns.
+ 
+ ## MV TimeSeries RollUp Support
+  MV Timeseries queries can be rolledUp from existing mv datamaps.
+  ### Query RollUp
+ Consider an example where the query is on hour level granularity, but the 
datamap
+ of hour is not present but  minute level datamap is present, then we can get 
the data
+ from minute level and the aggregate the hour level data and give output.
+ This is called query rollup.
+ 
+ Consider if user create's below timeseries datamap,
+   ```
+   CREATE DATAMAP agg_sales
+   ON TABLE sales
+   USING "MV"
+   AS
+     SELECT timeseries(order_time,'minute'),avg(price)
+     FROM sales
+     GROUP BY timeseries(order_time,'minute')
+   ```
+ and fires the below query with hour level granularity.
+   ```
+    SELECT timeseries(order_time,'hour'),avg(price)
+    FROM sales
+    GROUP BY timeseries(order_time,'hour')
+   ```
+ Then, the above query can be rolled up from 'agg_sales' mv datamap, by adding 
hour
+ level timeseries aggregation on minute level datamap. Users can fire explain 
command
+ to check if query is rolled up from existing mv datamaps.
+ 
+  **NOTE**:
+  1. Queries cannot be rolled up, if filter contains timeseries function.
+  2. RollUp is not yet supported for queries having join clause or order by 
functions.
\ No newline at end of file
diff --git 
a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 6182a94..3135b99 100644
--- 
a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -52,6 +52,14 @@ object CarbonToSparkAdapter {
       metadata)(exprId, qualifier)
   }
 
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = Some(newSubsume))
+  }
+
   def createScalaUDF(s: ScalaUDF, reference: AttributeReference): ScalaUDF = {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
   }
diff --git 
a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
 
b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 149a9ca..910c30d 100644
--- 
a/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ 
b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -73,6 +73,14 @@ object CarbonToSparkAdapter {
       metadata)(exprId, qualifier)
   }
 
+  def createAttributeReference(attr: AttributeReference,
+      attrName: String,
+      newSubsume: String): AttributeReference = {
+    AttributeReference(attrName, attr.dataType)(
+      exprId = attr.exprId,
+      qualifier = newSubsume.split("\n").map(_.trim))
+  }
+
   def createScalaUDF(s: ScalaUDF, reference: AttributeReference) = {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, 
s.inputTypes)
   }

Reply via email to