[CARBONDATA-2416] Support DEFERRED REBUILD when creating DataMap

1. REFRESH DATAMAP is changed to REBUILD DATAMAP command
2. When creating datamap, user can choose to load the datamap immediately or 
later by manually trigger REBUILD DATAMAP command

This closes #2255


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/747be9b1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/747be9b1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/747be9b1

Branch: refs/heads/master
Commit: 747be9b111ab0a12e7550124a9facccacf8ad861
Parents: fb12897
Author: Jacky Li <jacky.li...@qq.com>
Authored: Tue May 1 17:17:33 2018 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Wed May 9 18:59:03 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |  75 +++--
 .../core/datamap/DataMapProvider.java           |   3 +-
 .../core/datamap/DataMapStoreManager.java       |  19 +-
 .../core/datamap/dev/DataMapBuilder.java        |  38 +++
 .../core/datamap/dev/DataMapFactory.java        |   5 +-
 .../core/datamap/dev/DataMapRefresher.java      |  36 ---
 .../datamap/status/DataMapStatusDetail.java     |   4 +
 .../datamap/status/DataMapStatusManager.java    |  30 +-
 .../blockletindex/BlockletDataMapFactory.java   |   4 +-
 .../schema/datamap/DataMapProperty.java         |  39 +++
 .../metadata/schema/table/DataMapSchema.java    |  16 +-
 .../bloom/BloomCoarseGrainDataMapFactory.java   |   6 +-
 .../datamap/bloom/BloomDataMapBuilder.java      |  91 ++++++
 .../datamap/bloom/BloomDataMapRefresher.java    |  91 ------
 .../examples/MinMaxIndexDataMapFactory.java     |  30 +-
 .../datamap/lucene/LuceneDataMapBuilder.java    | 224 +++++++++++++
 .../lucene/LuceneDataMapFactoryBase.java        |   9 +-
 .../datamap/lucene/LuceneDataMapRefresher.java  | 224 -------------
 .../hadoop/api/CarbonInputFormat.java           |  32 +-
 .../hadoop/api/CarbonOutputCommitter.java       |   2 +-
 .../lucene/LuceneFineGrainDataMapSuite.scala    |  68 ++--
 .../preaggregate/TestPreAggCreateCommand.scala  |  10 +
 .../testsuite/datamap/CGDataMapTestCase.scala   |   6 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |   6 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |   8 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  71 ++++-
 .../detailquery/SearchModeTestCase.scala        |   2 -
 .../TestInsertAndOtherCommandConcurrent.scala   |  59 ++--
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   3 +-
 .../carbondata/datamap/DataMapProperty.java     |  33 --
 .../datamap/IndexDataMapProvider.java           |   2 +-
 .../datamap/PreAggregateDataMapProvider.java    |  12 +-
 .../datamap/IndexDataMapRebuildRDD.scala        | 318 +++++++++++++++++++
 .../datamap/IndexDataMapRefreshRDD.scala        | 317 ------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../org/apache/spark/sql/CarbonSession.scala    |  11 +
 .../datamap/CarbonCreateDataMapCommand.scala    |  29 +-
 .../datamap/CarbonDataMapRebuildCommand.scala   |  56 ++++
 .../datamap/CarbonDataMapRefreshCommand.scala   |  56 ----
 .../datasources/SparkCarbonFileFormat.scala     |   5 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  20 +-
 .../bloom/BloomCoarseGrainDataMapSuite.scala    | 187 +++++++++--
 .../datamap/DataMapWriterListener.java          |   8 +-
 .../store/worker/SearchRequestHandler.java      |  56 ++--
 .../scala/org/apache/spark/rpc/Master.scala     |  18 +-
 .../org/apache/spark/search/Searcher.scala      |   8 +-
 46 files changed, 1318 insertions(+), 1033 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index 478254d..7cdabd6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -21,7 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -29,6 +29,8 @@ import 
org.apache.carbondata.core.datamap.dev.expr.AndDataMapExprWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapperImpl;
 import org.apache.carbondata.core.datamap.dev.expr.OrDataMapExprWrapper;
+import org.apache.carbondata.core.datamap.status.DataMapStatusDetail;
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
@@ -56,34 +58,42 @@ import 
org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 @InterfaceAudience.Internal
 public class DataMapChooser {
 
-  private static DataMapChooser INSTANCE;
+  private CarbonTable carbonTable;
+  private List<TableDataMap> cgDataMaps;
+  private List<TableDataMap> fgDataMaps;
 
-  private DataMapChooser() { }
-
-  public static DataMapChooser get() {
-    if (INSTANCE == null) {
-      INSTANCE = new DataMapChooser();
+  public DataMapChooser(CarbonTable carbonTable) throws IOException {
+    this.carbonTable = carbonTable;
+    // read all datamaps for this table and populate CG and FG datamap list
+    List<TableDataMap> visibleDataMaps =
+        DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable);
+    Map<String, DataMapStatusDetail> map = 
DataMapStatusManager.readDataMapStatusMap();
+    cgDataMaps = new ArrayList<>(visibleDataMaps.size());
+    fgDataMaps = new ArrayList<>(visibleDataMaps.size());
+    for (TableDataMap visibleDataMap : visibleDataMaps) {
+      DataMapStatusDetail status = 
map.get(visibleDataMap.getDataMapSchema().getDataMapName());
+      if (status != null && status.isEnabled()) {
+        DataMapLevel level = 
visibleDataMap.getDataMapFactory().getDataMapLevel();
+        if (level == DataMapLevel.CG) {
+          cgDataMaps.add(visibleDataMap);
+        } else {
+          fgDataMaps.add(visibleDataMap);
+        }
+      }
     }
-    return INSTANCE;
   }
 
   /**
    * Return a chosen datamap based on input filter. See {@link DataMapChooser}
    */
-  public DataMapExprWrapper choose(CarbonTable carbonTable, FilterResolverIntf 
filter)
-      throws IOException {
-    Objects.requireNonNull(carbonTable);
+  public DataMapExprWrapper choose(FilterResolverIntf filter) {
     if (filter != null) {
       Expression expression = filter.getFilterExpression();
       // First check for FG datamaps if any exist
-      List<TableDataMap> allDataMapFG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, 
DataMapLevel.FG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, filter);
+      ExpressionTuple tuple = selectDataMap(expression, fgDataMaps, filter);
       if (tuple.dataMapExprWrapper == null) {
         // Check for CG datamap
-        List<TableDataMap> allDataMapCG =
-            
DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, 
DataMapLevel.CG);
-        tuple = selectDataMap(expression, allDataMapCG, filter);
+        tuple = selectDataMap(expression, cgDataMaps, filter);
       }
       if (tuple.dataMapExprWrapper != null) {
         return tuple.dataMapExprWrapper;
@@ -97,33 +107,22 @@ public class DataMapChooser {
   /**
    * Return a chosen FG datamap based on input filter. See {@link 
DataMapChooser}
    */
-  public DataMapExprWrapper chooseFGDataMap(CarbonTable carbonTable,
-      FilterResolverIntf resolverIntf) throws IOException {
-    if (resolverIntf != null) {
-      Expression expression = resolverIntf.getFilterExpression();
-      // First check for FG datamaps if any exist
-      List<TableDataMap> allDataMapFG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, 
DataMapLevel.FG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapFG, 
resolverIntf);
-      if (tuple.dataMapExprWrapper != null) {
-        return tuple.dataMapExprWrapper;
-      }
-    }
-    // Return the default datamap if no other datamap exists.
-    return null;
+  public DataMapExprWrapper chooseFGDataMap(FilterResolverIntf resolverIntf) {
+    return chooseDataMap(DataMapLevel.FG, resolverIntf);
   }
 
   /**
    * Return a chosen CG datamap based on input filter. See {@link 
DataMapChooser}
    */
-  public DataMapExprWrapper chooseCGDataMap(CarbonTable carbonTable,
-      FilterResolverIntf resolverIntf) throws IOException {
+  public DataMapExprWrapper chooseCGDataMap(FilterResolverIntf resolverIntf) {
+    return chooseDataMap(DataMapLevel.CG, resolverIntf);
+  }
+
+  private DataMapExprWrapper chooseDataMap(DataMapLevel level, 
FilterResolverIntf resolverIntf) {
     if (resolverIntf != null) {
       Expression expression = resolverIntf.getFilterExpression();
-      // Check for CG datamap
-      List<TableDataMap> allDataMapCG =
-          DataMapStoreManager.getInstance().getAllVisibleDataMap(carbonTable, 
DataMapLevel.CG);
-      ExpressionTuple tuple = selectDataMap(expression, allDataMapCG, 
resolverIntf);
+      List<TableDataMap> datamaps = level == DataMapLevel.CG ? cgDataMaps : 
fgDataMaps;
+      ExpressionTuple tuple = selectDataMap(expression, datamaps, 
resolverIntf);
       if (tuple.dataMapExprWrapper != null) {
         return tuple.dataMapExprWrapper;
       }
@@ -137,7 +136,7 @@ public class DataMapChooser {
    * @param resolverIntf
    * @return
    */
-  public DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable,
+  public static DataMapExprWrapper getDefaultDataMap(CarbonTable carbonTable,
       FilterResolverIntf resolverIntf) {
     // Return the default datamap if no other datamap exists.
     return new DataMapExprWrapperImpl(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index 775b912..05ba7cf 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -104,13 +104,12 @@ public abstract class DataMapProvider {
    * Rebuild the datamap by loading all existing data from mainTable
    * This is called when refreshing the datamap when
    * 1. after datamap creation and if `autoRefreshDataMap` is set to true
-   * 2. user manually trigger refresh datamap command
+   * 2. user manually trigger REBUILD DATAMAP command
    */
   public abstract void rebuild() throws IOException, NoSuchDataMapException;
 
   /**
    * Build the datamap incrementally by loading specified segment data
-   * This is called when user manually trigger refresh datamap
    */
   public abstract void incrementalBuild(String[] segmentIds) throws 
IOException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 29a1106..9f7af2b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -79,26 +79,9 @@ public final class DataMapStoreManager {
   }
 
   /**
-   * It gives all visible datamaps of type @mapType except the default datamap.
-   */
-  public List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable, 
DataMapLevel mapType)
-      throws IOException {
-    List<TableDataMap> dataMaps = new ArrayList<>();
-    List<TableDataMap> tableIndices = getAllVisibleDataMap(carbonTable);
-    if (tableIndices != null) {
-      for (TableDataMap dataMap : tableIndices) {
-        if (mapType == dataMap.getDataMapFactory().getDataMapLevel()) {
-          dataMaps.add(dataMap);
-        }
-      }
-    }
-    return dataMaps;
-  }
-
-  /**
    * It only gives the visible datamaps
    */
-  private List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable) 
throws IOException {
+  List<TableDataMap> getAllVisibleDataMap(CarbonTable carbonTable) throws 
IOException {
     CarbonSessionInfo sessionInfo = 
ThreadLocalSessionInfo.getCarbonSessionInfo();
     List<TableDataMap> allDataMaps = getAllDataMap(carbonTable);
     Iterator<TableDataMap> dataMapIterator = allDataMaps.iterator();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
new file mode 100644
index 0000000..570a1ce
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap.dev;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * DataMapBuilder is used to implement REBUILD DATAMAP command, it reads all 
existing
+ * data in main table and load them into the DataMap. All existing index data 
will be deleted
+ * if there are existing data in the datamap.
+ */
+@InterfaceAudience.Developer("DataMap")
+public interface DataMapBuilder {
+  void initialize() throws IOException;
+
+  void addRow(int blockletId, int pageId, int rowId, Object[] values) throws 
IOException;
+
+  void finish() throws IOException;
+
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index ae34be7..ad709a0 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -61,10 +61,11 @@ public abstract class DataMapFactory<T extends DataMap> {
       throws IOException;
 
   /**
-   * Create a new Refresher for this datamap, to rebuild the specified
+   * Create a new DataMapBuilder for this datamap, to rebuild the specified
    * segment and shard data in the main table.
+   * TODO: refactor to unify with DataMapWriter
    */
-  public abstract DataMapRefresher createRefresher(Segment segment, String 
shardName)
+  public abstract DataMapBuilder createBuilder(Segment segment, String 
shardName)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
deleted file mode 100644
index 770ceca..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapRefresher.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datamap.dev;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-
-/**
- * Interface to rebuild the datamap for main table with existing data
- */
-@InterfaceAudience.Developer("DataMap")
-public interface DataMapRefresher {
-  void initialize() throws IOException;
-
-  void addRow(int blockletId, int pageId, int rowId, Object[] values) throws 
IOException;
-
-  void finish() throws IOException;
-
-  void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
index 1ecb1b1..d1e5921 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusDetail.java
@@ -51,6 +51,10 @@ public class DataMapStatusDetail implements Serializable {
     return status;
   }
 
+  public boolean isEnabled() {
+    return status == DataMapStatus.ENABLED;
+  }
+
   public void setStatus(DataMapStatus status) {
     this.status = status;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
index dcad80b..b540146 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/status/DataMapStatusManager.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.core.datamap.status;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
@@ -51,6 +53,15 @@ public class DataMapStatusManager {
     return storageProvider.getDataMapStatusDetails();
   }
 
+  public static Map<String, DataMapStatusDetail> readDataMapStatusMap() throws 
IOException {
+    DataMapStatusDetail[] details = storageProvider.getDataMapStatusDetails();
+    Map<String, DataMapStatusDetail> map = new HashMap<>(details.length);
+    for (DataMapStatusDetail detail : details) {
+      map.put(detail.getDataMapName(), detail);
+    }
+    return map;
+  }
+
   public static void disableDataMap(String dataMapName) throws IOException, 
NoSuchDataMapException {
     DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     if (dataMapSchema != null) {
@@ -60,10 +71,19 @@ public class DataMapStatusManager {
     }
   }
 
-  public static void disableDataMapsOfTable(CarbonTable table) throws 
IOException {
+  /**
+   * This method will disable all lazy (DEFERRED REBUILD) datamap in the given 
table
+   */
+  public static void disableAllLazyDataMaps(CarbonTable table) throws 
IOException {
     List<DataMapSchema> allDataMapSchemas =
         DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
-    storageProvider.updateDataMapStatus(allDataMapSchemas, 
DataMapStatus.DISABLED);
+    List<DataMapSchema> dataMapToBeDisabled = new 
ArrayList<>(allDataMapSchemas.size());
+    for (DataMapSchema dataMap : allDataMapSchemas) {
+      if (dataMap.isLazy()) {
+        dataMapToBeDisabled.add(dataMap);
+      }
+    }
+    storageProvider.updateDataMapStatus(dataMapToBeDisabled, 
DataMapStatus.DISABLED);
   }
 
   public static void enableDataMap(String dataMapName) throws IOException, 
NoSuchDataMapException {
@@ -75,12 +95,6 @@ public class DataMapStatusManager {
     }
   }
 
-  public static void enableDataMapsOfTable(CarbonTable table) throws 
IOException {
-    List<DataMapSchema> allDataMapSchemas =
-        DataMapStoreManager.getInstance().getDataMapSchemasOfTable(table);
-    storageProvider.updateDataMapStatus(allDataMapSchemas, 
DataMapStatus.ENABLED);
-  }
-
   public static void dropDataMap(String dataMapName) throws IOException, 
NoSuchDataMapException {
     DataMapSchema dataMapSchema = getDataMapSchema(dataMapName);
     if (dataMapSchema != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index c3df721..e502251 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.CacheableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -91,7 +91,7 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) {
+  public DataMapBuilder createBuilder(Segment segment, String shardName) {
     throw new UnsupportedOperationException("not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
new file mode 100644
index 0000000..9bd78da
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapProperty.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Property that can be specified when creating DataMap
+ */
+@InterfaceAudience.Internal
+public class DataMapProperty {
+
+  /**
+   * Used to specify the store location of the datamap
+   */
+  public static final String PARTITIONING = "partitioning";
+  public static final String PATH = "path";
+
+  /**
+   * For datamap created with 'WITH DEFERRED REBUILD' syntax, we will add this
+   * property internally
+   */
+  public static final String DEFERRED_REBUILD = "_internal.deferred.rebuild";
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index b22a3d4..611f298 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -28,6 +28,8 @@ import java.util.Objects;
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProperty;
+
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.INDEX_COLUMNS;
 
 import com.google.gson.Gson;
@@ -157,7 +159,16 @@ public class DataMapSchema implements Serializable, 
Writable {
     }
   }
 
-  @Override public void write(DataOutput out) throws IOException {
+  /**
+   * Return true if this datamap is lazy (created with DEFERRED REBUILD syntax)
+   */
+  public boolean isLazy() {
+    String deferredRebuild = 
getProperties().get(DataMapProperty.DEFERRED_REBUILD);
+    return deferredRebuild != null && deferredRebuild.equalsIgnoreCase("true");
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
     out.writeUTF(dataMapName);
     out.writeUTF(providerName);
     boolean isRelationIdentifierExists = null != relationIdentifier;
@@ -181,7 +192,8 @@ public class DataMapSchema implements Serializable, 
Writable {
     }
   }
 
-  @Override public void readFields(DataInput in) throws IOException {
+  @Override
+  public void readFields(DataInput in) throws IOException {
     this.dataMapName = in.readUTF();
     this.providerName = in.readUTF();
     boolean isRelationIdnentifierExists = in.readBoolean();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
index 1d6eab7..95c21fa 100644
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java
@@ -33,9 +33,9 @@ import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -173,8 +173,8 @@ public class BloomCoarseGrainDataMapFactory extends 
DataMapFactory<CoarseGrainDa
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) 
throws IOException {
-    return new BloomDataMapRefresher(getCarbonTable().getTablePath(), 
this.dataMapName,
+  public DataMapBuilder createBuilder(Segment segment, String shardName) 
throws IOException {
+    return new BloomDataMapBuilder(getCarbonTable().getTablePath(), 
this.dataMapName,
         this.dataMapMeta.getIndexedColumns(), segment, shardName,
         this.bloomFilterSize, this.bloomFilterFpp);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
new file mode 100644
index 0000000..fa1aef7
--- /dev/null
+++ 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.bloom;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Implementation for BloomFilter DataMap to rebuild the datamap for main 
table with existing data
+ */
+@InterfaceAudience.Internal
+public class BloomDataMapBuilder extends BloomDataMapWriter implements 
DataMapBuilder {
+
+  BloomDataMapBuilder(String tablePath, String dataMapName, List<CarbonColumn> 
indexColumns,
+      Segment segment, String shardName, int bloomFilterSize, double 
bloomFilterFpp)
+      throws IOException {
+    super(tablePath, dataMapName, indexColumns, segment, shardName,
+        bloomFilterSize, bloomFilterFpp);
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.resetBloomFilters();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
+    if (currentBlockletId != blockletId) {
+      // new blocklet started, flush bloom filter to datamap fileh
+      super.writeBloomDataMapFile();
+      currentBlockletId = blockletId;
+    }
+    // for each indexed column, add the data to bloom filter
+    List<CarbonColumn> indexColumns = getIndexColumns();
+    for (int i = 0; i < indexColumns.size(); i++) {
+      Object data = values[i];
+      DataType dataType = indexColumns.get(i).getDataType();
+      byte[] indexValue;
+      if (DataTypes.STRING == dataType) {
+        indexValue = getStringData(data);
+      } else if (DataTypes.BYTE_ARRAY == dataType) {
+        byte[] originValue = (byte[]) data;
+        // String and byte array is LV encoded, L is short type
+        indexValue = new byte[originValue.length - 2];
+        System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 
2);
+      } else {
+        indexValue = CarbonUtil.getValueAsBytes(dataType, data);
+      }
+      indexBloomFilters.get(i).put(indexValue);
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    super.finish();
+  }
+
+  @Override
+  public void close() throws IOException {
+    releaseResouce();
+  }
+
+  @Override
+  protected byte[] getStringData(Object data) {
+    return ((String) 
data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
----------------------------------------------------------------------
diff --git 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
 
b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
deleted file mode 100644
index 8e05133..0000000
--- 
a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.bloom;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.annotations.InterfaceAudience;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Implementation for BloomFilter DataMap to rebuild the datamap for main 
table with existing data
- */
-@InterfaceAudience.Internal
-public class BloomDataMapRefresher extends BloomDataMapWriter implements 
DataMapRefresher {
-
-  BloomDataMapRefresher(String tablePath, String dataMapName, 
List<CarbonColumn> indexColumns,
-      Segment segment, String shardName, int bloomFilterSize, double 
bloomFilterFpp)
-      throws IOException {
-    super(tablePath, dataMapName, indexColumns, segment, shardName,
-        bloomFilterSize, bloomFilterFpp);
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    super.resetBloomFilters();
-  }
-
-  @Override
-  public void addRow(int blockletId, int pageId, int rowId, Object[] values) {
-    if (currentBlockletId != blockletId) {
-      // new blocklet started, flush bloom filter to datamap fileh
-      super.writeBloomDataMapFile();
-      currentBlockletId = blockletId;
-    }
-    // for each indexed column, add the data to bloom filter
-    List<CarbonColumn> indexColumns = getIndexColumns();
-    for (int i = 0; i < indexColumns.size(); i++) {
-      Object data = values[i];
-      DataType dataType = indexColumns.get(i).getDataType();
-      byte[] indexValue;
-      if (DataTypes.STRING == dataType) {
-        indexValue = getStringData(data);
-      } else if (DataTypes.BYTE_ARRAY == dataType) {
-        byte[] originValue = (byte[]) data;
-        // String and byte array is LV encoded, L is short type
-        indexValue = new byte[originValue.length - 2];
-        System.arraycopy(originValue, 2, indexValue, 0, originValue.length - 
2);
-      } else {
-        indexValue = CarbonUtil.getValueAsBytes(dataType, data);
-      }
-      indexBloomFilters.get(i).put(indexValue);
-    }
-  }
-
-  @Override
-  public void finish() throws IOException {
-    super.finish();
-  }
-
-  @Override
-  public void close() throws IOException {
-    releaseResouce();
-  }
-
-  @Override
-  protected byte[] getStringData(Object data) {
-    return ((String) 
data).getBytes(CarbonCommonConstants.DEFAULT_CHARSET_CLASS);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 4197b79..84b9e65 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -21,14 +21,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
@@ -42,8 +41,6 @@ import 
org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.Transformer;
 import org.apache.commons.lang3.StringUtils;
 
 /**
@@ -52,25 +49,18 @@ import org.apache.commons.lang3.StringUtils;
 public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
   private static final LogService LOGGER = LogServiceFactory.getLogService(
       MinMaxIndexDataMapFactory.class.getName());
-  private DataMapSchema dataMapSchema;
   private DataMapMeta dataMapMeta;
   private String dataMapName;
   private AbsoluteTableIdentifier identifier;
 
-  public MinMaxIndexDataMapFactory(CarbonTable carbonTable) {
-    super(carbonTable);
-  }
+  public MinMaxIndexDataMapFactory(CarbonTable carbonTable, DataMapSchema 
dataMapSchema) {
+    super(carbonTable, dataMapSchema);
 
-  // this is an example for datamap, we can choose the columns and operations 
that
-  // will be supported by this datamap. Furthermore, we can add cache-support 
for this datamap.
-  @Override public void init(DataMapSchema dataMapSchema)
-      throws IOException, MalformedDataMapCommandException {
-    this.dataMapSchema = dataMapSchema;
-    this.identifier = carbonTable.getAbsoluteTableIdentifier();
-    this.dataMapName = dataMapSchema.getDataMapName();
+    // this is an example for datamap, we can choose the columns and 
operations that
+    // will be supported by this datamap. Furthermore, we can add 
cache-support for this datamap.
 
     // columns that will be indexed
-    List<CarbonColumn> allColumns = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
+    List<CarbonColumn> allColumns = 
getCarbonTable().getCreateOrderColumn(identifier.getTableName());
 
     // operations that will be supported on the indexed columns
     List<ExpressionType> optOperations = new ArrayList<>();
@@ -91,12 +81,14 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
    * @param shardName
    * @return
    */
-  @Override public DataMapWriter createWriter(Segment segment, String 
shardName) {
-    return new MinMaxDataWriter(carbonTable, dataMapSchema, segment, shardName,
+  @Override
+  public DataMapWriter createWriter(Segment segment, String shardName) {
+    return new MinMaxDataWriter(getCarbonTable(), getDataMapSchema(), segment, 
shardName,
         dataMapMeta.getIndexedColumns());
   }
 
-  @Override public DataMapRefresher createRefresher(Segment segment, String 
shardName)
+  @Override
+  public DataMapBuilder createBuilder(Segment segment, String shardName)
       throws IOException {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
new file mode 100644
index 0000000..35c07f0
--- /dev/null
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapBuilder.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.IntRangeField;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.solr.store.hdfs.HdfsDirectory;
+
+public class LuceneDataMapBuilder implements DataMapBuilder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
+
+  private String dataMapPath;
+
+  private List<CarbonColumn> indexColumns;
+
+  private int columnsCount;
+
+  private IndexWriter indexWriter = null;
+
+  private IndexWriter pageIndexWriter = null;
+
+  private Analyzer analyzer = null;
+
+  LuceneDataMapBuilder(String tablePath, String dataMapName,
+      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
+    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
+        tablePath, segment.getSegmentNo(), dataMapName, shardName);
+    this.indexColumns = indexColumns;
+    this.columnsCount = indexColumns.size();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    // get index path, put index data into segment's path
+    Path indexPath = FileFactory.getPath(dataMapPath);
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+
+    // if index path exists, should delete it because we are
+    // rebuilding the whole datamap for all segments
+    if (fs.exists(indexPath)) {
+      fs.delete(indexPath, true);
+    }
+    if (!fs.mkdirs(indexPath)) {
+      LOGGER.error("Failed to create directory " + indexPath);
+    }
+
+    if (null == analyzer) {
+      analyzer = new StandardAnalyzer();
+    }
+
+    // create a index writer
+    Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
+      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
+    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
+  }
+
+  private IndexWriter createPageIndexWriter() throws IOException {
+    // save index data into ram, write into disk after one page finished
+    RAMDirectory ramDir = new RAMDirectory();
+    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
+  }
+
+  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
+
+    Directory directory = pageIndexWriter.getDirectory();
+
+    // close ram writer
+    pageIndexWriter.close();
+
+    // add ram index data into disk
+    indexWriter.addIndexes(directory);
+
+    // delete this ram data
+    directory.close();
+  }
+
+  @Override
+  public void addRow(int blockletId, int pageId, int rowId, Object[] values) 
throws IOException {
+    if (rowId == 0) {
+      if (pageIndexWriter != null) {
+        addPageIndex(pageIndexWriter);
+      }
+      pageIndexWriter = createPageIndexWriter();
+    }
+
+    // create a new document
+    Document doc = new Document();
+
+    // add blocklet Id
+    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
+    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
+
+    // add page id
+    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
+    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
+
+    // add row id
+    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
+    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
+
+    // add other fields
+    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
+      CarbonColumn column = indexColumns.get(colIdx);
+      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
+    }
+
+    pageIndexWriter.addDocument(doc);
+  }
+
+  private boolean addField(Document doc, String fieldName, DataType type, 
Object value) {
+    if (type == DataTypes.STRING) {
+      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
+    } else if (type == DataTypes.BYTE) {
+      // byte type , use int range to deal with byte, lucene has no byte type
+      IntRangeField field =
+          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
+      field.setIntValue((int) value);
+      doc.add(field);
+    } else if (type == DataTypes.SHORT) {
+      // short type , use int range to deal with short type, lucene has no 
short type
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
+          new int[] { Short.MAX_VALUE });
+      field.setShortValue((short) value);
+      doc.add(field);
+    } else if (type == DataTypes.INT) {
+      // int type , use int point to deal with int type
+      doc.add(new IntPoint(fieldName, (int) value));
+    } else if (type == DataTypes.LONG) {
+      // long type , use long point to deal with long type
+      doc.add(new LongPoint(fieldName, (long) value));
+    } else if (type == DataTypes.FLOAT) {
+      doc.add(new FloatPoint(fieldName, (float) value));
+    } else if (type == DataTypes.DOUBLE) {
+      doc.add(new DoublePoint(fieldName, (double) value));
+    } else if (type == DataTypes.DATE) {
+      // TODO: how to get data value
+    } else if (type == DataTypes.TIMESTAMP) {
+      // TODO: how to get
+    } else if (type == DataTypes.BOOLEAN) {
+      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
+      field.setIntValue((boolean) value ? 1 : 0);
+      doc.add(field);
+    } else {
+      LOGGER.error("unsupport data type " + type);
+      throw new RuntimeException("unsupported data type " + type);
+    }
+    return true;
+  }
+
+  @Override
+  public void finish() throws IOException {
+    if (indexWriter != null && pageIndexWriter != null) {
+      addPageIndex(pageIndexWriter);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (indexWriter != null) {
+      indexWriter.close();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index d52cef9..4c6aec3 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -153,15 +153,16 @@ abstract class LuceneDataMapFactoryBase<T extends 
DataMap> extends DataMapFactor
   }
 
   @Override
-  public DataMapRefresher createRefresher(Segment segment, String shardName) {
-    return new LuceneDataMapRefresher(getCarbonTable().getTablePath(), 
dataMapName,
+  public DataMapBuilder createBuilder(Segment segment, String shardName) {
+    return new LuceneDataMapBuilder(getCarbonTable().getTablePath(), 
dataMapName,
         segment, shardName, dataMapMeta.getIndexedColumns());
   }
 
   /**
    * Get all distributable objects of a segmentid
    */
-  @Override public List<DataMapDistributable> toDistributable(Segment segment) 
{
+  @Override
+  public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> lstDataMapDistribute = new ArrayList<>();
     CarbonFile[] indexDirs =
         getAllIndexDirs(tableIdentifier.getTablePath(), 
segment.getSegmentNo());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
deleted file mode 100644
index ee500ef..0000000
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapRefresher.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.DataMapRefresher;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
-import org.apache.lucene.codecs.lucene62.Lucene62Codec;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.DoublePoint;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.FloatPoint;
-import org.apache.lucene.document.IntPoint;
-import org.apache.lucene.document.IntRangeField;
-import org.apache.lucene.document.LongPoint;
-import org.apache.lucene.document.StoredField;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-import org.apache.solr.store.hdfs.HdfsDirectory;
-
-public class LuceneDataMapRefresher implements DataMapRefresher {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName());
-
-  private String dataMapPath;
-
-  private List<CarbonColumn> indexColumns;
-
-  private int columnsCount;
-
-  private IndexWriter indexWriter = null;
-
-  private IndexWriter pageIndexWriter = null;
-
-  private Analyzer analyzer = null;
-
-  LuceneDataMapRefresher(String tablePath, String dataMapName,
-      Segment segment, String shardName, List<CarbonColumn> indexColumns) {
-    this.dataMapPath = CarbonTablePath.getDataMapStorePathOnShardName(
-        tablePath, segment.getSegmentNo(), dataMapName, shardName);
-    this.indexColumns = indexColumns;
-    this.columnsCount = indexColumns.size();
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    // get index path, put index data into segment's path
-    Path indexPath = FileFactory.getPath(dataMapPath);
-    FileSystem fs = FileFactory.getFileSystem(indexPath);
-
-    // if index path exists, should delete it because we are
-    // rebuilding the whole datamap for all segments
-    if (fs.exists(indexPath)) {
-      fs.delete(indexPath, true);
-    }
-    if (!fs.mkdirs(indexPath)) {
-      LOGGER.error("Failed to create directory " + indexPath);
-    }
-
-    if (null == analyzer) {
-      analyzer = new StandardAnalyzer();
-    }
-
-    // create a index writer
-    Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
-
-    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
-    if (CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
-            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
-        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
-      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
-    } else {
-      indexWriterConfig
-          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
-    }
-
-    indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
-  }
-
-  private IndexWriter createPageIndexWriter() throws IOException {
-    // save index data into ram, write into disk after one page finished
-    RAMDirectory ramDir = new RAMDirectory();
-    return new IndexWriter(ramDir, new IndexWriterConfig(analyzer));
-  }
-
-  private void addPageIndex(IndexWriter pageIndexWriter) throws IOException {
-
-    Directory directory = pageIndexWriter.getDirectory();
-
-    // close ram writer
-    pageIndexWriter.close();
-
-    // add ram index data into disk
-    indexWriter.addIndexes(directory);
-
-    // delete this ram data
-    directory.close();
-  }
-
-  @Override
-  public void addRow(int blockletId, int pageId, int rowId, Object[] values) 
throws IOException {
-    if (rowId == 0) {
-      if (pageIndexWriter != null) {
-        addPageIndex(pageIndexWriter);
-      }
-      pageIndexWriter = createPageIndexWriter();
-    }
-
-    // create a new document
-    Document doc = new Document();
-
-    // add blocklet Id
-    doc.add(new IntPoint(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
-    doc.add(new StoredField(LuceneDataMapWriter.BLOCKLETID_NAME, (int) 
values[columnsCount]));
-
-    // add page id
-    doc.add(new IntPoint(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
-    doc.add(new StoredField(LuceneDataMapWriter.PAGEID_NAME, (int) 
values[columnsCount + 1]));
-
-    // add row id
-    doc.add(new IntPoint(LuceneDataMapWriter.ROWID_NAME, rowId));
-    doc.add(new StoredField(LuceneDataMapWriter.ROWID_NAME, rowId));
-
-    // add other fields
-    for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-      CarbonColumn column = indexColumns.get(colIdx);
-      addField(doc, column.getColName(), column.getDataType(), values[colIdx]);
-    }
-
-    pageIndexWriter.addDocument(doc);
-  }
-
-  private boolean addField(Document doc, String fieldName, DataType type, 
Object value) {
-    if (type == DataTypes.STRING) {
-      doc.add(new TextField(fieldName, (String) value, Field.Store.NO));
-    } else if (type == DataTypes.BYTE) {
-      // byte type , use int range to deal with byte, lucene has no byte type
-      IntRangeField field =
-          new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] 
{ Byte.MAX_VALUE });
-      field.setIntValue((int) value);
-      doc.add(field);
-    } else if (type == DataTypes.SHORT) {
-      // short type , use int range to deal with short type, lucene has no 
short type
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 
Short.MIN_VALUE },
-          new int[] { Short.MAX_VALUE });
-      field.setShortValue((short) value);
-      doc.add(field);
-    } else if (type == DataTypes.INT) {
-      // int type , use int point to deal with int type
-      doc.add(new IntPoint(fieldName, (int) value));
-    } else if (type == DataTypes.LONG) {
-      // long type , use long point to deal with long type
-      doc.add(new LongPoint(fieldName, (long) value));
-    } else if (type == DataTypes.FLOAT) {
-      doc.add(new FloatPoint(fieldName, (float) value));
-    } else if (type == DataTypes.DOUBLE) {
-      doc.add(new DoublePoint(fieldName, (double) value));
-    } else if (type == DataTypes.DATE) {
-      // TODO: how to get data value
-    } else if (type == DataTypes.TIMESTAMP) {
-      // TODO: how to get
-    } else if (type == DataTypes.BOOLEAN) {
-      IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new 
int[] { 1 });
-      field.setIntValue((boolean) value ? 1 : 0);
-      doc.add(field);
-    } else {
-      LOGGER.error("unsupport data type " + type);
-      throw new RuntimeException("unsupported data type " + type);
-    }
-    return true;
-  }
-
-  @Override
-  public void finish() throws IOException {
-    if (indexWriter != null && pageIndexWriter != null) {
-      addPageIndex(pageIndexWriter);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (indexWriter != null) {
-      indexWriter.close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index a5d4df2..c5365d5 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal;
 import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.exception.InvalidConfigurationException;
@@ -420,7 +419,7 @@ m filterExpression
     DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
     List<PartitionSpec> partitionsToPrune = 
getPartitionsToPrune(job.getConfiguration());
     // First prune using default datamap on driver side.
-    DataMapExprWrapper dataMapExprWrapper = DataMapChooser.get()
+    DataMapExprWrapper dataMapExprWrapper = DataMapChooser
         .getDefaultDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
     List<ExtendedBlocklet> prunedBlocklets =
         dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
@@ -428,9 +427,10 @@ m filterExpression
     ExplainCollector.recordDefaultDataMapPruning(
         dataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
 
+    DataMapChooser chooser = new 
DataMapChooser(getOrCreateCarbonTable(job.getConfiguration()));
+
     // Get the available CG datamaps and prune further.
-    DataMapExprWrapper cgDataMapExprWrapper = DataMapChooser.get()
-        .chooseCGDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
+    DataMapExprWrapper cgDataMapExprWrapper = 
chooser.chooseCGDataMap(resolver);
     if (cgDataMapExprWrapper != null) {
       // Prune segments from already pruned blocklets
       pruneSegments(segmentIds, prunedBlocklets);
@@ -447,19 +447,19 @@ m filterExpression
           cgDataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
     }
     // Now try to prune with FG DataMap.
-    dataMapExprWrapper = DataMapChooser.get()
-        .chooseFGDataMap(getOrCreateCarbonTable(job.getConfiguration()), 
resolver);
-    if (dataMapExprWrapper != null && dataMapExprWrapper.getDataMapLevel() == 
DataMapLevel.FG
-        && isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != 
null) {
-      // Prune segments from already pruned blocklets
-      pruneSegments(segmentIds, prunedBlocklets);
-      prunedBlocklets =
-          executeDataMapJob(carbonTable, resolver, segmentIds, 
dataMapExprWrapper, dataMapJob,
-              partitionsToPrune);
+    if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != 
null) {
+      DataMapExprWrapper fgDataMapExprWrapper = 
chooser.chooseFGDataMap(resolver);
+      if (fgDataMapExprWrapper != null) {
+        // Prune segments from already pruned blocklets
+        pruneSegments(segmentIds, prunedBlocklets);
+        prunedBlocklets =
+            executeDataMapJob(carbonTable, resolver, segmentIds, 
fgDataMapExprWrapper, dataMapJob,
+                partitionsToPrune);
 
-      ExplainCollector.recordFGDataMapPruning(
-          dataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
-    }
+        ExplainCollector.recordFGDataMapPruning(
+            fgDataMapExprWrapper.getDataMapSchema(), prunedBlocklets.size());
+      }
+    } // TODO: add a else branch to push FGDataMap pruning to reader side
     return prunedBlocklets;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 2851de2..0bcb188 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -168,7 +168,7 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
       } else {
         CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, 
false, uuid);
       }
-      DataMapStatusManager.disableDataMapsOfTable(carbonTable);
+      DataMapStatusManager.disableAllLazyDataMaps(carbonTable);
       if (operationContext != null) {
         LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent =
             new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
index 406ee41..9981ff5 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -30,7 +30,7 @@ import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandExcept
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 
 class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
 
@@ -62,14 +62,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql("DROP TABLE IF EXISTS datamap_test4")
-
-    sql(
-      """
-        | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'carbondata'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 
'autorefreshdatamap' = 'false')
-      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
   }
 
   test("validate INDEX_COLUMNS DataMap property") {
@@ -132,38 +125,38 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='Name , cIty')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
     checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
 
     sql("drop datamap dm on table datamap_test")
   }
 
-  test("test lucene refresh data map") {
-
+  test("test lucene rebuild data map") {
+    sql("DROP TABLE IF EXISTS datamap_test4")
+    sql(
+      """
+        | CREATE TABLE datamap_test4(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT', 
'autorefreshdatamap' = 'false')
+      """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
 
-    //sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
-
     sql(
       s"""
          | CREATE DATAMAP dm4 ON TABLE datamap_test4
          | USING 'lucene'
-         | DMProperties('INDEX_COLUMNS'='Name , cIty')
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    //sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test4 
OPTIONS('header'='false')")
+    sql("REBUILD DATAMAP dm4 ON TABLE datamap_test4")
 
-    sql("refresh datamap dm4 ON TABLE datamap_test4")
-
-    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test4 where name='n10'"))
+    checkAnswer(sql("SELECT * FROM datamap_test4 WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test4 WHERE 
TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test4 WHERE 
city='c020'"))
 
-    sql("drop datamap dm4 on table datamap_test4")
-
+    sql("drop table datamap_test4")
   }
 
-
   test("test lucene fine grain data map drop") {
     sql("DROP TABLE IF EXISTS datamap_test1")
     sql(
@@ -549,6 +542,7 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test lucene fine grain data map with text-match limit") {
+
     sql(
       s"""
          | CREATE DATAMAP dm ON TABLE datamap_test
@@ -556,7 +550,6 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
     checkAnswer(sql("select count(*) from datamap_test where 
TEXT_MATCH_WITH_LIMIT('name:n10*',10)"),Seq(Row(10)))
     checkAnswer(sql("select count(*) from datamap_test where 
TEXT_MATCH_WITH_LIMIT('name:n10*',50)"),Seq(Row(50)))
     sql("drop datamap dm on table datamap_test")
@@ -570,17 +563,16 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
          | DMProperties('INDEX_COLUMNS'='name , city')
       """.stripMargin)
 
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
-    sql("DROP TABLE IF EXISTS tabl1")
     sql(
       """
         | CREATE TABLE table1(id INT, name STRING, city STRING, age INT)
         | STORED BY 'carbondata'
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
-    sql("INSERT OVERWRITE TABLE table1 select *from datamap_test where 
TEXT_MATCH('name:n*')")
+    sql("INSERT OVERWRITE TABLE table1 select * from datamap_test where 
TEXT_MATCH('name:n*')")
     checkAnswer(sql("select count(*) from table1"),Seq(Row(10000)))
     sql("drop datamap dm on table datamap_test")
+    sql("drop table table1")
   }
 
   test("explain query with lucene datamap") {
@@ -698,7 +690,31 @@ class LuceneFineGrainDataMapSuite extends QueryTest with 
BeforeAndAfterAll {
       sql(s"select * from datamap_test5 where name='n10'"))
     checkAnswer(sql("SELECT * FROM datamap_test5 WHERE 
TEXT_MATCH('city:c020')"),
       sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
+  }
 
+  test("test lucene fine grain datamap rebuild") {
+    sql("DROP TABLE IF EXISTS datamap_test5")
+    sql(
+      """
+        | CREATE TABLE datamap_test5(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test5
+         | USING 'lucene'
+         | WITH DEFERRED REBUILD
+         | DMProperties('INDEX_COLUMNS'='city')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test5 
OPTIONS('header'='false')")
+    val map = DataMapStatusManager.readDataMapStatusMap()
+    assert(!map.get("dm").isEnabled)
+    sql("REBUILD DATAMAP dm ON TABLE datamap_test5")
+    checkAnswer(sql("SELECT * FROM datamap_test5 WHERE 
TEXT_MATCH('city:c020')"),
+      sql(s"SELECT * FROM datamap_test5 WHERE city='c020'"))
+    sql("DROP TABLE IF EXISTS datamap_test5")
   }
 
   test("test text_match on normal table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 629e9d9..fb25141 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -436,6 +436,16 @@ class TestPreAggCreateCommand extends QueryTest with 
BeforeAndAfterAll {
     }
   }
 
+  test("test pre agg datamap with deferred rebuild") {
+    val e = intercept[MalformedDataMapCommandException] {
+      sql("create datamap failure on table PreAggMain1 " +
+          "using 'preaggregate' " +
+          "with deferred rebuild " +
+          "as select a as a1,sum(b) as sum from PreAggMain1 group by a")
+    }
+    assert(e.getMessage.contains("DEFERRED REBUILD is not supported on this 
DataMap"))
+  }
+
   // TODO: Need to Fix
   ignore("test creation of multiple preaggregate of same name concurrently") {
     sql("DROP TABLE IF EXISTS tbl_concurr")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b441bb4..d8fc46f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapRefresher, 
DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, 
DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, 
CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -147,8 +147,8 @@ class CGDataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 5e0c10a..ffbcf67 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
Segment}
-import org.apache.carbondata.core.datamap.dev.{DataMapRefresher, DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapBuilder, DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, 
CoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.features.TableOperation
@@ -83,8 +83,8 @@ class C2DataMapFactory(
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/747be9b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 976e580..535a112 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,7 +27,9 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta}
 import org.apache.carbondata.core.datamap.Segment
-import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapRefresher, 
DataMapWriter}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, 
DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, 
Segment}
+import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapBuilder, 
DataMapWriter}
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, 
FineGrainDataMap, FineGrainDataMapFactory}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -141,8 +143,8 @@ class FGDataMapFactory(carbonTable: CarbonTable,
     false
   }
 
-  override def createRefresher(segment: Segment,
-      shardName: String): DataMapRefresher = {
+  override def createBuilder(segment: Segment,
+      shardName: String): DataMapBuilder = {
     ???
   }
 }

Reply via email to