http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
new file mode 100644
index 0000000..795ef6a
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
DataMapStoreManager, Segment}
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import 
org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap,
 AbstractCoarseGrainIndexDataMapFactory}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+
+class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory {
+
+  var identifier: AbsoluteTableIdentifier = _
+
+  override def init(identifier: AbsoluteTableIdentifier,
+      dataMapSchema: DataMapSchema): Unit = {
+    this.identifier = identifier
+  }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segment: Segment): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): 
util.List[AbstractCoarseGrainIndexDataMap] = ???
+
+  override def getDataMaps(segment: Segment): 
util.List[AbstractCoarseGrainIndexDataMap] = ???
+
+  override def createWriter(segment: Segment, dataWritePath: String): 
AbstractDataMapWriter =
+    IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, 
dataWritePath)
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, 
List(ExpressionType.EQUALS).asJava)
+
+  /**
+   * Get all distributable objects of a segmentid
+   *
+   * @return
+   */
+  override def toDistributable(segmentId: Segment): 
util.List[DataMapDistributable] = {
+    ???
+  }
+
+}
+
+class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
+  def buildTestData(numRows: Int): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numRows, 1)
+      .map(x => ("a" + x, "b", x))
+      .toDF("c1", "c2", "c3")
+  }
+
+  def dropTable(): Unit = {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
+  }
+
+  override def beforeAll {
+    dropTable()
+  }
+
+  test("test write datamap 2 pages") {
+    sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 
'org.apache.carbondata.format'")
+    // register datamap writer
+    sql(s"CREATE DATAMAP test ON TABLE carbon1 USING 
'${classOf[C2IndexDataMapFactory].getName}'")
+    val df = buildTestData(33000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon1")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
+    assert(
+      IndexDataMapWriterSuite.callbackSeq.slice(1, 
IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "blocklet end: 0"
+      ))
+    IndexDataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  test("test write datamap 2 blocklet") {
+    sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 
'org.apache.carbondata.format'")
+    sql(s"CREATE DATAMAP test ON TABLE carbon2 USING 
'${classOf[C2IndexDataMapFactory].getName}'")
+
+    CarbonProperties.getInstance()
+      .addProperty("carbon.blockletgroup.size.in.mb", "1")
+    CarbonProperties.getInstance()
+      .addProperty("carbon.number.of.cores.while.loading",
+        CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)
+
+    val df = buildTestData(300000)
+
+    // save dataframe to carbon file
+    df.write
+      .format("carbondata")
+      .option("tableName", "carbon2")
+      .option("tempCSV", "false")
+      .option("sort_columns","c1")
+      .option("SORT_SCOPE","GLOBAL_SORT")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start"))
+    assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end"))
+    // corrected test case the min "carbon.blockletgroup.size.in.mb" size 
could not be less than
+    // 64 MB
+    assert(
+      IndexDataMapWriterSuite.callbackSeq.slice(1, 
IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq(
+        "blocklet start 0",
+        "add page data: blocklet 0, page 0",
+        "add page data: blocklet 0, page 1",
+        "add page data: blocklet 0, page 2",
+        "add page data: blocklet 0, page 3",
+        "add page data: blocklet 0, page 4",
+        "add page data: blocklet 0, page 5",
+        "add page data: blocklet 0, page 6",
+        "add page data: blocklet 0, page 7",
+        "add page data: blocklet 0, page 8",
+        "add page data: blocklet 0, page 9",
+        "blocklet end: 0"
+      ))
+    IndexDataMapWriterSuite.callbackSeq = Seq()
+  }
+
+  override def afterAll {
+    dropTable()
+  }
+}
+
+object IndexDataMapWriterSuite {
+
+  var callbackSeq: Seq[String] = Seq[String]()
+
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: 
Segment,
+      dataWritePath: String) =
+    new AbstractDataMapWriter(identifier, segment, dataWritePath) {
+
+    override def onPageAdded(
+        blockletId: Int,
+        pageId: Int,
+        pages: Array[ColumnPage]): Unit = {
+      assert(pages.length == 1)
+      assert(pages(0).getDataType == DataTypes.STRING)
+      val bytes: Array[Byte] = pages(0).getByteArrayPage()(0)
+      assert(bytes.sameElements(Seq(0, 1, 'b'.toByte)))
+      callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId"
+    }
+
+    override def onBlockletEnd(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet end: $blockletId"
+    }
+
+    override def onBlockEnd(blockId: String): Unit = {
+      callbackSeq :+= s"block end $blockId"
+    }
+
+    override def onBlockletStart(blockletId: Int): Unit = {
+      callbackSeq :+= s"blocklet start $blockletId"
+    }
+
+    /**
+     * Start of new block notification.
+     *
+     * @param blockId file name of the carbondata file
+     */
+    override def onBlockStart(blockId: String) = {
+      callbackSeq :+= s"block start $blockId"
+    }
+
+    /**
+     * This is called during closing of writer.So after this call no more data 
will be sent to this
+     * class.
+     */
+    override def finish() = {
+
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
deleted file mode 100644
index 717af6f..0000000
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{File, FilenameFilter}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.MetadataProcessException
-import 
org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, 
NoSuchDataMapException}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
-
-  val testData = s"$resourcesPath/sample.csv"
-
-  override def beforeAll {
-    sql("drop table if exists datamaptest")
-    sql("drop table if exists datamapshowtest")
-    sql("drop table if exists uniqdata")
-    sql("create table datamaptest (a string, b string, c string) stored by 
'carbondata'")
-  }
-
-  val newClass = "org.apache.spark.sql.CarbonSource"
-
-  test("test datamap create: don't support using non-exist class") {
-    intercept[MetadataProcessException] {
-      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
-    }
-  }
-
-  test("test datamap create with dmproperties: don't support using non-exist 
class") {
-    intercept[MetadataProcessException] {
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
-    }
-  }
-
-  test("test datamap create with existing name: don't support using non-exist 
class") {
-    intercept[MetadataProcessException] {
-      sql(
-        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
-    }
-  }
-
-  test("test datamap create with preagg") {
-    sql("drop datamap if exists datamap3 on table datamaptest")
-    sql(
-      "create datamap datamap3 on table datamaptest using 'preaggregate' 
dmproperties('key'='value') as select count(a) from datamaptest")
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 1)
-    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
-    assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value"))
-    
assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
-  }
-
-  test("check hivemetastore after drop datamap") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          "true")
-      sql("drop table if exists hiveMetaStoreTable")
-      sql("create table hiveMetaStoreTable (a string, b string, c string) 
stored by 'carbondata'")
-
-      sql(
-        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable 
using 'preaggregate' dmproperties('key'='value') as select count(a) from 
hiveMetaStoreTable")
-      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, 
"datamap_hiveMetaStoreTable")
-
-      sql("drop datamap datamap_hiveMetaStoreTable on table 
hiveMetaStoreTable")
-      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, 
"datamap_hiveMetaStoreTable")
-
-    } finally {
-      sql("drop table hiveMetaStoreTable")
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("drop the table having pre-aggregate") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          "true")
-      sql("drop table if exists hiveMetaStoreTable_1")
-      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) 
stored by 'carbondata'")
-
-      sql(
-        "create datamap datamap_hiveMetaStoreTable_1 on table 
hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select 
count(a) from hiveMetaStoreTable_1")
-
-      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
-        true,
-        "datamap_hiveMetaStoreTable_1")
-
-      sql("drop datamap datamap_hiveMetaStoreTable_1 on table 
hiveMetaStoreTable_1")
-      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
-        false,
-        "datamap_hiveMetaStoreTable_1")
-      assert(sql("show datamap on table 
hiveMetaStoreTable_1").collect().length == 0)
-      sql("drop table hiveMetaStoreTable_1")
-
-      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
-    }
-    finally {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("test datamap create with preagg with duplicate name") {
-    sql(
-      s"""
-         | CREATE DATAMAP datamap10 ON TABLE datamaptest
-         | USING 'preaggregate'
-         | DMPROPERTIES('key'='value')
-         | AS SELECT COUNT(a) FROM datamaptest
-         """.stripMargin)
-    intercept[MalformedDataMapCommandException] {
-      sql(
-        s"""
-           | CREATE DATAMAP datamap10 ON TABLE datamaptest
-           | USING 'preaggregate'
-           | DMPROPERTIES('key'='value')
-           | AS SELECT COUNT(a) FROM datamaptest
-         """.stripMargin)
-    }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
-  }
-
-  test("test drop non-exist datamap") {
-    intercept[NoSuchDataMapException] {
-      sql("drop datamap nonexist on table datamaptest")
-    }
-    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
-    assert(table != null)
-    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
-    assert(dataMapSchemaList.size() == 2)
-  }
-
-  test("test show datamap without preaggregate: don't support using non-exist 
class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
-      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
-      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, 
"datamap1", "datamap2", "(NA)", newClass)
-    }
-  }
-
-  test("test show datamap with preaggregate: don't support using non-exist 
class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
-      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
-      val frame = sql("show datamap on table datamapshowtest")
-      assert(frame.collect().length == 2)
-      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, 
"default.datamapshowtest_datamap1")
-    }
-  }
-
-  test("test show datamap with no datamap") {
-    sql("drop table if exists datamapshowtest")
-    sql("create table datamapshowtest (a string, b string, c string) stored by 
'carbondata'")
-    assert(sql("show datamap on table datamapshowtest").collect().length == 0)
-  }
-
-  test("test show datamap after dropping datamap: don't support using 
non-exist class") {
-    intercept[MetadataProcessException] {
-      sql("drop table if exists datamapshowtest")
-      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
-      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
-      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
DMPROPERTIES('key'='value')")
-      sql("drop datamap datamap1 on table datamapshowtest")
-      val frame = sql("show datamap on table datamapshowtest")
-      assert(frame.collect().length == 1)
-      checkExistence(frame, true, "datamap2", "(NA)", newClass)
-    }
-  }
-
-  test("test if preaggregate load is successfull for hivemetastore") {
-    try {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, 
"true")
-      sql("DROP TABLE IF EXISTS maintable")
-      sql(
-        """
-          | CREATE TABLE maintable(id int, name string, city string, age int)
-          | STORED BY 'org.apache.carbondata.format'
-        """.stripMargin)
-      sql(
-        s"""create datamap preagg_sum on table maintable using 'preaggregate' 
as select id,sum(age) from maintable group by id"""
-
-          .stripMargin)
-      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
-      checkAnswer(sql(s"select * from maintable_preagg_sum"),
-        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
-    } finally {
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    }
-  }
-
-  test("test preaggregate load for decimal column for hivemetastore") {
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
 "true")
-    sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format'")
-    sql("insert into uniqdata select 
9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 
01:00:03','1970-01-01 
02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
-    sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as 
select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
-    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), 
Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
-    sql("drop datamap if exists uniqdata_agg on table uniqdata")
-  }
-
-  test("create pre-agg table with path") {
-    sql("drop table if exists main_preagg")
-    sql("drop table if exists main ")
-    val warehouse = s"$metastoredb/warehouse"
-    val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
-    sql(
-      s"""
-         | create table main(
-         |     year int,
-         |     month int,
-         |     name string,
-         |     salary int)
-         | stored by 'carbondata'
-         | tblproperties('sort_columns'='month,year,name')
-      """.stripMargin)
-    sql("insert into main select 10,11,'amy',12")
-    sql("insert into main select 10,11,'amy',14")
-    sql(
-      s"""
-         | create datamap preagg
-         | on table main
-         | using 'preaggregate'
-         | dmproperties ('path'='$path')
-         | as select name,avg(salary)
-         |    from main
-         |    group by name
-       """.stripMargin)
-    assertResult(true)(new File(path).exists())
-    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, 
"0")}")
-      .list(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
-        }
-      }).length > 0)
-    checkAnswer(sql("select name,avg(salary) from main group by name"), 
Row("amy", 13.0))
-    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
-    sql("drop datamap preagg on table main")
-    assertResult(false)(new File(path).exists())
-    sql("drop table main")
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS maintable")
-    sql("drop table if exists uniqdata")
-    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
-      CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
-    sql("drop table if exists datamaptest")
-    sql("drop table if exists datamapshowtest")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
new file mode 100644
index 0000000..a05a8c2
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, 
NoSuchDataMapException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+  val testData = s"$resourcesPath/sample.csv"
+
+  override def beforeAll {
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+    sql("drop table if exists uniqdata")
+    sql("create table datamaptest (a string, b string, c string) stored by 
'carbondata'")
+  }
+
+  val newClass = "org.apache.spark.sql.CarbonSource"
+
+  test("test datamap create: don't support using non-exist class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+    }
+  }
+
+  test("test datamap create with dmproperties: don't support using non-exist 
class") {
+    intercept[MetadataProcessException] {
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with existing name: don't support using non-exist 
class") {
+    intercept[MetadataProcessException] {
+      sql(
+        s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' 
DMPROPERTIES('key'='value')")
+    }
+  }
+
+  test("test datamap create with preagg") {
+    sql("drop datamap if exists datamap3 on table datamaptest")
+    sql(
+      "create datamap datamap3 on table datamaptest using 'preaggregate' as 
select count(a) from datamaptest")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 1)
+    assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+    
assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+  }
+
+  test("check hivemetastore after drop datamap") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable")
+      sql("create table hiveMetaStoreTable (a string, b string, c string) 
stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable 
using 'preaggregate' as select count(a) from hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, 
"datamap_hiveMetaStoreTable")
+
+      sql("drop datamap datamap_hiveMetaStoreTable on table 
hiveMetaStoreTable")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, 
"datamap_hiveMetaStoreTable")
+
+    } finally {
+      sql("drop table hiveMetaStoreTable")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("drop the table having pre-aggregate") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          "true")
+      sql("drop table if exists hiveMetaStoreTable_1")
+      sql("create table hiveMetaStoreTable_1 (a string, b string, c string) 
stored by 'carbondata'")
+
+      sql(
+        "create datamap datamap_hiveMetaStoreTable_1 on table 
hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from 
hiveMetaStoreTable_1")
+
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        true,
+        "datamap_hiveMetaStoreTable_1")
+
+      sql("drop datamap datamap_hiveMetaStoreTable_1 on table 
hiveMetaStoreTable_1")
+      checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+        false,
+        "datamap_hiveMetaStoreTable_1")
+      assert(sql("show datamap on table 
hiveMetaStoreTable_1").collect().length == 0)
+      sql("drop table hiveMetaStoreTable_1")
+
+      checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test datamap create with preagg with duplicate name") {
+    sql(
+      s"""
+         | CREATE DATAMAP datamap10 ON TABLE datamaptest
+         | USING 'preaggregate'
+         | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    intercept[MalformedDataMapCommandException] {
+      sql(
+        s"""
+           | CREATE DATAMAP datamap10 ON TABLE datamaptest
+           | USING 'preaggregate'
+           | AS SELECT COUNT(a) FROM datamaptest
+         """.stripMargin)
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test drop non-exist datamap") {
+    intercept[NoSuchDataMapException] {
+      sql("drop datamap nonexist on table datamaptest")
+    }
+    val table = CarbonMetadata.getInstance().getCarbonTable("default", 
"datamaptest")
+    assert(table != null)
+    val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+    assert(dataMapSchemaList.size() == 2)
+  }
+
+  test("test show datamap without preaggregate: don't support using non-exist 
class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' 
")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
")
+      checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, 
"datamap1", "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test show datamap with preaggregate: don't support using non-exist 
class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 2)
+      checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, 
"default.datamapshowtest_datamap1")
+    }
+  }
+
+  test("test show datamap with no datamap") {
+    sql("drop table if exists datamapshowtest")
+    sql("create table datamapshowtest (a string, b string, c string) stored by 
'carbondata'")
+    assert(sql("show datamap on table datamapshowtest").collect().length == 0)
+  }
+
+  test("test show datamap after dropping datamap: don't support using 
non-exist class") {
+    intercept[MetadataProcessException] {
+      sql("drop table if exists datamapshowtest")
+      sql("create table datamapshowtest (a string, b string, c string) stored 
by 'carbondata'")
+      sql("create datamap datamap1 on table datamapshowtest using 
'preaggregate' as select count(a) from datamapshowtest")
+      sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' 
")
+      sql("drop datamap datamap1 on table datamapshowtest")
+      val frame = sql("show datamap on table datamapshowtest")
+      assert(frame.collect().length == 1)
+      checkExistence(frame, true, "datamap2", "(NA)", newClass)
+    }
+  }
+
+  test("test if preaggregate load is successfull for hivemetastore") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, 
"true")
+      sql("DROP TABLE IF EXISTS maintable")
+      sql(
+        """
+          | CREATE TABLE maintable(id int, name string, city string, age int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(
+        s"""create datamap preagg_sum on table maintable using 'preaggregate' 
as select id,sum(age) from maintable group by id"""
+
+          .stripMargin)
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+      checkAnswer(sql(s"select * from maintable_preagg_sum"),
+        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
+  }
+
+  test("test preaggregate load for decimal column for hivemetastore") {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
 "true")
+    sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format'")
+    sql("insert into uniqdata select 
9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 
01:00:03','1970-01-01 
02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
+    sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as 
select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
+    checkAnswer(sql("select * from uniqdata_uniqdata_agg"), 
Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
+    sql("drop datamap if exists uniqdata_agg on table uniqdata")
+  }
+
+  test("create pre-agg table with path") {
+    sql("drop table if exists main_preagg")
+    sql("drop table if exists main ")
+    val warehouse = s"$metastoredb/warehouse"
+    val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
+    sql(
+      s"""
+         | create table main(
+         |     year int,
+         |     month int,
+         |     name string,
+         |     salary int)
+         | stored by 'carbondata'
+         | tblproperties('sort_columns'='month,year,name')
+      """.stripMargin)
+    sql("insert into main select 10,11,'amy',12")
+    sql("insert into main select 10,11,'amy',14")
+    sql(
+      s"""
+         | create datamap preagg
+         | on table main
+         | using 'preaggregate'
+         | dmproperties ('path'='$path')
+         | as select name,avg(salary)
+         |    from main
+         |    group by name
+       """.stripMargin)
+    assertResult(true)(new File(path).exists())
+    assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, 
"0")}")
+      .list(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+        }
+      }).length > 0)
+    checkAnswer(sql("select name,avg(salary) from main group by name"), 
Row("amy", 13.0))
+    checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+    sql("drop datamap preagg on table main")
+    assertResult(false)(new File(path).exists())
+    sql("drop table main")
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql("drop table if exists uniqdata")
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+      CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    sql("drop table if exists datamaptest")
+    sql("drop table if exists datamapshowtest")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
new file mode 100644
index 0000000..2b3a306
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import static 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE;
+import static 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES;
+
+public class DataMapManager {
+
+  private static DataMapManager INSTANCE;
+
+  private DataMapManager() { }
+
+  public static synchronized DataMapManager get() {
+    if (INSTANCE == null) {
+      INSTANCE = new DataMapManager();
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Return a DataMapProvider instance for specified dataMapSchema.
+   */
+  public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) {
+    DataMapProvider provider;
+    if 
(dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) {
+      provider = new PreAggregateDataMapProvider();
+    } else if 
(dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) {
+      provider = new TimeseriesDataMapProvider();
+    } else {
+      provider = new IndexDataMapProvider();
+    }
+    return provider;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
new file mode 100644
index 0000000..0cf0d04
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Property that can be specified when creating DataMap
+ */
+@InterfaceAudience.Internal
+public class DataMapProperty {
+
+  /**
+   * Used to specify the store location of the datamap
+   */
+  public static final String PATH = "path";
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
new file mode 100644
index 0000000..a71e0d8
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.processing.exception.DataLoadingException;
+
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DataMap is a accelerator for certain type of query. Developer can add new 
DataMap
+ * implementation to improve query performance.
+ *
+ * Currently two types of DataMap are supported
+ * <ol>
+ *   <li> MVDataMap: materialized view type of DataMap to accelerate olap 
style query,
+ * like SPJG query (select, predicate, join, groupby) </li>
+ *   <li> IndexDataMap: index type of DataMap to accelerate filter query </li>
+ * </ol>
+ *
+ * <p>
+ * In following command <br>
+ * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br>
+ * the <b>provider</b> string can be a short name or class name of the DataMap 
implementation.
+ *
+ * <br>Currently CarbonData supports following provider:
+ * <ol>
+ *   <li> preaggregate: one type of MVDataMap that do pre-aggregate of single 
table </li>
+ *   <li> timeseries: one type of MVDataMap that do pre-aggregate based on 
time dimension
+ *     of the table </li>
+ *   <li> class name of {@link 
org.apache.carbondata.core.datamap.dev.IndexDataMapFactory}
+ * implementation: Developer can implement new type of IndexDataMap by 
extending
+ * {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} </li>
+ * </ol>
+ *
+ * @since 1.4.0
+ */
+@InterfaceAudience.Developer("DataMap")
+@InterfaceStability.Unstable
+public interface DataMapProvider {
+
+  /**
+   * Initialize a datamap's metadata.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm 
ON TABLE mainTable"
+   * Implementation should initialize metadata for datamap, like creating table
+   */
+  void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String 
ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException;
+
+  /**
+   * Initialize a datamap's data.
+   * This is called when user creates datamap, for example "CREATE DATAMAP dm 
ON TABLE mainTable"
+   * Implementation should initialize data for datamap, like creating data 
folders
+   */
+  void initData(CarbonTable mainTable, SparkSession sparkSession);
+
+  /**
+   * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, 
String, SparkSession)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON 
TABLE mainTable"
+   * Implementation should clean all meta for the datamap
+   */
+  void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, 
SparkSession sparkSession);
+
+  /**
+   * Opposite operation of {@link #initData(CarbonTable, SparkSession)}.
+   * This is called when user drops datamap, for example "DROP DATAMAP dm ON 
TABLE mainTable"
+   * Implementation should clean all data for the datamap
+   */
+  void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, 
SparkSession sparkSession);
+
+  /**
+   * Rebuild the datamap by loading all existing data from mainTable
+   * This is called when refreshing the datamap when
+   * 1. after datamap creation and if `autoRefreshDataMap` is set to true
+   * 2. user manually trigger refresh datamap command
+   */
+  void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws 
DataLoadingException;
+
+  /**
+   * Build the datamap incrementally by loading specified segment data
+   * This is called when user manually trigger refresh datamap
+   */
+  void incrementalBuild(CarbonTable mainTable, String[] segmentIds, 
SparkSession sparkSession)
+    throws DataLoadingException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
new file mode 100644
index 0000000..e11e522
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.DataMapRegistry;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.format.TableInfo;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil;
+
+@InterfaceAudience.Internal
+public class IndexDataMapProvider implements DataMapProvider {
+
+  private TableInfo originalTableInfo;
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, 
String ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory = 
createIndexDataMapFactory(dataMapSchema);
+    DataMapStoreManager.getInstance().registerDataMap(
+        mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+    originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, 
dataMapSchema, sparkSession);
+  }
+
+  @Override
+  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, 
sparkSession);
+  }
+
+  @Override
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    DataMapStoreManager.getInstance().clearDataMap(
+        mainTable.getAbsoluteTableIdentifier(), 
dataMapSchema.getDataMapName());
+  }
+
+  @Override
+  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+      SparkSession sparkSession) {
+    throw new UnsupportedOperationException();
+  }
+
+  private IndexDataMapFactory createIndexDataMapFactory(DataMapSchema 
dataMapSchema)
+      throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory;
+    try {
+      // try to create DataMapProvider instance by taking providerName as 
class name
+      Class<? extends IndexDataMapFactory> providerClass =
+          (Class<? extends IndexDataMapFactory>) 
Class.forName(dataMapSchema.getClassName());
+      dataMapFactory = providerClass.newInstance();
+    } catch (ClassNotFoundException e) {
+      // try to create DataMapProvider instance by taking providerName as 
short name
+      dataMapFactory = 
getDataMapFactoryByShortName(dataMapSchema.getClassName());
+    } catch (Throwable e) {
+      throw new MetadataProcessException(
+          "failed to create DataMapProvider '" + dataMapSchema.getClassName() 
+ "'", e);
+    }
+    return dataMapFactory;
+  }
+
+  private IndexDataMapFactory getDataMapFactoryByShortName(String providerName)
+      throws MalformedDataMapCommandException {
+    IndexDataMapFactory dataMapFactory;
+    String className = DataMapRegistry.getDataMapClassName(providerName);
+    if (className != null) {
+      try {
+        Class<? extends IndexDataMapFactory> datamapClass =
+            (Class<? extends IndexDataMapFactory>) Class.forName(providerName);
+        dataMapFactory = datamapClass.newInstance();
+      } catch (ClassNotFoundException ex) {
+        throw new MalformedDataMapCommandException(
+            "DataMap '" + providerName + "' not found", ex);
+      } catch (Throwable ex) {
+        throw new MetadataProcessException(
+            "failed to create DataMap '" + providerName + "'", ex);
+      }
+    } else {
+      throw new MalformedDataMapCommandException(
+          "DataMap '" + providerName + "' not found");
+    }
+    return dataMapFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
new file mode 100644
index 0000000..dc53900
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import 
org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand;
+import scala.Some;
+
+@InterfaceAudience.Internal
+public class PreAggregateDataMapProvider implements DataMapProvider {
+  protected PreAggregateTableHelper helper;
+  protected CarbonDropTableCommand dropTableCommand;
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, 
String ctasSqlStatement,
+      SparkSession sparkSession) throws MalformedDataMapCommandException {
+    validateDmProperty(dataMapSchema);
+    helper = new PreAggregateTableHelper(
+        mainTable, dataMapSchema.getDataMapName(), 
dataMapSchema.getClassName(),
+        dataMapSchema.getProperties(), ctasSqlStatement, null, false);
+    helper.initMeta(sparkSession);
+  }
+
+  private void validateDmProperty(DataMapSchema dataMapSchema)
+      throws MalformedDataMapCommandException {
+    if (!dataMapSchema.getProperties().isEmpty()) {
+      if (dataMapSchema.getProperties().size() > 1 ||
+          !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH)) {
+        throw new MalformedDataMapCommandException(
+            "Only 'path' dmproperty is allowed for this datamap");
+      }
+    }
+  }
+
+  @Override
+  public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+    // Nothing is needed to do by default
+  }
+
+  @Override
+  public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    dropTableCommand = new CarbonDropTableCommand(
+        true,
+        new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()),
+        dataMapSchema.getRelationIdentifier().getTableName(),
+        true);
+    dropTableCommand.processMetadata(sparkSession);
+  }
+
+  @Override
+  public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+      SparkSession sparkSession) {
+    if (dropTableCommand != null) {
+      dropTableCommand.processData(sparkSession);
+    }
+  }
+
+  @Override
+  public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+    if (helper != null) {
+      helper.initData(sparkSession);
+    }
+  }
+
+  @Override
+  public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+      SparkSession sparkSession) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
new file mode 100644
index 0000000..a66f26a
--- /dev/null
+++ 
b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import 
org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil;
+import scala.Some;
+import scala.Tuple2;
+
+@InterfaceAudience.Internal
+public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider {
+
+  @Override
+  public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, 
String ctasSqlStatement,
+      SparkSession sparkSession) {
+    Map<String, String> dmProperties = dataMapSchema.getProperties();
+    String dmProviderName = dataMapSchema.getClassName();
+    TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName);
+    Tuple2<String, String> details =
+        TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, 
dmProviderName);
+    dmProperties.remove(details._1());
+    helper = new PreAggregateTableHelper(
+        mainTable, dataMapSchema.getDataMapName(), 
dataMapSchema.getClassName(),
+        dmProperties, ctasSqlStatement, new Some(details._1()), false);
+    helper.initMeta(sparkSession);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 39e73ee..9315208 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -94,7 +94,7 @@ class CarbonEnv {
             properties.addProperty(CarbonCommonConstants.STORE_LOCATION, 
storePath)
           }
           LOGGER.info(s"carbon env initial: $storePath")
-          // trigger event for CarbonEnv init
+          // trigger event for CarbonEnv create
           val operationContext = new OperationContext
           val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
             CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c6d86b3..8532e9d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -19,20 +19,13 @@ package org.apache.spark.sql.execution.command.datamap
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
-import 
org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand,
 PreAggregateUtil}
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.hive.CarbonRelation
 
-import org.apache.carbondata.common.exceptions.MetadataProcessException
 import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
 
 /**
  * Below command class will be used to create datamap on table
@@ -47,59 +40,25 @@ case class CarbonCreateDataMapCommand(
     ifNotExistsSet: Boolean = false)
   extends AtomicRunnableCommand {
 
-  var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
-  var tableIsExists: Boolean = false
+  private var dataMapProvider: DataMapProvider = _
+  private var mainTable: CarbonTable = _
+  private var dataMapSchema: DataMapSchema = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and 
pre-aggregate yet,
     // so streaming table does not support create datamap
-    val carbonTable =
-    CarbonEnv.getCarbonTable(tableIdentifier.database, 
tableIdentifier.table)(sparkSession)
-    if (carbonTable.isStreamingTable) {
+    mainTable =
+      CarbonEnv.getCarbonTable(tableIdentifier.database, 
tableIdentifier.table)(sparkSession)
+    if (mainTable.isStreamingTable) {
       throw new MalformedCarbonCommandException("Streaming table does not 
support creating datamap")
     }
-    validateDataMapName(carbonTable)
 
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-        dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmClassName)
-      createPreAggregateTableCommands = if 
(dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-        val details = 
TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmClassName)
-        val updatedDmProperties = dmProperties - details._1
-        CreatePreAggregateTableCommand(
-          dataMapName,
-          tableIdentifier,
-          DataMapProvider.TIMESERIES,
-          updatedDmProperties,
-          queryString.get,
-          Some(details._1))
-      } else {
-        CreatePreAggregateTableCommand(
-          dataMapName,
-          tableIdentifier,
-          DataMapProvider.PREAGGREGATE,
-          dmProperties,
-          queryString.get
-        )
-      }
-      try {
-        createPreAggregateTableCommands.processMetadata(sparkSession)
-      } catch {
-        case e: Throwable => throw new MetadataProcessException(s"Failed to 
create datamap " +
-                                                                
s"'$dataMapName'", e)
-      }
-    } else {
-      val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
-      dataMapSchema.setProperties(new java.util.HashMap[String, 
String](dmProperties.asJava))
-      val dbName = 
CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
-      val carbonTable = 
CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
-        Some(dbName),
-        
tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-      DataMapStoreManager.getInstance().createAndRegisterDataMap(
-        carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
-      // Save DataMapSchema in the  schema file of main table
-      PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, 
sparkSession)
-    }
+    validateDataMapName(mainTable)
+    dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+    dataMapSchema.setProperties(new java.util.HashMap[String, 
String](dmProperties.asJava))
+    dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema)
+    dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, 
sparkSession)
+
     val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     LOGGER.audit(s"DataMap $dataMapName successfully added to Table 
${tableIdentifier.table}")
     Seq.empty
@@ -115,29 +74,20 @@ case class CarbonCreateDataMapCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      if (!tableIsExists) {
-        createPreAggregateTableCommands.processData(sparkSession)
-      } else {
-        Seq.empty
+    if (dataMapProvider != null) {
+      dataMapProvider.initData(mainTable, sparkSession)
+      if (mainTable.isAutoRefreshDataMap) {
+        dataMapProvider.rebuild(mainTable, sparkSession)
       }
-    } else {
-      Seq.empty
     }
+    Seq.empty
   }
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
-      dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      if (!tableIsExists && createPreAggregateTableCommands != null) {
-        createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
-      } else {
-        Seq.empty
-      }
-    } else {
-      throw new MalformedDataMapCommandException("Unknown datamap 
provider/class " + dmClassName)
+    if (dataMapProvider != null) {
+      dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
     }
+    Seq.empty
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index e89d15a..f773a55 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.MetadataProcessException
 import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
 import org.apache.carbondata.events._
 
 /**
@@ -52,8 +51,10 @@ case class CarbonDropDataMapCommand(
     forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
-  val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-  var commandToRun: CarbonDropTableCommand = _
+  private val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private var dataMapProvider: DataMapProvider = _
+  private var mainTable: CarbonTable = _
+  private var dataMapSchema: DataMapSchema = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -66,7 +67,6 @@ case class CarbonDropDataMapCommand(
     catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, 
Some(dbName)))
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = 
ListBuffer()
     try {
-      forceDropTableFromMetaStore(sparkSession)
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, 
lock)
       }
@@ -82,35 +82,33 @@ case class CarbonDropDataMapCommand(
       // If force drop is true then remove the datamap from hivemetastore. No 
need to remove from
       // parent as the first condition would have taken care of it.
       if (carbonTable.isDefined && 
carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
-        val dataMapSchema = 
carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
+        mainTable = carbonTable.get
+        val dataMapSchemaOp = 
mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
-        if (dataMapSchema.isDefined) {
+        if (dataMapSchemaOp.isDefined) {
+          dataMapSchema = dataMapSchemaOp.get._1
           val operationContext = new OperationContext
           val dropDataMapPreEvent =
             DropDataMapPreEvent(
-              Some(dataMapSchema.get._1),
+              Some(dataMapSchema),
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, 
operationContext)
-          
carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
+          
mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2)
           val schemaConverter = new ThriftWrapperSchemaConverterImpl
           PreAggregateUtil.updateSchemaInfo(
-            carbonTable.get,
+            mainTable,
             schemaConverter.fromWrapperToExternalTableInfo(
-              carbonTable.get.getTableInfo,
+              mainTable.getTableInfo,
               dbName,
               tableName))(sparkSession)
-          commandToRun = CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
-            dataMapSchema.get._1.getRelationIdentifier.getTableName,
-            dropChildTable = true
-          )
-          commandToRun.processMetadata(sparkSession)
+          dataMapProvider = 
DataMapManager.get.getDataMapProvider(dataMapSchema)
+          dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+
           // fires the event after dropping datamap from main table schema
           val dropDataMapPostEvent =
             DropDataMapPostEvent(
-              Some(dataMapSchema.get._1),
+              Some(dataMapSchema),
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, 
operationContext)
@@ -118,7 +116,7 @@ case class CarbonDropDataMapCommand(
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
       } else if (carbonTable.isDefined &&
-        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
+                 carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 
0) {
         if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
@@ -142,39 +140,10 @@ case class CarbonDropDataMapCommand(
     Seq.empty
   }
 
-  /**
-   * Used to drop child datamap from hive metastore if it exists.
-   * forceDrop will be true only when an exception occurs in main table status 
updation for
-   * parent table or in processData from CreatePreAggregateTableCommand.
-   */
-  private def forceDropTableFromMetaStore(sparkSession: SparkSession): Unit = {
-    if (forceDrop) {
-      val childTableName = tableName + "_" + dataMapName
-      LOGGER.info(s"Trying to force drop $childTableName from metastore")
-      val childCarbonTable: Option[CarbonTable] = try {
-        Some(CarbonEnv.getCarbonTable(databaseNameOp, 
childTableName)(sparkSession))
-      } catch {
-        case _: Exception =>
-          LOGGER.warn(s"Child table $childTableName not found in metastore")
-          None
-      }
-      if (childCarbonTable.isDefined) {
-        commandToRun = CarbonDropTableCommand(
-          ifExistsSet = true,
-          Some(childCarbonTable.get.getDatabaseName),
-          childCarbonTable.get.getTableName,
-          dropChildTable = true)
-        commandToRun.processMetadata(sparkSession)
-      }
-    }
-  }
-
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
-    if (commandToRun != null) {
-      DataMapStoreManager.getInstance().clearDataMap(
-        commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName)
-      commandToRun.processData(sparkSession)
+    if (dataMapProvider != null) {
+      dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index c1f86ef..705455f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -93,7 +93,7 @@ case class CarbonAlterTableDropPartitionCommand(
     partitionInfo.dropPartition(partitionIndex)
 
     // read TableInfo
-    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index efd1216..99691d2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -88,7 +88,7 @@ case class CarbonAlterTableSplitPartitionCommand(
     updatePartitionInfo(partitionInfo, partitionIds)
 
     // read TableInfo
-    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+    val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()
     val wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
       dbName, tableName, tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
deleted file mode 100644
index c02ac4f..0000000
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.preaaggregate
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-
-import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
-import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-
-/**
- * Below command class will be used to create pre-aggregate table
- * and updating the parent table about the child table information
- * It will be either success or nothing happen in case of failure:
- * 1. failed to create pre aggregate table.
- * 2. failed to update main table
- *
- */
-case class CreatePreAggregateTableCommand(
-    dataMapName: String,
-    parentTableIdentifier: TableIdentifier,
-    dataMapProvider: DataMapProvider,
-    dmProperties: Map[String, String],
-    queryString: String,
-    timeSeriesFunction: Option[String] = None,
-    ifNotExistsSet: Boolean = false)
-  extends AtomicRunnableCommand {
-
-  var parentTable: CarbonTable = _
-  var loadCommand: CarbonLoadDataCommand = _
-
-  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val updatedQuery = new 
CarbonSpark2SqlParser().addPreAggFunction(queryString)
-    val df = sparkSession.sql(updatedQuery)
-    val fieldRelationMap = 
PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
-      df.logicalPlan, queryString)
-    val fields = fieldRelationMap.keySet.toSeq
-    val tableProperties = mutable.Map[String, String]()
-    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
-    parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    if 
(!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
-      throw new MalformedDataMapCommandException(
-        "Parent table name is different in select and create")
-    }
-    var neworder = Seq[String]()
-    val parentOrder = 
parentTable.getSortColumns(parentTable.getTableName).asScala
-    parentOrder.foreach(parentcol =>
-      fields.filter(col => 
(fieldRelationMap.get(col).get.aggregateFunction.isEmpty) &&
-                           (parentcol.equals(fieldRelationMap.get(col).get.
-                             columnTableRelationList.get(0).parentColumnName)))
-        .map(cols => neworder :+= cols.column)
-    )
-    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, 
neworder.mkString(","))
-    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
-      getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
-      .LOAD_SORT_SCOPE_DEFAULT))
-    tableProperties
-      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, 
parentTable.getBlockSizeInMB.toString)
-    val tableIdentifier =
-      TableIdentifier(parentTableIdentifier.table + "_" + dataMapName,
-        parentTableIdentifier.database)
-    // prepare table model of the collected tokens
-    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
-      ifNotExistPresent = ifNotExistsSet,
-      new 
CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
-      tableIdentifier.table.toLowerCase,
-      fields,
-      Seq(),
-      tableProperties,
-      None,
-      isAlterFlow = false,
-      None)
-    // updating the relation identifier, this will be stored in child table
-    // which can be used during dropping of pre-aggreate table as parent table 
will
-    // also get updated
-    if(timeSeriesFunction.isDefined) {
-      TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
-      TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
-        fieldRelationMap,
-        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get)
-      TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
-        dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get,
-      timeSeriesFunction.get)
-    }
-    tableModel.parentTable = Some(parentTable)
-    tableModel.dataMapRelation = Some(fieldRelationMap)
-    val tablePath = if (dmProperties.contains("path")) {
-      dmProperties("path")
-    } else {
-      CarbonEnv.getTablePath(tableModel.databaseNameOp, 
tableModel.tableName)(sparkSession)
-    }
-    CarbonCreateTableCommand(TableNewProcessor(tableModel),
-      tableModel.ifNotExistsSet, Some(tablePath), isVisible = 
false).run(sparkSession)
-
-    val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
-    val tableInfo = table.getTableInfo
-    // child schema object which will be updated on parent table about the
-    val childSchema = tableInfo.getFactTable.buildChildSchema(
-      dataMapName,
-      dataMapProvider.getClassName,
-      tableInfo.getDatabaseName,
-      queryString,
-      "AGGREGATION")
-    dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
-
-    // updating the parent table about child table
-    PreAggregateUtil.updateMainTable(
-      parentTable,
-      childSchema,
-      sparkSession)
-    // After updating the parent carbon table with data map entry extract the 
latest table object
-    // to be used in further create process.
-    parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
-      parentTableIdentifier.table)(sparkSession)
-    val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
-      
PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema,
-        parentTable.getTableName,
-        parentTable.getDatabaseName)
-    } else {
-      queryString
-    }
-    val dataFrame = sparkSession.sql(new 
CarbonSpark2SqlParser().addPreAggLoadFunction(
-      updatedLoadQuery)).drop("preAggLoad")
-    loadCommand = PreAggregateUtil.createLoadCommandForChild(
-      childSchema.getChildSchema.getListOfColumns,
-      tableIdentifier,
-      dataFrame,
-      false,
-      sparkSession = sparkSession)
-    loadCommand.processMetadata(sparkSession)
-    Seq.empty
-  }
-
-  override def undoMetadata(sparkSession: SparkSession, exception: Exception): 
Seq[Row] = {
-    // drop child table and undo the change in table info of main table
-    CarbonDropDataMapCommand(
-      dataMapName,
-      ifExistsSet = true,
-      parentTableIdentifier.database,
-      parentTableIdentifier.table,
-      forceDrop = true).run(sparkSession)
-    Seq.empty
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // load child table if parent table has existing segments
-    // This will be used to check if the parent table has any segments or not. 
If not then no
-    // need to fire load for pre-aggregate table. Therefore reading the load 
details for PARENT
-    // table.
-    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
-    val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
-    if (loadAvailable.exists(load => load.getSegmentStatus == 
SegmentStatus.INSERT_IN_PROGRESS ||
-      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
-      throw new UnsupportedOperationException(
-        "Cannot create pre-aggregate table when insert is in progress on main 
table")
-    } else if (loadAvailable.nonEmpty) {
-      // Passing segmentToLoad as * because we want to load all the segments 
into the
-      // pre-aggregate table even if the user has set some segments on the 
parent table.
-      loadCommand.dataFrame = Some(PreAggregateUtil
-        .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
-      PreAggregateUtil.startDataLoadForDataMap(
-        TableIdentifier(parentTable.getTableName, 
Some(parentTable.getDatabaseName)),
-        segmentToLoad = "*",
-        validateSegments = true,
-        loadCommand,
-        isOverwrite = false,
-        sparkSession)
-    }
-    Seq.empty
-  }
-}
-
-

Reply via email to