This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7b58b161a1d3264e744e3e78b0cffbde5e830e67
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Wed Jan 31 15:39:12 2018 +0800

    KYLIN-2899 Introduce segment level query cache
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  13 ++
 .../java/org/apache/kylin/common/QueryContext.java |  48 +++++++
 .../apache/kylin/common/debug/BackdoorToggles.java |  16 +++
 dev-support/checkstyle.xml                         |   4 +-
 storage-hbase/pom.xml                              |   4 +
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java        | 155 ++++++++++++++++++++-
 .../storage/hbase/cube/v2/SegmentQueryCache.java   |  80 +++++++++++
 .../storage/hbase/cube/v2/SegmentQueryResult.java  | 101 ++++++++++++++
 .../storage/hbase/cube/SegmentQueryResultTest.java | 112 +++++++++++++++
 9 files changed, 526 insertions(+), 7 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9066b0d..6e1ff9e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1461,6 +1461,19 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getRequired("kylin.cache.memcached.hosts");
     }
 
+    public boolean isQuerySegmentCacheEnabled() {
+        return 
Boolean.parseBoolean(getOptional("kylin.query.segment-cache-enabled", "false"));
+    }
+
+    public int getQuerySegmentCacheTimeout() {
+        return 
Integer.parseInt(getOptional("kylin.query.segment-cache-timeout", "2000"));
+    }
+
+    // define the maximum size for each segment in one query that can be 
cached, in megabytes
+    public int getQuerySegmentCacheMaxSize() {
+        return 
Integer.parseInt(getOptional("kylin.query.segment-cache-max-size", "200"));
+    }
+
     public String getQueryAccessController() {
         return getOptional("kylin.query.access-controller", null);
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index ef288c7..1a961ec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -204,6 +204,54 @@ public class QueryContext {
         return Lists.newArrayList(cubeSegmentStatisticsResultMap.values());
     }
 
+    public CubeSegmentStatistics getCubeSegmentStatistics(int ctxId, String 
cubeName, String segmentName) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = 
cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for 
context {}", ctxId);
+            return null;
+        }
+        Map<String, Map<String, CubeSegmentStatistics>> 
cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for 
CubeSegmentStatisticsResult with query type {}", 
cubeSegmentStatisticsResult.queryType);
+            return null;
+        }
+        Map<String, CubeSegmentStatistics> segmentStatisticsMap = 
cubeSegmentStatisticsMap.get(cubeName);
+        if (segmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatistic should be initialized for cube {}", 
cubeName);
+            return null;
+        }
+        CubeSegmentStatistics segmentStatistics = 
segmentStatisticsMap.get(segmentName);
+        if (segmentStatistics == null) {
+            logger.warn(
+                    "segmentStatistics should be initialized for cube {} with 
segment{}", cubeName, segmentName);
+            return null;
+        }
+        return segmentStatistics;
+    }
+
+    public void addCubeSegmentStatistics(int ctxId, CubeSegmentStatistics 
cubeSegmentStatistics) {
+        CubeSegmentStatisticsResult cubeSegmentStatisticsResult = 
cubeSegmentStatisticsResultMap.get(ctxId);
+        if (cubeSegmentStatisticsResult == null) {
+            logger.warn("CubeSegmentStatisticsResult should be initialized for 
context {}", ctxId);
+            return;
+        }
+        Map<String, Map<String, CubeSegmentStatistics>> 
cubeSegmentStatisticsMap = cubeSegmentStatisticsResult.cubeSegmentStatisticsMap;
+        if (cubeSegmentStatisticsMap == null) {
+            logger.warn(
+                    "cubeSegmentStatisticsMap should be initialized for 
CubeSegmentStatisticsResult with query type {}", 
cubeSegmentStatisticsResult.queryType);
+            return;
+        }
+        String cubeName = cubeSegmentStatistics.cubeName;
+        Map<String, CubeSegmentStatistics> segmentStatisticsMap = 
cubeSegmentStatisticsMap.get(cubeName);
+        if (segmentStatisticsMap == null) {
+            segmentStatisticsMap = Maps.newConcurrentMap();
+            cubeSegmentStatisticsMap.put(cubeName, segmentStatisticsMap);
+        }
+        segmentStatisticsMap.put(cubeSegmentStatistics.getSegmentName(), 
cubeSegmentStatistics);
+    }
+
     public void addRPCStatistics(int ctxId, String rpcServer, String cubeName, 
String segmentName, long sourceCuboidId,
             long targetCuboidId, long filterMask, Exception e, long 
rpcCallTimeMs, long skippedRows, long scannedRows,
             long returnedRows, long aggregatedRows, long scannedBytes) {
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 47fbbcd..be0f7a6 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -86,6 +86,10 @@ public class BackdoorToggles {
         return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_CACHE);
     }
 
+    public static boolean getDisableSegmentCache() {
+        return getBoolean(DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE);
+    }
+
     public static boolean getDisableFuzzyKey() {
         return getBoolean(DEBUG_TOGGLE_DISABLE_FUZZY_KEY);
     }
@@ -215,6 +219,18 @@ public class BackdoorToggles {
     public final static String DEBUG_TOGGLE_DISABLE_QUERY_CACHE = 
"DEBUG_TOGGLE_DISABLE_QUERY_CACHE";
 
     /**
+     * set DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE=true to prevent using 
segment cache for current query
+     *
+     *
+     *
+     example:(put it into request body)
+     "backdoorToggles": {
+     "DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE": "true"
+     }
+     */
+    public final static String DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE = 
"DEBUG_TOGGLE_DISABLE_QUERY_SEGMENT_CACHE";
+
+    /**
      * set DEBUG_TOGGLE_HBASE_CUBE_QUERY_VERSION=v1/v2 to control which 
version CubeStorageQuery to use
      *
      example:(put it into request body)
diff --git a/dev-support/checkstyle.xml b/dev-support/checkstyle.xml
index d8eb73f..802f058 100644
--- a/dev-support/checkstyle.xml
+++ b/dev-support/checkstyle.xml
@@ -58,7 +58,9 @@
             <property name="ignorePattern"
                       value="^package.*|^import.*|a 
href|href|http://|https://|ftp://"/>
         </module>
-        <module name="MethodLength"/>
+        <module name="MethodLength">
+            <property name="max" value="300"/>
+        </module>
         <module name="MethodParamPad"/>
         <module name="ParameterNumber">
             <!-- default is 8 -->
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 220fc14..190dd8c 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -41,6 +41,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-cache</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 911c8d5..c9be666 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.concurrent.ExecutorService;
 import java.util.zip.DataFormatException;
 
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -37,8 +38,11 @@ import 
org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.KylinTimeoutException;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
@@ -48,7 +52,10 @@ import org.apache.kylin.common.util.LoggableCachedThreadPool;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.storage.StorageContext;
@@ -171,6 +178,48 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         // for different cubes until redeployment of coprocessor jar.
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final boolean compressionResult = kylinConfig.getCompressionResult();
+
+        final boolean querySegmentCacheEnabled = isSegmentLevelCacheEnabled();
+        final SegmentQueryResult.Builder segmentQueryResultBuilder = new 
SegmentQueryResult.Builder(shardNum,
+                cubeSeg.getConfig().getQuerySegmentCacheMaxSize() * 1024 * 
1024);
+        String calculatedSegmentQueryCacheKey = null;
+        if (querySegmentCacheEnabled) {
+            try {
+                logger.info("Query-{}: try to get segment result from cache 
for segment:{}", queryContext.getQueryId(),
+                        cubeSeg);
+                calculatedSegmentQueryCacheKey = 
getSegmentQueryCacheKey(scanRequest);
+                long startTime = System.currentTimeMillis();
+                SegmentQueryResult segmentResult = 
SegmentQueryCache.getInstance().get(calculatedSegmentQueryCacheKey);
+                long spendTime = System.currentTimeMillis() - startTime;
+                if (segmentResult == null) {
+                    logger.info("Query-{}: no segment result is cached for 
segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                } else {
+                    logger.info("Query-{}: get segment result from cache for 
segment:{}, take time:{}ms",
+                            queryContext.getQueryId(), cubeSeg, spendTime);
+                    if (segmentResult.getCubeSegmentStatisticsBytes() != null) 
{
+                        
queryContext.addCubeSegmentStatistics(storageContext.ctxId,
+                                (CubeSegmentStatistics) SerializationUtils
+                                        
.deserialize(segmentResult.getCubeSegmentStatisticsBytes()));
+                    }
+                    for (byte[] regionResult : 
segmentResult.getRegionResults()) {
+                        if (compressionResult) {
+                            
epResultItr.append(CompressionUtils.decompress(regionResult));
+                        } else {
+                            epResultItr.append(regionResult);
+                        }
+                    }
+                    return new StorageResponseGTScatter(scanRequest, new 
DummyPartitionStreamer(epResultItr),
+                            storageContext);
+                }
+            } catch (Exception e) {
+                logger.error("Fail to handle cached segment result from 
cache", e);
+            }
+        }
+        final String segmentQueryCacheKey = calculatedSegmentQueryCacheKey;
+        logger.debug("Submitting rpc to {} shards starting from shard {}, scan 
range count {}", shardNum,
+                cuboidBaseShard, rawScans.size());
+
         final CubeVisitProtos.CubeVisitRequest.Builder builder = 
CubeVisitProtos.CubeVisitRequest.newBuilder();
         
builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
         for (IntList intList : hbaseColumnsToGTIntList) {
@@ -193,7 +242,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public void run() {
                     runEPRange(queryContext, logHeader, compressionResult, 
builder.build(), conn, epRange.getFirst(),
-                            epRange.getSecond(), epResultItr);
+                            epRange.getSecond(), epResultItr, 
querySegmentCacheEnabled, segmentQueryResultBuilder,
+                            segmentQueryCacheKey);
                 }
             });
         }
@@ -203,7 +253,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private void runEPRange(final QueryContext queryContext, final String 
logHeader, final boolean compressionResult,
             final CubeVisitProtos.CubeVisitRequest request, final Connection 
conn, byte[] startKey, byte[] endKey,
-            final ExpectedSizeIterator epResultItr) {
+            final ExpectedSizeIterator epResultItr, final boolean 
querySegmentCacheEnabled,
+            final SegmentQueryResult.Builder segmentQueryResultBuilder, final 
String segmentQueryCacheKey) {
 
         final String queryId = queryContext.getQueryId();
 
@@ -328,12 +379,38 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                             }
 
                             try {
+                                byte[] rawData = 
HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows());
                                 if (compressionResult) {
-                                    
epResultItr.append(CompressionUtils.decompress(
-                                            
HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows())));
+                                    
epResultItr.append(CompressionUtils.decompress(rawData));
                                 } else {
-                                    epResultItr.append(
-                                            
HBaseZeroCopyByteString.zeroCopyGetBytes(result.getCompressedRows()));
+                                    epResultItr.append(rawData);
+                                }
+                                // put segment query result to cache if cache 
is enabled
+                                if (querySegmentCacheEnabled) {
+                                    try {
+                                        
segmentQueryResultBuilder.putRegionResult(rawData);
+                                        if 
(segmentQueryResultBuilder.isComplete()) {
+                                            CubeSegmentStatistics 
cubeSegmentStatistics = queryContext
+                                                    
.getCubeSegmentStatistics(storageContext.ctxId,
+                                                            
cubeSeg.getCubeInstance().getName(), cubeSeg.getName());
+                                            if (cubeSegmentStatistics != null) 
{
+                                                segmentQueryResultBuilder
+                                                        
.setCubeSegmentStatistics(cubeSegmentStatistics);
+                                                logger.info(
+                                                        "Query-{}: try to put 
segment query result to cache for segment:{}",
+                                                        
queryContext.getQueryId(), cubeSeg);
+                                                SegmentQueryResult 
segmentQueryResult = segmentQueryResultBuilder
+                                                        .build();
+                                                
SegmentQueryCache.getInstance().put(segmentQueryCacheKey,
+                                                        segmentQueryResult);
+                                                logger.info(
+                                                        "Query-{}: 
successfully put segment query result to cache for segment:{}",
+                                                        
queryContext.getQueryId(), cubeSeg);
+                                            }
+                                        }
+                                    } catch (Throwable t) {
+                                        logger.error("Fail to put query 
segment result to cache", t);
+                                    }
                                 }
                             } catch (IOException | DataFormatException e) {
                                 throw new RuntimeException(logHeader + "Error 
when decompressing", e);
@@ -432,4 +509,70 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             throw new AssertionError("Unknown error type: " + 
errorInfo.getType());
         }
     }
+
+    private boolean isSegmentLevelCacheEnabled() {
+        if (BackdoorToggles.getDisableSegmentCache()) {
+            return false;
+        }
+        if (!cubeSeg.getConfig().isQuerySegmentCacheEnabled()) {
+            return false;
+        }
+        try {
+            if (KylinConfig.getInstanceFromEnv().getMemCachedHosts() == null) {
+                return false;
+            }
+        } catch (Exception e) {
+            logger.warn("Fail to get memcached hosts and segment level cache 
will not be enabled");
+            return false;
+        }
+        return true;
+    }
+
+    private String getSegmentQueryCacheKey(GTScanRequest scanRequest) {
+        String scanReqStr = getScanRequestString(scanRequest);
+        return cubeSeg.getCubeInstance().getName() + "_" + cubeSeg.getUuid() + 
"_" + scanReqStr;
+    }
+
+    private String getScanRequestString(GTScanRequest scanRequest) {
+        int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
+        while (true) {
+            try {
+                ByteBuffer out = ByteBuffer.allocate(scanRequestBufferSize);
+                GTInfo.serializer.serialize(scanRequest.getInfo(), out);
+                BytesUtil.writeVInt(scanRequest.getGTScanRanges().size(), out);
+                for (GTScanRange range : scanRequest.getGTScanRanges()) {
+                    serializeGTRecord(range.pkStart, out);
+                    serializeGTRecord(range.pkEnd, out);
+                    BytesUtil.writeVInt(range.fuzzyKeys.size(), out);
+                    for (GTRecord f : range.fuzzyKeys) {
+                        serializeGTRecord(f, out);
+                    }
+                }
+                ImmutableBitSet.serializer.serialize(scanRequest.getColumns(), 
out);
+                BytesUtil.writeByteArray(
+                        
GTUtil.serializeGTFilter(scanRequest.getFilterPushDown(), 
scanRequest.getInfo()), out);
+                
ImmutableBitSet.serializer.serialize(scanRequest.getAggrGroupBy(), out);
+                
ImmutableBitSet.serializer.serialize(scanRequest.getAggrMetrics(), out);
+                
BytesUtil.writeAsciiStringArray(scanRequest.getAggrMetricsFuncs(), out);
+                BytesUtil.writeVInt(scanRequest.isAllowStorageAggregation() ? 
1 : 0, out);
+                
BytesUtil.writeUTFString(scanRequest.getStorageLimitLevel().name(), out);
+                
BytesUtil.writeVInt(scanRequest.getStorageScanRowNumThreshold(), out);
+                BytesUtil.writeVInt(scanRequest.getStoragePushDownLimit(), 
out);
+                BytesUtil.writeUTFString(scanRequest.getStorageBehavior(), 
out);
+                out.flip();
+                return Bytes.toStringBinary(out.array(), out.position(), 
out.limit());
+            } catch (BufferOverflowException boe) {
+                logger.info("Buffer size {} cannot hold the scan request, 
resizing to 4 times", scanRequestBufferSize);
+                scanRequestBufferSize *= 4;
+            }
+        }
+    }
+
+    private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {
+        ByteArray[] cols = gtRecord.getInternal();
+        BytesUtil.writeVInt(cols.length, out);
+        for (ByteArray col : cols) {
+            col.exportData(out);
+        }
+    }
 }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
new file mode 100755
index 0000000..2b66a22
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryCache.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.cache.memcached.CacheStats;
+import org.apache.kylin.cache.memcached.MemcachedCache;
+import org.apache.kylin.cache.memcached.MemcachedCacheConfig;
+import org.apache.kylin.cache.memcached.MemcachedChunkingCache;
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SegmentQueryCache {
+    public static final Logger logger = 
LoggerFactory.getLogger(SegmentQueryCache.class);
+    private static final String SEG_QUERY_CACHE_NAME = "query_segment_cache";
+    private static SegmentQueryCache segmentQueryCacheInstance = new 
SegmentQueryCache();
+
+    private MemcachedChunkingCache memcachedCache;
+
+    public static SegmentQueryCache getInstance() {
+        return segmentQueryCacheInstance;
+    }
+
+    private SegmentQueryCache() {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig();
+        String configHosts = kylinConfig.getMemCachedHosts();
+        
memcachedCacheConfig.setTimeout(kylinConfig.getQuerySegmentCacheTimeout());
+        // set max object size a little less than 1024 * 1024, because the key 
of the segment result cache is long
+        // if set to 1024 * 1024 will cause memcached client exceed max size 
error
+        memcachedCacheConfig.setMaxObjectSize(1040000);
+        memcachedCacheConfig.setHosts(configHosts);
+        //Reverse the compression setting between Hbase coprocessor and 
memcached, if Hbase result is compressed, memcached will not compress.
+        
memcachedCacheConfig.setEnableCompression(!kylinConfig.getCompressionResult());
+        String cacheName = SEG_QUERY_CACHE_NAME;
+        memcachedCache = new 
MemcachedChunkingCache(MemcachedCache.create(memcachedCacheConfig, cacheName));
+    }
+
+    public void put(String key, SegmentQueryResult segmentQueryResult) {
+        memcachedCache.put(key, segmentQueryResult);
+    }
+
+    public SegmentQueryResult get(String key) {
+        byte[] value = memcachedCache.get(key);
+        if (value == null) {
+            return null;
+        }
+        return (SegmentQueryResult) (SerializationUtils.deserialize(value));
+    }
+
+    public CacheStats getCacheStats() {
+        return memcachedCache.getStats();
+    }
+
+    /**
+     * evict the segment cache by query key
+     *
+     * @param segmentQueryKey
+     */
+    public void evict(String segmentQueryKey) {
+        memcachedCache.evict(segmentQueryKey);
+    }
+}
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
new file mode 100755
index 0000000..e208c02
--- /dev/null
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SegmentQueryResult.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import com.google.common.collect.Queues;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * query result for each segment
+ */
+public class SegmentQueryResult implements Serializable {
+    private static final long serialVersionUID = 9047493994209284453L;
+
+    private Collection<byte[]> regionResults;
+
+    // store segment query stats for cube planer usage
+    private byte[] cubeSegmentStatisticsBytes;
+
+    public void setRegionResults(Collection<byte[]> regionResults) {
+        this.regionResults = regionResults;
+    }
+
+    public Collection<byte[]> getRegionResults() {
+        return regionResults;
+    }
+
+    public byte[] getCubeSegmentStatisticsBytes() {
+        return cubeSegmentStatisticsBytes;
+    }
+
+    public void setCubeSegmentStatisticsBytes(byte[] 
cubeSegmentStatisticsBytes) {
+        this.cubeSegmentStatisticsBytes = cubeSegmentStatisticsBytes;
+    }
+
+    public static class Builder {
+        private static final Logger logger = 
LoggerFactory.getLogger(Builder.class);
+
+        private volatile int regionsNum;
+        private ConcurrentLinkedQueue<byte[]> queue;
+        private AtomicInteger totalResultSize;
+        private volatile int maxSegmentCacheSize;
+        private byte[] cubeSegmentStatisticsBytes;
+
+        public Builder(int regionsNum, int maxCacheResultSize) {
+            this.regionsNum = regionsNum;
+            this.queue = Queues.newConcurrentLinkedQueue();
+            this.totalResultSize = new AtomicInteger();
+            this.maxSegmentCacheSize = maxCacheResultSize;
+        }
+
+        public void putRegionResult(byte[] result) {
+            totalResultSize.addAndGet(result.length);
+            if (totalResultSize.get() > maxSegmentCacheSize) {
+                logger.info("stop put result to cache, since the result 
size:{} is larger than configured size:{}",
+                        totalResultSize.get(), maxSegmentCacheSize);
+                return;
+            }
+            queue.offer(result);
+        }
+
+        public void setCubeSegmentStatistics(CubeSegmentStatistics 
cubeSegmentStatistics) {
+            this.cubeSegmentStatisticsBytes = (cubeSegmentStatistics == null ? 
null : SerializationUtils
+                    .serialize(cubeSegmentStatistics));
+        }
+
+        public boolean isComplete() {
+            return queue.size() == regionsNum;
+        }
+
+        public SegmentQueryResult build() {
+            SegmentQueryResult result = new SegmentQueryResult();
+            result.setCubeSegmentStatisticsBytes(cubeSegmentStatisticsBytes);
+            result.setRegionResults(queue);
+            return result;
+        }
+    }
+}
diff --git 
a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
new file mode 100644
index 0000000..a944c8b
--- /dev/null
+++ 
b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/SegmentQueryResultTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.kylin.common.QueryContext.CubeSegmentStatistics;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult;
+import org.apache.kylin.storage.hbase.cube.v2.SegmentQueryResult.Builder;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.TestCase.assertFalse;
+
+public class SegmentQueryResultTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(SegmentQueryResultTest.class);
+
+    @Test
+    public void buildTest() {
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, 
maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertFalse(builder.isComplete());
+        mockSendRPCTasks(rpcExecutor, 4, builder, 1024);
+        assertTrue(builder.isComplete());
+
+        builder = new Builder(8, maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1500);
+        assertFalse(builder.isComplete());
+    }
+
+    @Test
+    public void resultValidateTest() {
+        long segmentBuildTime = System.currentTimeMillis() - 1000;
+        int maxCacheResultSize = 10 * 1024;
+        ExecutorService rpcExecutor = Executors.newFixedThreadPool(4);
+        SegmentQueryResult.Builder builder = new Builder(8, 
maxCacheResultSize);
+        mockSendRPCTasks(rpcExecutor, 8, builder, 1024);
+        CubeSegmentStatistics statistics = new CubeSegmentStatistics();
+        statistics.setWrapper("cube1", "20171001000000-20171010000000", 3, 7, 
1);
+        builder.setCubeSegmentStatistics(statistics);
+        SegmentQueryResult segmentQueryResult = builder.build();
+
+        CubeSegmentStatistics desStatistics = 
SerializationUtils.deserialize(segmentQueryResult
+                .getCubeSegmentStatisticsBytes());
+        assertEquals("cube1", desStatistics.getCubeName());
+    }
+
+    private void mockSendRPCTasks(ExecutorService rpcExecutor, int rpcNum, 
SegmentQueryResult.Builder builder,
+                                  int resultSize) {
+        List<Future> futures = Lists.newArrayList();
+        for (int i = 0; i < rpcNum; i++) {
+            Future future = rpcExecutor.submit(new MockRPCTask(resultSize, 10, 
builder));
+            futures.add(future);
+        }
+        for (Future future : futures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                logger.error("exception", e);
+            }
+        }
+    }
+
+    private static class MockRPCTask implements Runnable {
+        private int resultSize;
+        private long takeTime;
+        private SegmentQueryResult.Builder builder;
+
+        MockRPCTask(int resultSize, long takeTime, SegmentQueryResult.Builder 
builder) {
+            this.resultSize = resultSize;
+            this.takeTime = takeTime;
+            this.builder = builder;
+        }
+
+        @Override
+        public void run() {
+            try {
+                Thread.sleep(takeTime);
+            } catch (InterruptedException e) {
+                logger.error("interrupt", e);
+            }
+            builder.putRegionResult(new byte[resultSize]);
+        }
+    }
+
+}

Reply via email to