[CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple 
directory based on taskId

make the datamap distributable object based on lucene index path written during 
load
Added Lucene Listener and Fixed Show Datamap

This closes #2113


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/860e144d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/860e144d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/860e144d

Branch: refs/heads/master
Commit: 860e144d4ac87a5d19a4572b2ffa7d2f192e4162
Parents: ceac8ab
Author: akashrn5 <akashnilu...@gmail.com>
Authored: Thu Mar 29 19:59:36 2018 +0530
Committer: QiangCai <qiang...@qq.com>
Committed: Wed Apr 18 15:56:44 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  13 ++
 .../core/datamap/DataMapStoreManager.java       |   8 +-
 .../core/datamap/IndexDataMapProvider.java      |  34 ++--
 .../carbondata/core/datamap/TableDataMap.java   |   7 +
 .../core/datamap/dev/DataMapFactory.java        |  11 +-
 .../core/datamap/dev/DataMapWriter.java         |   2 +-
 .../blockletindex/BlockletDataMapFactory.java   |   9 +-
 .../core/metadata/SegmentFileStore.java         |  10 +-
 .../schema/datamap/DataMapClassProvider.java    |  17 +-
 .../metadata/schema/table/DataMapSchema.java    |   1 -
 .../table/DataMapSchemaStorageProvider.java     |   3 +-
 .../table/DiskBasedDMSchemaStorageProvider.java |  27 +++-
 .../datamap/examples/MinMaxDataWriter.java      |   2 +-
 .../examples/MinMaxIndexDataMapFactory.java     |  14 +-
 .../lucene/LuceneCoarseGrainDataMapFactory.java |   4 +
 .../lucene/LuceneDataMapDistributable.java      |  13 +-
 .../lucene/LuceneDataMapFactoryBase.java        |  93 ++++++++---
 .../datamap/lucene/LuceneDataMapWriter.java     |  80 +++++++--
 .../datamap/lucene/LuceneFineGrainDataMap.java  |  20 ++-
 .../lucene/LuceneFineGrainDataMapFactory.java   |  12 +-
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |  70 --------
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 148 -----------------
 .../lucene/LuceneCoarseGrainDataMapSuite.scala  |  74 +++++++++
 .../lucene/LuceneFineGrainDataMapSuite.scala    | 161 +++++++++++++++++++
 .../testsuite/datamap/CGDataMapTestCase.scala   |  32 ++--
 .../testsuite/datamap/DataMapWriterSuite.scala  |  13 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  15 +-
 .../testsuite/datamap/TestDataMapStatus.scala   |  15 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  14 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  25 ++-
 .../datamap/CarbonDataMapShowCommand.scala      |  16 +-
 .../CarbonProjectForDeleteCommand.scala         |   1 -
 .../command/table/CarbonDropTableCommand.scala  |   7 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   2 -
 .../datamap/DataMapWriterListener.java          |   4 +-
 .../store/writer/AbstractFactDataWriter.java    |   3 +-
 36 files changed, 631 insertions(+), 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index df995e0..6ab1ce5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1653,6 +1653,19 @@ public final class CarbonCommonConstants {
       "carbon.compaction.prefetch.enable";
   public static final String CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT = 
"false";
 
+  /**
+   * compression mode used by lucene for index writing, this conf will be 
passed to lucene writer
+   * while writing index files.
+   */
+  public static final String CARBON_LUCENE_COMPRESSION_MODE = 
"carbon.lucene.compression.mode";
+
+  /**
+   * default lucene index compression mode, in this mode writing speed will be 
less and speed is
+   * given priority, another mode is compression mode, where the index size is 
given importance to
+   * make it less and not the index writing speed.
+   */
+  public static final String CARBON_LUCENE_COMPRESSION_MODE_DEFAULT = "speed";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 02cf2a5..169cbde 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -252,8 +252,9 @@ public final class DataMapStoreManager {
           (Class<? extends DataMapFactory>) 
Class.forName(dataMapSchema.getProviderName());
       dataMapFactory = factoryClass.newInstance();
     } catch (ClassNotFoundException e) {
-      throw new MalformedDataMapCommandException(
-          "DataMap '" + dataMapSchema.getProviderName() + "' not found");
+      // try to create DataMapClassProvider instance by taking providerName as 
short name
+      dataMapFactory =
+          
IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName());
     } catch (Throwable e) {
       throw new MetadataProcessException(
           "failed to create DataMap '" + dataMapSchema.getProviderName() + 
"'", e);
@@ -272,7 +273,7 @@ public final class DataMapStoreManager {
       tableIndices = new ArrayList<>();
     }
 
-    dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema);
+    dataMapFactory.init(table, dataMapSchema);
     BlockletDetailsFetcher blockletDetailsFetcher;
     SegmentPropertiesFetcher segmentPropertiesFetcher = null;
     if (dataMapFactory instanceof BlockletDetailsFetcher) {
@@ -348,6 +349,7 @@ public final class DataMapStoreManager {
         if (tableDataMap != null && dataMapName
             
.equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) {
           tableDataMap.clear();
+          tableDataMap.deleteDatamapData();
           tableIndices.remove(i);
           break;
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
index 02ff1a1..85a4341 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -24,6 +24,7 @@ import 
org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.MetadataProcessException;
 import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import 
org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider;
@@ -46,10 +47,12 @@ public class IndexDataMapProvider implements 
DataMapProvider {
           "Parent table is required to create index datamap");
     }
     ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>();
-    dataMapSchema.setParentTables(relationIdentifiers);
-    relationIdentifiers.add(
+    RelationIdentifier relationIdentifier =
         new RelationIdentifier(mainTable.getDatabaseName(), 
mainTable.getTableName(),
-            mainTable.getTableInfo().getFactTable().getTableId()));
+            mainTable.getTableInfo().getFactTable().getTableId());
+    relationIdentifiers.add(relationIdentifier);
+    dataMapSchema.setRelationIdentifier(relationIdentifier);
+    dataMapSchema.setParentTables(relationIdentifiers);
     DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
     DataMapStoreManager.getInstance().registerDataMap(mainTable, 
dataMapSchema, dataMapFactory);
     storageProvider.saveSchema(dataMapSchema);
@@ -62,7 +65,8 @@ public class IndexDataMapProvider implements DataMapProvider {
 
   @Override
   public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) 
throws IOException {
-    storageProvider.dropSchema(dataMapSchema.getDataMapName());
+    storageProvider.dropSchema(dataMapSchema.getDataMapName(),
+        dataMapSchema.getParentTables().get(0).getTableName(), 
dataMapSchema.getProviderName());
   }
 
   @Override
@@ -99,25 +103,29 @@ public class IndexDataMapProvider implements 
DataMapProvider {
     return dataMapFactory;
   }
 
-  private DataMapFactory getDataMapFactoryByShortName(String providerName)
+  public static DataMapFactory getDataMapFactoryByShortName(String 
providerName)
       throws MalformedDataMapCommandException {
+    try {
+      DataMapRegistry.registerDataMap(
+          
DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(),
+          
DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName());
+    } catch (UnsupportedOperationException ex) {
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + 
"' not found", ex);
+    }
     DataMapFactory dataMapFactory;
-    String className = DataMapRegistry.getDataMapClassName(providerName);
+    String className = 
DataMapRegistry.getDataMapClassName(providerName.toLowerCase());
     if (className != null) {
       try {
         Class<? extends DataMapFactory> datamapClass =
-            (Class<? extends DataMapFactory>) Class.forName(providerName);
+            (Class<? extends DataMapFactory>) Class.forName(className);
         dataMapFactory = datamapClass.newInstance();
       } catch (ClassNotFoundException ex) {
-        throw new MalformedDataMapCommandException(
-            "DataMap '" + providerName + "' not found", ex);
+        throw new MalformedDataMapCommandException("DataMap '" + providerName 
+ "' not found", ex);
       } catch (Throwable ex) {
-        throw new MetadataProcessException(
-            "failed to create DataMap '" + providerName + "'", ex);
+        throw new MetadataProcessException("failed to create DataMap '" + 
providerName + "'", ex);
       }
     } else {
-      throw new MalformedDataMapCommandException(
-          "DataMap '" + providerName + "' not found");
+      throw new MalformedDataMapCommandException("DataMap '" + providerName + 
"' not found");
     }
     return dataMapFactory;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 6689e3b..8143b1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -197,6 +197,13 @@ public final class TableDataMap extends 
OperationEventListener {
     dataMapFactory.clear();
   }
 
+  /**
+   * delete datamap data if any
+   */
+  public void deleteDatamapData() {
+    dataMapFactory.deleteDatamapData();
+  }
+
   public DataMapSchema getDataMapSchema() {
     return dataMapSchema;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index d27b255..70f2772 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -24,7 +24,7 @@ import 
org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapLevel;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.events.Event;
@@ -35,9 +35,9 @@ import org.apache.carbondata.events.Event;
 public interface DataMapFactory<T extends DataMap> {
 
   /**
-   * Initialization of Datamap factory with the identifier and datamap name
+   * Initialization of Datamap factory with the carbonTable and datamap name
    */
-  void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema)
+  void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException;
 
   /**
@@ -87,4 +87,9 @@ public interface DataMapFactory<T extends DataMap> {
    *  Type of datamap whether it is FG or CG
    */
   DataMapLevel getDataMapType();
+
+  /**
+   * delete datamap data if any
+   */
+  void deleteDatamapData();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
index 6a3ee18..29670a1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java
@@ -54,7 +54,7 @@ public abstract class DataMapWriter {
    *
    * @param blockId file name of the carbondata file
    */
-  public abstract void onBlockStart(String blockId) throws IOException;
+  public abstract void onBlockStart(String blockId, long taskId) throws 
IOException;
 
   /**
    * End of block notification

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 2425c4c..4c8ac0c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -43,6 +43,7 @@ import 
org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -72,8 +73,8 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
   private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache;
 
   @Override
-  public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema) {
-    this.identifier = identifier;
+  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier();
     cache = CacheProvider.getInstance()
         .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP);
   }
@@ -262,6 +263,10 @@ public class BlockletDataMapFactory extends 
CoarseGrainDataMapFactory
     return null;
   }
 
+  @Override public void deleteDatamapData() {
+
+  }
+
   @Override public SegmentProperties getSegmentProperties(Segment segment,
       ReadCommittedScope readCommittedScope) throws IOException {
     List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, 
readCommittedScope);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 5902148..d609c56 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -672,7 +672,7 @@ public class SegmentFileStore {
   }
 
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
-      Map<String, List<String>> locationMap) {
+      Map<String, List<String>> locationMap) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       Path location = new Path(entry.getKey()).getParent();
       if (partitionSpecs != null) {
@@ -681,10 +681,10 @@ public class SegmentFileStore {
           
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
         }
       } else {
-        // delete the segment folder if it is empty
-        CarbonFile file = FileFactory.getCarbonFile(location.toString());
-        if (file.listFiles().length == 0) {
-          file.delete();
+        // delete the segment folder
+        CarbonFile segmentPath = 
FileFactory.getCarbonFile(location.toString());
+        if (null != segmentPath) {
+          FileFactory.deleteAllCarbonFilesOfDir(segmentPath);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
index 3934444..4ab400d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -28,7 +28,8 @@ package org.apache.carbondata.core.metadata.schema.datamap;
 
 public enum DataMapClassProvider {
   PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", 
"preaggregate"),
-  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", 
"timeseries");
+  TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", 
"timeseries"),
+  
LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene");
 
   /**
    * Fully qualified class name of datamap
@@ -64,8 +65,22 @@ public enum DataMapClassProvider {
       return TIMESERIES;
     } else if (PREAGGREGATE.isEqual(dataMapClass)) {
       return PREAGGREGATE;
+    } else if (LUCENE.isEqual(dataMapClass)) {
+      return LUCENE;
     } else {
       throw new UnsupportedOperationException("Unknown datamap provider/class 
" + dataMapClass);
     }
   }
+
+  public static DataMapClassProvider getDataMapProviderOnName(String 
dataMapShortname) {
+    if (TIMESERIES.isEqual(dataMapShortname)) {
+      return TIMESERIES;
+    } else if (PREAGGREGATE.isEqual(dataMapShortname)) {
+      return PREAGGREGATE;
+    } else if (LUCENE.isEqual(dataMapShortname)) {
+      return LUCENE;
+    } else {
+      throw new UnsupportedOperationException("Unknown datamap provider" + 
dataMapShortname);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 6b592fb..d9f83b3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -70,7 +70,6 @@ public class DataMapSchema implements Serializable, Writable {
    */
   protected TableSchema childSchema;
 
-
   public DataMapSchema(String dataMapName, String providerName) {
     this.dataMapName = dataMapName;
     this.providerName = providerName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
index 6b9bca5..ed13201 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java
@@ -63,6 +63,7 @@ public interface DataMapSchemaStorageProvider {
    * Drop the schema from the storage by using dataMapName.
    * @param dataMapName
    */
-  void dropSchema(String dataMapName) throws IOException;
+  void dropSchema(String dataMapName, String tableName, String 
dataMapProviderName)
+      throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
index d49a9ae..9e34131 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java
@@ -51,9 +51,8 @@ public class DiskBasedDMSchemaStorageProvider implements 
DataMapSchemaStoragePro
     BufferedWriter brWriter = null;
     DataOutputStream dataOutputStream = null;
     Gson gsonObjectToWrite = new Gson();
-    String schemaPath =
-        storePath + CarbonCommonConstants.FILE_SEPARATOR + 
dataMapSchema.getDataMapName()
-            + ".dmschema";
+    String schemaPath = getSchemaPath(storePath, 
dataMapSchema.getDataMapName(),
+        dataMapSchema.relationIdentifier.getTableName(), 
dataMapSchema.getProviderName());
     FileFactory.FileType fileType = FileFactory.getFileType(schemaPath);
     if (FileFactory.isFileExist(schemaPath, fileType)) {
       throw new IOException(
@@ -129,9 +128,9 @@ public class DiskBasedDMSchemaStorageProvider implements 
DataMapSchemaStoragePro
     return dataMapSchemas;
   }
 
-  @Override public void dropSchema(String dataMapName) throws IOException {
-    String schemaPath =
-        storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName + 
".dmschema";
+  @Override public void dropSchema(String dataMapName, String tableName, 
String dataMapProviderName)
+      throws IOException {
+    String schemaPath = getSchemaPath(storePath, dataMapName, tableName, 
dataMapProviderName);
     if (!FileFactory.isFileExist(schemaPath, 
FileFactory.getFileType(schemaPath))) {
       throw new IOException("DataMap with name " + dataMapName + " does not 
exists in storage");
     }
@@ -140,4 +139,20 @@ public class DiskBasedDMSchemaStorageProvider implements 
DataMapSchemaStoragePro
       throw new IOException("DataMap with name " + dataMapName + " cannot be 
deleted");
     }
   }
+
+  /**
+   * it returns the schema path for the datamap
+   * @param storePath
+   * @param dataMapName
+   * @param tableName
+   * @param dataMapProviderName
+   * @return
+   */
+  public static String getSchemaPath(String storePath, String dataMapName, 
String tableName,
+      String dataMapProviderName) {
+    String schemaPath = storePath + CarbonCommonConstants.FILE_SEPARATOR + 
tableName
+        + CarbonCommonConstants.UNDERSCORE + dataMapName + 
CarbonCommonConstants.UNDERSCORE
+        + dataMapProviderName + ".dmschema";
+    return schemaPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index d2dbaa5..e68b481 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -90,7 +90,7 @@ public class MinMaxDataWriter extends DataMapWriter {
     }
   }
 
-  @Override public void onBlockStart(String blockId) {
+  @Override public void onBlockStart(String blockId, long taskId) {
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index 758a67c..01aaffa 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -58,17 +58,11 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
 
   // this is an example for datamap, we can choose the columns and operations 
that
   // will be supported by this datamap. Furthermore, we can add cache-support 
for this datamap.
-  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema)
+  @Override public void init(CarbonTable carbonTable, DataMapSchema 
dataMapSchema)
       throws IOException, MalformedDataMapCommandException {
-    this.identifier = identifier;
+    this.identifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
-    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName);
-    if (null == carbonTable) {
-      throw new IOException("Failed to get carbon table with name " + 
tableUniqueName);
-    }
-
     // columns that will be indexed
     List<CarbonColumn> allColumns = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
     List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new 
Transformer() {
@@ -163,4 +157,8 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
   @Override public DataMapMeta getMeta() {
     return this.dataMapMeta;
   }
+
+  @Override public void deleteDatamapData() {
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
index 7308841..f63656b 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java
@@ -75,4 +75,8 @@ public class LuceneCoarseGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Co
     return DataMapLevel.CG;
   }
 
+  @Override public void deleteDatamapData() {
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
index 19e4035..1d47ee8 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java
@@ -26,11 +26,22 @@ class LuceneDataMapDistributable extends 
DataMapDistributable {
   // TODO: seems no one use this?
   private String dataPath;
 
-  LuceneDataMapDistributable(String dataPath) {
+  private String indexPath;
+
+  LuceneDataMapDistributable(String dataPath, String indexPath) {
     this.dataPath = dataPath;
+    this.indexPath = indexPath;
   }
 
   public String getDataPath() {
     return dataPath;
   }
+
+  public String getIndexPath() {
+    return indexPath;
+  }
+
+  public void setIndexPath(String indexPath) {
+    this.indexPath = indexPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
index ab4e9ee..672880f 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.datamap.lucene;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,13 +33,17 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -79,29 +84,21 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
    */
   AbsoluteTableIdentifier tableIdentifier = null;
 
+  /**
+   * indexed carbon columns for lucene
+   */
+  List<String> indexedCarbonColumns = null;
+
+
   @Override
-  public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema)
+  public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema)
       throws IOException, MalformedDataMapCommandException {
-    Objects.requireNonNull(identifier);
+    Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier());
     Objects.requireNonNull(dataMapSchema);
 
-    this.tableIdentifier = identifier;
+    this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier();
     this.dataMapName = dataMapSchema.getDataMapName();
 
-    // get carbonmetadata from carbonmetadata instance
-    CarbonMetadata carbonMetadata = CarbonMetadata.getInstance();
-
-    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
-
-    // get carbon table
-    CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName);
-    if (carbonTable == null) {
-      String errorMessage =
-          String.format("failed to get carbon table with name %s", 
tableUniqueName);
-      LOGGER.error(errorMessage);
-      throw new IOException(errorMessage);
-    }
-
     // validate DataMapSchema and get index columns
     List<String> indexedColumns =  validateAndGetIndexedColumns(dataMapSchema, 
carbonTable);
 
@@ -151,7 +148,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
         }
       }
     }
-    List<String> textColumnList = new ArrayList<String>(textColumns.length);
+    indexedCarbonColumns = new ArrayList<>(textColumns.length);
     for (int i = 0; i < textColumns.length; i++) {
       CarbonColumn column = 
carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]);
       if (null == column) {
@@ -161,10 +158,41 @@ abstract class LuceneDataMapFactoryBase<T extends 
DataMap> implements DataMapFac
         throw new MalformedDataMapCommandException(
             "TEXT_COLUMNS only supports String column. " + "Unsupported 
column: " + textColumns[i]
                 + ", DataType: " + column.getDataType());
+      } else if (column.getEncoder().contains(Encoding.DICTIONARY)) {
+        throw new MalformedDataMapCommandException(
+            "TEXT_COLUMNS cannot contain dictionary column " + 
column.getColName());
+      }
+      indexedCarbonColumns.add(column.getColName());
+    }
+    return indexedCarbonColumns;
+  }
+
+  /**
+   * this method will delete the datamap folders during drop datamap
+   * @throws MalformedDataMapCommandException
+   */
+  private void deleteDatamap() throws MalformedDataMapCommandException {
+    SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier);
+    try {
+      List<Segment> validSegments = 
ssm.getValidAndInvalidSegments().getValidSegments();
+      for (Segment segment : validSegments) {
+        String segmentId = segment.getSegmentNo();
+        String datamapPath =
+            CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), 
segmentId)
+                + File.separator + dataMapName;
+        if (FileFactory.isFileExist(datamapPath)) {
+          CarbonFile file =
+              FileFactory.getCarbonFile(datamapPath, 
FileFactory.getFileType(datamapPath));
+          CarbonUtil.deleteFoldersAndFilesSilent(file);
+        }
       }
-      textColumnList.add(column.getColName());
+    } catch (IOException ex) {
+      throw new MalformedDataMapCommandException(
+          "drop datamap failed, failed to delete datamap directory");
+    } catch (InterruptedException ex) {
+      throw new MalformedDataMapCommandException(
+          "drop datamap failed, failed to delete datamap directory");
     }
-    return textColumnList;
   }
 
   /**
@@ -173,8 +201,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
   @Override
   public DataMapWriter createWriter(Segment segment, String 
writeDirectoryPath) {
     LOGGER.info("lucene data write to " + writeDirectoryPath);
-    return new LuceneDataMapWriter(
-        tableIdentifier, dataMapName, segment, writeDirectoryPath, true);
+    return new LuceneDataMapWriter(tableIdentifier, dataMapName, segment, 
writeDirectoryPath, true,
+        indexedCarbonColumns);
   }
 
   /**
@@ -183,9 +211,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
   @Override
   public List<DataMapDistributable> toDistributable(Segment segment) {
     List<DataMapDistributable> lstDataMapDistribute = new 
ArrayList<DataMapDistributable>();
-    DataMapDistributable luceneDataMapDistributable = new 
LuceneDataMapDistributable(
-        CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), 
segment.getSegmentNo()));
-    lstDataMapDistribute.add(luceneDataMapDistributable);
+    CarbonFile[] indexDirs = LuceneDataMapWriter
+        .getAllIndexDirs(tableIdentifier.getTablePath(), 
segment.getSegmentNo(), dataMapName);
+    for (CarbonFile indexDir : indexDirs) {
+      DataMapDistributable luceneDataMapDistributable = new 
LuceneDataMapDistributable(
+          CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), 
segment.getSegmentNo()),
+          indexDir.getAbsolutePath());
+      lstDataMapDistribute.add(luceneDataMapDistributable);
+    }
     return lstDataMapDistribute;
   }
 
@@ -210,6 +243,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> 
implements DataMapFac
 
   }
 
+  @Override public void deleteDatamapData() {
+    try {
+      deleteDatamap();
+    } catch (MalformedDataMapCommandException ex) {
+      LOGGER.error(ex, "failed to delete datamap directory ");
+    }
+  }
+
   /**
    * Return metadata of this datamap
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 4286e5a..83454b3 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -20,23 +20,30 @@ package org.apache.carbondata.datamap.lucene;
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
+import org.apache.lucene.codecs.lucene62.Lucene62Codec;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoublePoint;
 import org.apache.lucene.document.Field;
@@ -77,6 +84,8 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
   private boolean isFineGrain = true;
 
+  private List<String> indexedCarbonColumns = null;
+
   private static final String BLOCKID_NAME = "blockId";
 
   private static final String BLOCKLETID_NAME = "blockletId";
@@ -86,30 +95,31 @@ public class LuceneDataMapWriter extends DataMapWriter {
   private static final String ROWID_NAME = "rowId";
 
   LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, 
Segment segment,
-      String writeDirectoryPath, boolean isFineGrain) {
+      String writeDirectoryPath, boolean isFineGrain, List<String> 
indexedCarbonColumns) {
     super(identifier, segment, writeDirectoryPath);
     this.dataMapName = dataMapName;
     this.isFineGrain = isFineGrain;
+    this.indexedCarbonColumns = indexedCarbonColumns;
   }
 
-  private String getIndexPath() {
+  private String getIndexPath(long taskId) {
     if (isFineGrain) {
-      return genDataMapStorePath(identifier.getTablePath(), segmentId, 
dataMapName);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName, taskId);
     } else {
       // TODO: where write data in coarse grain data map
-      return genDataMapStorePath(identifier.getTablePath(), segmentId, 
dataMapName);
+      return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, 
dataMapName, taskId);
     }
   }
 
   /**
    * Start of new block notification.
    */
-  public void onBlockStart(String blockId) throws IOException {
+  public void onBlockStart(String blockId, long taskId) throws IOException {
     // save this block id for lucene index , used in onPageAdd function
     this.blockId = blockId;
 
     // get index path, put index data into segment's path
-    String strIndexPath = getIndexPath();
+    String strIndexPath = getIndexPath(taskId);
     Path indexPath = FileFactory.getPath(strIndexPath);
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
@@ -124,6 +134,18 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
     // create a index writer
     Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
+
+    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
+    if (CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE,
+            CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)
+        
.equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT))
 {
+      indexWriterConfig.setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED));
+    } else {
+      indexWriterConfig
+          .setCodec(new 
Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION));
+    }
+
     indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer));
 
   }
@@ -202,8 +224,10 @@ public class LuceneDataMapWriter extends DataMapWriter {
 
       // add other fields
       for (int colIdx = 0; colIdx < columnsCount; colIdx++) {
-        if (!pages[colIdx].getNullBits().get(rowId)) {
-          addField(doc, pages[colIdx], rowId, Field.Store.NO);
+        if 
(indexedCarbonColumns.contains(pages[colIdx].getColumnSpec().getFieldName())) {
+          if (!pages[colIdx].getNullBits().get(rowId)) {
+            addField(doc, pages[colIdx], rowId, Field.Store.NO);
+          }
         }
       }
 
@@ -226,7 +250,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
     String fieldName = page.getColumnSpec().getFieldName();
 
     //get field type
-    DataType type = page.getDataType();
+    DataType type = page.getColumnSpec().getSchemaDataType();
 
     if (type == DataTypes.BYTE) {
       // byte type , use int range to deal with byte, lucene has no byte type
@@ -322,8 +346,44 @@ public class LuceneDataMapWriter extends DataMapWriter {
   /**
    * Return store path for datamap
    */
-  static String genDataMapStorePath(String tablePath, String segmentId, String 
dataMapName) {
+  public static String genDataMapStorePath(String tablePath, String segmentId, 
String dataMapName) {
     return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName;
   }
 
+  /**
+   * Return store path for datamap based on the taskId, if three tasks get 
launched during loading,
+   * then three folders will be created based on the three task Ids and lucene 
index file will be
+   * written into those folders
+   * @return store path based on taskID
+   */
+  private static String genDataMapStorePathOnTaskId(String tablePath, String 
segmentId,
+      String dataMapName, long taskId) {
+    return CarbonTablePath.getSegmentPath(tablePath, segmentId) + 
File.separator + dataMapName
+        + File.separator + dataMapName + CarbonCommonConstants.UNDERSCORE + 
taskId
+        + CarbonCommonConstants.UNDERSCORE + System.currentTimeMillis();
+  }
+
+  /**
+   * returns all the directories of lucene index files for query
+   * @param tablePath
+   * @param segmentId
+   * @param dataMapName
+   * @return
+   */
+  public static CarbonFile[] getAllIndexDirs(String tablePath, String 
segmentId,
+      final String dataMapName) {
+    String dmPath =
+        CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator 
+ dataMapName;
+    FileFactory.FileType fileType = FileFactory.getFileType(dmPath);
+    final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType);
+    return dirPath.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        if (file.isDirectory() && file.getName().startsWith(dataMapName)) {
+          return true;
+        } else {
+          return false;
+        }
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
index faa4cbe..3caefd2 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java
@@ -67,14 +67,14 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
       LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName());
 
   /**
-   * searcher object for this datamap
+   * index Reader object to create searcher object
    */
-  private IndexSearcher indexSearcher = null;
+  private IndexReader indexReader = null;
 
   /**
-   * default max values to return
+   * searcher object for this datamap
    */
-  private static int MAX_RESULT_NUMBER = 100;
+  private IndexSearcher indexSearcher = null;
 
   /**
    * analyzer for lucene index
@@ -113,7 +113,7 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
     // open this index path , use HDFS default configuration
     Directory indexDir = new HdfsDirectory(indexPath, 
FileFactory.getConfiguration());
 
-    IndexReader indexReader = DirectoryReader.open(indexDir);
+    indexReader = DirectoryReader.open(indexDir);
     if (indexReader == null) {
       throw new RuntimeException("failed to create index reader object");
     }
@@ -153,6 +153,10 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
     // only for test , query all data
     String strQuery = getQueryString(filterExp.getFilterExpression());
 
+    if (null == strQuery) {
+      return null;
+    }
+
     String[] sFields = new String[fields.size()];
     fields.toArray(sFields);
 
@@ -163,9 +167,11 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
 
     // use MultiFieldQueryParser to parser query
     QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer);
+    queryParser.setAllowLeadingWildcard(true);
     Query query;
     try {
-      query = queryParser.parse(strQuery);
+      // always send lowercase string to lucene as it is case sensitive
+      query = queryParser.parse(strQuery.toLowerCase());
     } catch (ParseException e) {
       String errorMessage = String.format(
           "failed to filter block with query %s, detail is %s", strQuery, 
e.getMessage());
@@ -176,7 +182,7 @@ public class LuceneFineGrainDataMap extends 
FineGrainDataMap {
     // execute index search
     TopDocs result;
     try {
-      result = indexSearcher.search(query, MAX_RESULT_NUMBER);
+      result = indexSearcher.search(query, indexReader.maxDoc());
     } catch (IOException e) {
       String errorMessage =
           String.format("failed to search lucene data, detail is %s", 
e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
index 23c9928..151e674 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java
@@ -62,7 +62,17 @@ public class LuceneFineGrainDataMapFactory extends 
LuceneDataMapFactoryBase<Fine
   public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable,
       ReadCommittedScope readCommittedScope)
       throws IOException {
-    return getDataMaps(distributable.getSegment(), readCommittedScope);
+    List<FineGrainDataMap> lstDataMap = new ArrayList<>();
+    FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer);
+    String indexPath = ((LuceneDataMapDistributable) 
distributable).getIndexPath();
+    try {
+      dataMap.init(new DataMapModel(indexPath));
+    } catch (MemoryException e) {
+      LOGGER.error(String.format("failed to get lucene datamap , detail is 
%s", e.getMessage()));
+      return lstDataMap;
+    }
+    lstDataMap.add(dataMap);
+    return lstDataMap;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
 
b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
deleted file mode 100644
index f128afe..0000000
--- 
a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/datamap_input.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 15000
-    LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
-  }
-
-  test("test lucene coarse grain data map") {
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-
-    sql(
-      s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
-         | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}'
-         | DMProperties('TEXT_COLUMNS'='name,city')
-      """.stripMargin)
-
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
-
-    checkAnswer(sql("select * from datamap_test where name='n502670'"),
-      sql("select * from normal_test where name='n502670'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    LuceneFineGrainDataMapSuite.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
 
b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
deleted file mode 100644
index bfcfa67..0000000
--- 
a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.datamap.lucene
-
-import java.io.{File, PrintWriter}
-
-import scala.util.Random
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
-
-class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
-
-  val file2 = resourcesPath + "/datamap_input.csv"
-
-  override protected def beforeAll(): Unit = {
-    //n should be about 5000000 of reset if size is default 1024
-    val n = 15000
-    LuceneFineGrainDataMapSuite.createFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql(
-      """
-        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
-
-    sql("DROP TABLE IF EXISTS datamap_test")
-    sql(
-      """
-        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
-        | STORED BY 'carbondata'
-        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
-      """.stripMargin)
-  }
-
-  test("validate TEXT_COLUMNS DataMap property") {
-    // require TEXT_COLUMNS
-    var exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-      """.stripMargin))
-
-    assertResult("Lucene DataMap require proper TEXT_COLUMNS 
property.")(exception.getMessage)
-
-    // illegal argumnet.
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='name, ')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS contains illegal 
argument.")(exception.getMessage)
-
-    // not exists
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='city,school')
-    """.stripMargin))
-
-    assertResult("TEXT_COLUMNS: school does not exist in table. Please check 
create DataMap statement.")(exception.getMessage)
-
-    // duplicate columns
-    exception = intercept[MalformedDataMapCommandException](sql(
-      s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='name,city,name')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS has duplicate columns 
:name")(exception.getMessage)
-
-    // only support String DataType
-    exception = intercept[MalformedDataMapCommandException](sql(
-    s"""
-         | CREATE DATAMAP dm1 ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('text_COLUMNS'='city,id')
-      """.stripMargin))
-
-    assertResult("TEXT_COLUMNS only supports String column. Unsupported 
column: id, DataType: INT")(exception.getMessage)
-  }
-
-  test("test lucene fine grain data map") {
-    sql(
-      s"""
-         | CREATE DATAMAP dm ON TABLE datamap_test
-         | USING 
'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory'
-         | DMProperties('TEXT_COLUMNS'='Name , cIty')
-      """.stripMargin)
-
-    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
-
-    //    sql("select * from normal_test where name='n34000'").show
-    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
-//    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 
'n10%'"))
-    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
-
-    //    checkAnswer(
-    //      sql("select * from datamap_test where match('name:n34000')"),
-    //      sql("select * from normal_test where name='n34000'"))
-  }
-
-  override protected def afterAll(): Unit = {
-    LuceneFineGrainDataMapSuite.deleteFile(file2)
-    sql("DROP TABLE IF EXISTS normal_test")
-    sql("DROP TABLE IF EXISTS datamap_test")
-  }
-}
-
-object LuceneFineGrainDataMapSuite {
-  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
-    val write = new PrintWriter(new File(fileName))
-    for (i <- start until (start + line)) {
-      write.println(i + "," + "n" + i + "," + "c0" + i + "," + 
Random.nextInt(80))
-    }
-    write.close()
-  }
-
-  def deleteFile(fileName: String): Unit = {
-      val file = new File(fileName)
-      if (file.exists()) {
-        file.delete()
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
new file mode 100644
index 0000000..245d147
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+/**
+ * Ignored test class as CG datamap is not supported yet
+ */
+@Ignore
+class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
+  }
+
+  test("test lucene coarse grain data map") {
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+
+      sql(
+        s"""
+           | CREATE DATAMAP dm ON TABLE datamap_test
+           | USING 'lucene'
+           | DMProperties('TEXT_COLUMNS'='name,city')
+      """.stripMargin)
+
+     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
+
+     checkAnswer(sql("select * from datamap_test where name='n502670'"),
+     sql("select * from normal_test where name='n502670'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
new file mode 100644
index 0000000..c5ea2c7
--- /dev/null
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.lucene
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll {
+
+  val file2 = resourcesPath + "/datamap_input.csv"
+
+  override protected def beforeAll(): Unit = {
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 15000
+    LuceneFineGrainDataMapSuite.createFile(file2)
+    sql("create database if not exists lucene")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession))
+    sql("use lucene")
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql(
+      """
+        | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test 
OPTIONS('header'='false')")
+
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql(
+      """
+        | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
+      """.stripMargin)
+  }
+
+  test("validate TEXT_COLUMNS DataMap property") {
+    // require TEXT_COLUMNS
+    var exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+      """.stripMargin))
+
+    assertResult("Lucene DataMap require proper TEXT_COLUMNS 
property.")(exception.getMessage)
+
+    // illegal argumnet.
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='name, ')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS contains illegal 
argument.")(exception.getMessage)
+
+    // not exists
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='city,school')
+    """.stripMargin))
+
+    assertResult("TEXT_COLUMNS: school does not exist in table. Please check 
create DataMap statement.")(exception.getMessage)
+
+    // duplicate columns
+    exception = intercept[MalformedDataMapCommandException](sql(
+      s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='name,city,name')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS has duplicate columns 
:name")(exception.getMessage)
+
+    // only support String DataType
+    exception = intercept[MalformedDataMapCommandException](sql(
+    s"""
+         | CREATE DATAMAP dm1 ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('text_COLUMNS'='city,id')
+      """.stripMargin))
+
+    assertResult("TEXT_COLUMNS only supports String column. Unsupported 
column: id, DataType: INT")(exception.getMessage)
+  }
+
+  test("test lucene fine grain data map") {
+    sql(
+      s"""
+         | CREATE DATAMAP dm ON TABLE datamap_test
+         | USING 'lucene'
+         | DMProperties('TEXT_COLUMNS'='Name , cIty')
+      """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test 
OPTIONS('header'='false')")
+
+    //    sql("select * from normal_test where name='n34000'").show
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'"))
+//    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 
'n10%'"))
+    checkAnswer(sql("SELECT * FROM datamap_test WHERE 
TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'"))
+
+    //    checkAnswer(
+    //      sql("select * from datamap_test where match('name:n34000')"),
+    //      sql("select * from normal_test where name='n34000'"))
+  }
+
+  override protected def afterAll(): Unit = {
+    LuceneFineGrainDataMapSuite.deleteFile(file2)
+    sql("DROP TABLE IF EXISTS normal_test")
+    sql("DROP TABLE IF EXISTS datamap_test")
+    sql("use default")
+    sql("drop database if exists lucene cascade")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION,
+        CarbonProperties.getStorePath)
+  }
+}
+
+object LuceneFineGrainDataMapSuite {
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0) = {
+    val write = new PrintWriter(new File(fileName))
+    for (i <- start until (start + line)) {
+      write.println(i + "," + "n" + i + "," + "c0" + i + "," + 
Random.nextInt(80))
+    }
+    write.close()
+  }
+
+  def deleteFile(fileName: String): Unit = {
+      val file = new File(fileName)
+      if (file.exists()) {
+        file.delete()
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 0c96247..20e01ff 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, DiskBasedDMSchemaStorageProvider}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
@@ -55,8 +55,8 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: 
DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): 
Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
     this.dataMapSchema = dataMapSchema
   }
 
@@ -143,6 +143,12 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory {
     new 
DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
       List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+  }
 }
 
 class CGDataMap extends CoarseGrainDataMap {
@@ -243,7 +249,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String): Unit = {
+  override def onBlockStart(blockId: String, taskId: Long): Unit = {
     currentBlockId = blockId
   }
 
@@ -326,6 +332,8 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
 class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll {
 
   val file2 = resourcesPath + "/compaction/fil2.csv"
+  val systemFolderStoreLocation = 
CarbonProperties.getInstance().getSystemFolderLocation
+
   override protected def beforeAll(): Unit = {
     //n should be about 5000000 of reset if size is default 1024
     val n = 150000
@@ -384,9 +392,10 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap on table datamap_store_test using 
'${classOf[CGDataMapFactory].getName}' as select  id, name from 
datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap on table datamap_store_test using 
'$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + 
"/test_cg_datamap.dmschema"
+    val loc = 
DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, 
"test_cg_datamap", "datamap_store_test", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
   }
@@ -400,9 +409,10 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using 
'${classOf[CGDataMapFactory].getName}' as select  id, name from 
datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using 
'$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + 
"/test_cg_datamap1.dmschema"
+    val loc = 
DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, 
"test_cg_datamap1", "datamap_store_test1", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
 
@@ -420,9 +430,10 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
         | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT')
       """.stripMargin)
 
-    sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using 
'${classOf[CGDataMapFactory].getName}' as select  id, name from 
datamap_store_test")
+    val dataMapProvider = classOf[CGDataMapFactory].getName
+    sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using 
'$dataMapProvider' as select  id, name from datamap_store_test")
 
-    val loc = CarbonProperties.getInstance().getSystemFolderLocation + 
"/test_cg_datamap2.dmschema"
+    val loc = 
DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation,"test_cg_datamap2",
 "datamap_store_test2", dataMapProvider)
 
     assert(FileFactory.isFileExist(loc))
 
@@ -442,4 +453,5 @@ class CGDataMapTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS datamap_store_test1")
     sql("DROP TABLE IF EXISTS datamap_store_test2")
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 270c676..5fa5209 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -35,6 +35,7 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -43,9 +44,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
 
   var identifier: AbsoluteTableIdentifier = _
 
-  override def init(identifier: AbsoluteTableIdentifier,
+  override def init(carbonTable: CarbonTable,
       dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
   }
 
   override def fireEvent(event: Event): Unit = ???
@@ -72,6 +73,12 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory {
     ???
   }
 
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+    ???
+  }
 }
 
 class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll {
@@ -203,7 +210,7 @@ object DataMapWriterSuite {
      *
      * @param blockId file name of the carbondata file
      */
-    override def onBlockStart(blockId: String) = {
+    override def onBlockStart(blockId: String, taskId: Long) = {
       callbackSeq :+= s"block start $blockId"
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 060d06a..551f9e1 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
-import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
 import org.apache.carbondata.core.scan.expression.Expression
@@ -57,8 +57,8 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
   /**
    * Initialization of Datamap factory with the identifier and datamap name
    */
-  override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: 
DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): 
Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
     this.dataMapSchema = dataMapSchema
   }
 
@@ -141,6 +141,13 @@ class FGDataMapFactory extends FineGrainDataMapFactory {
     new 
DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava,
       List(ExpressionType.EQUALS, ExpressionType.IN).asJava)
   }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+    ???
+  }
 }
 
 class FGDataMap extends FineGrainDataMap {
@@ -271,7 +278,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
    *
    * @param blockId file name of the carbondata file
    */
-  override def onBlockStart(blockId: String): Unit = {
+  override def onBlockStart(blockId: String, taskId: Long): Unit = {
     currentBlockId = blockId
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
index be9cc51..280c20d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -204,7 +205,7 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String): Unit = {
+      override def onBlockStart(blockId: String, taskId: Long): Unit = {
         // trigger the second SQL to execute
       }
 
@@ -218,8 +219,14 @@ class TestDataMap() extends CoarseGrainDataMapFactory {
 
   override def toDistributable(segmentId: Segment): 
util.List[DataMapDistributable] = ???
 
-  override def init(identifier: AbsoluteTableIdentifier,
-      dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+  override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): 
Unit = {
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
+  }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 86f0f10..5c9709c 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -36,6 +36,7 @@ import 
org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, 
RelationIdentifier}
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema, RelationIdentifier}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
@@ -306,7 +307,7 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
       override def onBlockletStart(blockletId: Int): Unit = { }
 
-      override def onBlockStart(blockId: String): Unit = {
+      override def onBlockStart(blockId: String, taskId: Long): Unit = {
         // trigger the second SQL to execute
         Global.overwriteRunning = true
 
@@ -324,8 +325,15 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory {
 
   override def toDistributable(segmentId: Segment): 
util.List[DataMapDistributable] = ???
 
-  override def init(identifier: AbsoluteTableIdentifier,
+  override def init(carbonTable: CarbonTable,
       dataMapSchema: DataMapSchema): Unit = {
-    this.identifier = identifier
+    this.identifier = carbonTable.getAbsoluteTableIdentifier
+  }
+
+  /**
+   * delete datamap data if any
+   */
+  override def deleteDatamapData(): Unit = {
+
   }
 }
\ No newline at end of file

Reply via email to