[CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple directory based on taskId
make the datamap distributable object based on lucene index path written during load Added Lucene Listener and Fixed Show Datamap This closes #2113 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/860e144d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/860e144d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/860e144d Branch: refs/heads/master Commit: 860e144d4ac87a5d19a4572b2ffa7d2f192e4162 Parents: ceac8ab Author: akashrn5 <akashnilu...@gmail.com> Authored: Thu Mar 29 19:59:36 2018 +0530 Committer: QiangCai <qiang...@qq.com> Committed: Wed Apr 18 15:56:44 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 13 ++ .../core/datamap/DataMapStoreManager.java | 8 +- .../core/datamap/IndexDataMapProvider.java | 34 ++-- .../carbondata/core/datamap/TableDataMap.java | 7 + .../core/datamap/dev/DataMapFactory.java | 11 +- .../core/datamap/dev/DataMapWriter.java | 2 +- .../blockletindex/BlockletDataMapFactory.java | 9 +- .../core/metadata/SegmentFileStore.java | 10 +- .../schema/datamap/DataMapClassProvider.java | 17 +- .../metadata/schema/table/DataMapSchema.java | 1 - .../table/DataMapSchemaStorageProvider.java | 3 +- .../table/DiskBasedDMSchemaStorageProvider.java | 27 +++- .../datamap/examples/MinMaxDataWriter.java | 2 +- .../examples/MinMaxIndexDataMapFactory.java | 14 +- .../lucene/LuceneCoarseGrainDataMapFactory.java | 4 + .../lucene/LuceneDataMapDistributable.java | 13 +- .../lucene/LuceneDataMapFactoryBase.java | 93 ++++++++--- .../datamap/lucene/LuceneDataMapWriter.java | 80 +++++++-- .../datamap/lucene/LuceneFineGrainDataMap.java | 20 ++- .../lucene/LuceneFineGrainDataMapFactory.java | 12 +- .../lucene/LuceneCoarseGrainDataMapSuite.scala | 70 -------- .../lucene/LuceneFineGrainDataMapSuite.scala | 148 ----------------- .../lucene/LuceneCoarseGrainDataMapSuite.scala | 74 +++++++++ .../lucene/LuceneFineGrainDataMapSuite.scala | 161 +++++++++++++++++++ .../testsuite/datamap/CGDataMapTestCase.scala | 32 ++-- .../testsuite/datamap/DataMapWriterSuite.scala | 13 +- .../testsuite/datamap/FGDataMapTestCase.scala | 15 +- .../testsuite/datamap/TestDataMapStatus.scala | 15 +- .../TestInsertAndOtherCommandConcurrent.scala | 14 +- .../datamap/CarbonCreateDataMapCommand.scala | 25 ++- .../datamap/CarbonDataMapShowCommand.scala | 16 +- .../CarbonProjectForDeleteCommand.scala | 1 - .../command/table/CarbonDropTableCommand.scala | 7 +- .../sql/execution/strategy/DDLStrategy.scala | 2 - .../datamap/DataMapWriterListener.java | 4 +- .../store/writer/AbstractFactDataWriter.java | 3 +- 36 files changed, 631 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index df995e0..6ab1ce5 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1653,6 +1653,19 @@ public final class CarbonCommonConstants { "carbon.compaction.prefetch.enable"; public static final String CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT = "false"; + /** + * compression mode used by lucene for index writing, this conf will be passed to lucene writer + * while writing index files. + */ + public static final String CARBON_LUCENE_COMPRESSION_MODE = "carbon.lucene.compression.mode"; + + /** + * default lucene index compression mode, in this mode writing speed will be less and speed is + * given priority, another mode is compression mode, where the index size is given importance to + * make it less and not the index writing speed. + */ + public static final String CARBON_LUCENE_COMPRESSION_MODE_DEFAULT = "speed"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 02cf2a5..169cbde 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -252,8 +252,9 @@ public final class DataMapStoreManager { (Class<? extends DataMapFactory>) Class.forName(dataMapSchema.getProviderName()); dataMapFactory = factoryClass.newInstance(); } catch (ClassNotFoundException e) { - throw new MalformedDataMapCommandException( - "DataMap '" + dataMapSchema.getProviderName() + "' not found"); + // try to create DataMapClassProvider instance by taking providerName as short name + dataMapFactory = + IndexDataMapProvider.getDataMapFactoryByShortName(dataMapSchema.getProviderName()); } catch (Throwable e) { throw new MetadataProcessException( "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e); @@ -272,7 +273,7 @@ public final class DataMapStoreManager { tableIndices = new ArrayList<>(); } - dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema); + dataMapFactory.init(table, dataMapSchema); BlockletDetailsFetcher blockletDetailsFetcher; SegmentPropertiesFetcher segmentPropertiesFetcher = null; if (dataMapFactory instanceof BlockletDetailsFetcher) { @@ -348,6 +349,7 @@ public final class DataMapStoreManager { if (tableDataMap != null && dataMapName .equalsIgnoreCase(tableDataMap.getDataMapSchema().getDataMapName())) { tableDataMap.clear(); + tableDataMap.deleteDatamapData(); tableIndices.remove(i); break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java index 02ff1a1..85a4341 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java @@ -24,6 +24,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.MetadataProcessException; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaStorageProvider; @@ -46,10 +47,12 @@ public class IndexDataMapProvider implements DataMapProvider { "Parent table is required to create index datamap"); } ArrayList<RelationIdentifier> relationIdentifiers = new ArrayList<>(); - dataMapSchema.setParentTables(relationIdentifiers); - relationIdentifiers.add( + RelationIdentifier relationIdentifier = new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(), - mainTable.getTableInfo().getFactTable().getTableId())); + mainTable.getTableInfo().getFactTable().getTableId()); + relationIdentifiers.add(relationIdentifier); + dataMapSchema.setRelationIdentifier(relationIdentifier); + dataMapSchema.setParentTables(relationIdentifiers); DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema); DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory); storageProvider.saveSchema(dataMapSchema); @@ -62,7 +65,8 @@ public class IndexDataMapProvider implements DataMapProvider { @Override public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema) throws IOException { - storageProvider.dropSchema(dataMapSchema.getDataMapName()); + storageProvider.dropSchema(dataMapSchema.getDataMapName(), + dataMapSchema.getParentTables().get(0).getTableName(), dataMapSchema.getProviderName()); } @Override @@ -99,25 +103,29 @@ public class IndexDataMapProvider implements DataMapProvider { return dataMapFactory; } - private DataMapFactory getDataMapFactoryByShortName(String providerName) + public static DataMapFactory getDataMapFactoryByShortName(String providerName) throws MalformedDataMapCommandException { + try { + DataMapRegistry.registerDataMap( + DataMapClassProvider.getDataMapProviderOnName(providerName).getClassName(), + DataMapClassProvider.getDataMapProviderOnName(providerName).getShortName()); + } catch (UnsupportedOperationException ex) { + throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex); + } DataMapFactory dataMapFactory; - String className = DataMapRegistry.getDataMapClassName(providerName); + String className = DataMapRegistry.getDataMapClassName(providerName.toLowerCase()); if (className != null) { try { Class<? extends DataMapFactory> datamapClass = - (Class<? extends DataMapFactory>) Class.forName(providerName); + (Class<? extends DataMapFactory>) Class.forName(className); dataMapFactory = datamapClass.newInstance(); } catch (ClassNotFoundException ex) { - throw new MalformedDataMapCommandException( - "DataMap '" + providerName + "' not found", ex); + throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found", ex); } catch (Throwable ex) { - throw new MetadataProcessException( - "failed to create DataMap '" + providerName + "'", ex); + throw new MetadataProcessException("failed to create DataMap '" + providerName + "'", ex); } } else { - throw new MalformedDataMapCommandException( - "DataMap '" + providerName + "' not found"); + throw new MalformedDataMapCommandException("DataMap '" + providerName + "' not found"); } return dataMapFactory; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 6689e3b..8143b1c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -197,6 +197,13 @@ public final class TableDataMap extends OperationEventListener { dataMapFactory.clear(); } + /** + * delete datamap data if any + */ + public void deleteDatamapData() { + dataMapFactory.deleteDatamapData(); + } + public DataMapSchema getDataMapSchema() { return dataMapSchema; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index d27b255..70f2772 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -24,7 +24,7 @@ import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.DataMapMeta; import org.apache.carbondata.core.datamap.Segment; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.events.Event; @@ -35,9 +35,9 @@ import org.apache.carbondata.events.Event; public interface DataMapFactory<T extends DataMap> { /** - * Initialization of Datamap factory with the identifier and datamap name + * Initialization of Datamap factory with the carbonTable and datamap name */ - void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) + void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException; /** @@ -87,4 +87,9 @@ public interface DataMapFactory<T extends DataMap> { * Type of datamap whether it is FG or CG */ DataMapLevel getDataMapType(); + + /** + * delete datamap data if any + */ + void deleteDatamapData(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 6a3ee18..29670a1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -54,7 +54,7 @@ public abstract class DataMapWriter { * * @param blockId file name of the carbondata file */ - public abstract void onBlockStart(String blockId) throws IOException; + public abstract void onBlockStart(String blockId, long taskId) throws IOException; /** * End of block notification http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 2425c4c..4c8ac0c 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -43,6 +43,7 @@ import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -72,8 +73,8 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory private Cache<TableBlockIndexUniqueIdentifier, CoarseGrainDataMap> cache; @Override - public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) { - this.identifier = identifier; + public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) { + this.identifier = carbonTable.getAbsoluteTableIdentifier(); cache = CacheProvider.getInstance() .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP); } @@ -262,6 +263,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return null; } + @Override public void deleteDatamapData() { + + } + @Override public SegmentProperties getSegmentProperties(Segment segment, ReadCommittedScope readCommittedScope) throws IOException { List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, readCommittedScope); http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 5902148..d609c56 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -672,7 +672,7 @@ public class SegmentFileStore { } private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, - Map<String, List<String>> locationMap) { + Map<String, List<String>> locationMap) throws IOException { for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) { Path location = new Path(entry.getKey()).getParent(); if (partitionSpecs != null) { @@ -681,10 +681,10 @@ public class SegmentFileStore { FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString())); } } else { - // delete the segment folder if it is empty - CarbonFile file = FileFactory.getCarbonFile(location.toString()); - if (file.listFiles().length == 0) { - file.delete(); + // delete the segment folder + CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString()); + if (null != segmentPath) { + FileFactory.deleteAllCarbonFilesOfDir(segmentPath); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java index 3934444..4ab400d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java @@ -28,7 +28,8 @@ package org.apache.carbondata.core.metadata.schema.datamap; public enum DataMapClassProvider { PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"), - TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"); + TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"), + LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene"); /** * Fully qualified class name of datamap @@ -64,8 +65,22 @@ public enum DataMapClassProvider { return TIMESERIES; } else if (PREAGGREGATE.isEqual(dataMapClass)) { return PREAGGREGATE; + } else if (LUCENE.isEqual(dataMapClass)) { + return LUCENE; } else { throw new UnsupportedOperationException("Unknown datamap provider/class " + dataMapClass); } } + + public static DataMapClassProvider getDataMapProviderOnName(String dataMapShortname) { + if (TIMESERIES.isEqual(dataMapShortname)) { + return TIMESERIES; + } else if (PREAGGREGATE.isEqual(dataMapShortname)) { + return PREAGGREGATE; + } else if (LUCENE.isEqual(dataMapShortname)) { + return LUCENE; + } else { + throw new UnsupportedOperationException("Unknown datamap provider" + dataMapShortname); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java index 6b592fb..d9f83b3 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java @@ -70,7 +70,6 @@ public class DataMapSchema implements Serializable, Writable { */ protected TableSchema childSchema; - public DataMapSchema(String dataMapName, String providerName) { this.dataMapName = dataMapName; this.providerName = providerName; http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java index 6b9bca5..ed13201 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaStorageProvider.java @@ -63,6 +63,7 @@ public interface DataMapSchemaStorageProvider { * Drop the schema from the storage by using dataMapName. * @param dataMapName */ - void dropSchema(String dataMapName) throws IOException; + void dropSchema(String dataMapName, String tableName, String dataMapProviderName) + throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java index d49a9ae..9e34131 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java @@ -51,9 +51,8 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro BufferedWriter brWriter = null; DataOutputStream dataOutputStream = null; Gson gsonObjectToWrite = new Gson(); - String schemaPath = - storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapSchema.getDataMapName() - + ".dmschema"; + String schemaPath = getSchemaPath(storePath, dataMapSchema.getDataMapName(), + dataMapSchema.relationIdentifier.getTableName(), dataMapSchema.getProviderName()); FileFactory.FileType fileType = FileFactory.getFileType(schemaPath); if (FileFactory.isFileExist(schemaPath, fileType)) { throw new IOException( @@ -129,9 +128,9 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro return dataMapSchemas; } - @Override public void dropSchema(String dataMapName) throws IOException { - String schemaPath = - storePath + CarbonCommonConstants.FILE_SEPARATOR + dataMapName + ".dmschema"; + @Override public void dropSchema(String dataMapName, String tableName, String dataMapProviderName) + throws IOException { + String schemaPath = getSchemaPath(storePath, dataMapName, tableName, dataMapProviderName); if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { throw new IOException("DataMap with name " + dataMapName + " does not exists in storage"); } @@ -140,4 +139,20 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro throw new IOException("DataMap with name " + dataMapName + " cannot be deleted"); } } + + /** + * it returns the schema path for the datamap + * @param storePath + * @param dataMapName + * @param tableName + * @param dataMapProviderName + * @return + */ + public static String getSchemaPath(String storePath, String dataMapName, String tableName, + String dataMapProviderName) { + String schemaPath = storePath + CarbonCommonConstants.FILE_SEPARATOR + tableName + + CarbonCommonConstants.UNDERSCORE + dataMapName + CarbonCommonConstants.UNDERSCORE + + dataMapProviderName + ".dmschema"; + return schemaPath; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index d2dbaa5..e68b481 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -90,7 +90,7 @@ public class MinMaxDataWriter extends DataMapWriter { } } - @Override public void onBlockStart(String blockId) { + @Override public void onBlockStart(String blockId, long taskId) { blockMinMaxMap = new HashMap<Integer, BlockletMinMax>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java index 758a67c..01aaffa 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java @@ -58,17 +58,11 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { // this is an example for datamap, we can choose the columns and operations that // will be supported by this datamap. Furthermore, we can add cache-support for this datamap. - @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) + @Override public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException { - this.identifier = identifier; + this.identifier = carbonTable.getAbsoluteTableIdentifier(); this.dataMapName = dataMapSchema.getDataMapName(); - String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName); - if (null == carbonTable) { - throw new IOException("Failed to get carbon table with name " + tableUniqueName); - } - // columns that will be indexed List<CarbonColumn> allColumns = carbonTable.getCreateOrderColumn(identifier.getTableName()); List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new Transformer() { @@ -163,4 +157,8 @@ public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory { @Override public DataMapMeta getMeta() { return this.dataMapMeta; } + + @Override public void deleteDatamapData() { + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java index 7308841..f63656b 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java @@ -75,4 +75,8 @@ public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<Co return DataMapLevel.CG; } + @Override public void deleteDatamapData() { + + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java index 19e4035..1d47ee8 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java @@ -26,11 +26,22 @@ class LuceneDataMapDistributable extends DataMapDistributable { // TODO: seems no one use this? private String dataPath; - LuceneDataMapDistributable(String dataPath) { + private String indexPath; + + LuceneDataMapDistributable(String dataPath, String indexPath) { this.dataPath = dataPath; + this.indexPath = indexPath; } public String getDataPath() { return dataPath; } + + public String getIndexPath() { + return indexPath; + } + + public void setIndexPath(String indexPath) { + this.indexPath = indexPath; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java index ab4e9ee..672880f 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -17,6 +17,7 @@ package org.apache.carbondata.datamap.lucene; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -32,13 +33,17 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -79,29 +84,21 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac */ AbsoluteTableIdentifier tableIdentifier = null; + /** + * indexed carbon columns for lucene + */ + List<String> indexedCarbonColumns = null; + + @Override - public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) + public void init(CarbonTable carbonTable, DataMapSchema dataMapSchema) throws IOException, MalformedDataMapCommandException { - Objects.requireNonNull(identifier); + Objects.requireNonNull(carbonTable.getAbsoluteTableIdentifier()); Objects.requireNonNull(dataMapSchema); - this.tableIdentifier = identifier; + this.tableIdentifier = carbonTable.getAbsoluteTableIdentifier(); this.dataMapName = dataMapSchema.getDataMapName(); - // get carbonmetadata from carbonmetadata instance - CarbonMetadata carbonMetadata = CarbonMetadata.getInstance(); - - String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); - - // get carbon table - CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName); - if (carbonTable == null) { - String errorMessage = - String.format("failed to get carbon table with name %s", tableUniqueName); - LOGGER.error(errorMessage); - throw new IOException(errorMessage); - } - // validate DataMapSchema and get index columns List<String> indexedColumns = validateAndGetIndexedColumns(dataMapSchema, carbonTable); @@ -151,7 +148,7 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac } } } - List<String> textColumnList = new ArrayList<String>(textColumns.length); + indexedCarbonColumns = new ArrayList<>(textColumns.length); for (int i = 0; i < textColumns.length; i++) { CarbonColumn column = carbonTable.getColumnByName(carbonTable.getTableName(), textColumns[i]); if (null == column) { @@ -161,10 +158,41 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac throw new MalformedDataMapCommandException( "TEXT_COLUMNS only supports String column. " + "Unsupported column: " + textColumns[i] + ", DataType: " + column.getDataType()); + } else if (column.getEncoder().contains(Encoding.DICTIONARY)) { + throw new MalformedDataMapCommandException( + "TEXT_COLUMNS cannot contain dictionary column " + column.getColName()); + } + indexedCarbonColumns.add(column.getColName()); + } + return indexedCarbonColumns; + } + + /** + * this method will delete the datamap folders during drop datamap + * @throws MalformedDataMapCommandException + */ + private void deleteDatamap() throws MalformedDataMapCommandException { + SegmentStatusManager ssm = new SegmentStatusManager(tableIdentifier); + try { + List<Segment> validSegments = ssm.getValidAndInvalidSegments().getValidSegments(); + for (Segment segment : validSegments) { + String segmentId = segment.getSegmentNo(); + String datamapPath = + CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId) + + File.separator + dataMapName; + if (FileFactory.isFileExist(datamapPath)) { + CarbonFile file = + FileFactory.getCarbonFile(datamapPath, FileFactory.getFileType(datamapPath)); + CarbonUtil.deleteFoldersAndFilesSilent(file); + } } - textColumnList.add(column.getColName()); + } catch (IOException ex) { + throw new MalformedDataMapCommandException( + "drop datamap failed, failed to delete datamap directory"); + } catch (InterruptedException ex) { + throw new MalformedDataMapCommandException( + "drop datamap failed, failed to delete datamap directory"); } - return textColumnList; } /** @@ -173,8 +201,8 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac @Override public DataMapWriter createWriter(Segment segment, String writeDirectoryPath) { LOGGER.info("lucene data write to " + writeDirectoryPath); - return new LuceneDataMapWriter( - tableIdentifier, dataMapName, segment, writeDirectoryPath, true); + return new LuceneDataMapWriter(tableIdentifier, dataMapName, segment, writeDirectoryPath, true, + indexedCarbonColumns); } /** @@ -183,9 +211,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac @Override public List<DataMapDistributable> toDistributable(Segment segment) { List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>(); - DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( - CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo())); - lstDataMapDistribute.add(luceneDataMapDistributable); + CarbonFile[] indexDirs = LuceneDataMapWriter + .getAllIndexDirs(tableIdentifier.getTablePath(), segment.getSegmentNo(), dataMapName); + for (CarbonFile indexDir : indexDirs) { + DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( + CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segment.getSegmentNo()), + indexDir.getAbsolutePath()); + lstDataMapDistribute.add(luceneDataMapDistributable); + } return lstDataMapDistribute; } @@ -210,6 +243,14 @@ abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFac } + @Override public void deleteDatamapData() { + try { + deleteDatamap(); + } catch (MalformedDataMapCommandException ex) { + LOGGER.error(ex, "failed to delete datamap directory "); + } + } + /** * Return metadata of this datamap */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java index 4286e5a..83454b3 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -20,23 +20,30 @@ package org.apache.carbondata.datamap.lucene; import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; +import org.apache.lucene.codecs.lucene62.Lucene62Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; @@ -77,6 +84,8 @@ public class LuceneDataMapWriter extends DataMapWriter { private boolean isFineGrain = true; + private List<String> indexedCarbonColumns = null; + private static final String BLOCKID_NAME = "blockId"; private static final String BLOCKLETID_NAME = "blockletId"; @@ -86,30 +95,31 @@ public class LuceneDataMapWriter extends DataMapWriter { private static final String ROWID_NAME = "rowId"; LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, Segment segment, - String writeDirectoryPath, boolean isFineGrain) { + String writeDirectoryPath, boolean isFineGrain, List<String> indexedCarbonColumns) { super(identifier, segment, writeDirectoryPath); this.dataMapName = dataMapName; this.isFineGrain = isFineGrain; + this.indexedCarbonColumns = indexedCarbonColumns; } - private String getIndexPath() { + private String getIndexPath(long taskId) { if (isFineGrain) { - return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName); + return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId); } else { // TODO: where write data in coarse grain data map - return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName); + return genDataMapStorePathOnTaskId(identifier.getTablePath(), segmentId, dataMapName, taskId); } } /** * Start of new block notification. */ - public void onBlockStart(String blockId) throws IOException { + public void onBlockStart(String blockId, long taskId) throws IOException { // save this block id for lucene index , used in onPageAdd function this.blockId = blockId; // get index path, put index data into segment's path - String strIndexPath = getIndexPath(); + String strIndexPath = getIndexPath(taskId); Path indexPath = FileFactory.getPath(strIndexPath); FileSystem fs = FileFactory.getFileSystem(indexPath); @@ -124,6 +134,18 @@ public class LuceneDataMapWriter extends DataMapWriter { // create a index writer Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); + + IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE, + CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT) + .equalsIgnoreCase(CarbonCommonConstants.CARBON_LUCENE_COMPRESSION_MODE_DEFAULT)) { + indexWriterConfig.setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_SPEED)); + } else { + indexWriterConfig + .setCodec(new Lucene62Codec(Lucene50StoredFieldsFormat.Mode.BEST_COMPRESSION)); + } + indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); } @@ -202,8 +224,10 @@ public class LuceneDataMapWriter extends DataMapWriter { // add other fields for (int colIdx = 0; colIdx < columnsCount; colIdx++) { - if (!pages[colIdx].getNullBits().get(rowId)) { - addField(doc, pages[colIdx], rowId, Field.Store.NO); + if (indexedCarbonColumns.contains(pages[colIdx].getColumnSpec().getFieldName())) { + if (!pages[colIdx].getNullBits().get(rowId)) { + addField(doc, pages[colIdx], rowId, Field.Store.NO); + } } } @@ -226,7 +250,7 @@ public class LuceneDataMapWriter extends DataMapWriter { String fieldName = page.getColumnSpec().getFieldName(); //get field type - DataType type = page.getDataType(); + DataType type = page.getColumnSpec().getSchemaDataType(); if (type == DataTypes.BYTE) { // byte type , use int range to deal with byte, lucene has no byte type @@ -322,8 +346,44 @@ public class LuceneDataMapWriter extends DataMapWriter { /** * Return store path for datamap */ - static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) { + public static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) { return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName; } + /** + * Return store path for datamap based on the taskId, if three tasks get launched during loading, + * then three folders will be created based on the three task Ids and lucene index file will be + * written into those folders + * @return store path based on taskID + */ + private static String genDataMapStorePathOnTaskId(String tablePath, String segmentId, + String dataMapName, long taskId) { + return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName + + File.separator + dataMapName + CarbonCommonConstants.UNDERSCORE + taskId + + CarbonCommonConstants.UNDERSCORE + System.currentTimeMillis(); + } + + /** + * returns all the directories of lucene index files for query + * @param tablePath + * @param segmentId + * @param dataMapName + * @return + */ + public static CarbonFile[] getAllIndexDirs(String tablePath, String segmentId, + final String dataMapName) { + String dmPath = + CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName; + FileFactory.FileType fileType = FileFactory.getFileType(dmPath); + final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath, fileType); + return dirPath.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + if (file.isDirectory() && file.getName().startsWith(dataMapName)) { + return true; + } else { + return false; + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java index faa4cbe..3caefd2 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java @@ -67,14 +67,14 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName()); /** - * searcher object for this datamap + * index Reader object to create searcher object */ - private IndexSearcher indexSearcher = null; + private IndexReader indexReader = null; /** - * default max values to return + * searcher object for this datamap */ - private static int MAX_RESULT_NUMBER = 100; + private IndexSearcher indexSearcher = null; /** * analyzer for lucene index @@ -113,7 +113,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // open this index path , use HDFS default configuration Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); - IndexReader indexReader = DirectoryReader.open(indexDir); + indexReader = DirectoryReader.open(indexDir); if (indexReader == null) { throw new RuntimeException("failed to create index reader object"); } @@ -153,6 +153,10 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // only for test , query all data String strQuery = getQueryString(filterExp.getFilterExpression()); + if (null == strQuery) { + return null; + } + String[] sFields = new String[fields.size()]; fields.toArray(sFields); @@ -163,9 +167,11 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // use MultiFieldQueryParser to parser query QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer); + queryParser.setAllowLeadingWildcard(true); Query query; try { - query = queryParser.parse(strQuery); + // always send lowercase string to lucene as it is case sensitive + query = queryParser.parse(strQuery.toLowerCase()); } catch (ParseException e) { String errorMessage = String.format( "failed to filter block with query %s, detail is %s", strQuery, e.getMessage()); @@ -176,7 +182,7 @@ public class LuceneFineGrainDataMap extends FineGrainDataMap { // execute index search TopDocs result; try { - result = indexSearcher.search(query, MAX_RESULT_NUMBER); + result = indexSearcher.search(query, indexReader.maxDoc()); } catch (IOException e) { String errorMessage = String.format("failed to search lucene data, detail is %s", e.getMessage()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java index 23c9928..151e674 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java @@ -62,7 +62,17 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable, ReadCommittedScope readCommittedScope) throws IOException { - return getDataMaps(distributable.getSegment(), readCommittedScope); + List<FineGrainDataMap> lstDataMap = new ArrayList<>(); + FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer); + String indexPath = ((LuceneDataMapDistributable) distributable).getIndexPath(); + try { + dataMap.init(new DataMapModel(indexPath)); + } catch (MemoryException e) { + LOGGER.error(String.format("failed to get lucene datamap , detail is %s", e.getMessage())); + return lstDataMap; + } + lstDataMap.add(dataMap); + return lstDataMap; } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala deleted file mode 100644 index f128afe..0000000 --- a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.lucene - -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/datamap_input.csv" - - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 15000 - LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - } - - test("test lucene coarse grain data map") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test - | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}' - | DMProperties('TEXT_COLUMNS'='name,city') - """.stripMargin) - - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - - checkAnswer(sql("select * from datamap_test where name='n502670'"), - sql("select * from normal_test where name='n502670'")) - } - - override protected def afterAll(): Unit = { - LuceneFineGrainDataMapSuite.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test") - } - -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala deleted file mode 100644 index bfcfa67..0000000 --- a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.lucene - -import java.io.{File, PrintWriter} - -import scala.util.Random - -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException - -class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/datamap_input.csv" - - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 15000 - LuceneFineGrainDataMapSuite.createFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'carbondata' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - } - - test("validate TEXT_COLUMNS DataMap property") { - // require TEXT_COLUMNS - var exception = intercept[MalformedDataMapCommandException](sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - """.stripMargin)) - - assertResult("Lucene DataMap require proper TEXT_COLUMNS property.")(exception.getMessage) - - // illegal argumnet. - exception = intercept[MalformedDataMapCommandException](sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - | DMProperties('text_COLUMNS'='name, ') - """.stripMargin)) - - assertResult("TEXT_COLUMNS contains illegal argument.")(exception.getMessage) - - // not exists - exception = intercept[MalformedDataMapCommandException](sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - | DMProperties('text_COLUMNS'='city,school') - """.stripMargin)) - - assertResult("TEXT_COLUMNS: school does not exist in table. Please check create DataMap statement.")(exception.getMessage) - - // duplicate columns - exception = intercept[MalformedDataMapCommandException](sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - | DMProperties('text_COLUMNS'='name,city,name') - """.stripMargin)) - - assertResult("TEXT_COLUMNS has duplicate columns :name")(exception.getMessage) - - // only support String DataType - exception = intercept[MalformedDataMapCommandException](sql( - s""" - | CREATE DATAMAP dm1 ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - | DMProperties('text_COLUMNS'='city,id') - """.stripMargin)) - - assertResult("TEXT_COLUMNS only supports String column. Unsupported column: id, DataType: INT")(exception.getMessage) - } - - test("test lucene fine grain data map") { - sql( - s""" - | CREATE DATAMAP dm ON TABLE datamap_test - | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' - | DMProperties('TEXT_COLUMNS'='Name , cIty') - """.stripMargin) - - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - - // sql("select * from normal_test where name='n34000'").show - checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'")) -// checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 'n10%'")) - checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'")) - - // checkAnswer( - // sql("select * from datamap_test where match('name:n34000')"), - // sql("select * from normal_test where name='n34000'")) - } - - override protected def afterAll(): Unit = { - LuceneFineGrainDataMapSuite.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test") - } -} - -object LuceneFineGrainDataMapSuite { - def createFile(fileName: String, line: Int = 10000, start: Int = 0) = { - val write = new PrintWriter(new File(fileName)) - for (i <- start until (start + line)) { - write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80)) - } - write.close() - } - - def deleteFile(fileName: String): Unit = { - val file = new File(fileName) - if (file.exists()) { - file.delete() - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala new file mode 100644 index 0000000..245d147 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, Ignore} + +/** + * Ignored test class as CG datamap is not supported yet + */ +@Ignore +class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/datamap_input.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 15000 + LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test lucene coarse grain data map") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('TEXT_COLUMNS'='name,city') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + + checkAnswer(sql("select * from datamap_test where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + override protected def afterAll(): Unit = { + LuceneFineGrainDataMapSuite.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } + +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala new file mode 100644 index 0000000..c5ea2c7 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene + +import java.io.{File, PrintWriter} + +import scala.util.Random + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/datamap_input.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 15000 + LuceneFineGrainDataMapSuite.createFile(file2) + sql("create database if not exists lucene") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, + CarbonEnv.getDatabaseLocation("lucene", sqlContext.sparkSession)) + sql("use lucene") + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + } + + test("validate TEXT_COLUMNS DataMap property") { + // require TEXT_COLUMNS + var exception = intercept[MalformedDataMapCommandException](sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test + | USING 'lucene' + """.stripMargin)) + + assertResult("Lucene DataMap require proper TEXT_COLUMNS property.")(exception.getMessage) + + // illegal argumnet. + exception = intercept[MalformedDataMapCommandException](sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test + | USING 'lucene' + | DMProperties('text_COLUMNS'='name, ') + """.stripMargin)) + + assertResult("TEXT_COLUMNS contains illegal argument.")(exception.getMessage) + + // not exists + exception = intercept[MalformedDataMapCommandException](sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test + | USING 'lucene' + | DMProperties('text_COLUMNS'='city,school') + """.stripMargin)) + + assertResult("TEXT_COLUMNS: school does not exist in table. Please check create DataMap statement.")(exception.getMessage) + + // duplicate columns + exception = intercept[MalformedDataMapCommandException](sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test + | USING 'lucene' + | DMProperties('text_COLUMNS'='name,city,name') + """.stripMargin)) + + assertResult("TEXT_COLUMNS has duplicate columns :name")(exception.getMessage) + + // only support String DataType + exception = intercept[MalformedDataMapCommandException](sql( + s""" + | CREATE DATAMAP dm1 ON TABLE datamap_test + | USING 'lucene' + | DMProperties('text_COLUMNS'='city,id') + """.stripMargin)) + + assertResult("TEXT_COLUMNS only supports String column. Unsupported column: id, DataType: INT")(exception.getMessage) + } + + test("test lucene fine grain data map") { + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'lucene' + | DMProperties('TEXT_COLUMNS'='Name , cIty') + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + + // sql("select * from normal_test where name='n34000'").show + checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')"), sql(s"select * from datamap_test where name='n10'")) +// checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')"), sql(s"SELECT * FROM datamap_test WHERE name like 'n10%'")) + checkAnswer(sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')"), sql(s"SELECT * FROM datamap_test WHERE city='c020'")) + + // checkAnswer( + // sql("select * from datamap_test where match('name:n34000')"), + // sql("select * from normal_test where name='n34000'")) + } + + override protected def afterAll(): Unit = { + LuceneFineGrainDataMapSuite.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + sql("use default") + sql("drop database if exists lucene cascade") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_SYSTEM_FOLDER_LOCATION, + CarbonProperties.getStorePath) + } +} + +object LuceneFineGrainDataMapSuite { + def createFile(fileName: String, line: Int = 10000, start: Int = 0) = { + val write = new PrintWriter(new File(fileName)) + for (i <- start until (start + line)) { + write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80)) + } + write.close() + } + + def deleteFile(fileName: String): Unit = { + val file = new File(fileName) + if (file.exists()) { + file.delete() + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 0c96247..20e01ff 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -36,7 +36,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, DiskBasedDMSchemaStorageProvider} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.expression.Expression @@ -55,8 +55,8 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { /** * Initialization of Datamap factory with the identifier and datamap name */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier + override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = { + this.identifier = carbonTable.getAbsoluteTableIdentifier this.dataMapSchema = dataMapSchema } @@ -143,6 +143,12 @@ class CGDataMapFactory extends CoarseGrainDataMapFactory { new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, List(ExpressionType.EQUALS, ExpressionType.IN).asJava) } + + /** + * delete datamap data if any + */ + override def deleteDatamapData(): Unit = { + } } class CGDataMap extends CoarseGrainDataMap { @@ -243,7 +249,7 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, taskId: Long): Unit = { currentBlockId = blockId } @@ -326,6 +332,8 @@ class CGDataMapWriter(identifier: AbsoluteTableIdentifier, class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { val file2 = resourcesPath + "/compaction/fil2.csv" + val systemFolderStoreLocation = CarbonProperties.getInstance().getSystemFolderLocation + override protected def beforeAll(): Unit = { //n should be about 5000000 of reset if size is default 1024 val n = 150000 @@ -384,9 +392,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') """.stripMargin) - sql(s"create datamap test_cg_datamap on table datamap_store_test using '${classOf[CGDataMapFactory].getName}' as select id, name from datamap_store_test") + val dataMapProvider = classOf[CGDataMapFactory].getName + sql(s"create datamap test_cg_datamap on table datamap_store_test using '$dataMapProvider' as select id, name from datamap_store_test") - val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap.dmschema" + val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap", "datamap_store_test", dataMapProvider) assert(FileFactory.isFileExist(loc)) } @@ -400,9 +409,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') """.stripMargin) - sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '${classOf[CGDataMapFactory].getName}' as select id, name from datamap_store_test") + val dataMapProvider = classOf[CGDataMapFactory].getName + sql(s"create datamap test_cg_datamap1 on table datamap_store_test1 using '$dataMapProvider' as select id, name from datamap_store_test") - val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap1.dmschema" + val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation, "test_cg_datamap1", "datamap_store_test1", dataMapProvider) assert(FileFactory.isFileExist(loc)) @@ -420,9 +430,10 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') """.stripMargin) - sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '${classOf[CGDataMapFactory].getName}' as select id, name from datamap_store_test") + val dataMapProvider = classOf[CGDataMapFactory].getName + sql(s"create datamap test_cg_datamap2 on table datamap_store_test2 using '$dataMapProvider' as select id, name from datamap_store_test") - val loc = CarbonProperties.getInstance().getSystemFolderLocation + "/test_cg_datamap2.dmschema" + val loc = DiskBasedDMSchemaStorageProvider.getSchemaPath(systemFolderStoreLocation,"test_cg_datamap2", "datamap_store_test2", dataMapProvider) assert(FileFactory.isFileExist(loc)) @@ -442,4 +453,5 @@ class CGDataMapTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS datamap_store_test1") sql("DROP TABLE IF EXISTS datamap_store_test2") } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index 270c676..5fa5209 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -43,9 +44,9 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory { var identifier: AbsoluteTableIdentifier = _ - override def init(identifier: AbsoluteTableIdentifier, + override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier + this.identifier = carbonTable.getAbsoluteTableIdentifier } override def fireEvent(event: Event): Unit = ??? @@ -72,6 +73,12 @@ class C2DataMapFactory() extends CoarseGrainDataMapFactory { ??? } + /** + * delete datamap data if any + */ + override def deleteDatamapData(): Unit = { + ??? + } } class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { @@ -203,7 +210,7 @@ object DataMapWriterSuite { * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String) = { + override def onBlockStart(blockId: String, taskId: Long) = { callbackSeq :+= s"block start $blockId" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index 060d06a..551f9e1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -38,7 +38,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec} import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} import org.apache.carbondata.core.readcommitter.ReadCommittedScope import org.apache.carbondata.core.scan.expression.Expression @@ -57,8 +57,8 @@ class FGDataMapFactory extends FineGrainDataMapFactory { /** * Initialization of Datamap factory with the identifier and datamap name */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier + override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = { + this.identifier = carbonTable.getAbsoluteTableIdentifier this.dataMapSchema = dataMapSchema } @@ -141,6 +141,13 @@ class FGDataMapFactory extends FineGrainDataMapFactory { new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, List(ExpressionType.EQUALS, ExpressionType.IN).asJava) } + + /** + * delete datamap data if any + */ + override def deleteDatamapData(): Unit = { + ??? + } } class FGDataMap extends FineGrainDataMap { @@ -271,7 +278,7 @@ class FGDataMapWriter(identifier: AbsoluteTableIdentifier, * * @param blockId file name of the carbondata file */ - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, taskId: Long): Unit = { currentBlockId = blockId } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index be9cc51..280c20d 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.DataMapSchema import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -204,7 +205,7 @@ class TestDataMap() extends CoarseGrainDataMapFactory { override def onBlockletStart(blockletId: Int): Unit = { } - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, taskId: Long): Unit = { // trigger the second SQL to execute } @@ -218,8 +219,14 @@ class TestDataMap() extends CoarseGrainDataMapFactory { override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ??? - override def init(identifier: AbsoluteTableIdentifier, - dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier + override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = { + this.identifier = carbonTable.getAbsoluteTableIdentifier + } + + /** + * delete datamap data if any + */ + override def deleteDatamapData(): Unit = { + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 86f0f10..5c9709c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -36,6 +36,7 @@ import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, RelationIdentifier} import org.apache.carbondata.core.readcommitter.ReadCommittedScope +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, RelationIdentifier} import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event @@ -306,7 +307,7 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory { override def onBlockletStart(blockletId: Int): Unit = { } - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, taskId: Long): Unit = { // trigger the second SQL to execute Global.overwriteRunning = true @@ -324,8 +325,15 @@ class WaitingDataMap() extends CoarseGrainDataMapFactory { override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = ??? - override def init(identifier: AbsoluteTableIdentifier, + override def init(carbonTable: CarbonTable, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier + this.identifier = carbonTable.getAbsoluteTableIdentifier + } + + /** + * delete datamap data if any + */ + override def deleteDatamapData(): Unit = { + } } \ No newline at end of file