Repository: carbondata Updated Branches: refs/heads/master 57b457153 -> dc2931917
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java index c41a08d..15f3e21 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecuterImpl.java @@ -58,7 +58,6 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu * flag to check whether default values is present in the filter value list */ private boolean isDefaultValuePresentInFilter; - private int lastDimensionColOrdinal = 0; public RowLevelRangeLessThanFilterExecuterImpl( List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList, List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression exp, @@ -68,7 +67,6 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu null); this.filterRangeValues = filterRangeValues; this.msrFilterRangeValues = msrFilterRangeValues; - lastDimensionColOrdinal = segmentProperties.getLastDimensionColOrdinal(); if (!msrColEvalutorInfoList.isEmpty()) { CarbonMeasure measure = this.msrColEvalutorInfoList.get(0).getMeasure(); comparator = Comparator.getComparatorByDataTypeForMeasure(measure.getDataType()); @@ -123,7 +121,7 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu boolean isScanRequired = false; if (isMeasurePresentInCurrentBlock[0] || isDimensionPresentInCurrentBlock[0]) { if (isMeasurePresentInCurrentBlock[0]) { - minValue = blockMinValue[measureChunkIndex[0] + lastDimensionColOrdinal]; + minValue = blockMinValue[measureChunkIndex[0]]; isScanRequired = isScanRequired(minValue, msrFilterRangeValues, msrColEvalutorInfoList.get(0).getType()); } else { @@ -233,8 +231,8 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu } return bitSetGroup; } else { - int chunkIndex = - segmentProperties.getMeasuresOrdinalToChunkMapping().get(measureChunkIndex[0]); + int chunkIndex = segmentProperties.getMeasuresOrdinalToChunkMapping() + .get(msrColEvalutorInfoList.get(0).getColumnIndex()); if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) { rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] = rawBlockletColumnChunks.getDataBlock().readMeasureChunk( @@ -515,7 +513,7 @@ public class RowLevelRangeLessThanFilterExecuterImpl extends RowLevelFilterExecu rawBlockletColumnChunks.getFileReader(), chunkIndex); } } else if (isMeasurePresentInCurrentBlock[0]) { - int chunkIndex = measureChunkIndex[0]; + int chunkIndex = msrColEvalutorInfoList.get(0).getColumnIndex(); if (null == rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex]) { rawBlockletColumnChunks.getMeasureRawColumnChunks()[chunkIndex] = rawBlockletColumnChunks.getDataBlock().readMeasureChunk( http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/ColumnResolvedFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/ColumnResolvedFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/ColumnResolvedFilterInfo.java index 456a64e..09a9c51 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/ColumnResolvedFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/ColumnResolvedFilterInfo.java @@ -17,6 +17,44 @@ package org.apache.carbondata.core.scan.filter.resolver.resolverinfo; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; -public class ColumnResolvedFilterInfo { +public abstract class ColumnResolvedFilterInfo { + + /** + * in case column min/max cache is configured then this index will map the filter column to + * its index in min/max byte array. For example + * Total columns = 10 + * Columns to Be cached = 3 + * Column ordinals to be cached = 1,5,7 + * then when checking for isScanRequired column if we consider the above ordinal 5 and 7 then + * ArrayIndexOutOfBoundException will be thrown although the min/max value for the ordinal 5 + * is present at array index 1 and for ordinal 7 at array index 2. To avoid these scenario this + * index is maintained + */ + protected int columnIndexInMinMaxByteArray = -1; + /** + * column index in file + */ + protected int columnIndex = -1; + + public void setColumnIndexInMinMaxByteArray(int columnIndexInMinMaxByteArray) { + this.columnIndexInMinMaxByteArray = columnIndexInMinMaxByteArray; + } + + public int getColumnIndexInMinMaxByteArray() { + // -1 means + // 1. On driver side either the filter dimension does not exist in the cached min/max columns + // or columns min/max to be cached are not specified + // 2. For RowFilterExecutorImpl and ExcludeFilterExecutorImpl this value will be -1 + if (columnIndexInMinMaxByteArray == -1) { + return columnIndex; + } + return columnIndexInMinMaxByteArray; + } + + public abstract CarbonMeasure getMeasure(); + + public abstract CarbonDimension getDimension(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java index d55a146..13dfbcd 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/DimColumnResolvedFilterInfo.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.ColumnFilterInfo; import org.apache.carbondata.core.scan.filter.resolver.metadata.FilterResolverMetadata; @@ -37,11 +38,6 @@ public class DimColumnResolvedFilterInfo extends ColumnResolvedFilterInfo implem private static final long serialVersionUID = 3428115141211084114L; /** - * column index in file - */ - private int columnIndex = -1; - - /** * rowIndex */ private int rowIndex = -1; @@ -140,6 +136,11 @@ public class DimColumnResolvedFilterInfo extends ColumnResolvedFilterInfo implem dimColumnResolvedFilterInfo.rowIndex = this.rowIndex; dimColumnResolvedFilterInfo.dimensionResolvedFilter = this.dimensionResolvedFilter; dimColumnResolvedFilterInfo.isDimensionExistsInCurrentSilce = isDimensionExistsInCurrentSilce; + dimColumnResolvedFilterInfo.columnIndexInMinMaxByteArray = columnIndexInMinMaxByteArray; return dimColumnResolvedFilterInfo; } + + @Override public CarbonMeasure getMeasure() { + throw new UnsupportedOperationException("Operation not supported"); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java index 4165424..e64eed9 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/MeasureColumnResolvedFilterInfo.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.ColumnFilterInfo; @@ -38,8 +39,6 @@ public class MeasureColumnResolvedFilterInfo extends ColumnResolvedFilterInfo */ private static final long serialVersionUID = 4222568289115151561L; - private int columnIndex = -1; - private int rowIndex = -1; private boolean isMeasureExistsInCurrentSilce = true; @@ -117,6 +116,10 @@ public class MeasureColumnResolvedFilterInfo extends ColumnResolvedFilterInfo return carbonMeasure; } + @Override public CarbonDimension getDimension() { + throw new UnsupportedOperationException("Operation not supported"); + } + public void setMeasureExistsInCurrentSilce(boolean measureExistsInCurrentSilce) { isMeasureExistsInCurrentSilce = measureExistsInCurrentSilce; } @@ -148,6 +151,7 @@ public class MeasureColumnResolvedFilterInfo extends ColumnResolvedFilterInfo msrColumnResolvedFilterInfo.rowIndex = this.rowIndex; msrColumnResolvedFilterInfo.measureResolvedFilter = this.measureResolvedFilter; msrColumnResolvedFilterInfo.setMeasureExistsInCurrentSilce(this.isMeasureExistsInCurrentSilce); + msrColumnResolvedFilterInfo.columnIndexInMinMaxByteArray = columnIndexInMinMaxByteArray; return msrColumnResolvedFilterInfo; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index 9f4c6c5..8fa48a8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -50,6 +50,7 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -367,4 +368,44 @@ public class BlockletDataMapUtil { } return columnSchemas; } + + /** + * Method to get the min/max values for columns to be cached + * + * @param segmentProperties + * @param minMaxCacheColumns + * @param minMaxValuesForAllColumns + * @return + */ + public static byte[][] getMinMaxForColumnsToBeCached(SegmentProperties segmentProperties, + List<CarbonColumn> minMaxCacheColumns, byte[][] minMaxValuesForAllColumns) { + byte[][] minMaxValuesForColumnsToBeCached = minMaxValuesForAllColumns; + if (null != minMaxCacheColumns) { + minMaxValuesForColumnsToBeCached = new byte[minMaxCacheColumns.size()][]; + int counter = 0; + for (CarbonColumn column : minMaxCacheColumns) { + minMaxValuesForColumnsToBeCached[counter++] = + minMaxValuesForAllColumns[getColumnOrdinal(segmentProperties, column)]; + } + } + return minMaxValuesForColumnsToBeCached; + } + + /** + * compute the column ordinal as per data is stored + * + * @param segmentProperties + * @param column + * @return + */ + public static int getColumnOrdinal(SegmentProperties segmentProperties, CarbonColumn column) { + if (column.isMeasure()) { + // as measures are stored at the end after all dimensions and complex dimensions hence add + // the last dimension ordinal to measure ordinal. Segment properties will store min max + // length in one array on the order normal dimension, complex dimension and then measure + return segmentProperties.getLastDimensionColOrdinal() + column.getOrdinal(); + } else { + return column.getOrdinal(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala new file mode 100644 index 0000000..3e1f188 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.allqueries + + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row} +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.DataMap +import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment, TableDataMap} +import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder +import org.apache.carbondata.core.indexstore.blockletindex.{BlockDataMap, BlockletDataMap} +import org.apache.carbondata.core.indexstore.schema.CarbonRowSchema +import org.apache.carbondata.core.indexstore.Blocklet + +/** + * test class for validating COLUMN_META_CACHE and CACHE_LEVEL + */ +class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + dropSchema + } + + override def afterAll(): Unit = { + dropSchema + } + + private def dropSchema: Unit = { + sql("drop table if exists metaCache") + sql("drop table if exists column_min_max_cache_test") + } + + private def createAndLoadTable(cacheLevel: String): Unit = { + sql(s"CREATE table column_min_max_cache_test (empno int, empname String, designation String, doj Timestamp, workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, projectcode int, projectjoindate Timestamp, projectenddate Timestamp, attendance int, utilization int,salary int) STORED BY 'carbondata' TBLPROPERTIES('DICTIONARY_INCLUDE'='empno','column_meta_cache'='workgroupcategoryname,designation,salary,attendance', 'CACHE_LEVEL'= '$cacheLevel')") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO " + + "TABLE column_min_max_cache_test OPTIONS('DELIMITER'=',', " + + "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE')") + } + + private def getDataMaps(dbName: String, + tableName: String, + segmentId: String): List[DataMap[_ <: Blocklet]] = { + val relation: CarbonRelation = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore + .lookupRelation(Some(dbName), tableName)(sqlContext.sparkSession) + .asInstanceOf[CarbonRelation] + val carbonTable = relation.carbonTable + val segment: Segment = Segment.getSegment(segmentId, carbonTable.getTablePath) + val defaultDataMap: TableDataMap = DataMapStoreManager.getInstance() + .getDefaultDataMap(carbonTable) + val dataMaps: List[DataMap[_ <: Blocklet]] = defaultDataMap.getDataMapFactory + .getDataMaps(segment).asScala.toList + dataMaps + } + + private def validateMinMaxColumnsCacheLength(dataMaps: List[DataMap[_ <: Blocklet]], + expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = { + val index = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex + val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance() + .getSegmentPropertiesWrapper(index).getTaskSummarySchema(storeBlockletCount, false) + val minSchemas = summarySchema(0).asInstanceOf[CarbonRowSchema.StructCarbonRowSchema] + .getChildSchemas + minSchemas.length == expectedLength + } + + test("verify if number of columns cached are as per the COLUMN_META_CACHE property dataMap instance is as per CACHE_LEVEL property") { + sql("drop table if exists metaCache") + sql("create table metaCache(name string, c1 string, c2 string) stored by 'carbondata'") + sql("insert into metaCache select 'a','aa','aaa'") + checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa")) + var dataMaps = getDataMaps("default", "metaCache", "0") + // validate dataMap is non empty, its an instance of BlockDataMap and minMaxSchema length is 3 + assert(dataMaps.nonEmpty) + assert(dataMaps(0).isInstanceOf[BlockDataMap]) + assert(validateMinMaxColumnsCacheLength(dataMaps, 3, true)) + var segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex + + // alter table to add column_meta_cache and cache_level + sql( + "alter table metaCache set tblproperties('column_meta_cache'='c2,c1', 'CACHE_LEVEL'='BLOCKLET')") + var wrapper = SegmentPropertiesAndSchemaHolder.getInstance() + .getSegmentPropertiesWrapper(segmentPropertyIndex) + // after alter operation cache should be cleaned and cache should be evicted + assert(null == wrapper) + checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa")) + // validate dataMap is non empty, its an instance of BlockletDataMap and minMaxSchema length + // is 1 + dataMaps = getDataMaps("default", "metaCache", "0") + assert(dataMaps.nonEmpty) + assert(dataMaps(0).isInstanceOf[BlockletDataMap]) + assert(validateMinMaxColumnsCacheLength(dataMaps, 2)) + + // alter table to add same value as previous with order change for column_meta_cache and cache_level + segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex + sql( + "alter table metaCache set tblproperties('column_meta_cache'='c1,c2', 'CACHE_LEVEL'='BLOCKLET')") + wrapper = SegmentPropertiesAndSchemaHolder.getInstance() + .getSegmentPropertiesWrapper(segmentPropertyIndex) + // after alter operation cache should not be cleaned as value are unchanged + assert(null != wrapper) + + // alter table to cache no column in column_meta_cache + segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex + sql( + "alter table metaCache set tblproperties('column_meta_cache'='')") + wrapper = SegmentPropertiesAndSchemaHolder.getInstance() + .getSegmentPropertiesWrapper(segmentPropertyIndex) + + // after alter operation cache should be cleaned and cache should be evicted + assert(null == wrapper) + checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa")) + // validate dataMap is non empty, its an instance of BlockletDataMap and minMaxSchema length + // is 0 + dataMaps = getDataMaps("default", "metaCache", "0") + assert(dataMaps.nonEmpty) + assert(dataMaps(0).isInstanceOf[BlockletDataMap]) + assert(validateMinMaxColumnsCacheLength(dataMaps, 0)) + + // alter table to cache no column in column_meta_cache + segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex + sql( + "alter table metaCache unset tblproperties('column_meta_cache', 'cache_level')") + wrapper = SegmentPropertiesAndSchemaHolder.getInstance() + .getSegmentPropertiesWrapper(segmentPropertyIndex) + // after alter operation cache should be cleaned and cache should be evicted + assert(null == wrapper) + checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa")) + // validate dataMap is non empty, its an instance of BlockletDataMap and minMaxSchema length + // is 3 + dataMaps = getDataMaps("default", "metaCache", "0") + assert(dataMaps.nonEmpty) + assert(dataMaps(0).isInstanceOf[BlockDataMap]) + assert(validateMinMaxColumnsCacheLength(dataMaps, 3)) + } + + test("test UPDATE scenario after column_meta_cache") { + sql("drop table if exists metaCache") + sql("create table metaCache(name string, c1 string, c2 string) stored by 'carbondata' TBLPROPERTIES('COLUMN_META_CACHE'='')") + sql("insert into metaCache select 'a','aa','aaa'") + sql("insert into metaCache select 'b','bb','bbb'") + sql("update metaCache set(c1)=('new_c1') where c1='aa'").show() + checkAnswer(sql("select c1 from metaCache"), Seq(Row("new_c1"), Row("bb"))) + } + + test("test queries with column_meta_cache and cache_level='BLOCK'") { + dropSchema + // set cache_level + createAndLoadTable("BLOCK") + // check count(*) + checkAnswer(sql("select count(*) from column_min_max_cache_test"), Row(10)) + // check query on cached dimension columns + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where workgroupcategoryname='developer' OR designation='PL'"), + Row(6)) + // check query on cached dimension column and non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where empname='pramod' and " + + "workgroupcategoryname='developer'"), + Row(1)) + // query on cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where workgroupcategoryname='developer'"), + Row(5)) + // check query on non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where empname='pramod' and " + + "deptname='network'"), + Row(0)) + // check query on cached dimension and measure column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where attendance='77' and " + + "salary='11248' and workgroupcategoryname='manager'"), + Row(1)) + // check query on cached dimension and measure column with one non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where attendance='77' and " + + "salary='11248' OR deptname='network'"), + Row(4)) + } + + test("test queries with column_meta_cache and cache_level='BLOCKLET'") { + dropSchema + // set cache_level + createAndLoadTable("BLOCKLET") + // check count(*) + checkAnswer(sql("select count(*) from column_min_max_cache_test"), Row(10)) + // check query on cached dimension columns + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where workgroupcategoryname='developer' OR designation='PL'"), + Row(6)) + // check query on cached dimension column and non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where empname='pramod' and " + + "workgroupcategoryname='developer'"), + Row(1)) + // query on cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where workgroupcategoryname='developer'"), + Row(5)) + // check query on non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where empname='pramod' and " + + "deptname='network'"), + Row(0)) + // check query on cached dimension and measure column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where attendance='77' and " + + "salary='11248' and workgroupcategoryname='manager'"), + Row(1)) + // check query on cached dimension and measure column with one non cached column + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where attendance='77' and " + + "salary='11248' OR deptname='network'"), + Row(4)) + } + + test("test update on column cached") { + dropSchema + // set cache_level + createAndLoadTable("BLOCKLET") + sql("update column_min_max_cache_test set (designation)=('SEG') where empname='ayushi'").show() + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where empname='ayushi' and " + + "designation='SEG'"), + Row(1)) + } + + test("test update on column not cached") { + dropSchema + // set cache_level + createAndLoadTable("BLOCKLET") + sql( + "update column_min_max_cache_test set (workgroupcategoryname)=('solution engrr') where " + + "workgroupcategoryname='developer'") + .show() + checkAnswer(sql( + "select count(*) from column_min_max_cache_test where workgroupcategoryname='solution " + + "engrr'"), + Row(5)) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala index 8dadef9..56aef40 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithColumnMetCacheAndCacheLevelProperty.scala @@ -17,11 +17,14 @@ package org.apache.carbondata.spark.testsuite.createTable +import scala.collection.JavaConverters._ + import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants /** * test class for validating create table with column_meta_cache and cache_level properties @@ -33,8 +36,13 @@ class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest w key: String, expectedValue: String): Boolean = { val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession) - val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key) - expectedValue.equals(value) + if (key.equalsIgnoreCase(CarbonCommonConstants.COLUMN_META_CACHE)) { + val value = carbonTable.getMinMaxCachedColumnsInCreateOrder.asScala.mkString(",") + expectedValue.equals(value) + } else { + val value = carbonTable.getTableInfo.getFactTable.getTableProperties.get(key) + expectedValue.equals(value) + } } test("validate column_meta_cache with only empty spaces - COLUMN_META_CACHE_01") { @@ -111,6 +119,14 @@ class TestCreateTableWithColumnMetCacheAndCacheLevelProperty extends QueryTest w checkExistence(descResult, false, "COLUMN_META_CACHE") } + test("validate column_meta_cache after column drop - COLUMN_META_CACHE_12") { + sql("drop table if exists column_meta_cache") + sql("create table column_meta_cache(c1 String, c2 String, c3 int, c4 double) stored by 'carbondata' TBLPROPERTIES('column_meta_cache'='c1,c2,c3')") + assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c1,c2,c3")) + sql("alter table column_meta_cache drop columns(c2)") + assert(isExpectedValueValid("default", "column_meta_cache", "column_meta_cache", "c1,c3")) + } + test("validate cache_level with only empty spaces - CACHE_LEVEL_01") { sql("drop table if exists cache_level") intercept[MalformedCarbonCommandException] { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 2aa4446..cdd9b36 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -209,7 +209,7 @@ private[sql] case class CarbonDescribeFormattedCommand( // add columns configured in column meta cache if (null != tblProps.get(CarbonCommonConstants.COLUMN_META_CACHE)) { results ++= - Seq(("COLUMN_META_CACHE", carbonTable.getCachedColumns(carbonTable.getTableName).asScala + Seq(("COLUMN_META_CACHE", carbonTable.getMinMaxCachedColumnsInCreateOrder().asScala .map(col => col).mkString(","), "")) } if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc293191/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 67c33ed..00cd653 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -31,8 +31,9 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} @@ -519,17 +520,25 @@ object AlterTableUtil { // the cache should be loaded again with default value if (propertiesToBeRemoved.contains(CarbonCommonConstants.COLUMN_META_CACHE) && existingTableProperties.get(CarbonCommonConstants.COLUMN_META_CACHE).isDefined) { - ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + clearCache(carbonTable) } else if (propertiesToBeRemoved.contains(CarbonCommonConstants.CACHE_LEVEL)) { val cacheLevel = existingTableProperties.get(CarbonCommonConstants.CACHE_LEVEL) if (cacheLevel.isDefined && !cacheLevel.equals(CarbonCommonConstants.CACHE_LEVEL_DEFAULT_VALUE)) { - ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + clearCache(carbonTable) } } } } + private def clearCache(carbonTable: CarbonTable): Unit = { + // clear dataMap cache + DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier) + // clear segmentProperties Cache + SegmentPropertiesAndSchemaHolder.getInstance() + .invalidate(carbonTable.getAbsoluteTableIdentifier) + } + /** * This method will validate the column_meta_cache and cache_level properties and clear the * driver block/blocklet cache @@ -549,7 +558,7 @@ object AlterTableUtil { case Some(newColumnsToBeCached) => if (!checkIfColumnsAreAlreadyCached(carbonTable, tblPropertiesMap .get(CarbonCommonConstants.COLUMN_META_CACHE), newColumnsToBeCached)) { - ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + clearCache(carbonTable) } case None => // don't do anything @@ -561,7 +570,7 @@ object AlterTableUtil { case Some(newCacheLevelValue) => if (!isCacheLevelValid(tblPropertiesMap.get(CarbonCommonConstants.CACHE_LEVEL), newCacheLevelValue)) { - ManageDictionaryAndBTree.invalidateBTreeCache(carbonTable) + clearCache(carbonTable) } case None => // don't do anything