This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 51235d4  [CARBONDATA-3387] Support Partition with MV datamap & Show 
DataMap Status
51235d4 is described below

commit 51235d4cf239ea0d167623fed5ae339796d56eae
Author: Indhumathi27 <indhumathi...@gmail.com>
AuthorDate: Mon May 13 11:08:31 2019 +0530

    [CARBONDATA-3387] Support Partition with MV datamap & Show DataMap Status
    
    This PR includes,
    
    Support Partition with Mv Datamap [Datamap with single parent table]
    
    Show DataMap status and ParentTable to Datamap table segment Sync 
Information with SHOW DATAMAP ddl
    
    Optimization for Incremental DataLoad.
    In case of below scenario we can avoid reloading the MV
    Maintable segments:0,1,2
    MV: 0 => 0,1,2
    Now after maintable compaction it will reload the 0.1 segment of maintable 
to MV, this is avoided by changing the mapping {0,1,2}=>{0.1}
    
    This closes #3216
---
 .../core/constants/CarbonCommonConstants.java      |   2 +
 .../carbondata/core/datamap/DataMapProvider.java   |  64 +-
 .../core/metadata/schema/table/DataMapSchema.java  |  13 +
 datamap/mv/core/pom.xml                            |   2 +-
 .../carbondata/mv/datamap/MVDataMapProvider.scala  |  12 +-
 .../apache/carbondata/mv/datamap/MVHelper.scala    |  75 ++-
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  |   3 +-
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  |  23 +
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 138 ++++-
 .../mv/rewrite/TestPartitionWithMV.scala           | 688 +++++++++++++++++++++
 datamap/mv/plan/pom.xml                            |   2 +-
 .../mv/plans/util/BirdcageOptimizer.scala          |   4 +-
 .../testsuite/datamap/TestDataMapCommand.scala     |  10 +-
 ...StandardPartitionWithPreaggregateTestCase.scala |  10 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |   5 +-
 .../datamap/CarbonCreateDataMapCommand.scala       |  36 +-
 .../command/datamap/CarbonDataMapShowCommand.scala |  54 +-
 .../command/management/CarbonLoadDataCommand.scala |  10 +-
 .../execution/command/mv/DataMapListeners.scala    | 113 +++-
 .../CarbonAlterTableDropHivePartitionCommand.scala |   4 -
 .../preaaggregate/PreAggregateListeners.scala      |   2 +-
 .../command/table/CarbonDropTableCommand.scala     |  14 +-
 .../spark/sql/execution/strategy/DDLStrategy.scala |   4 +
 .../processing/util/CarbonLoaderUtil.java          |  43 ++
 24 files changed, 1280 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 9375414..e78ea17 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2174,4 +2174,6 @@ public final class CarbonCommonConstants {
    */
   public static final String PARENT_TABLES = "parent_tables";
 
+  public static final String LOAD_SYNC_TIME = "load_sync_time";
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index fe2e7dd..c4ee49b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -264,23 +264,52 @@ public abstract class DataMapProvider {
     } else {
       for (RelationIdentifier relationIdentifier : relationIdentifiers) {
         List<String> dataMapTableSegmentList = new ArrayList<>();
+        // Get all segments for parent relationIdentifier
+        List<String> mainTableSegmentList =
+            DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
+        boolean ifTableStatusUpdateRequired = false;
         for (LoadMetadataDetails loadMetaDetail : listOfLoadFolderDetails) {
           if (loadMetaDetail.getSegmentStatus() == SegmentStatus.SUCCESS
               || loadMetaDetail.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS) {
             Map<String, List<String>> segmentMaps =
                 
DataMapSegmentStatusUtil.getSegmentMap(loadMetaDetail.getExtraInfo());
-            dataMapTableSegmentList.addAll(segmentMaps.get(
-                relationIdentifier.getDatabaseName() + 
CarbonCommonConstants.POINT
-                    + relationIdentifier.getTableName()));
+            String mainTableMetaDataPath =
+                
CarbonTablePath.getMetadataPath(relationIdentifier.getTablePath());
+            LoadMetadataDetails[] parentTableLoadMetaDataDetails =
+                SegmentStatusManager.readLoadMetadata(mainTableMetaDataPath);
+            String table = relationIdentifier.getDatabaseName() + 
CarbonCommonConstants.POINT
+                + relationIdentifier.getTableName();
+            for (String segmentId : mainTableSegmentList) {
+              // In case if dataMap segment(0) is mapped to mainTable 
segments{0,1,2} and if
+              // {0,1,2} segments of mainTable are compacted to 0.1. Then,
+              // on next rebuild/load to dataMap, no need to load segment(0.1) 
again. Update the
+              // segmentMapping of dataMap segment from {0,1,2} to {0.1}
+              if (!checkIfSegmentsToBeReloaded(parentTableLoadMetaDataDetails,
+                  segmentMaps.get(table), segmentId)) {
+                ifTableStatusUpdateRequired = true;
+                // Update loadMetaDetail with updated segment info and clear 
old segmentMap
+                Map<String, List<String>> updatedSegmentMap = new HashMap<>();
+                List<String> segmentList = new ArrayList<>();
+                segmentList.add(segmentId);
+                updatedSegmentMap.put(table, segmentList);
+                dataMapTableSegmentList.add(segmentId);
+                loadMetaDetail.setExtraInfo(new 
Gson().toJson(updatedSegmentMap));
+                segmentMaps.get(table).clear();
+              }
+            }
+            dataMapTableSegmentList.addAll(segmentMaps.get(table));
           }
         }
         List<String> dataMapSegmentList = new 
ArrayList<>(dataMapTableSegmentList);
-        // Get all segments for parent relationIdentifier
-        List<String> mainTableSegmentList =
-            DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
         dataMapTableSegmentList.removeAll(mainTableSegmentList);
         mainTableSegmentList.removeAll(dataMapSegmentList);
-        if (mainTableSegmentList.isEmpty()) {
+        if (ifTableStatusUpdateRequired && mainTableSegmentList.isEmpty()) {
+          SegmentStatusManager.writeLoadDetailsIntoFile(CarbonTablePath
+                  
.getTableStatusFilePath(dataMapSchema.getRelationIdentifier().getTablePath()),
+              listOfLoadFolderDetails
+                  .toArray(new 
LoadMetadataDetails[listOfLoadFolderDetails.size()]));
+          return false;
+        } else if (mainTableSegmentList.isEmpty()) {
           return false;
         }
         if (!dataMapTableSegmentList.isEmpty()) {
@@ -333,6 +362,27 @@ public abstract class DataMapProvider {
   }
 
   /**
+   * This method checks if dataMap table segment has to be reloaded again or 
not
+   */
+  private boolean checkIfSegmentsToBeReloaded(LoadMetadataDetails[] 
loadMetaDataDetails,
+      List<String> segmentIds, String segmentId) {
+    boolean isToBeLoadedAgain = true;
+    for (String loadName : segmentIds) {
+      for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
+        if (loadMetadataDetail.getLoadName().equalsIgnoreCase(loadName)) {
+          if (null != loadMetadataDetail.getMergedLoadName() && 
loadMetadataDetail
+              .getMergedLoadName().equalsIgnoreCase(segmentId)) {
+            isToBeLoadedAgain = false;
+          } else {
+            return true;
+          }
+        }
+      }
+    }
+    return isToBeLoadedAgain;
+  }
+
+  /**
    * Provide the datamap catalog instance or null if this datamap not required 
to rewrite
    * the query.
    */
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index b927ce0..6ca3676 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -86,6 +86,11 @@ public class DataMapSchema implements Serializable, Writable 
{
    */
   private Map<String, Set<String>> mainTableColumnList;
 
+  /**
+   * DataMap table column order map as per Select query
+   */
+  private Map<Integer, String> columnsOrderMap;
+
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;
@@ -264,4 +269,12 @@ public class DataMapSchema implements Serializable, 
Writable {
   public void setMainTableColumnList(Map<String, Set<String>> 
mainTableColumnList) {
     this.mainTableColumnList = mainTableColumnList;
   }
+
+  public Map<Integer, String> getColumnsOrderMap() {
+    return columnsOrderMap;
+  }
+
+  public void setColumnsOrderMap(Map<Integer, String> columnsOrderMap) {
+    this.columnsOrderMap = columnsOrderMap;
+  }
 }
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
index dbfb22e..0a1f0e2 100644
--- a/datamap/mv/core/pom.xml
+++ b/datamap/mv/core/pom.xml
@@ -78,7 +78,7 @@
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
           </systemProperties>
-          <testFailureIgnore>false</testFailureIgnore>
+          <!-- testFailureIgnore>false</testFailureIgnore -->
           <failIfNoTests>false</failIfNoTests>
         </configuration>
       </plugin>
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 90b7dbc..8bdac4e 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -39,6 +39,7 @@ import 
org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.indexstore.Blocklet
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 @InterfaceAudience.Internal
 class MVDataMapProvider(
@@ -163,9 +164,18 @@ class MVDataMapProvider(
         SparkSQLUtil.execute(loadCommand, sparkSession)
       } catch {
         case ex: Exception =>
+          // If load to dataMap table fails, disable the dataMap and if 
newLoad is still
+          // in INSERT_IN_PROGRESS state, mark for delete the newLoad and 
update table status file
           DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
           LOGGER.error("Data Load failed for DataMap: ", ex)
-          return false
+          CarbonLoaderUtil.updateTableStatusInCaseOfFailure(
+            newLoadName,
+            dataMapTable.getAbsoluteTableIdentifier,
+            dataMapTable.getTableName,
+            dataMapTable.getDatabaseName,
+            dataMapTable.getTablePath,
+            dataMapTable.getMetadataPath)
+          throw ex
       } finally {
         unsetMainTableSegments()
       }
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 4bcaa1d..8d60a06 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -28,11 +28,11 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, Coalesce, Expression, NamedExpression, ScalaUDF, 
SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, 
LogicalPlan, Project}
-import org.apache.spark.sql.execution.command.{Field, TableModel, 
TableNewProcessor}
+import org.apache.spark.sql.execution.command.{Field, PartitionerField, 
TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, 
CarbonDropTableCommand}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-import org.apache.spark.util.DataMapUtil
+import org.apache.spark.util.{DataMapUtil, PartitionUtils}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -55,6 +55,11 @@ object MVHelper {
       queryString: String,
       ifNotExistsSet: Boolean = false): Unit = {
     val dmProperties = dataMapSchema.getProperties.asScala
+    if (dmProperties.contains("streaming") && 
dmProperties("streaming").equalsIgnoreCase("true")) {
+      throw new MalformedCarbonCommandException(
+        s"MV datamap does not support streaming"
+      )
+    }
     val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
     val query = sparkSession.sql(updatedQuery)
     val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
@@ -98,6 +103,13 @@ object MVHelper {
           throw new MalformedCarbonCommandException(
             s"Non-Carbon table does not support creating MV datamap")
       }
+      if (!mainCarbonTable.get.getTableInfo.isTransactionalTable) {
+        throw new MalformedCarbonCommandException("Unsupported operation on 
NonTransactional table")
+      }
+      if (mainCarbonTable.get.isChildTable || 
mainCarbonTable.get.isChildDataMap) {
+        throw new MalformedCarbonCommandException(
+          "Cannot create Datamap on child table " + 
mainCarbonTable.get.getTableUniqueName)
+      }
       parentTables.add(mainCarbonTable.get.getTableName)
       if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
         throw new MalformedCarbonCommandException(
@@ -121,6 +133,31 @@ object MVHelper {
           tableProperties)
     }
     dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+    val usePartitioning = dmProperties.getOrElse("partitioning", 
"true").toBoolean
+    var partitionerFields: Seq[PartitionerField] = Seq.empty
+    // Inherit partition from parent table if datamap is mapped to single 
parent table
+    if (parentTablesList.size() == 1) {
+      val partitionInfo = parentTablesList.get(0).getPartitionInfo
+      val parentPartitionColumns = if (!usePartitioning) {
+        Seq.empty
+      } else if (parentTablesList.get(0).isHivePartitionTable) {
+        partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+      } else {
+        Seq()
+      }
+      partitionerFields = PartitionUtils
+        .getPartitionerFields(parentPartitionColumns, fieldRelationMap)
+    }
+
+    var order = 0
+    val columnOrderMap = new java.util.HashMap[Integer, String]()
+    if (partitionerFields.nonEmpty) {
+      fields.foreach { field =>
+        columnOrderMap.put(order, field.column)
+        order += 1
+      }
+    }
+
     // TODO Use a proper DB
     val tableIdentifier =
     TableIdentifier(dataMapSchema.getDataMapName + "_table",
@@ -131,7 +168,7 @@ object MVHelper {
       new 
CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
       tableIdentifier.table.toLowerCase,
       fields,
-      Seq(),
+      partitionerFields,
       tableProperties,
       None,
       isAlterFlow = false,
@@ -167,6 +204,7 @@ object MVHelper {
       }
     }
     dataMapSchema.setMainTableColumnList(mainTableToColumnsMap)
+    dataMapSchema.setColumnsOrderMap(columnOrderMap)
     dataMapSchema.setCtasQuery(updatedQueryWithDb)
     dataMapSchema
       .setRelationIdentifier(new 
RelationIdentifier(tableIdentifier.database.get,
@@ -519,7 +557,8 @@ object MVHelper {
       case s: Select if s.dataMapTableRelation.isDefined =>
         val relation =
           
s.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
-        val mappings = s.outputList zip relation.outputList
+        val outputList = getUpdatedOutputList(relation.outputList, 
s.dataMapTableRelation)
+        val mappings = s.outputList zip outputList
         val oList = for ((o1, o2) <- mappings) yield {
           if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else 
o2
         }
@@ -528,7 +567,8 @@ object MVHelper {
         val relation =
           
g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper].plan.asInstanceOf[Select]
         val in = relation.asInstanceOf[Select].outputList
-        val mappings = g.outputList zip relation.outputList
+        val outputList = getUpdatedOutputList(relation.outputList, 
g.dataMapTableRelation)
+        val mappings = g.outputList zip outputList
         val oList = for ((left, right) <- mappings) yield {
           left match {
             case Alias(agg@AggregateExpression(fun@Sum(child), _, _, _), name) 
=>
@@ -579,7 +619,8 @@ object MVHelper {
               relation,
               aliasMap)
             if 
(isFullRefresh(g.dataMapTableRelation.get.asInstanceOf[MVPlanWrapper])) {
-              val mappings = g.outputList zip relation.outputList
+              val outputList = getUpdatedOutputList(relation.outputList, 
g.dataMapTableRelation)
+              val mappings = g.outputList zip outputList
               val oList = for ((o1, o2) <- mappings) yield {
                 if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) 
else o2
               }
@@ -707,5 +748,27 @@ object MVHelper {
       rewrittenPlan
     }
   }
+
+  private def getUpdatedOutputList(outputList: Seq[NamedExpression],
+      dataMapTableRelation: Option[ModularPlan]): Seq[NamedExpression] = {
+    dataMapTableRelation.collect {
+      case mv: MVPlanWrapper =>
+        val dataMapSchema = mv.dataMapSchema
+        val columnsOrderMap = dataMapSchema.getColumnsOrderMap
+        if (null != columnsOrderMap && !columnsOrderMap.isEmpty) {
+          val updatedOutputList = new util.ArrayList[NamedExpression]()
+          var i = 0
+          while (i < columnsOrderMap.size()) {
+            updatedOutputList
+              .add(outputList.filter(f => 
f.name.equalsIgnoreCase(columnsOrderMap.get(i))).head)
+            i = i + 1
+          }
+          updatedOutputList.asScala
+        } else {
+          outputList
+        }
+      case _ => outputList
+    }.get
+  }
 }
 
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 6852695..3d42c88 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -106,7 +106,7 @@ object MVUtil {
             arrayBuffer += relation
           }
           fieldToDataMapFieldMap +=
-          getFieldToDataMapFields(name, attr.dataType, attr.qualifier, "", 
arrayBuffer, "")
+          getFieldToDataMapFields(name, attr.dataType, None, "", arrayBuffer, 
"")
         }
     }
     fieldToDataMapFieldMap
@@ -134,6 +134,7 @@ object MVUtil {
               attr.aggregateFunction.nodeName,
               arrayBuffer,
               "")
+            aggregateType = "count"
           } else {
             aggregateType = attr.aggregateFunction.nodeName
           }
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index 2e64055..b663ecf 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -544,6 +544,29 @@ class MVIncrementalLoadingTestcase extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table IF EXISTS main_table")
   }
 
+  test("test compaction on main table and rebuild") {
+    createTableFactTable("test_table")
+    loadDataToFactTable("test_table")
+    sql("drop datamap if exists datamap1")
+    sql(
+      "create datamap datamap1 using 'mv'  with deferred rebuild  as select 
empname, designation " +
+      "from test_table")
+    loadDataToFactTable("test_table")
+    loadDataToFactTable("test_table")
+    sql(s"rebuild datamap datamap1")
+    sql("alter table test_table compact 'major'")
+    sql(s"rebuild datamap datamap1")
+    val dataMapTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME,
+      "datamap1_table")
+    val loadMetadataDetails = 
SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath)
+    assert(loadMetadataDetails.length == 1)
+    var segmentMap = 
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(0).getExtraInfo)
+    val segmentList = new java.util.ArrayList[String]()
+    segmentList.add("0.1")
+    assert(segmentList.containsAll(segmentMap.get("default.test_table")))
+  }
+
   def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
     val tables = logicalPlan collect {
       case l: LogicalRelation => l.catalogTable.get
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 3978bd1..159cdbc 100644
--- 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -186,19 +186,6 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     }.getMessage.contains("Delete segment operation is not supported on mv 
table")
   }
 
-  test("test partition table with mv") {
-    sql("drop table if exists par_table")
-    sql("CREATE TABLE par_table(id INT, name STRING, age INT) PARTITIONED 
BY(city string) STORED BY 'carbondata'")
-    sql("insert into par_table values(1,'abc',3,'def')")
-    sql("drop datamap if exists p1")
-    sql("create datamap p1 using 'mv' WITH DEFERRED REBUILD as select city, id 
from par_table")
-    sql("rebuild datamap p1")
-    intercept[MalformedCarbonCommandException] {
-      sql("alter table par_table drop partition (city='def')")
-    }.getMessage.contains("Drop Partition is not supported for datamap table 
or for tables which have child datamap")
-    sql("drop datamap if exists p1")
-  }
-
   test("test direct load to mv datamap table") {
     sql("drop table IF EXISTS maintable")
     sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
@@ -251,5 +238,130 @@ class TestAllOperationsOnMV extends QueryTest with 
BeforeAndAfterEach {
     sql("drop table if exists noncarbon")
   }
 
+  //Test show datamap
+  test("test datamap status with single table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1 ")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price 
from maintable")
+    checkExistence(sql("show datamap on table maintable"), true, "DISABLED")
+    sql("rebuild datamap dm1")
+    var result = sql("show datamap on table maintable").collectAsList()
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    
assert(result.get(0).get(5).toString.contains("{\"default.maintable\":\"0\""))
+    sql("insert into table maintable select 'abc',21,2000")
+    checkExistence(sql("show datamap on table maintable"), true, "DISABLED")
+    sql("rebuild datamap dm1")
+    result = sql("show datamap on table maintable").collectAsList()
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    
assert(result.get(0).get(5).toString.contains("{\"default.maintable\":\"1\""))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test datamap status with multiple tables") {
+    sql("drop table if exists products")
+    sql("create table products (product string, amount int) stored by 
'carbondata' ")
+    sql(s"load data INPATH '$resourcesPath/products.csv' into table products")
+    sql("drop table if exists sales")
+    sql("create table sales (product string, quantity int) stored by 
'carbondata'")
+    sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales")
+    sql("drop datamap if exists innerjoin")
+    sql(
+      "Create datamap innerjoin using 'mv'  with deferred rebuild as Select 
p.product, p.amount, " +
+      "s.quantity, s.product from " +
+      "products p, sales s where p.product=s.product")
+    checkExistence(sql("show datamap on table products"), true, "DISABLED")
+    checkExistence(sql("show datamap on table sales"), true, "DISABLED")
+    sql("rebuild datamap innerjoin")
+    var result = sql("show datamap on table products").collectAsList()
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    
assert(result.get(0).get(5).toString.contains("\"default.products\":\"0\",\"default.sales\":\"0\"}"))
+    result = sql("show datamap on table sales").collectAsList()
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    
assert(result.get(0).get(5).toString.contains("\"default.products\":\"0\",\"default.sales\":\"0\"}"))
+    sql(s"load data INPATH '$resourcesPath/sales_data.csv' into table sales")
+    checkExistence(sql("show datamap on table products"), true, "DISABLED")
+    checkExistence(sql("show datamap on table sales"), true, "DISABLED")
+    sql("rebuild datamap innerjoin")
+    result = sql("show datamap on table sales").collectAsList()
+    assert(result.get(0).get(4).toString.equalsIgnoreCase("ENABLED"))
+    
assert(result.get(0).get(5).toString.contains("\"default.products\":\"0\",\"default.sales\":\"1\"}"))
+    sql("drop table if exists products")
+    sql("drop table if exists sales")
+  }
+
+  test("directly drop datamap table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1 ")
+    sql("create datamap dm1 using 'mv' WITH DEFERRED REBUILD as select price 
from maintable")
+    intercept[ProcessMetaDataException] {
+      sql("drop table dm1_table")
+    }.getMessage.contains("Child table which is associated with datamap cannot 
be dropped, use DROP DATAMAP command to drop")
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("create datamap on child table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1 ")
+    sql("create datamap dm1 using 'mv' as select name, price from maintable")
+    intercept[Exception] {
+      sql("create datamap dm_agg on table dm1_table using 'preaggregate' as 
select maintable_name, sum(maintable_price) from dm1_table group by 
maintable_name")
+    }.getMessage.contains("Cannot create DataMap on child table 
default.dm1_table")
+    intercept[Exception] {
+      sql("create datamap dm_agg using 'mv' as select maintable_name, 
sum(maintable_price) from dm1_table group by maintable_name")
+    }.getMessage.contains("Cannot create DataMap on child table 
default.dm1_table")
+  }
+
+  test("create datamap if already exists") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm1 ")
+    sql("create datamap dm1 using 'mv' as select name from maintable")
+    intercept[Exception] {
+      sql("create datamap dm1 using 'mv' as select price from maintable")
+    }.getMessage.contains("DataMap with name dm1 already exists in storage")
+    checkAnswer(sql("select name from maintable"), Seq(Row("abc")))
+  }
+
+  test("test create datamap with select query having 'like' expression") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("select name from maintable where name like '%b%'").show(false)
+    sql("drop datamap if exists dm_like ")
+    sql("create datamap dm_like using 'mv' as select name from maintable where 
name like '%b%'")
+    checkAnswer(sql("select name from maintable where name like '%b%'"), 
Seq(Row("abc")))
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test datamap with streaming dmproperty") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    intercept[MalformedCarbonCommandException] {
+      sql("create datamap dm using 'mv' dmproperties('STREAMING'='true') as 
select name from maintable")
+    }.getMessage.contains("MV datamap does not support streaming")
+    sql("drop table IF EXISTS maintable")
+  }
+
+  test("test set streaming after creating datamap table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 
'carbondata'")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("drop datamap if exists dm ")
+    sql("create datamap dm using 'mv' as select name from maintable")
+    intercept[MalformedCarbonCommandException] {
+      sql("ALTER TABLE dm_table SET TBLPROPERTIES('streaming'='true')")
+    }.getMessage.contains("Datamap table does not support set streaming 
property")
+    sql("drop table IF EXISTS maintable")
+  }
+
 }
 
diff --git 
a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
new file mode 100644
index 0000000..56b1f9f
--- /dev/null
+++ 
b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -0,0 +1,688 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain id copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.mv.rewrite
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+/**
+ * Test class for MV to verify partition scenarios
+ */
+class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll {
+
+  val testData = s"$resourcesPath/sample.csv"
+
+  override def beforeAll(): Unit = {
+    sql("drop database if exists partition_mv cascade")
+    sql("create database partition_mv")
+    sql("use partition_mv")
+    sql(
+      """
+        | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city 
STRING)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string) partitioned 
by (age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop database if exists partition_mv cascade")
+    sql("use default")
+  }
+
+  // Create mv table on partition with partition column in aggregation only.
+  test("test mv table creation on partition table with partition col as 
aggregation") {
+    sql("create datamap p1 on table par using 'mv' as select id, sum(city) 
from par group by id")
+    assert(!CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p1_table")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
+  // Create mv table on partition with partition column in projection and 
aggregation only.
+  test("test mv table creation on partition table with partition col as 
projection") {
+    sql("create datamap p2 on table par using 'mv' as select id, city, 
min(city) from par group by id,city ")
+    assert(CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p2_table")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
+  // Create mv table on partition with partition column as group by.
+  test("test mv table creation on partition table with partition col as group 
by") {
+    sql("create datamap p3 on table par using 'mv' as select id, city, 
max(city) from par group by id,city ")
+    assert(CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p3_table")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
+  // Create mv table on partition without partition column.
+  test("test mv table creation on partition table without partition column") {
+    sql("create datamap p4 on table par using 'mv' as select name, count(id) 
from par group by name ")
+    assert(!CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p4_table")(sqlContext.sparkSession).isHivePartitionTable)
+    sql("drop datamap if exists p4")
+  }
+
+  test("test data correction with insert overwrite") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
year, sum(year),month,day from partitionone group by empname, year, month,day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert overwrite table partitionone values('v',2,2014,1,1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("v",2,2014,1,1)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("v",2014,2014,1,1)))
+    checkAnswer(sql("select empname, sum(year) from partitionone group by 
empname, year, month,day"), Seq(Row("v", 2014)))
+    val df1 = sql(s"select empname, sum(year) from partitionone group by 
empname, year, month,day")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed1, "p1"))
+    assert(CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p1_table")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
+  test("test data correction with insert overwrite on different value") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
year, sum(year),month,day from partitionone group by empname, year, month,day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert overwrite table partitionone values('v',2,2015,1,1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,1), 
Row("v",2,2015,1,1)))
+    val df1 = sql(s"select empname, sum(year) from partitionone group by 
empname, year, month,day")
+    val analyzed1 = df1.queryExecution.analyzed
+    assert(verifyMVDataMap(analyzed1, "p1"))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,1), 
Row("v",2015,2015,1,1)))
+  }
+
+  test("test to check column ordering in parent and child table") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, month, year,day")
+    val parentTable = CarbonEnv.getCarbonTable(Some("partition_mv"), 
"partitionone")(sqlContext.sparkSession)
+    val childTable = CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p1_table")(sqlContext.sparkSession)
+    val parentPartitionColumns = 
parentTable.getPartitionInfo.getColumnSchemaList
+    val childPartitionColumns = childTable.getPartitionInfo.getColumnSchemaList
+    
assert(parentPartitionColumns.asScala.zip(childPartitionColumns.asScala).forall 
{
+      case (a,b) =>
+        a.getColumnName
+          .equalsIgnoreCase(b.getColumnName
+            .substring(b.getColumnName.lastIndexOf("_") + 1, 
b.getColumnName.length))
+    })
+  }
+
+  test("test data after minor compaction on partition table with mv") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month,day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone compact 'minor'")
+    val showSegments = sql("show segments for table 
partitionone").collect().map{a=> (a.get(0), a.get(1))}
+    assert(showSegments.count(_._2 == "Success") == 1)
+    assert(showSegments.count(_._2 == "Compacted") == 4)
+    assert(CarbonEnv.getCarbonTable(Some("partition_mv"), 
"p1_table")(sqlContext.sparkSession).isHivePartitionTable)
+  }
+
+  test("test data after major compaction on partition table with mv") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month,day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone compact 'major'")
+    val showSegments = sql("show segments for table 
partitionone").collect().map{a=> (a.get(0), a.get(1))}
+    assert(showSegments.count(_._2 == "Success") == 1)
+    assert(showSegments.count(_._2 == "Compacted") == 5)
+  }
+
+  test("test drop partition 1") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(day=1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2)))
+  }
+
+  test("test drop partition 2") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(day=1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2), 
Row("k",2,2015,2,3)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015,2,3)))
+  }
+
+  test("test drop partition directory") {
+    sql("drop table if exists droppartition")
+    sql(
+      """
+        | CREATE TABLE if not exists droppartition (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 using 'mv' as select empname,  year, 
sum(year),month,day from droppartition group by empname, year, month, day")
+    sql("insert into droppartition values('k',2,2014,1,1)")
+    sql("insert into droppartition values('k',2,2015,2,3)")
+    sql("alter table droppartition drop partition(year=2015,month=2,day=3)")
+    sql("clean files for table droppartition")
+    val table = CarbonEnv.getCarbonTable(Option("partition_mv"), 
"droppartition")(sqlContext.sparkSession)
+    val dataMapTable = CarbonEnv.getCarbonTable(Option("partition_mv"), 
"droppartition")(sqlContext.sparkSession)
+    val dataMaptablePath = dataMapTable.getTablePath
+    val tablePath = table.getTablePath
+    val carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("year=2015")
+    }
+    val dataMapCarbonFiles = 
FileFactory.getCarbonFile(dataMaptablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("year=2015")
+    }
+    assert(dataMapCarbonFiles.length == 0)
+    assert(carbonFiles.length == 0)
+  }
+
+  test("test data with filter query") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(day=1)")
+    checkAnswer(sql("select empname, sum(year) from partitionone where day=3 
group by empname, year, month, day"), Seq(Row("k",2015)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015,2,3)))
+  }
+
+  test("test drop partition 3") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String,age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(day=1,month=1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2), 
Row("k",2,2015, 2,3), Row("k",2,2015, 2,1)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015,2,3), Row("k",2015,2015,2,1)))
+  }
+
+  test("test drop partition 4") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(year=2014,day=1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2), 
Row("k",2,2015, 2,3), Row("k",2,2015, 2,1)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015, 2,3), Row("k",2015,2015, 2,1)))
+  }
+
+  test("test drop partition 5") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2), 
Row("k",2,2015, 2,3), Row("k",2,2015, 2,1)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015,2,3), Row("k",2015,2015,2,1)))
+  }
+
+  test("test drop partition 6") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("create datamap p1 on table partitionone using 'mv' as select empname, 
 year, sum(year),month,day from partitionone group by empname, year, month, 
day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2015,2,3)")
+    sql("insert into partitionone values('k',2,2015,2,1)")
+    sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2,2014,1,2), 
Row("k",2,2015, 2,3), Row("k",2,2015, 2,1)))
+    checkAnswer(sql("select * from p1_table"), Seq(Row("k",2014,2014,1,2), 
Row("k",2015,2015,2,3), Row("k",2015,2015,2,1)))
+  }
+
+  test("test drop partition 7") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String,age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("drop datamap if exists p2")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
year,sum(year),day from partitionone group by empname, year, day")
+    sql(
+      "create datamap p2 on table partitionone using 'mv' as select empname, 
month,sum(year) from partitionone group by empname, month")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+
+        val exceptionMessage = intercept[Exception] {
+      sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+    }.getMessage
+    assert(exceptionMessage.contains("Cannot drop partition as one of the 
partition"))
+    assert(exceptionMessage.contains("p2"))
+    assert(exceptionMessage.contains("p1"))
+  }
+
+  test("test drop partition 8") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("drop datamap if exists p2")
+    sql("drop datamap if exists p3")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
year,month,sum(year) from partitionone group by empname, year, month")
+    sql(
+      "create datamap p2 on table partitionone using 'mv' as select empname, 
month, day, sum(year) from partitionone group by empname, month, day")
+    sql(
+      "create datamap p3 on table partitionone using 'mv' as select 
empname,month,sum(year) from partitionone group by empname, month")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+    }.getMessage
+    assert(exceptionMessage.contains("Cannot drop partition as one of the 
partition"))
+    assert(!exceptionMessage.contains("p2"))
+    assert(exceptionMessage.contains("p3"))
+    assert(exceptionMessage.contains("p1"))
+  }
+
+  test("test drop partition 9") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    sql("insert into partitionone values('k',2014,1,1)")
+    sql("insert into partitionone values('k',2014,1,2)")
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+    }.getMessage
+    assert(exceptionMessage.contains("Cannot drop partition as one of the 
partition"))
+    assert(exceptionMessage.contains("p1"))
+  }
+
+  test("test drop partition 10") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, age int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql("drop datamap if exists p2")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    sql(
+      "create datamap p2 on table partitionone using 'mv' as select empname, 
year,sum(year),month,day from partitionone group by empname, year, month, day")
+    sql("insert into partitionone values('k',2,2014,1,1)")
+    sql("insert into partitionone values('k',2,2014,1,2)")
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+    }.getMessage
+    assert(exceptionMessage.contains("Cannot drop partition as one of the 
partition"))
+    assert(exceptionMessage.contains("p1"))
+    sql("drop datamap p1 on table partitionone")
+    sql("alter table partitionone drop partition(year=2014,month=1, day=1)")
+  }
+
+  test("test drop partition 11") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
year, sum(year) from partitionone group by empname, year")
+    sql("insert into partitionone values('k',2014,1,1)")
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table p1_table drop partition(partitionone_year=1)")
+    }.getMessage
+    assert(exceptionMessage.contains("Cannot drop partition directly on child 
table"))
+  }
+
+  test("test drop partition 12") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    sql("insert into partitionone values('k',2014,1,1)")
+    val exceptionMessage = intercept[Exception] {
+      sql("alter table p1_table drop partition(year=2014)")
+    }.getMessage
+    assert(exceptionMessage.contains("operation failed for 
partition_mv.p1_table: Not a partitioned table"))
+  }
+
+  test("test add partition on mv table") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    assert(intercept[Exception] {
+      sql("alter table p1_table add partition(c=1)")
+    }.getMessage.equals("Cannot add partition directly on non partitioned 
table"))
+  }
+
+  test("test if alter rename is blocked on partition table with mv") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, id int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("drop datamap if exists p1")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    intercept[Exception] {
+      sql("alter table partitionone rename to p")
+    }
+  }
+
+  test("test dropping partition which has already been deleted") {
+    sql("drop table if exists partitiontable")
+    sql("create table partitiontable(id int,name string) partitioned by (email 
string) " +
+        "stored by 'carbondata' tblproperties('sort_scope'='global_sort')")
+    sql("insert into table partitiontable select 1,'huawei','abc'")
+    sql("create datamap ag1 on table partitiontable using 'mv' as select 
count(email),id" +
+        " from partitiontable group by id")
+    sql("create datamap ag2 on table partitiontable using 'mv' as select 
sum(email),name" +
+        " from partitiontable group by name")
+    sql("create datamap ag3 on table partitiontable using 'mv' as select 
max(email),name" +
+        " from partitiontable group by name")
+    sql("create datamap ag4 on table partitiontable using 'mv' as select 
min(email),name" +
+        " from partitiontable group by name")
+    sql("create datamap ag5 on table partitiontable using 'mv' as select 
avg(email),name" +
+        " from partitiontable group by name")
+    sql("alter table partitiontable add partition (email='def')")
+    sql("insert into table partitiontable select 1,'huawei','def'")
+    sql("drop datamap ag1 on table partitiontable")
+    sql("drop datamap ag2 on table partitiontable")
+    sql("drop datamap ag3 on table partitiontable")
+    sql("drop datamap ag4 on table partitiontable")
+    sql("drop datamap ag5 on table partitiontable")
+    sql("alter table partitiontable drop partition(email='def')")
+    assert(intercept[Exception] {
+      sql("alter table partitiontable drop partition(email='def')")
+    }.getMessage.contains("No partition is dropped. One partition spec 
'Map(email -> def)' does not exist in table 'partitiontable' database 
'partition_mv'"))
+    sql("drop table if exists partitiontable")
+  }
+
+  test("test mv table creation with count(*) on Partition table") {
+    sql("drop table if exists partitiontable")
+    sql("create table partitiontable(id int,name string) partitioned by (email 
string) " +
+        "stored by 'carbondata' tblproperties('sort_scope'='global_sort')")
+    sql("insert into table partitiontable select 1,'huawei','abc'")
+    sql("drop datamap if exists ag1")
+    sql("create datamap ag1 on table partitiontable using 'mv' as select 
count(*),id" +
+        " from partitiontable group by id")
+    sql("insert into table partitiontable select 1,'huawei','def'")
+    assert(sql("show datamap on table 
partitiontable").collect().head.get(0).toString.equalsIgnoreCase("ag1"))
+    sql("drop datamap ag1 on table partitiontable")
+  }
+
+  test("test blocking partitioning of mv table") {
+    sql("drop table if exists updatetime_8")
+    sql("create table updatetime_8" +
+        "(countryid smallint,hs_len smallint,minstartdate string,startdate 
string,newdate string,minnewdate string) partitioned by (imex smallint) stored 
by 'carbondata' 
tblproperties('sort_scope'='global_sort','sort_columns'='countryid,imex,hs_len,minstartdate,startdate,newdate,minnewdate','table_blocksize'='256')")
+    sql("drop datamap if exists ag")
+    sql("create datamap ag on table updatetime_8 using 'mv' 
dmproperties('partitioning'='false') as select imex,sum(hs_len) from 
updatetime_8 group by imex")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("partition_mv"), 
"ag_table")(sqlContext.sparkSession)
+    assert(!carbonTable.isHivePartitionTable)
+    sql("drop table if exists updatetime_8")
+  }
+
+  test("Test data updation after compaction on Partition with mv tables") {
+    sql("drop table if exists partitionallcompaction")
+    sql(
+      "create table partitionallcompaction(empno int,empname 
String,designation String," +
+      "workgroupcategory int,workgroupcategoryname String,deptno 
int,projectjoindate timestamp," +
+      "projectenddate date,attendance int,utilization int,salary int) 
partitioned by (deptname " +
+      "String,doj timestamp,projectcode int) stored  by 'carbondata' 
tblproperties" +
+      "('sort_scope'='global_sort')")
+    sql(
+      "create datamap sensor_1 on table partitionallcompaction using 'mv' as 
select " +
+      "sum(salary),doj, deptname,projectcode from partitionallcompaction group 
by doj," +
+      "deptname,projectcode")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='Learning', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"') """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='configManagement', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='network', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='protocol', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='security', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MINOR'").collect()
+    checkAnswer(sql("select count(empno) from partitionallcompaction where 
empno=14"),
+      Seq(Row(5)))
+    sql("drop table if exists partitionallcompaction")
+  }
+
+  test("Test data updation in Aggregate query after compaction on Partitioned 
table with mv table") {
+    sql("drop table if exists updatetime_8")
+    sql("create table updatetime_8" +
+        "(countryid smallint,hs_len smallint,minstartdate string,startdate 
string,newdate string,minnewdate string) partitioned by (imex smallint) stored 
by 'carbondata' 
tblproperties('sort_scope'='global_sort','sort_columns'='countryid,imex,hs_len,minstartdate,startdate,newdate,minnewdate','table_blocksize'='256')")
+    sql("drop datamap if exists ag")
+    sql("create datamap ag on table updatetime_8 using 'mv' as select 
sum(hs_len), imex from updatetime_8 group by imex")
+    sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',23")
+    sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',24")
+    sql("insert into updatetime_8 select 21,20,'fbv','gbv','wvsw','vwr',23")
+    sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24")
+    sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24")
+    sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',24")
+    sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25")
+    sql("insert into updatetime_8 select 21,21,'fbv','gbv','wvsw','vwr',25")
+    sql("alter table updatetime_8 compact 'minor'")
+    sql("alter table updatetime_8 compact 'minor'")
+    checkAnswer(sql("select sum(hs_len) from updatetime_8 group by 
imex"),Seq(Row(40),Row(42),Row(83)))
+  }
+
+  test("check partitioning for child tables with various combinations") {
+    sql("drop table if exists partitionone")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionone (empname String, id int)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      "create datamap p7 on table partitionone using 'mv' as select empname, 
year, day, sum(year), sum(day) from partitionone group by empname, year, day")
+    sql(
+      "create datamap p1 on table partitionone using 'mv' as select empname, 
sum(year) from partitionone group by empname")
+    sql(
+      "create datamap p2 on table partitionone using 'mv' as select empname, 
year,sum(year) from partitionone group by empname, year")
+    sql(
+      "create datamap p3 on table partitionone using 'mv' as select empname, 
year, month, sum(year), sum(month) from partitionone group by empname, year, 
month")
+    sql(
+      "create datamap p4 on table partitionone using 'mv' as select empname, 
year,month,day,sum(year) from partitionone group by empname, year, month, day")
+    sql(
+      "create datamap p5 on table partitionone using 'mv' as select empname, 
month,sum(year) from partitionone group by empname, month")
+    sql(
+      "create datamap p6 on table partitionone using 'mv' as select empname, 
month, day, sum(year), sum(month) from partitionone group by empname, month, 
day")
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"p1_table")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_mv"),"p2_table")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size()
 == 1)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_mv"),"p3_table")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size
 == 2)
+    
assert(CarbonEnv.getCarbonTable(Some("partition_mv"),"p4_table")(sqlContext.sparkSession).getPartitionInfo.getColumnSchemaList.size
 == 3)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"p5_table")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"p6_table")(sqlContext.sparkSession).isHivePartitionTable)
+    
assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"p7_table")(sqlContext.sparkSession).isHivePartitionTable)
+    sql("drop table if exists partitionone")
+  }
+
+  test("test partition at last column") {
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b int) partitioned by (c int) stored 
by 'carbondata'")
+    sql("insert into partitionone values(1,2,3)")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'mv' as select 
c,sum(b) from partitionone group by c")
+    checkAnswer(sql("select c,sum(b) from partitionone group by c"), 
Seq(Row(3,2)))
+    sql("drop table if exists partitionone")
+  }
+
+  def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean 
= {
+    val tables = logicalPlan collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+    tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
+  }
+
+}
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
index 6c965e8..753d48b 100644
--- a/datamap/mv/plan/pom.xml
+++ b/datamap/mv/plan/pom.xml
@@ -72,7 +72,7 @@
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
           </systemProperties>
-          <testFailureIgnore>false</testFailureIgnore>
+          <!-- testFailureIgnore>false</testFailureIgnore -->
           <failIfNoTests>false</failIfNoTests>
         </configuration>
       </plugin>
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 9182a89..0bbacc4 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -96,7 +96,9 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
         //      OptimizeIn(conf),
         ConstantFolding,
         ReorderAssociativeOperator,
-        LikeSimplification,
+        // No need to apply LikeSimplification rule while creating datamap
+        // as modular plan asCompactSql will be set in datamapschema
+        //        LikeSimplification,
         BooleanSimplification,
         SimplifyConditionals,
         RemoveDispensableExpressions,
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index ffe1977..b163ee9 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -229,9 +229,9 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
        """.stripMargin)
     var result = sql(s"show datamap on table $tableName").cache()
     checkAnswer(sql(s"show datamap on table $tableName"),
-      Seq(Row(datamapName, "bloomfilter", s"default.$tableName", 
"'bloom_fpp'='0.001', 'bloom_size'='32000', 'index_columns'='a'"),
-        Row(datamapName2, "bloomfilter", s"default.$tableName", 
"'index_columns'='b'"),
-        Row(datamapName3, "bloomfilter", s"default.$tableName", 
"'index_columns'='c'")))
+      Seq(Row(datamapName, "bloomfilter", s"default.$tableName", 
"'bloom_fpp'='0.001', 'bloom_size'='32000', 'index_columns'='a'", "ENABLED", 
"NA"),
+        Row(datamapName2, "bloomfilter", s"default.$tableName", 
"'index_columns'='b'","ENABLED", "NA"),
+        Row(datamapName3, "bloomfilter", s"default.$tableName", 
"'index_columns'='c'", "ENABLED", "NA")))
     result.unpersist()
     sql(s"drop table if exists $tableName")
 
@@ -248,7 +248,7 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
          | GROUP BY mytime
        """.stripMargin)
     checkAnswer(sql(s"show datamap on table $tableName"),
-      Seq(Row("agg0_hour", "timeSeries", s"default.${tableName}_agg0_hour", 
"'event_time'='mytime', 'hour_granularity'='1'")))
+      Seq(Row("agg0_hour", "timeSeries", s"default.${tableName}_agg0_hour", 
"'event_time'='mytime', 'hour_granularity'='1'","NA", "NA")))
     sql(s"drop table if exists $tableName")
 
     // for preaggreate datamap, the property is empty
@@ -262,7 +262,7 @@ class TestDataMapCommand extends QueryTest with 
BeforeAndAfterAll {
          | FROM $tableName GROUP BY name
          | """.stripMargin)
     checkAnswer(sql(s"show datamap on table $tableName"),
-      Seq(Row("agg0", "preaggregate", s"default.${tableName}_agg0", "")))
+      Seq(Row("agg0", "preaggregate", s"default.${tableName}_agg0", 
"","NA","NA")))
     sql(s"drop table if exists $tableName")
   }
 
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
index c3d3456..b55fec7 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
@@ -639,6 +639,16 @@ class StandardPartitionWithPreaggregateTestCase extends 
QueryTest with BeforeAnd
     
assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"),"partitionone_p7")(sqlContext.sparkSession).isHivePartitionTable)
   }
 
+  test("test partition at last column") {
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b int) partitioned by (c int) stored 
by 'carbondata'")
+    sql("insert into partitionone values(1,2,3)")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'preaggregate' as 
select c,sum(b) from partitionone group by c")
+    checkAnswer(sql("select c,sum(b) from partitionone group by c"), 
Seq(Row(3,2)))
+    sql("drop table if exists partitionone")
+  }
+
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit 
= {
     var isValidPlan = false
     plan.transform {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 3c6b265..8d4994e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -201,7 +201,10 @@ object CarbonEnv {
       .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
         DataMapChangeDataTypeorRenameColumnPreListener)
       .addListener(classOf[AlterTableAddColumnPreEvent], 
DataMapAddColumnsPreListener)
-
+      .addListener(classOf[AlterTableDropPartitionMetaEvent],
+        DataMapAlterTableDropPartitionMetaListener)
+      .addListener(classOf[AlterTableDropPartitionPreStatusEvent],
+        DataMapAlterTableDropPartitionPreStatusListener)
   }
 
   /**
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index d9a6490..2b62bf2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -101,6 +101,27 @@ case class CarbonCreateDataMapCommand(
         s" with provider ${dataMapSchema.getProviderName}")
     }
 
+    if (null != mainTable) {
+      if (mainTable.isChildTable || mainTable.isChildDataMap) {
+        throw new MalformedDataMapCommandException(
+          "Cannot create DataMap on child table " + 
mainTable.getTableUniqueName)
+      }
+    }
+    if (!dataMapSchema.isIndexDataMap && !dataMapSchema.getProviderName
+      .equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName) && 
!dataMapSchema
+      
.getProviderName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName))
 {
+      if (DataMapStoreManager.getInstance().getAllDataMapSchemas.asScala
+        
.exists(_.getDataMapName.equalsIgnoreCase(dataMapSchema.getDataMapName))) {
+        if (!ifNotExistsSet) {
+          throw new MalformedDataMapCommandException(
+            "DataMap with name " + dataMapSchema.getDataMapName + " already 
exists in storage")
+        }
+        else {
+          return Seq.empty
+        }
+      }
+    }
+
     val systemFolderLocation: String = 
CarbonProperties.getInstance().getSystemFolderLocation
     val operationContext: OperationContext = new OperationContext()
 
@@ -142,10 +163,15 @@ case class CarbonCreateDataMapCommand(
         dataMapProvider.initMeta(queryString.orNull)
         DataMapStatusManager.disableDataMap(dataMapName)
       case _ =>
+        val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent =
+          CreateDataMapPreExecutionEvent(sparkSession,
+            systemFolderLocation, tableIdentifier.orNull)
+        
OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent,
+          operationContext)
         dataMapProvider.initMeta(queryString.orNull)
     }
     val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent =
-      new CreateDataMapPostExecutionEvent(sparkSession,
+      CreateDataMapPostExecutionEvent(sparkSession,
         systemFolderLocation, tableIdentifier, dmProviderName)
     
OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
       operationContext)
@@ -183,10 +209,16 @@ case class CarbonCreateDataMapCommand(
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
     if (dataMapProvider != null) {
+      val table =
+        if (mainTable != null) {
+          Some(TableIdentifier(mainTable.getTableName, 
Some(mainTable.getDatabaseName)))
+        } else {
+          None
+        }
         CarbonDropDataMapCommand(
           dataMapName,
           true,
-          Some(TableIdentifier(mainTable.getTableName, 
Some(mainTable.getDatabaseName))),
+          table,
           forceDrop = false).run(sparkSession)
     }
     Seq.empty
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 3cee810..30cd3ef 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -20,16 +20,22 @@ package org.apache.spark.sql.execution.command.datamap
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
 
+import com.google.gson.Gson
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.StringType
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.status.{DataMapSegmentStatusUtil, 
DataMapStatus, DataMapStatusManager}
 import 
org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, 
DataMapProperty}
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Show the datamaps on the table
@@ -43,7 +49,9 @@ case class CarbonDataMapShowCommand(tableIdentifier: 
Option[TableIdentifier])
     Seq(AttributeReference("DataMapName", StringType, nullable = false)(),
       AttributeReference("ClassName", StringType, nullable = false)(),
       AttributeReference("Associated Table", StringType, nullable = false)(),
-      AttributeReference("DataMap Properties", StringType, nullable = false)())
+      AttributeReference("DataMap Properties", StringType, nullable = false)(),
+      AttributeReference("DataMap Status", StringType, nullable = false)(),
+      AttributeReference("Sync Status", StringType, nullable = false)())
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
@@ -92,7 +100,49 @@ case class CarbonDataMapShowCommand(tableIdentifier: 
Option[TableIdentifier])
               .map(p => s"'${ p._1 }'='${ p._2 }'").toSeq
               .sorted.mkString(", ")
           }
-        Row(s.getDataMapName, s.getProviderName, table, dmPropertieStr)
+          // Get datamap status and sync information details
+          var dataMapStatus = "NA"
+          var syncInfo: String = "NA"
+          if (!s.getProviderName.equalsIgnoreCase(
+            DataMapClassProvider.PREAGGREGATE.getShortName) && 
!s.getProviderName.equalsIgnoreCase(
+            DataMapClassProvider.TIMESERIES.getShortName)) {
+            if (DataMapStatusManager.getEnabledDataMapStatusDetails
+              .exists(_.getDataMapName.equalsIgnoreCase(s.getDataMapName))) {
+              dataMapStatus = DataMapStatus.ENABLED.name()
+            } else {
+              dataMapStatus = DataMapStatus.DISABLED.name()
+            }
+            val loadMetadataDetails = SegmentStatusManager
+              .readLoadMetadata(CarbonTablePath
+                .getMetadataPath(s.getRelationIdentifier.getTablePath))
+            if (!s.isIndexDataMap && loadMetadataDetails.nonEmpty) {
+              breakable({
+                for (i <- loadMetadataDetails.length - 1 to 0 by -1) {
+                  if 
(loadMetadataDetails(i).getSegmentStatus.equals(SegmentStatus.SUCCESS)) {
+                    val segmentMaps =
+                      
DataMapSegmentStatusUtil.getSegmentMap(loadMetadataDetails(i).getExtraInfo)
+                    val syncInfoMap = new util.HashMap[String, String]()
+                    val iterator = segmentMaps.entrySet().iterator()
+                    while (iterator.hasNext) {
+                      val entry = iterator.next()
+                      syncInfoMap.put(entry.getKey, 
entry.getValue.get(entry.getValue.size() - 1))
+                    }
+                    val loadEndTime =
+                      if (loadMetadataDetails(i).getLoadEndTime ==
+                          CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+                        "NA"
+                      } else {
+                        new 
java.sql.Timestamp(loadMetadataDetails(i).getLoadEndTime).toString
+                      }
+                    syncInfoMap.put(CarbonCommonConstants.LOAD_SYNC_TIME, 
loadEndTime)
+                    syncInfo = new Gson().toJson(syncInfoMap)
+                    break()
+                  }
+                }
+              })
+            }
+          }
+          Row(s.getDataMapName, s.getProviderName, table, dmPropertieStr, 
dataMapStatus, syncInfo)
       }
     } else {
       Seq.empty
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 41494c5..c5e5282 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -946,7 +946,7 @@ case class CarbonLoadDataCommand(
       }
     }
     // Only select the required columns
-    val output = if (partition.nonEmpty) {
+    var output = if (partition.nonEmpty) {
       val lowerCasePartition = partition.map { case (key, value) => 
(key.toLowerCase, value) }
       catalogTable.schema.map { attr =>
         attributes.find(_.name.equalsIgnoreCase(attr.name)).get
@@ -954,6 +954,14 @@ case class CarbonLoadDataCommand(
     } else {
       catalogTable.schema.map(f => 
attributes.find(_.name.equalsIgnoreCase(f.name)).get)
     }
+    // Rearrange the partition column at the end of output list
+    if (catalogTable.partitionColumnNames.nonEmpty &&
+        (loadModel.getCarbonDataLoadSchema.getCarbonTable.isChildTable ||
+         loadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) && 
output.nonEmpty) {
+      val partitionOutPut =
+        catalogTable.partitionColumnNames.map(col => 
output.find(_.name.equalsIgnoreCase(col)).get)
+      output = output.filterNot(partitionOutPut.contains(_)) ++ partitionOutPut
+    }
     val partitionsLen = rdd.partitions.length
 
     // If it is global sort scope then appl sort logical plan on the sort 
columns
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
index 5b1d7e5..fcee878 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mv/DataMapListeners.scala
@@ -20,11 +20,14 @@ package org.apache.spark.sql.execution.command.mv
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AlterTableModel
 import 
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
+import 
org.apache.spark.sql.execution.command.partition.CarbonAlterTableDropHivePartitionCommand
 import org.apache.spark.util.DataMapUtil
 
+import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
@@ -137,12 +140,11 @@ object LoadPostDataMapListener extends 
OperationEventListener {
               .getDataMapProvider(carbonTable, dataMapSchema, sparkSession)
             try {
               provider.rebuild()
+              DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
             } catch {
               case ex: Exception =>
                 
DataMapStatusManager.disableDataMap(dataMapSchema.getDataMapName)
-                throw ex
             }
-            DataMapStatusManager.enableDataMap(dataMapSchema.getDataMapName)
           }
         }
       }
@@ -275,3 +277,108 @@ object DataMapChangeDataTypeorRenameColumnPreListener
     }
   }
 }
+
+object DataMapAlterTableDropPartitionMetaListener extends 
OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit 
= {
+    val dropPartitionEvent = 
event.asInstanceOf[AlterTableDropPartitionMetaEvent]
+    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
+    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
+    if (DataMapUtil.hasMVDataMap(parentCarbonTable)) {
+      // used as a flag to block direct drop partition on datamap tables fired 
by the user
+      operationContext.setProperty("isInternalDropCall", "true")
+      // Filter out all the tables which don't have the partition being 
dropped.
+      val dataMapSchemaList = DataMapStoreManager.getInstance
+        .getDataMapSchemasOfTable(parentCarbonTable).asScala
+      val childTablesWithoutPartitionColumns =
+        dataMapSchemaList.filter { dataMapSchema =>
+          val childColumns = dataMapSchema.getMainTableColumnList
+            .get(parentCarbonTable.getTableName).asScala
+          val partitionColExists =
+            partitionsToBeDropped.forall {
+              partition =>
+                childColumns.exists { childColumn =>
+                  childColumn.equalsIgnoreCase(partition)
+                }
+            }
+          !partitionColExists
+        }
+      if (childTablesWithoutPartitionColumns.nonEmpty) {
+        throw new MetadataProcessException(s"Cannot drop partition as one of 
the partition is not" +
+                                           s" participating in the following 
datamaps ${
+                                             
childTablesWithoutPartitionColumns.toList
+                                               
.map(_.getRelationIdentifier.getTableName)
+                                           }. Please drop the specified child 
tables to " +
+                                           s"continue")
+      } else {
+        // blocked drop partition for child tables having more than one parent 
table
+        val nonPartitionChildTables = 
dataMapSchemaList.filter(_.getParentTables.size() >= 2)
+        if (nonPartitionChildTables.nonEmpty) {
+          throw new MetadataProcessException(
+            s"Cannot drop partition if child Table is mapped to more than one 
parent table. Drop " +
+            s"datamaps ${ nonPartitionChildTables.toList.map(_.getDataMapName) 
}  to continue")
+        }
+        val childDropPartitionCommands =
+          dataMapSchemaList.map { dataMapSchema =>
+            val tableIdentifier = 
TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
+              Some(dataMapSchema.getRelationIdentifier.getDatabaseName))
+            if 
(!CarbonEnv.getCarbonTable(tableIdentifier)(SparkSession.getActiveSession.get)
+              .isHivePartitionTable) {
+              throw new MetadataProcessException(
+                "Cannot drop partition as one of the partition is not 
participating in the " +
+                "following datamap " + dataMapSchema.getDataMapName +
+                ". Please drop the specified datamap to continue")
+            }
+            // as the datamap table columns start with parent table name 
therefore the
+            // partition column also has to be updated with parent table name 
to generate
+            // partitionSpecs for the child table.
+            val childSpecs = dropPartitionEvent.specs.map {
+              spec =>
+                spec.map {
+                  case (key, value) => (s"${ parentCarbonTable.getTableName 
}_$key", value)
+                }
+            }
+            CarbonAlterTableDropHivePartitionCommand(
+              tableIdentifier,
+              childSpecs,
+              dropPartitionEvent.ifExists,
+              dropPartitionEvent.purge,
+              dropPartitionEvent.retainData,
+              operationContext)
+          }
+        operationContext.setProperty("dropPartitionCommands", 
childDropPartitionCommands)
+        
childDropPartitionCommands.foreach(_.processMetadata(SparkSession.getActiveSession.get))
+      }
+    } else if (parentCarbonTable.isChildTable) {
+      if (operationContext.getProperty("isInternalDropCall") == null) {
+        throw new UnsupportedOperationException("Cannot drop partition 
directly on child table")
+      }
+    }
+  }
+}
+
+object DataMapAlterTableDropPartitionPreStatusListener extends 
OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override protected def onEvent(event: Event,
+      operationContext: OperationContext) = {
+    val preStatusListener = 
event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+    val carbonTable = preStatusListener.carbonTable
+    val childDropPartitionCommands = 
operationContext.getProperty("dropPartitionCommands")
+    if (childDropPartitionCommands != null && 
DataMapUtil.hasMVDataMap(carbonTable)) {
+      val childCommands =
+        
childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+    }
+  }
+}
+
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 9119375..44e51a1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -70,10 +70,6 @@ case class CarbonAlterTableDropHivePartitionCommand(
     table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
     setAuditTable(table)
     setAuditInfo(Map("partition" -> specs.mkString(",")))
-    if (DataMapUtil.hasMVDataMap(table) || table.isChildTable) {
-      throw new MalformedCarbonCommandException(
-        "Drop Partition is not supported for datamap table or for tables which 
have child datamap")
-    }
     if (table.isHivePartitionTable) {
       var locks = List.empty[ICarbonLock]
       try {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index c4c3539..adf89cd 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil, 
Segment}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index a117814..ff0177b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -67,6 +67,18 @@ case class CarbonDropTableCommand(
         lock => carbonLocks +=
                 CarbonLockUtil.getLockObject(identifier, lock)
       }
+      // check for directly drop datamap table
+      if (carbonTable.isChildTable && !dropChildTable) {
+        if (!ifExistsSet) {
+          throwMetadataException(dbName, tableName,
+            "Child table which is associated with datamap cannot be dropped, " 
+
+            "use DROP DATAMAP command to drop")
+        } else {
+          LOGGER.info("Skipping Drop table " + tableName +
+                      " because Child table which is associated with datamap 
cannot be dropped")
+          return Seq.empty
+        }
+      }
 
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
         throw new ConcurrentOperationException(carbonTable, "loading", "drop 
table")
@@ -198,7 +210,7 @@ case class CarbonDropTableCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // clear driver side index and dictionary cache
-    if (carbonTable != null) {
+    if (carbonTable != null && !(carbonTable.isChildTable && !dropChildTable)) 
{
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
       // delete the table folder
       val tablePath = carbonTable.getTablePath
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 91d7675..4791687 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -326,6 +326,10 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
               throw new MalformedCarbonCommandException(
                 "The table which has MV datamap does not support set streaming 
property")
             }
+            if (carbonTable.isChildTable) {
+              throw new MalformedCarbonCommandException(
+                "Datamap table does not support set streaming property")
+            }
           }
         }
         ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, 
isView)) :: Nil
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3769671..29bd0c6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1184,4 +1184,47 @@ public final class CarbonLoaderUtil {
       FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath));
     }
   }
+
+  /**
+   * Update specified segment status for load to MarkedForDelete in case of 
failure
+   */
+  public static void updateTableStatusInCaseOfFailure(String loadName,
+      AbsoluteTableIdentifier absoluteTableIdentifier, String tableName, 
String databaseName,
+      String tablePath, String metaDataPath) throws IOException {
+    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for table" + databaseName + "." + tableName
+            + " for table status updation");
+        LoadMetadataDetails[] loadMetadataDetails =
+            SegmentStatusManager.readLoadMetadata(metaDataPath);
+        boolean ifTableStatusUpdateRequired = false;
+        for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
+          if (loadMetadataDetail.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS && loadName
+              .equalsIgnoreCase(loadMetadataDetail.getLoadName())) {
+            
loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+            ifTableStatusUpdateRequired = true;
+          }
+        }
+        if (ifTableStatusUpdateRequired) {
+          SegmentStatusManager
+              
.writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
+                  loadMetadataDetails);
+        }
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table 
" + databaseName + "."
+                + tableName);
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" 
+ databaseName + "."
+            + tableName);
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + databaseName + 
"." + tableName
+            + " during table status updation");
+      }
+    }
+  }
 }

Reply via email to