Add retry in cache sync

Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ecc01458
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ecc01458
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ecc01458

Branch: refs/heads/ranger
Commit: ecc01458c4ad361aaf863505884d51474a8fec9d
Parents: 32d9d23
Author: lichao <cha...@mobvoi.com>
Authored: Sun Sep 10 03:03:23 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Mon Sep 11 11:09:29 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  6 ++++-
 .../main/resources/kylin-defaults.properties    |  3 +++
 .../kylin/metadata/cachesync/Broadcaster.java   | 27 +++++++++++++++++++-
 .../kylin/rest/service/CacheServiceTest.java    |  4 +++
 4 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ff76be2..7d20648 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -253,6 +253,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return StorageURL.valueOf(getOptional("kylin.metadata.url", 
"kylin_metadata@hbase"));
     }
 
+    public int getCacheSyncRetrys() {
+        return Integer.parseInt(getOptional("kylin.metadata.sync.retries", 
"3"));
+    }
+
     // for test only
     public void setMetadataUrl(String metadataUrl) {
         setProperty("kylin.metadata.url", metadataUrl);
@@ -346,7 +350,7 @@ abstract public class KylinConfigBase implements 
Serializable {
     public String getSegmentAdvisor() {
         return getOptional("kylin.cube.segment-advisor", 
"org.apache.kylin.cube.CubeSegmentAdvisor");
     }
-    
+
     public double getJobCuboidSizeRatio() {
         return 
Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-common/src/main/resources/kylin-defaults.properties
----------------------------------------------------------------------
diff --git a/core-common/src/main/resources/kylin-defaults.properties 
b/core-common/src/main/resources/kylin-defaults.properties
index cf0d226..31ed60e 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -20,6 +20,9 @@
 # The metadata store in hbase
 kylin.metadata.url=kylin_metadata@hbase
 
+# metadata cache sync retry times
+kylin.metadata.sync.retries=3
+
 # Working folder in HDFS, better be qualified absolute path, make sure user 
has the right permission to this directory
 kylin.env.hdfs-working-dir=/kylin
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index 00b8857..532ae74 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -112,6 +112,7 @@ public class Broadcaster {
 
     private Broadcaster(final KylinConfig config) {
         this.config = config;
+        final int retryLimitTimes = config.getCacheSyncRetrys();
 
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
@@ -129,6 +130,12 @@ public class Broadcaster {
                 while (true) {
                     try {
                         final BroadcastEvent broadcastEvent = 
broadcastEvents.takeFirst();
+                        
broadcastEvent.setRetryTime(broadcastEvent.getRetryTime() + 1);
+                        if (broadcastEvent.getRetryTime() > retryLimitTimes) {
+                            logger.info("broadcastEvent retry up to limit 
times, broadcastEvent:{}", broadcastEvent);
+                            continue;
+                        }
+
                         String[] restServers = config.getRestServers();
                         logger.debug("Servers in the cluster: " + 
Arrays.toString(restServers));
                         for (final String node : restServers) {
@@ -146,7 +153,16 @@ public class Broadcaster {
                                         
restClientMap.get(node).wipeCache(broadcastEvent.getEntity(),
                                                 broadcastEvent.getEvent(), 
broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
-                                        logger.warn("Thread failed during wipe 
cache at " + broadcastEvent, e);
+                                        logger.warn("Thread failed during wipe 
cache at {}, error msg: {}",
+                                                broadcastEvent, e);
+                                        // when sync failed, put back to queue
+                                        try {
+                                            
broadcastEvents.putLast(broadcastEvent);
+                                        } catch (InterruptedException ex) {
+                                            logger.warn(
+                                                    "error reentry failed 
broadcastEvent to queue, broacastEvent:{}, error: {} ",
+                                                    broadcastEvent, ex);
+                                        }
                                     }
                                 }
                             });
@@ -322,6 +338,7 @@ public class Broadcaster {
     }
 
     public static class BroadcastEvent {
+        private int retryTime;
         private String entity;
         private String event;
         private String cacheKey;
@@ -333,6 +350,14 @@ public class Broadcaster {
             this.cacheKey = cacheKey;
         }
 
+        public int getRetryTime() {
+            return retryTime;
+        }
+
+        public void setRetryTime(int retryTime) {
+            this.retryTime = retryTime;
+        }
+
         public String getEntity() {
             return entity;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/ecc01458/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java 
b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 704e45d..ccc8edc 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -155,6 +155,10 @@ public class CacheServiceTest extends 
LocalFileMetadataTestCase {
     private void waitForCounterAndClear(long count) {
         int retryTimes = 0;
         while ((!counter.compareAndSet(count, 0L))) {
+            // take into account wipe retry causing counter larger than count
+            if (counter.get() > count) {
+                counter.decrementAndGet();
+            }
             if (++retryTimes > 30) {
                 throw new RuntimeException("timeout");
             }

Reply via email to