This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new f906eef [CARBONDATA-3658] Prune and Cache only Matched partitioned segments for filter on Partitioned table f906eef is described below commit f906eef03238a070f0f2eb58a3fca37b24d64643 Author: Indhumathi27 <indhumathi...@gmail.com> AuthorDate: Wed Jan 8 19:57:08 2020 +0530 [CARBONDATA-3658] Prune and Cache only Matched partitioned segments for filter on Partitioned table Why is this PR needed? For filter on parition column, we are currently loading all index files to cache. Since VM's in cloud scenarios, does not have much memory, all index files will be too big to store in driver. What changes were proposed in this PR? For filter on partition column, load only index for matched segments. Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3568 --- .../carbondata/core/datamap/TableDataMap.java | 17 ++++- .../core/datamap/dev/DataMapFactory.java | 20 ++++++ .../core/indexstore/SegmentPropertiesFetcher.java | 6 +- .../blockletindex/BlockletDataMapFactory.java | 72 +++++++++++++++++----- .../bloom/BloomCoarseGrainDataMapFactory.java | 7 +++ .../lucene/LuceneFineGrainDataMapFactory.java | 7 +++ .../testsuite/datamap/CGDataMapTestCase.scala | 8 +++ .../testsuite/datamap/DataMapWriterSuite.scala | 9 +++ .../testsuite/datamap/FGDataMapTestCase.scala | 8 +++ .../testsuite/datamap/TestDataMapStatus.scala | 9 +++ .../iud/TestInsertAndOtherCommandConcurrent.scala | 9 +++ .../sql/commands/TestCarbonShowCacheCommand.scala | 72 ++++++++++++++++++++++ 12 files changed, 225 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 5f097e3..7ff1645 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -115,11 +115,21 @@ public final class TableDataMap extends OperationEventListener { final List<PartitionSpec> partitions) throws IOException { final List<ExtendedBlocklet> blocklets = new ArrayList<>(); List<Segment> segments = getCarbonSegments(allsegments); - final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments); + final Map<Segment, List<DataMap>> dataMaps; + if (filter == null || filter.isEmpty() || partitions == null || partitions.isEmpty()) { + dataMaps = dataMapFactory.getDataMaps(segments); + } else { + dataMaps = dataMapFactory.getDataMaps(segments, partitions); + } // for non-filter queries // for filter queries int totalFiles = 0; int datamapsCount = 0; + // In case if filter has matched partitions, then update the segments with datamap's + // segment list, as getDataMaps will return segments that matches the partition. + if (null != partitions && !partitions.isEmpty()) { + segments = new ArrayList<>(dataMaps.keySet()); + } for (Segment segment : segments) { for (DataMap dataMap: dataMaps.get(segment)) { totalFiles += dataMap.getNumberOfEntries(); @@ -171,7 +181,8 @@ public final class TableDataMap extends OperationEventListener { Map<Segment, List<DataMap>> dataMaps) throws IOException { for (Segment segment : segments) { List<Blocklet> pruneBlocklets = new ArrayList<>(); - SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); + SegmentProperties segmentProperties = + segmentPropertiesFetcher.getSegmentProperties(segment, partitions); if (filter.isResolvedOnSegment(segmentProperties)) { for (DataMap dataMap : dataMaps.get(segment)) { pruneBlocklets.addAll( @@ -391,7 +402,7 @@ public final class TableDataMap extends OperationEventListener { List<Blocklet> blocklets = new ArrayList<>(); for (DataMap dataMap : dataMaps) { blocklets.addAll(dataMap.prune(filterExp, - segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()), + segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment(), partitions), partitions)); } BlockletSerializer serializer = new BlockletSerializer(); diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index f4e9dd9..0895459 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.features.TableOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -91,11 +92,30 @@ public abstract class DataMapFactory<T extends DataMap> { } /** + * Get the datamap for all segments with matched partitions. Load datamaps to cache, only if it + * matches the partition. + */ + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, + List<PartitionSpec> partitionSpecs) throws IOException { + Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); + for (Segment segment : segments) { + dataMaps.put(segment, (List<CoarseGrainDataMap>) this.getDataMaps(segment, partitionSpecs)); + } + return dataMaps; + } + + /** * Get the datamap for segmentId */ public abstract List<T> getDataMaps(Segment segment) throws IOException; /** + * Get the datamap for segmentId with matched partitions + */ + public abstract List<T> getDataMaps(Segment segment, List<PartitionSpec> partitions) + throws IOException; + + /** * Get datamaps for distributable object. */ public abstract List<T> getDataMaps(DataMapDistributable distributable) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java index 03f8a1d..04bf922 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java @@ -18,6 +18,7 @@ package org.apache.carbondata.core.indexstore; import java.io.IOException; +import java.util.List; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; @@ -30,12 +31,15 @@ public interface SegmentPropertiesFetcher { /** * get the Segment properties based on the SegmentID. - * @param segmentId + * @param segment * @return * @throws IOException */ SegmentProperties getSegmentProperties(Segment segment) throws IOException; + SegmentProperties getSegmentProperties(Segment segment, List<PartitionSpec> partitionSpecs) + throws IOException; + SegmentProperties getSegmentPropertiesFromDataMap(DataMap coarseGrainDataMap) throws IOException; } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index c2c3647..eb840dd 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -124,6 +124,14 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory */ public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments) throws IOException { + return getDataMaps(segments, null); + } + + /** + * Get the datamap for all segments + */ + public Map<Segment, List<CoarseGrainDataMap>> getDataMaps(List<Segment> segments, + List<PartitionSpec> partitionsToPrune) throws IOException { List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = new ArrayList<>(); Map<Segment, List<CoarseGrainDataMap>> dataMaps = new HashMap<>(); @@ -132,12 +140,9 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory segmentMap.put(segment.getSegmentNo(), segment); Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); - - for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { - tableBlockIndexUniqueIdentifierWrappers.add( - new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, - this.getCarbonTable())); - } + // get tableBlockIndexUniqueIdentifierWrappers from segment file info + getTableBlockUniqueIdentifierWrappers(partitionsToPrune, + tableBlockIndexUniqueIdentifierWrappers, identifiers); } List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = cache.getAll(tableBlockIndexUniqueIdentifierWrappers); @@ -153,18 +158,49 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory return dataMaps; } + /** + * get tableBlockUniqueIdentifierWrappers from segment info. If partitionsToPrune is defined, + * then get tableBlockUniqueIdentifierWrappers for the matched partitions. + */ + private void getTableBlockUniqueIdentifierWrappers(List<PartitionSpec> partitionsToPrune, + List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers, + Set<TableBlockIndexUniqueIdentifier> identifiers) { + for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { + if (null != partitionsToPrune && !partitionsToPrune.isEmpty()) { + // add only tableBlockUniqueIdentifier that matches the partition + // get the indexFile Parent path and compare with the PartitionPath, if matches, then add + // the corresponding tableBlockIndexUniqueIdentifier for pruning + for (PartitionSpec partitionSpec : partitionsToPrune) { + if (partitionSpec.getLocation().toString() + .equalsIgnoreCase(tableBlockIndexUniqueIdentifier.getIndexFilePath())) { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + } + } else { + tableBlockIndexUniqueIdentifierWrappers.add( + new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, + this.getCarbonTable())); + } + } + } + @Override public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws IOException { + return getDataMaps(segment, null); + } + + @Override + public List<CoarseGrainDataMap> getDataMaps(Segment segment, + List<PartitionSpec> partitionsToPrune) throws IOException { List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); Set<TableBlockIndexUniqueIdentifier> identifiers = getTableBlockIndexUniqueIdentifiers(segment); List<TableBlockIndexUniqueIdentifierWrapper> tableBlockIndexUniqueIdentifierWrappers = new ArrayList<>(identifiers.size()); - for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier : identifiers) { - tableBlockIndexUniqueIdentifierWrappers.add( - new TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier, - this.getCarbonTable())); - } + getTableBlockUniqueIdentifierWrappers(partitionsToPrune, + tableBlockIndexUniqueIdentifierWrappers, identifiers); List<BlockletDataMapIndexWrapper> blockletDataMapIndexWrappers = cache.getAll(tableBlockIndexUniqueIdentifierWrappers); for (BlockletDataMapIndexWrapper wrapper : blockletDataMapIndexWrappers) { @@ -429,7 +465,13 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException { - List<CoarseGrainDataMap> dataMaps = getDataMaps(segment); + return getSegmentProperties(segment, null); + } + + @Override + public SegmentProperties getSegmentProperties(Segment segment, List<PartitionSpec> partitions) + throws IOException { + List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, partitions); assert (dataMaps.size() > 0); CoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0); assert (coarseGrainDataMap instanceof BlockDataMap); @@ -449,10 +491,10 @@ public class BlockletDataMapFactory extends CoarseGrainDataMapFactory public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException { List<Blocklet> blocklets = new ArrayList<>(); - List<CoarseGrainDataMap> dataMaps = getDataMaps(segment); + List<CoarseGrainDataMap> dataMaps = getDataMaps(segment, partitions); for (CoarseGrainDataMap dataMap : dataMaps) { - blocklets.addAll( - dataMap.prune((FilterResolverIntf) null, getSegmentProperties(segment), partitions)); + blocklets.addAll(dataMap + .prune((FilterResolverIntf) null, getSegmentProperties(segment, partitions), partitions)); } return blocklets; } diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 4cb5385..432df4c 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -46,6 +46,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; @@ -283,6 +284,12 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa } @Override + public List<CoarseGrainDataMap> getDataMaps(Segment segment, List<PartitionSpec> partitionSpecs) + throws IOException { + return getDataMaps(segment); + } + + @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) throws IOException { List<CoarseGrainDataMap> dataMaps = new ArrayList<>(); diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java index 27151fb..e6c6010 100644 --- a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java @@ -31,6 +31,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.features.TableOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; @@ -65,6 +66,12 @@ public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<Fine return lstDataMap; } + @Override + public List<FineGrainDataMap> getDataMaps(Segment segment, List<PartitionSpec> partitions) + throws IOException { + return getDataMaps(segment); + } + /** * Get datamaps for distributable object. */ diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala index 826ffd0..bc95b12 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala @@ -153,6 +153,14 @@ class CGDataMapFactory( shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = { ??? } + + /** + * Get the datamap for segmentId and partitionSpecs + */ + override def getDataMaps(segment: Segment, + partitions: java.util.List[PartitionSpec]): java.util.List[CoarseGrainDataMap] = { + getDataMaps(segment); + } } class CGDataMap extends CoarseGrainDataMap { diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index dfa3f57..74fbc2a 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, Coa import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} @@ -92,6 +93,14 @@ class C2DataMapFactory( shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = { ??? } + + /** + * Get the datamap for segmentId and partitionSpecs + */ + override def getDataMaps(segment: Segment, + partitions: util.List[PartitionSpec]): util.List[CoarseGrainDataMap] = { + ??? + } } class DataMapWriterSuite extends CarbonQueryTest with BeforeAndAfterAll { diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index c93c247..1e72524 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -151,6 +151,14 @@ class FGDataMapFactory(carbonTable: CarbonTable, shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = { ??? } + + /** + * Get the datamap for segmentId + */ + override def getDataMaps(segment: Segment, + partitions: java.util.List[PartitionSpec]): java.util.List[FineGrainDataMap] = { + getDataMaps(segment) + } } class FGDataMap extends FineGrainDataMap { diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala index bca6123..88558e8 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapStatus.scala @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, Se import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties @@ -310,4 +311,12 @@ class TestDataMapFactory( } override def supportRebuild(): Boolean = true + + /** + * Get the datamap for segmentId and partitionSpecs + */ + override def getDataMaps(segment: Segment, + partitions: util.List[PartitionSpec]): util.List[CoarseGrainDataMap] = { + ??? + } } diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index 1bc4411..fc2cc71 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -36,6 +36,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.page.ColumnPage import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation +import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.scan.filter.intf.ExpressionType @@ -368,4 +369,12 @@ class WaitingDataMapFactory( shardName: String, segmentProperties: SegmentProperties): DataMapBuilder = { ??? } + + /** + * Get the datamap for segmentId and partitionSpecs + */ + override def getDataMaps(segment: Segment, + partitions: util.List[PartitionSpec]): util.List[CoarseGrainDataMap] = { + ??? + } } \ No newline at end of file diff --git a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala index aa9bf52..a683eed 100644 --- a/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala +++ b/integration/spark-carbon-common-test/src/test/scala/org/apache/carbondata/sql/commands/TestCarbonShowCacheCommand.scala @@ -17,11 +17,15 @@ package org.apache.carbondata.sql.commands +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.{CarbonEnv, Row} import org.apache.spark.sql.test.util.QueryTest import org.junit.Assert import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -219,4 +223,72 @@ class TestCarbonShowCacheCommand extends QueryTest with BeforeAndAfterAll { assertResult(2)(result5.length) assertResult("5/5 index files cached")(result5(0).getString(2)) } + + test("test index files cached for table with single partition") { + sql("drop table if exists partitionTable") + sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata") + sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')") + sql("insert into partitionTable values(1,'cc','dd')") + sql("insert into partitionTable values(2,'aa','bb')") + sql("insert into partitionTable values(1,'aa','ee')") + checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb"))) + var showCache = sql("SHOW METACACHE on table partitionTable").collect() + val tableIdentifier = new TableIdentifier("partitionTable", Some("default")) + val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath + var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath)) + assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2) + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached")) + checkAnswer(sql("select * from partitionTable where col3='ee'"), Seq(Row(1,"aa","ee"))) + showCache = sql("SHOW METACACHE on table partitionTable").collect() + result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath)) + assert(result.exists(index => + index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb") || + index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=ee") && + result.size == 3)) + assert(showCache(0).get(2).toString.equalsIgnoreCase("3/5 index files cached")) + sql("drop table if exists partitionTable") + } + + test("test index files cached for table with multiple partition") { + sql("drop table if exists partitionTable") + sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string, col4 string, col5 int) stored as carbondata") + sql("insert into partitionTable values(1,'aa','bb','cc',1),(1,'aa1','bb1','ff',3)") + sql("insert into partitionTable values(1,'cc','dd','ff',3)") + sql("insert into partitionTable values(2,'aa','bb','gg',2)") + sql("insert into partitionTable values(1,'aa','ee','kk',4)") + checkAnswer(sql("select * from partitionTable where col3='bb' and col4='cc'"), Seq(Row(1,"aa","bb","cc",1))) + var showCache = sql("SHOW METACACHE on table partitionTable").collect() + val tableIdentifier = new TableIdentifier("partitionTable", Some("default")) + val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath + var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath)) + assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")) && result.size == 1) + assert(showCache(0).get(2).toString.equalsIgnoreCase("1/5 index files cached")) + checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb","cc",1),Row(2,"aa","bb","gg",2))) + showCache = sql("SHOW METACACHE on table partitionTable").collect() + result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath)) + assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=cc")|| + index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb/col4=gg")) && result.size == 2) + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached")) + sql("drop table if exists partitionTable") + } + + test("test index files cached for table with partition without filter") { + sql("drop table if exists partitionTable") + sql("create table partitionTable(col1 int, col2 string) partitioned by (col3 string) stored as carbondata") + sql("insert into partitionTable values(1,'aa','bb'),(1,'aa1','bb1')") + sql("insert into partitionTable values(1,'cc','dd')") + sql("insert into partitionTable values(2,'aa','bb')") + sql("insert into partitionTable values(1,'aa','ee')") + checkAnswer(sql("select * from partitionTable where col3='bb'"), Seq(Row(1,"aa","bb"),Row(2,"aa","bb"))) + var showCache = sql("SHOW METACACHE on table partitionTable").collect() + val tableIdentifier = new TableIdentifier("partitionTable", Some("default")) + val carbonTablePath = CarbonEnv.getCarbonTable(tableIdentifier)(sqlContext.sparkSession).getTablePath + var result = CacheProvider.getInstance().getCarbonCache.getCacheMap.keySet().asScala.filter(index => index.startsWith(carbonTablePath)) + assert(result.exists(index => index.startsWith(carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR + "col3=bb")) && result.size == 2) + assert(showCache(0).get(2).toString.equalsIgnoreCase("2/5 index files cached")) + sql("select * from partitionTable").collect() + showCache = sql("SHOW METACACHE on table partitionTable").collect() + assert(showCache(0).get(2).toString.equalsIgnoreCase("5/5 index files cached")) + sql("drop table if exists partitionTable") + } }