This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 75cab45  [CARBONDATA-3600] Fix creating mv timeseries UDF column as 
partition column
75cab45 is described below

commit 75cab4560c109de369ad3f830b5fed8b77e27182
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Sun Dec 1 16:21:04 2019 +0530

    [CARBONDATA-3600] Fix creating mv timeseries UDF column as partition column
    
    Problem:
    Issue 1:
    When trying to create datamap with partition column in timeseries udf,
    throws Exception.
    Issue 2:
    When Create datamap was in progress, Jdbc application is killed. When
    restarting, datamap table not found exception is thrown.
    
    Solution:
    Issue 1:
    While adding fields to FieldRelationMap, in case of group by, no need to
    add UDF expressions.
    If partition column is present in timeseries UDF, child table will not
    inherit partition fields from parent table
    Issue 2:
    Clean up the invalid table and schema from the store
    
    This closes #3493
---
 .../carbondata/mv/datamap/MVDataMapProvider.scala  | 13 +++++--
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 10 +++---
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 42 +++++++++++-----------
 .../mv/rewrite/TestPartitionWithMV.scala           | 18 ++++++++++
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  | 12 +++++++
 .../scala/org/apache/spark/sql/CarbonEnv.scala     | 36 +++++++++++++++++++
 6 files changed, 103 insertions(+), 28 deletions(-)

diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 15508fd..6499a8f 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -95,8 +95,17 @@ class MVDataMapProvider(
       dataMapSchema.getRelationIdentifier.getTableName,
       true)
     dropTableCommand.processMetadata(sparkSession)
-    DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
-    
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+    // First, drop datamapschema and unregister datamap from catalog, because 
if in
+    // case, unregister fails, datamapschema will not be deleted from system 
and cannot
+    // create datamap also again
+    try {
+      
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+    } catch {
+      case e: IOException =>
+        throw e
+    } finally {
+      DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+    }
   }
 
   override def cleanData(): Unit = {
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 7ff4c8a..29afec1 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -305,10 +305,10 @@ object MVHelper {
   }
 
   private def isValidSelect(isValidExp: Boolean,
-      s: Select): Boolean = {
+      filterPredicate: Seq[Expression], outputList: Seq[NamedExpression]): 
Boolean = {
     // Make sure all predicates are present in projections.
     var predicateList: Seq[AttributeReference] = Seq.empty
-    s.predicateList.map { f =>
+    filterPredicate.map { f =>
       f.children.collect {
         case p: AttributeReference =>
           predicateList = predicateList.+:(p)
@@ -321,7 +321,7 @@ object MVHelper {
     }
     if (predicateList.nonEmpty) {
       predicateList.forall { p =>
-        s.outputList.exists {
+        outputList.exists {
           case a: Alias =>
             a.semanticEquals(p) || a.child.semanticEquals(p) || a.collect {
               case attr: AttributeReference =>
@@ -364,11 +364,11 @@ object MVHelper {
         }
         g.child match {
           case s: Select =>
-            isValidSelect(isValidExp, s)
+            isValidSelect(isValidExp, s.predicateList, g.outputList)
           case m: ModularRelation => isValidExp
         }
       case s: Select =>
-        isValidSelect(true, s)
+        isValidSelect(true, s.predicateList, s.outputList)
       case _ => true
     }
     if (!isValid) {
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 7e6ee02..74b0474 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -204,27 +204,27 @@ class MVUtil {
           "")
       }
     }
-    groupByExp map { grp =>
-      grp.collect {
-        case attr: AttributeReference =>
-          val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
-          if (null != carbonTable) {
-            val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
-                ArrayBuffer[ColumnTableRelation]()
-            arrayBuffer += getColumnRelation(attr.name,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-              
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-              carbonTable)
-            fieldToDataMapFieldMap +=
-            getFieldToDataMapFields(attr.name,
-              attr.dataType,
-              attr.qualifier,
-              "",
-              arrayBuffer,
-              carbonTable.getTableName)
-          }
-      }
+    groupByExp map {
+      case attr: AttributeReference =>
+        val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+        if (null != carbonTable) {
+          val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
+              ArrayBuffer[ColumnTableRelation]()
+          arrayBuffer += getColumnRelation(attr.name,
+            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+            
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+            carbonTable)
+          fieldToDataMapFieldMap +=
+          getFieldToDataMapFields(attr.name,
+            attr.dataType,
+            attr.qualifier,
+            "",
+            arrayBuffer,
+            carbonTable.getTableName)
+        }
+        attr
+      case _ =>
     }
     fieldToDataMapFieldMap
   }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
index 3cb0227..0d5b645 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -685,4 +685,22 @@ class TestPartitionWithMV extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists partitionone")
   }
 
+  test("test partition on timeseries column") {
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b int) partitioned by (c timestamp,d 
timestamp) stored by 'carbondata'")
+    sql("insert into partitionone values(1,2,'2017-01-01 01:00:00','2018-01-01 
01:00:00')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'mv' as select 
timeseries(c,'day'),sum(b) from partitionone group by timeseries(c,'day')")
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"dm1_table")(sqlContext.sparkSession).isHivePartitionTable)
+    assert(sql("select timeseries(c,'day'),sum(b) from partitionone group by 
timeseries(c,'day')").count() == 1)
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b timestamp) partitioned by (c 
timestamp) stored by 'carbondata'")
+    sql("insert into partitionone values(1,'2017-01-01 01:00:00','2018-01-01 
01:00:00')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'mv' as select 
timeseries(b,'day'),c from partitionone group by timeseries(b,'day'),c")
+    
assert(CarbonEnv.getCarbonTable(Some("partition_mv"),"dm1_table")(sqlContext.sparkSession).isHivePartitionTable)
+    assert(sql("select timeseries(b,'day'),c from partitionone group by 
timeseries(b,'day'),c").count() == 1)
+    sql("drop table if exists partitionone")
+  }
+
 }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 8815a94..3bd1e60 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -329,6 +329,18 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with 
BeforeAndAfterAll {
     checkPlan("datamap1", df)
   }
 
+  test("test create datamap with group by & filter columns not present in 
projection") {
+    sql("drop datamap if exists dm ")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select 
timeseries(projectjoindate,'day') from maintable where empname='chandler' group 
by timeseries(projectjoindate,'day'),empname")
+    }.getMessage.contains("Group by/Filter columns must be present in project 
columns")
+    sql("create datamap dm using 'mv' as select 
timeseries(projectjoindate,'day'),empname from maintable where 
empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    var df = sql("select timeseries(projectjoindate,'day'),empname from 
maintable where empname='chandler' group by 
timeseries(projectjoindate,'day'),empname")
+    checkPlan("dm", df)
+    df = sql("select timeseries(projectjoindate,'day') from maintable where 
empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    checkPlan("dm", df)
+    sql("drop datamap if exists dm ")
+  }
 
   override def afterAll(): Unit = {
     drop()
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 3616cf6..37db135 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql
 
+import java.io.IOException
 import java.util.concurrent.ConcurrentHashMap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -121,11 +124,44 @@ class CarbonEnv {
         CarbonProperties.getInstance
           
.addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
         initialized = true
+        cleanChildTablesNotRegisteredInHive(sparkSession)
       }
     }
     Profiler.initialize(sparkSession.sparkContext)
     LOGGER.info("Initialize CarbonEnv completed...")
   }
+
+  private def cleanChildTablesNotRegisteredInHive(sparkSession: SparkSession): 
Unit = {
+    // If in case JDBC application is killed/stopped, when create datamap was 
in progress, datamap
+    // table was created and datampschema was saved to the system, but table 
was not registered to
+    // metastore. So, when we restart JDBC application, we need to clean up
+    // stale tables and datamapschema's.
+    val dataMapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas
+    dataMapSchemas.asScala.foreach {
+      dataMapSchema =>
+        if (null != dataMapSchema.getRelationIdentifier &&
+            !dataMapSchema.isIndexDataMap) {
+          if (!sparkSession.sessionState
+            .catalog
+            
.tableExists(TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
+              Some(dataMapSchema.getRelationIdentifier.getDatabaseName)))) {
+            try {
+              
DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+            } catch {
+              case e: IOException =>
+                throw e
+            } finally {
+              
DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+              if 
(FileFactory.isFileExist(dataMapSchema.getRelationIdentifier.getTablePath)) {
+                
CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(dataMapSchema
+                  .getRelationIdentifier
+                  .getTablePath))
+              }
+            }
+          }
+        }
+    }
+  }
 }
 
 object CarbonEnv {

Reply via email to