This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7b58b161a1d3264e744e3e78b0cffbde5e830e67 Author: Ma,Gang <ga...@ebay.com> AuthorDate: Wed Jan 31 15:39:12 2018 +0800 KYLIN-2899 Introduce segment level query cache --- .../org/apache/kylin/common/KylinConfigBase.java | 13 ++ .../java/org/apache/kylin/common/QueryContext.java | 48 +++++++ .../apache/kylin/common/debug/BackdoorToggles.java | 16 +++ dev-support/checkstyle.xml | 4 +- storage-hbase/pom.xml | 4 + .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 155 ++++++++++++++++++++- .../storage/hbase/cube/v2/SegmentQueryCache.java | 80 +++++++++++ .../storage/hbase/cube/v2/SegmentQueryResult.java | 101 ++++++++++++++ .../storage/hbase/cube/SegmentQueryResultTest.java | 112 +++++++++++++++ 9 files changed, 526 insertions(+), 7 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9066b0d..6e1ff9e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1461,6 +1461,19 @@ abstract public class KylinConfigBase implements Serializable { return getRequired("kylin.cache.memcached.hosts"); } + public boolean isQuerySegmentCacheEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.segment-cache-enabled", "false")); + } + + public int getQuerySegmentCacheTimeout() { + return Integer.parseInt(getOptional("kylin.query.segment-cache-timeout", "2000")); + } + + // define the maximum size for each segment in one query that can be cached, in megabytes + public int getQuerySegmentCacheMaxSize() { + return Integer.parseInt(getOptional("kylin.query.segment-cache-max-size", "200")); + } + public String getQueryAccessController() { return getOptional("kylin.query.access-controller", null); } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index ef288c7..1a961ec 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -204,6 +204,54 @@ public class QueryContext { return Lists.newArrayList(cubeSegmentStatisticsResultMap.values()); } + public CubeSegmentStatistics getCubeSegmentStatistics(int ctxId, String cubeName, String segmentName) { + CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId); + if (cubeSegmentStatisticsResult == null) { + logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId); + return null; + } + Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + if (cubeSegmentStatisticsMap == null) { + logger.warn( + "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType); + return null; + } + Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + if (segmentStatisticsMap == null) { + logger.warn( + "cubeSegmentStatistic should be initialized for cube {}", cubeName); + return null; + } + CubeSegmentStatistics segmentStatistics = segmentStatisticsMap.get(segmentName); + if (segmentStatistics == null) { + logger.warn( + "segmentStatistics should be initialized for cube {} with segment{}", cubeName, segmentName); + return null; + } + return segmentStatistics; + } + + public void addCubeSegmentStatistics(int ctxId, CubeSegmentStatistics cubeSegmentStatistics) { + CubeSegmentStatisticsResult cubeSegmentStatisticsResult = cubeSegmentStatisticsResultMap.get(ctxId); + if (cubeSegmentStatisticsResult == null) { + logger.warn("CubeSegmentStatisticsResult should be initialized for context {}", ctxId); + return; + } + Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap; + if (cubeSegmentStatisticsMap == null) { + logger.warn( + "cubeSegmentStatisticsMap should be initialized for CubeSegmentStatisticsResult with query type {}", cubeSegmentStatisticsResult.queryType); + return; + } + String cubeName = cubeSegmentStatistics.cubeName; + Map<String, CubeSegmentStatistics> segmentStatisticsMap = cubeSegmentStatisticsMap.get(cubeName); + if (segmentStatisticsMap == null) { + segmentStatisticsMap = Maps.newConcurrentMap(); + cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap); + } + segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), cubeSegmentStatistics); + } + public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, String segmentName, long sourceCuboidId, long targetCuboidId, long filterMask, Exception e, long rpcCallTimeMs, long skippedRows, long scannedRows, long returnedRows, long aggregatedRows, long scannedBytes) { diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java index 47fbbcd..be0f7a6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java +++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java @@ -86,6 +86,10 @@ public class BackdoorToggles { return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE); } + public static boolean getDisableSegmentCache() { + return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE); + } + public static boolean getDisableFuzzyKey() { return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY); } @@ -215,6 +219,18 @@ public class BackdoorToggles { public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_CACHE"; /** + * set DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE=true to prevent using segment cache for current query + * + * + * + example:(put it into request body) + "backdoorToggles": { + "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE": "true" + } + */ + public final static String DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE = "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE"; + + /** * set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which version CubeStorageQuery to use * example:(put it into request body) diff --git a/dev-support/checkstyle.xml b/dev-support/checkstyle.xml index d8eb73f..802f058 100644 --- a/dev-support/checkstyle.xml +++ b/dev-support/checkstyle.xml @@ -58,7 +58,9 @@ <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/> </module> - <module name="MethodLength"/> + <module name="MethodLength"> + <property name="max" value="300"/> + </module> <module name="MethodParamPad"/> <module name="ParameterNumber"> <!-- default is 8 --> diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 220fc14..190dd8c 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -41,6 +41,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-engine-mr</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-cache</artifactId> + </dependency> <dependency> <groupId>org.apache.kylin</groupId> diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index 911c8d5..c9be666 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -27,6 +27,7 @@ import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.zip.DataFormatException; +import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; @@ -37,8 +38,11 @@ import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContext.CubeSegmentStatistics; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.KylinTimeoutException; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesSerializer; import org.apache.kylin.common.util.BytesUtil; @@ -48,7 +52,10 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.gridtable.GTScanRange; import org.apache.kylin.gridtable.GTScanRequest; +import org.apache.kylin.gridtable.GTUtil; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.metadata.model.ISegment; import org.apache.kylin.storage.StorageContext; @@ -171,6 +178,48 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { // for different cubes until redeployment of coprocessor jar. final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); final boolean compressionResult = kylinConfig.getCompressionResult(); + + final boolean querySegmentCacheEnabled = isSegmentLevelCacheEnabled(); + final SegmentQueryResult.Builder segmentQueryResultBuilder = new SegmentQueryResult.Builder(shardNum, + cubeSeg.getConfig().getQuerySegmentCacheMaxSize() * 1024 * 1024); + String calculatedSegmentQueryCacheKey = null; + if (querySegmentCacheEnabled) { + try { + logger.info("Query-{}: try to get segment result from cache for segment:{}", queryContext.getQueryId(), + cubeSeg); + calculatedSegmentQueryCacheKey = getSegmentQueryCacheKey(scanRequest); + long startTime = System.currentTimeMillis(); + SegmentQueryResult segmentResult = SegmentQueryCache.getInstance().get(calculatedSegmentQueryCacheKey); + long spendTime = System.currentTimeMillis() - startTime; + if (segmentResult == null) { + logger.info("Query-{}: no segment result is cached for segment:{}, take time:{}ms", + queryContext.getQueryId(), cubeSeg, spendTime); + } else { + logger.info("Query-{}: get segment result from cache for segment:{}, take time:{}ms", + queryContext.getQueryId(), cubeSeg, spendTime); + if (segmentResult.getCubeSegmentStatisticsBytes() != null) { + queryContext.addCubeSegmentStatistics(storageContext.ctxId, + (CubeSegmentStatistics) SerializationUtils + .deserialize(segmentResult.getCubeSegmentStatisticsBytes())); + } + for (byte[] regionResult : segmentResult.getRegionResults()) { + if (compressionResult) { + epResultItr.append(CompressionUtils.decompress(regionResult)); + } else { + epResultItr.append(regionResult); + } + } + return new StorageResponseGTScatter(scanRequest, new DummyPartitionStreamer(epResultItr), + storageContext); + } + } catch (Exception e) { + logger.error("Fail to handle cached segment result from cache", e); + } + } + final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey; + logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, + cuboidBaseShard, rawScans.size()); + final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder(); builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString); for (IntList intList : hbaseColumnsToGTIntList) { @@ -193,7 +242,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public void run() { runEPRange(queryContext, logHeader, compressionResult, builder.build(), conn, epRange.getFirst(), - epRange.getSecond(), epResultItr); + epRange.getSecond(), epResultItr, querySegmentCacheEnabled, segmentQueryResultBuilder, + segmentQueryCacheKey); } }); } @@ -203,7 +253,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { private void runEPRange(final QueryContext queryContext, final String logHeader, final boolean compressionResult, final CubeVisitProtos.CubeVisitRequest request, final Connection conn, byte[] startKey, byte[] endKey, - final ExpectedSizeIterator epResultItr) { + final ExpectedSizeIterator epResultItr, final boolean querySegmentCacheEnabled, + final SegmentQueryResult.Builder segmentQueryResultBuilder, final String segmentQueryCacheKey) { final String queryId = queryContext.getQueryId(); @@ -328,12 +379,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } try { + byte[] rawData = HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()); if (compressionResult) { - epResultItr.append(CompressionUtils.decompress( - HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()))); + epResultItr.append(CompressionUtils.decompress(rawData)); } else { - epResultItr.append( - HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())); + epResultItr.append(rawData); + } + // put segment query result to cache if cache is enabled + if (querySegmentCacheEnabled) { + try { + segmentQueryResultBuilder.putRegionResult(rawData); + if (segmentQueryResultBuilder.isComplete()) { + CubeSegmentStatistics cubeSegmentStatistics = queryContext + .getCubeSegmentStatistics(storageContext.ctxId, + cubeSeg.getCubeInstance().getName(), cubeSeg.getName()); + if (cubeSegmentStatistics != null) { + segmentQueryResultBuilder + .setCubeSegmentStatistics(cubeSegmentStatistics); + logger.info( + "Query-{}: try to put segment query result to cache for segment:{}", + queryContext.getQueryId(), cubeSeg); + SegmentQueryResult segmentQueryResult = segmentQueryResultBuilder + .build(); + SegmentQueryCache.getInstance().put(segmentQueryCacheKey, + segmentQueryResult); + logger.info( + "Query-{}: successfully put segment query result to cache for segment:{}", + queryContext.getQueryId(), cubeSeg); + } + } + } catch (Throwable t) { + logger.error("Fail to put query segment result to cache", t); + } } } catch (IOException | DataFormatException e) { throw new RuntimeException(logHeader + "Error when decompressing", e); @@ -432,4 +509,70 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { throw new AssertionError("Unknown error type: " + errorInfo.getType()); } } + + private boolean isSegmentLevelCacheEnabled() { + if (BackdoorToggles.getDisableSegmentCache()) { + return false; + } + if (!cubeSeg.getConfig().isQuerySegmentCacheEnabled()) { + return false; + } + try { + if (KylinConfig.getInstanceFromEnv().getMemCachedHosts() == null) { + return false; + } + } catch (Exception e) { + logger.warn("Fail to get memcached hosts and segment level cache will not be enabled"); + return false; + } + return true; + } + + private String getSegmentQueryCacheKey(GTScanRequest scanRequest) { + String scanReqStr = getScanRequestString(scanRequest); + return cubeSeg.getCubeInstance().getName() + "_" + cubeSeg.getUuid() + "_" + scanReqStr; + } + + private String getScanRequestString(GTScanRequest scanRequest) { + int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE; + while (true) { + try { + ByteBuffer out = ByteBuffer.allocate(scanRequestBufferSize); + GTInfo.serializer.serialize(scanRequest.getInfo(), out); + BytesUtil.writeVInt(scanRequest.getGTScanRanges().size(), out); + for (GTScanRange range : scanRequest.getGTScanRanges()) { + serializeGTRecord(range.pkStart, out); + serializeGTRecord(range.pkEnd, out); + BytesUtil.writeVInt(range.fuzzyKeys.size(), out); + for (GTRecord f : range.fuzzyKeys) { + serializeGTRecord(f, out); + } + } + ImmutableBitSet.serializer.serialize(scanRequest.getColumns(), out); + BytesUtil.writeByteArray( + GTUtil.serializeGTFilter(scanRequest.getFilterPushDown(), scanRequest.getInfo()), out); + ImmutableBitSet.serializer.serialize(scanRequest.getAggrGroupBy(), out); + ImmutableBitSet.serializer.serialize(scanRequest.getAggrMetrics(), out); + BytesUtil.writeAsciiStringArray(scanRequest.getAggrMetricsFuncs(), out); + BytesUtil.writeVInt(scanRequest.isAllowStorageAggregation() ? 1 : 0, out); + BytesUtil.writeUTFString(scanRequest.getStorageLimitLevel().name(), out); + BytesUtil.writeVInt(scanRequest.getStorageScanRowNumThreshold(), out); + BytesUtil.writeVInt(scanRequest.getStoragePushDownLimit(), out); + BytesUtil.writeUTFString(scanRequest.getStorageBehavior(), out); + out.flip(); + return Bytes.toStringBinary(out.array(), out.position(), out.limit()); + } catch (BufferOverflowException boe) { + logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize); + scanRequestBufferSize *= 4; + } + } + } + + private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) { + ByteArray[] cols = gtRecord.getInternal(); + BytesUtil.writeVInt(cols.length, out); + for (ByteArray col : cols) { + col.exportData(out); + } + } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java new file mode 100755 index 0000000..2b66a22 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.cache.memcached.CacheStats; +import org.apache.kylin.cache.memcached.MemcachedCache; +import org.apache.kylin.cache.memcached.MemcachedCacheConfig; +import org.apache.kylin.cache.memcached.MemcachedChunkingCache; +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SegmentQueryCache { + public static final Logger logger = LoggerFactory.getLogger(SegmentQueryCache.class); + private static final String SEG_QUERY_CACHE_NAME = "query_segment_cache"; + private static SegmentQueryCache segmentQueryCacheInstance = new SegmentQueryCache(); + + private MemcachedChunkingCache memcachedCache; + + public static SegmentQueryCache getInstance() { + return segmentQueryCacheInstance; + } + + private SegmentQueryCache() { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig(); + String configHosts = kylinConfig.getMemCachedHosts(); + memcachedCacheConfig.setTimeout(kylinConfig.getQuerySegmentCacheTimeout()); + // set max object size a little less than 1024 * 1024, because the key of the segment result cache is long + // if set to 1024 * 1024 will cause memcached client exceed max size error + memcachedCacheConfig.setMaxObjectSize(1040000); + memcachedCacheConfig.setHosts(configHosts); + //Reverse the compression setting between Hbase coprocessor and memcached, if Hbase result is compressed, memcached will not compress. + memcachedCacheConfig.setEnableCompression(!kylinConfig.getCompressionResult()); + String cacheName = SEG_QUERY_CACHE_NAME; + memcachedCache = new MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, cacheName)); + } + + public void put(String key, SegmentQueryResult segmentQueryResult) { + memcachedCache.put(key, segmentQueryResult); + } + + public SegmentQueryResult get(String key) { + byte[] value = memcachedCache.get(key); + if (value == null) { + return null; + } + return (SegmentQueryResult) (SerializationUtils.deserialize(value)); + } + + public CacheStats getCacheStats() { + return memcachedCache.getStats(); + } + + /** + * evict the segment cache by query key + * + * @param segmentQueryKey + */ + public void evict(String segmentQueryKey) { + memcachedCache.evict(segmentQueryKey); + } +} diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java new file mode 100755 index 0000000..e208c02 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import com.google.common.collect.Queues; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.common.QueryContext.CubeSegmentStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * query result for each segment + */ +public class SegmentQueryResult implements Serializable { + private static final long serialVersionUID = 9047493994209284453L; + + private Collection<byte[]> regionResults; + + // store segment query stats for cube planer usage + private byte[] cubeSegmentStatisticsBytes; + + public void setRegionResults(Collection<byte[]> regionResults) { + this.regionResults = regionResults; + } + + public Collection<byte[]> getRegionResults() { + return regionResults; + } + + public byte[] getCubeSegmentStatisticsBytes() { + return cubeSegmentStatisticsBytes; + } + + public void setCubeSegmentStatisticsBytes(byte[] cubeSegmentStatisticsBytes) { + this.cubeSegmentStatisticsBytes = cubeSegmentStatisticsBytes; + } + + public static class Builder { + private static final Logger logger = LoggerFactory.getLogger(Builder.class); + + private volatile int regionsNum; + private ConcurrentLinkedQueue<byte[]> queue; + private AtomicInteger totalResultSize; + private volatile int maxSegmentCacheSize; + private byte[] cubeSegmentStatisticsBytes; + + public Builder(int regionsNum, int maxCacheResultSize) { + this.regionsNum = regionsNum; + this.queue = Queues.newConcurrentLinkedQueue(); + this.totalResultSize = new AtomicInteger(); + this.maxSegmentCacheSize = maxCacheResultSize; + } + + public void putRegionResult(byte[] result) { + totalResultSize.addAndGet(result.length); + if (totalResultSize.get() > maxSegmentCacheSize) { + logger.info("stop put result to cache, since the result size:{} is larger than configured size:{}", + totalResultSize.get(), maxSegmentCacheSize); + return; + } + queue.offer(result); + } + + public void setCubeSegmentStatistics(CubeSegmentStatistics cubeSegmentStatistics) { + this.cubeSegmentStatisticsBytes = (cubeSegmentStatistics == null ? null : SerializationUtils + .serialize(cubeSegmentStatistics)); + } + + public boolean isComplete() { + return queue.size() == regionsNum; + } + + public SegmentQueryResult build() { + SegmentQueryResult result = new SegmentQueryResult(); + result.setCubeSegmentStatisticsBytes(cubeSegmentStatisticsBytes); + result.setRegionResults(queue); + return result; + } + } +} diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java new file mode 100644 index 0000000..a944c8b --- /dev/null +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.common.QueryContext.CubeSegmentStatistics; +import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult; +import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult.Builder; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; + +public class SegmentQueryResultTest { + private static final Logger logger = LoggerFactory.getLogger(SegmentQueryResultTest.class); + + @Test + public void buildTest() { + int maxCacheResultSize = 10 * 1024; + ExecutorService rpcExecutor = Executors.newFixedThreadPool(4); + SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize); + mockSendRPCTasks(rpcExecutor, 4, builder, 1024); + assertFalse(builder.isComplete()); + mockSendRPCTasks(rpcExecutor, 4, builder, 1024); + assertTrue(builder.isComplete()); + + builder = new Builder(8, maxCacheResultSize); + mockSendRPCTasks(rpcExecutor, 8, builder, 1500); + assertFalse(builder.isComplete()); + } + + @Test + public void resultValidateTest() { + long segmentBuildTime = System.currentTimeMillis() - 1000; + int maxCacheResultSize = 10 * 1024; + ExecutorService rpcExecutor = Executors.newFixedThreadPool(4); + SegmentQueryResult.Builder builder = new Builder(8, maxCacheResultSize); + mockSendRPCTasks(rpcExecutor, 8, builder, 1024); + CubeSegmentStatistics statistics = new CubeSegmentStatistics(); + statistics.setWrapper("cube1", "20171001000000-20171010000000", 3, 7, 1); + builder.setCubeSegmentStatistics(statistics); + SegmentQueryResult segmentQueryResult = builder.build(); + + CubeSegmentStatistics desStatistics = SerializationUtils.deserialize(segmentQueryResult + .getCubeSegmentStatisticsBytes()); + assertEquals("cube1", desStatistics.getCubeName()); + } + + private void mockSendRPCTasks(ExecutorService rpcExecutor, int rpcNum, SegmentQueryResult.Builder builder, + int resultSize) { + List<Future> futures = Lists.newArrayList(); + for (int i = 0; i < rpcNum; i++) { + Future future = rpcExecutor.submit(new MockRPCTask(resultSize, 10, builder)); + futures.add(future); + } + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + logger.error("exception", e); + } + } + } + + private static class MockRPCTask implements Runnable { + private int resultSize; + private long takeTime; + private SegmentQueryResult.Builder builder; + + MockRPCTask(int resultSize, long takeTime, SegmentQueryResult.Builder builder) { + this.resultSize = resultSize; + this.takeTime = takeTime; + this.builder = builder; + } + + @Override + public void run() { + try { + Thread.sleep(takeTime); + } catch (InterruptedException e) { + logger.error("interrupt", e); + } + builder.putRegionResult(new byte[resultSize]); + } + } + +}