Repository: kylin Updated Branches: refs/heads/2.0-rc 82e258b69 -> a035cc27a
KYLIN-1227 fix bug & add topN to v1 query engine & restore testing v1 query engine in case need it as a fallback for v2 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a035cc27 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a035cc27 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a035cc27 Branch: refs/heads/2.0-rc Commit: a035cc27a8c7eaebde5d4db3e1ac031c16b1c045 Parents: 82e258b Author: honma <ho...@ebay.com> Authored: Mon Dec 28 15:56:47 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Mon Dec 28 15:57:08 2015 +0800 ---------------------------------------------------------------------- .../kylin/query/test/ITCombinationTest.java | 10 ++++-- .../kylin/query/test/ITKylinQueryTest.java | 2 +- .../apache/kylin/query/test/KylinTestBase.java | 34 ++++++++++++++++++++ .../kylin/storage/hbase/HBaseStorage.java | 9 ++++-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 12 ++++--- .../hbase/cube/v1/CubeTupleConverter.java | 25 ++++++-------- .../coprocessor/observer/ObserverEnabler.java | 10 +++--- 7 files changed, 72 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java index bbff87d..200d6d2 100644 --- a/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/ITCombinationTest.java @@ -22,6 +22,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; +import org.apache.kylin.storage.hbase.HBaseStorage; import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -41,6 +42,7 @@ public class ITCombinationTest extends ITKylinQueryTest { @AfterClass public static void tearDown() { clean(); + HBaseStorage.overwriteStorageQuery = null; } /** @@ -51,10 +53,10 @@ public class ITCombinationTest extends ITKylinQueryTest { @Parameterized.Parameters public static Collection<Object[]> configs() { // return Arrays.asList(new Object[][] { { "inner", "unset" }, { "left", "unset" }, { "inner", "off" }, { "left", "off" }, { "inner", "on" }, { "left", "on" }, }); - return Arrays.asList(new Object[][] { { "inner", "on" }, { "left", "on" } }); + return Arrays.asList(new Object[][] { { "inner", "on", "v2" }, { "left", "on", "v1" }, { "left", "on", "v2" } }); } - public ITCombinationTest(String joinType, String coprocessorToggle) throws Exception { + public ITCombinationTest(String joinType, String coprocessorToggle,String queryEngine) throws Exception { ITKylinQueryTest.clean(); @@ -68,5 +70,9 @@ public class ITCombinationTest extends ITKylinQueryTest { } else if (coprocessorToggle.equals("unset")) { // unset } + + if ("v1".equalsIgnoreCase(queryEngine)) { + HBaseStorage.overwriteStorageQuery = HBaseStorage.v1CubeStorageQuery; + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java index 72c366b..90d327c 100644 --- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java +++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java @@ -168,7 +168,7 @@ public class ITKylinQueryTest extends KylinTestBase { @Test public void testTableauQuery() throws Exception { - batchExecuteQuery("src/test/resources/query/sql_tableau"); + execAndCompResultSize("src/test/resources/query/sql_tableau", null, true); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java index 0399f8c..45b5d2c 100644 --- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java +++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java @@ -373,6 +373,40 @@ public class KylinTestBase { } } + + protected void execAndCompResultSize(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { + printInfo("---------- test folder: " + queryFolder); + Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); + + List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql"); + for (File sqlFile : sqlFiles) { + String queryName = StringUtils.split(sqlFile.getName(), '.')[0]; + if (exclusiveSet.contains(queryName)) { + continue; + } + String sql = getTextFromFile(sqlFile); + + // execute Kylin + printInfo("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); + IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); + + // execute H2 + printInfo("Query Result from H2 - " + queryName); + H2Connection h2Conn = new H2Connection(h2Connection, null); + h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory()); + ITable h2Table = executeQuery(h2Conn, queryName, sql, needSort); + + // compare the result + Assert.assertEquals(h2Table.getRowCount(), kylinTable.getRowCount()); + + compQueryCount++; + if (kylinTable.getRowCount() == 0) { + zeroResultQueries.add(sql); + } + } + } + protected void execAndCompDynamicQuery(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception { printInfo("---------- test folder: " + queryFolder); Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys); http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java index e7c8116..c61212c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java @@ -44,8 +44,9 @@ import com.google.common.base.Preconditions; //used by reflection public class HBaseStorage implements IStorage { - private final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery"; - private final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery"; + public final static String v2CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery"; + public final static String v1CubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery"; + public static String overwriteStorageQuery = null;//for test case private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery"; @@ -71,7 +72,9 @@ public class HBaseStorage implements IStorage { } else if (realization.getType() == RealizationType.CUBE) { String cubeStorageQuery; - if ("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) { + if (overwriteStorageQuery != null) { + cubeStorageQuery = overwriteStorageQuery; + } else if ("v1".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryVersion())) { cubeStorageQuery = v1CubeStorageQuery; } else { cubeStorageQuery = v2CubeStorageQuery;//by default use v2 http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 2fa0490..3d782eb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -144,7 +145,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context); // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more - // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory + setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); @@ -488,7 +489,9 @@ public class CubeStorageQuery implements ICachableStorageQuery { Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment); - result.add(andRanges); + if (andRanges != null) { + result.add(andRanges); + } } return preprocessConstantConditions(result); @@ -691,8 +694,9 @@ public class CubeStorageQuery implements ICachableStorageQuery { short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId()); short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId()); for (short i = 0; i < cuboidShardNum; ++i) { - byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey); - byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey); + short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards()); + byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey); + byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey); HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, // scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate()); ret.add(newRange); http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java index 8813901..7415b62 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java @@ -12,7 +12,6 @@ import org.apache.kylin.common.topn.Counter; import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -21,12 +20,9 @@ import org.apache.kylin.cube.kv.RowKeyDecoder; import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.metadata.model.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; -import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; import org.apache.kylin.storage.tuple.Tuple; import org.apache.kylin.storage.tuple.TupleInfo; @@ -40,7 +36,7 @@ public class CubeTupleConverter { final TupleInfo tupleInfo; final RowKeyDecoder rowKeyDecoder; final List<RowValueDecoder> rowValueDecoders; - final List<IDerivedColumnFiller> derivedColFillers; + final List<IDerivedColumnFiller> derivedColFillers; final int[] dimensionTupleIdx; final int[][] metricsMeasureIdx; final int[][] metricsTupleIdx; @@ -57,7 +53,7 @@ public class CubeTupleConverter { this.rowValueDecoders = rowValueDecoders; this.derivedColFillers = Lists.newArrayList(); this.topNCol = topNCol; - + List<TblColRef> dimCols = cuboid.getColumns(); // pre-calculate dimension index mapping to tuple @@ -66,7 +62,6 @@ public class CubeTupleConverter { TblColRef col = dimCols.get(i); dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; } - // pre-calculate metrics index mapping to tuple metricsMeasureIdx = new int[rowValueDecoders.size()][]; @@ -79,7 +74,7 @@ public class CubeTupleConverter { metricsTupleIdx[i] = new int[selectedMeasures.cardinality()]; for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) { FunctionDesc aggrFunc = measures[mi].getFunction(); - + int tupleIdx; // a rewrite metrics is identified by its rewrite field name if (aggrFunc.needRewrite()) { @@ -99,10 +94,10 @@ public class CubeTupleConverter { if (this.topNCol != null) { this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1; this.topNMeasureTupleIdx = metricsTupleIdx[0][0]; - - this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol); + + this.topNColDict = (Dictionary<String>) cubeSeg.getDictionary(this.topNCol); } - + // prepare derived columns and filler Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null); for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { @@ -128,12 +123,12 @@ public class CubeTupleConverter { private Tuple tuple; private Iterator<Counter> topNCounterIterator; private Counter<ByteArray> counter; - + private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) { this.tuple = tuple; this.topNCounterIterator = topNCounter.iterator(); } - + @Override public boolean hasNext() { return topNCounterIterator.hasNext(); @@ -146,7 +141,7 @@ public class CubeTupleConverter { String colValue = topNColDict.getValueFromId(key); tuple.setDimensionValue(topNColTupleIdx, colValue); tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount()); - + return tuple; } @@ -155,7 +150,7 @@ public class CubeTupleConverter { throw new UnsupportedOperationException(); } } - + public void translateResult(Result hbaseRow, Tuple tuple) { try { byte[] rowkey = hbaseRow.getRow(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a035cc27/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java index 4750ea4..0e9b3a4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java @@ -38,8 +38,8 @@ import org.apache.kylin.storage.StorageContext; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; +import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter; import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter; import org.apache.kylin.storage.hbase.steps.RowValueDecoder; @@ -119,10 +119,10 @@ public class ObserverEnabler { return r; } - // if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) { - // logger.info("Coprocessor is disabled because there is memory hungry count distinct"); - // return false; - // } + if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) { + logger.info("Coprocessor is disabled because there is memory hungry count distinct"); + return false; + } if (context.isExactAggregation()) { logger.info("Coprocessor is disabled because exactAggregation is true");