half way

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/478066d0
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/478066d0
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/478066d0

Branch: refs/heads/KYLIN-2033
Commit: 478066d03509db845c7b2c6485a27722b95b96f8
Parents: 4c9b67f
Author: Li Yang <liy...@apache.org>
Authored: Thu Sep 22 18:42:37 2016 +0800
Committer: Yang Li <liy...@apache.org>
Committed: Thu Sep 22 21:04:05 2016 +0800

----------------------------------------------------------------------
 .../kylin/common/restclient/AbstractCache.java  |  50 ++++++
 .../common/restclient/AbstractRestCache.java    |  52 ------
 .../kylin/common/restclient/Broadcaster.java    | 164 +++++++++++--------
 .../restclient/CaseInsensitiveStringCache.java  |   5 +-
 .../kylin/common/restclient/RestClient.java     |   4 +-
 .../common/restclient/SingleValueCache.java     |  16 +-
 .../org/apache/kylin/cube/CubeDescManager.java  |  43 ++++-
 .../java/org/apache/kylin/cube/CubeManager.java |  34 +++-
 .../apache/kylin/metadata/MetadataManager.java  |  88 +++++++++-
 .../kylin/metadata/project/ProjectManager.java  |  52 +++++-
 .../kylin/storage/hybrid/HybridManager.java     |  28 +++-
 .../engine/streaming/StreamingManager.java      |  18 +-
 .../kylin/rest/controller/CacheController.java  |  23 ++-
 .../apache/kylin/rest/service/CacheService.java | 110 +++++++------
 .../apache/kylin/rest/service/CubeService.java  |  30 ++++
 .../kylin/rest/service/CacheServiceTest.java    |  15 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  18 +-
 .../storage/hbase/util/CubeMigrationCLI.java    |   2 +-
 18 files changed, 536 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
 
b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
new file mode 100644
index 0000000..42692ea
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractCache.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.restclient;
+
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ */
+public abstract class AbstractCache<K, V> {
+
+    protected final KylinConfig config;
+    protected final String syncEntity;
+
+    protected AbstractCache(KylinConfig config, String syncEntity) {
+        this.config = config;
+        this.syncEntity = syncEntity;
+    }
+
+    public Broadcaster getBroadcaster() {
+        return Broadcaster.getInstance(config);
+    }
+
+    public abstract void put(K key, V value);
+
+    public abstract void putLocal(K key, V value);
+
+    public abstract void remove(K key);
+
+    public abstract void removeLocal(K key);
+
+    public abstract void clear();
+
+    public abstract int size();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
 
b/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
deleted file mode 100644
index 584131d..0000000
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/AbstractRestCache.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.restclient;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- * @author xjiang
- * 
- */
-public abstract class AbstractRestCache<K, V> {
-
-    protected final KylinConfig config;
-    protected final Broadcaster.TYPE syncType;
-
-    protected AbstractRestCache(KylinConfig config, Broadcaster.TYPE syncType) 
{
-        this.config = config;
-        this.syncType = syncType;
-    }
-
-    public Broadcaster getBroadcaster() {
-        return Broadcaster.getInstance(config);
-    }
-
-    public abstract void put(K key, V value);
-
-    public abstract void putLocal(K key, V value);
-
-    public abstract void remove(K key);
-
-    public abstract void removeLocal(K key);
-
-    public abstract void clear();
-
-    public abstract int size();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
index 230888f..d02859d 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/Broadcaster.java
@@ -19,8 +19,10 @@
 package org.apache.kylin.common.restclient;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -36,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Broadcast kylin event out
@@ -44,6 +47,10 @@ public class Broadcaster {
 
     private static final Logger logger = 
LoggerFactory.getLogger(Broadcaster.class);
 
+    public static final String SYNC_ALL = "all";                   // the 
special entity to indicate clear all
+    public static final String SYNC_PRJ_SCHEMA = "project_schema"; // the 
special entity to indicate project schema has change, e.g. 
table/model/cube_desc update
+    public static final String SYNC_PRJ_DATA = "project_data";     // the 
special entity to indicate project data has change, e.g. cube/raw_table update
+
     // static cached instances
     private static final ConcurrentHashMap<KylinConfig, Broadcaster> CACHE = 
new ConcurrentHashMap<KylinConfig, Broadcaster>();
 
@@ -78,6 +85,8 @@ public class Broadcaster {
 
     private AtomicLong counter = new AtomicLong();
 
+    private Map<String, List<Listener>> listenerMap = Maps.newConcurrentMap();
+
     private Broadcaster(final KylinConfig config) {
         final String[] nodes = config.getRestServers();
         if (nodes == null || nodes.length < 1) {
@@ -104,7 +113,7 @@ public class Broadcaster {
                                 @Override
                                 public void run() {
                                     try {
-                                        
restClient.wipeCache(broadcastEvent.getType(), broadcastEvent.getAction(), 
broadcastEvent.getName());
+                                        
restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), 
broadcastEvent.getCacheKey());
                                     } catch (IOException e) {
                                         logger.warn("Thread failed during wipe 
cache at " + broadcastEvent);
                                     }
@@ -119,19 +128,71 @@ public class Broadcaster {
         });
     }
 
+    public void registerListener(Listener listener, String... entities) {
+        for (String entity : entities) {
+            addListener(entity, listener);
+        }
+        addListener(SYNC_ALL, listener);
+        addListener(SYNC_PRJ_SCHEMA, listener);
+        addListener(SYNC_PRJ_DATA, listener);
+    }
+
+    synchronized private void addListener(String entity, Listener listener) {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null) {
+            list = new ArrayList<>();
+        }
+        list.add(listener);
+        listenerMap.put(entity, list);
+    }
+
+    public void notifyClearAll() throws IOException {
+        notifyListener(SYNC_ALL, Event.UPDATE, SYNC_ALL);
+    }
+    
+    public void notifyProjectSchemaUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_SCHEMA, Event.UPDATE, project);
+    }
+    
+    public void notifyProjectDataUpdate(String project) throws IOException {
+        notifyListener(SYNC_PRJ_DATA, Event.UPDATE, project);
+    }
+    
+    public void notifyListener(String entity, Event event, String cacheKey) 
throws IOException {
+        List<Listener> list = listenerMap.get(entity);
+        if (list == null)
+            return;
+
+        switch (entity) {
+        case SYNC_ALL: 
+            for (Listener l : list)
+                l.onClearAll(this);
+            break;
+        case SYNC_PRJ_SCHEMA: 
+            for (Listener l : list)
+                l.onProjectSchemaChange(this, cacheKey);
+            break;
+        case SYNC_PRJ_DATA: 
+            for (Listener l : list)
+                l.onProjectDataChange(this, cacheKey);
+            break;
+        default:
+            for (Listener l : list)
+                l.onEntityChange(this, entity, event, cacheKey);
+            break;
+        }
+    }
+
     /**
-     * Broadcast the cubedesc event out
-     * 
-     * @param action
-     *            event action
+     * Broadcast an event out
      */
-    public void queue(String type, String action, String key) {
+    public void queue(String entity, String event, String key) {
         if (broadcastEvents == null)
             return;
 
         try {
             counter.incrementAndGet();
-            broadcastEvents.putFirst(new BroadcastEvent(type, action, key));
+            broadcastEvents.putFirst(new BroadcastEvent(entity, event, key));
         } catch (Exception e) {
             counter.decrementAndGet();
             logger.error("error putting BroadcastEvent", e);
@@ -142,12 +203,12 @@ public class Broadcaster {
         return counter.getAndSet(0);
     }
 
-    public enum EVENT {
+    public enum Event {
 
         CREATE("create"), UPDATE("update"), DROP("drop");
         private String text;
 
-        EVENT(String text) {
+        Event(String text) {
             this.text = text;
         }
 
@@ -155,8 +216,8 @@ public class Broadcaster {
             return text;
         }
 
-        public static EVENT getEvent(String event) {
-            for (EVENT one : values()) {
+        public static Event getEvent(String event) {
+            for (Event one : values()) {
                 if (one.getType().equalsIgnoreCase(event)) {
                     return one;
                 }
@@ -166,76 +227,51 @@ public class Broadcaster {
         }
     }
 
-    public enum TYPE {
-        ALL("all"), //
-        PROJECT("project"), //
-        CUBE("cube"), //
-        CUBE_DESC("cube_desc"), //
-        STREAMING("streaming"), //
-        KAFKA("kafka"), //
-        INVERTED_INDEX("inverted_index"), //
-        INVERTED_INDEX_DESC("ii_desc"), // 
-        TABLE("table"), //
-        DATA_MODEL("data_model"), //
-        EXTERNAL_FILTER("external_filter"), //
-        HYBRID("hybrid");
-        
-        private String text;
-
-        TYPE(String text) {
-            this.text = text;
+    abstract public static class Listener {
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
         }
 
-        public String getType() {
-            return text;
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
         }
-
-        /**
-         * @param type
-         * @return
-         */
-        public static TYPE getType(String type) {
-            for (TYPE one : values()) {
-                if (one.getType().equalsIgnoreCase(type)) {
-                    return one;
-                }
-            }
-
-            return null;
+        
+        public void onProjectDataChange(Broadcaster broadcaster, String 
project) throws IOException {
+        }
+        
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
         }
     }
 
     public static class BroadcastEvent {
-        private String type;
-        private String action;
-        private String name;
+        private String entity;
+        private String event;
+        private String cacheKey;
 
-        public BroadcastEvent(String type, String action, String name) {
+        public BroadcastEvent(String entity, String event, String cacheKey) {
             super();
-            this.type = type;
-            this.action = action;
-            this.name = name;
+            this.entity = entity;
+            this.event = event;
+            this.cacheKey = cacheKey;
         }
 
-        public String getType() {
-            return type;
+        public String getEntity() {
+            return entity;
         }
 
-        public String getAction() {
-            return action;
+        public String getEvent() {
+            return event;
         }
 
-        public String getName() {
-            return name;
+        public String getCacheKey() {
+            return cacheKey;
         }
 
         @Override
         public int hashCode() {
             final int prime = 31;
             int result = 1;
-            result = prime * result + ((action == null) ? 0 : 
action.hashCode());
-            result = prime * result + ((name == null) ? 0 : name.hashCode());
-            result = prime * result + ((type == null) ? 0 : type.hashCode());
+            result = prime * result + ((event == null) ? 0 : event.hashCode());
+            result = prime * result + ((cacheKey == null) ? 0 : 
cacheKey.hashCode());
+            result = prime * result + ((entity == null) ? 0 : 
entity.hashCode());
             return result;
         }
 
@@ -251,13 +287,13 @@ public class Broadcaster {
                 return false;
             }
             BroadcastEvent other = (BroadcastEvent) obj;
-            if (!StringUtils.equals(action, other.action)) {
+            if (!StringUtils.equals(event, other.event)) {
                 return false;
             }
-            if (!StringUtils.equals(name, other.name)) {
+            if (!StringUtils.equals(cacheKey, other.cacheKey)) {
                 return false;
             }
-            if (!StringUtils.equals(type, other.type)) {
+            if (!StringUtils.equals(entity, other.entity)) {
                 return false;
             }
             return true;
@@ -265,7 +301,7 @@ public class Broadcaster {
 
         @Override
         public String toString() {
-            return Objects.toStringHelper(this).add("type", type).add("name", 
name).add("action", action).toString();
+            return Objects.toStringHelper(this).add("type", 
entity).add("name", cacheKey).add("action", event).toString();
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
 
b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
index 2bcddbf..acc50bd 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/CaseInsensitiveStringCache.java
@@ -26,8 +26,9 @@ import org.apache.kylin.common.KylinConfig;
  */
 public class CaseInsensitiveStringCache<V> extends SingleValueCache<String, V> 
{
 
-    public CaseInsensitiveStringCache(KylinConfig config, Broadcaster.TYPE 
syncType) {
-        super(config, syncType, new ConcurrentSkipListMap<String, 
V>(String.CASE_INSENSITIVE_ORDER));
+    public CaseInsensitiveStringCache(KylinConfig config, String syncEntity, 
Broadcaster.Listener listener) {
+        super(config, syncEntity, new ConcurrentSkipListMap<String, 
V>(String.CASE_INSENSITIVE_ORDER));
+        getBroadcaster().registerListener(listener, syncEntity);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 050d911..46a9e9b 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -89,8 +89,8 @@ public class RestClient {
         }
     }
 
-    public void wipeCache(String type, String action, String name) throws 
IOException {
-        String url = baseUrl + "/cache/" + type + "/" + name + "/" + action;
+    public void wipeCache(String entity, String event, String cacheKey) throws 
IOException {
+        String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + 
event;
         HttpPut request = new HttpPut(url);
 
         try {

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
 
b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
index 5d1ca9a..80dff33 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/restclient/SingleValueCache.java
@@ -30,16 +30,16 @@ import org.apache.kylin.common.KylinConfig;
 /**
  * @author xjiang
  */
-public abstract class SingleValueCache<K, V> extends AbstractRestCache<K, V> {
+public abstract class SingleValueCache<K, V> extends AbstractCache<K, V> {
 
     private final ConcurrentMap<K, V> innerCache;
 
-    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType) {
-        this(config, syncType, new ConcurrentHashMap<K, V>());
+    public SingleValueCache(KylinConfig config, String syncEntity) {
+        this(config, syncEntity, new ConcurrentHashMap<K, V>());
     }
 
-    public SingleValueCache(KylinConfig config, Broadcaster.TYPE syncType, 
ConcurrentMap<K, V> innerCache) {
-        super(config, syncType);
+    public SingleValueCache(KylinConfig config, String syncEntity, 
ConcurrentMap<K, V> innerCache) {
+        super(config, syncEntity);
         this.innerCache = innerCache;
     }
 
@@ -49,9 +49,9 @@ public abstract class SingleValueCache<K, V> extends 
AbstractRestCache<K, V> {
         innerCache.put(key, value);
 
         if (!exists) {
-            getBroadcaster().queue(syncType.getType(), 
Broadcaster.EVENT.CREATE.getType(), key.toString());
+            getBroadcaster().queue(syncEntity, 
Broadcaster.Event.CREATE.getType(), key.toString());
         } else {
-            getBroadcaster().queue(syncType.getType(), 
Broadcaster.EVENT.UPDATE.getType(), key.toString());
+            getBroadcaster().queue(syncEntity, 
Broadcaster.Event.UPDATE.getType(), key.toString());
         }
     }
 
@@ -65,7 +65,7 @@ public abstract class SingleValueCache<K, V> extends 
AbstractRestCache<K, V> {
         innerCache.remove(key);
 
         if (exists) {
-            getBroadcaster().queue(syncType.getType(), 
Broadcaster.EVENT.DROP.getType(), key.toString());
+            getBroadcaster().queue(syncEntity, 
Broadcaster.Event.DROP.getType(), key.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 1b1cf70..19e0eb8 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -28,6 +28,7 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -35,14 +36,17 @@ import 
org.apache.kylin.cube.model.validation.CubeMetadataValidator;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Manager class for CubeDesc; extracted from #CubeManager
+ * 
  * @author shaoshi
- *
  */
 public class CubeDescManager {
 
@@ -90,9 +94,44 @@ public class CubeDescManager {
     private CubeDescManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeDescManager with config " + config);
         this.config = config;
-        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, 
Broadcaster.TYPE.CUBE_DESC);
+        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, 
"cube_desc", new SyncListener());
         reloadAllCubeDesc();
     }
+    
+    private class SyncListener extends Broadcaster.Listener {
+        
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+            Cuboid.clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof CubeInstance) {
+                    String descName = ((CubeInstance) real).getDescName();
+                    reloadCubeDescLocal(descName);        
+                }
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            String cubeDescName = cacheKey;
+            CubeDesc cubeDesc = getCubeDesc(cubeDescName);
+            String modelName = cubeDesc == null ? null : 
cubeDesc.getModel().getName();
+            
+            if (event == Event.DROP)
+                removeLocalCubeDesc(cubeDescName);
+            else
+                reloadCubeDescLocal(cubeDescName);
+            
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
 
     public CubeDesc getCubeDesc(String name) {
         return cubeDescMap.get(name);

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fd46b54..f86301f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -40,6 +40,7 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
@@ -55,6 +56,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
@@ -130,10 +132,40 @@ public class CubeManager implements IRealizationProvider {
     private CubeManager(KylinConfig config) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
         this.config = config;
-        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, 
Broadcaster.TYPE.CUBE);
+        this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, 
"cube", new SyncListener());
         loadAllCubeInstance();
     }
 
+    private class SyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            for (IRealization real : 
ProjectManager.getInstance(config).listAllRealizations(project)) {
+                if (real instanceof CubeInstance) {
+                    reloadCubeLocal(real.getName());
+                }
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            String cubeName = cacheKey;
+            
+            if (event == Event.DROP)
+                removeCubeLocal(cubeName);
+            else
+                reloadCubeLocal(cubeName);
+            
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjects(RealizationType.CUBE, 
cubeName)) {
+                broadcaster.notifyProjectDataUpdate(prj.getName());
+            }
+        }
+    }
+
     public List<CubeInstance> listAllCubes() {
         return new ArrayList<CubeInstance>(cubeMap.values());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
index a74dd58..6803941 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataManager.java
@@ -38,6 +38,7 @@ import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -253,10 +254,10 @@ public class MetadataManager {
 
     private void init(KylinConfig config) throws IOException {
         this.config = config;
-        this.srcTableMap = new CaseInsensitiveStringCache<>(config, 
Broadcaster.TYPE.TABLE);
-        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, 
Broadcaster.TYPE.TABLE);
-        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, 
Broadcaster.TYPE.DATA_MODEL);
-        this.extFilterMap = new CaseInsensitiveStringCache<>(config, 
Broadcaster.TYPE.EXTERNAL_FILTER);
+        this.srcTableMap = new CaseInsensitiveStringCache<>(config, "table", 
new SrcTableSyncListener());
+        this.srcTableExdMap = new CaseInsensitiveStringCache<>(config, 
"table_ext", new SrcTableExtSyncListener());
+        this.dataModelDescMap = new CaseInsensitiveStringCache<>(config, 
"data_model", new DataModelSyncListener());
+        this.extFilterMap = new CaseInsensitiveStringCache<>(config, 
"external_filter", new ExtFilterSyncListener());
 
         reloadAllSourceTable();
         reloadAllSourceTableExd();
@@ -264,6 +265,85 @@ public class MetadataManager {
         reloadAllExternalFilter();
     }
 
+    private class SrcTableSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                srcTableMap.removeLocal(cacheKey);
+            else
+                reloadSourceTable(cacheKey);
+            
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjectsByTable(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class SrcTableExtSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                srcTableExdMap.removeLocal(cacheKey);
+            else
+                reloadSourceTableExt(cacheKey);
+            
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjectsByTable(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class DataModelSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            for (String model : 
ProjectManager.getInstance(config).getProject(project).getModels()) {
+                reloadDataModelDesc(model);
+            }
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                dataModelDescMap.removeLocal(cacheKey);
+            else
+                reloadDataModelDesc(cacheKey);
+            
+            for (ProjectInstance prj : 
ProjectManager.getInstance(config).findProjectsByModel(cacheKey)) {
+                broadcaster.notifyProjectSchemaUpdate(prj.getName());
+            }
+        }
+    }
+
+    private class ExtFilterSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                extFilterMap.removeLocal(cacheKey);
+            else
+                reloadExtFilter(cacheKey);
+        }
+    }
+
     private void reloadAllSourceTableExd() throws IOException {
         ResourceStore store = getStore();
         logger.debug("Reloading SourceTable exd info from folder " + 
store.getReadableResourcePath(ResourceStore.TABLE_EXD_RESOURCE_ROOT));

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 1bf9804..972d40f 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -30,6 +30,10 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.badquery.BadQueryHistoryManager;
@@ -87,12 +91,38 @@ public class ProjectManager {
     private ProjectManager(KylinConfig config) throws IOException {
         logger.info("Initializing ProjectManager with metadata url " + config);
         this.config = config;
-        this.projectMap = new 
CaseInsensitiveStringCache<ProjectInstance>(config, Broadcaster.TYPE.PROJECT);
+        this.projectMap = new 
CaseInsensitiveStringCache<ProjectInstance>(config, "project", new 
SyncListener());
         this.l2Cache = new ProjectL2Cache(this);
 
         reloadAllProjects();
     }
 
+    private class SyncListener extends Broadcaster.Listener {
+        
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onProjectSchemaChange(Broadcaster broadcaster, String 
project) throws IOException {
+            reloadProjectLocal(project);
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, 
Event event, String cacheKey) throws IOException {
+            String project = cacheKey;
+            
+            if (event == Event.DROP)
+                removeProjectLocal(project);
+            else
+                reloadProjectLocal(project);
+            
+            broadcaster.notifyProjectSchemaUpdate(project);
+            broadcaster.notifyProjectDataUpdate(project);
+        }
+    }
+
     public void clearL2Cache() {
         l2Cache.clear();
     }
@@ -224,6 +254,11 @@ public class ProjectManager {
         projectMap.remove(norm(proj.getName()));
         clearL2Cache();
     }
+    
+    private void removeProjectLocal(String proj) {
+        projectMap.remove(norm(proj));
+        clearL2Cache();
+    }
 
     public boolean isModelInProject(String projectName, String modelName) {
         return this.getProject(projectName).containsModel(modelName);
@@ -235,7 +270,7 @@ public class ProjectManager {
     }
 
     public void removeModelFromProjects(String modelName) throws IOException {
-        for (ProjectInstance projectInstance : findProjects(modelName)) {
+        for (ProjectInstance projectInstance : findProjectsByModel(modelName)) 
{
             projectInstance.removeModel(modelName);
             updateProject(projectInstance);
         }
@@ -344,17 +379,26 @@ public class ProjectManager {
         return result;
     }
 
-    private List<ProjectInstance> findProjects(String modelName) {
+    public List<ProjectInstance> findProjectsByModel(String modelName) {
         List<ProjectInstance> projects = new ArrayList<ProjectInstance>();
         for (ProjectInstance projectInstance : projectMap.values()) {
             if (projectInstance.containsModel(modelName)) {
                 projects.add(projectInstance);
             }
         }
-
         return projects;
     }
 
+    public List<ProjectInstance> findProjectsByTable(String tableIdentity) {
+        List<ProjectInstance> projects = new ArrayList<ProjectInstance>();
+        for (ProjectInstance projectInstance : projectMap.values()) {
+            if (projectInstance.containsTable(tableIdentity)) {
+                projects.add(projectInstance);
+            }
+        }
+        return projects;
+    }
+    
     public ExternalFilterDesc getExternalFilterDesc(String project, String 
extFilter) {
         return l2Cache.getExternalFilterDesc(project, extFilter);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
index 0f948cb..f772777 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridManager.java
@@ -28,6 +28,8 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
+import org.apache.kylin.cube.CubeManager.SyncListener;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -83,10 +85,34 @@ public class HybridManager implements IRealizationProvider {
     private HybridManager(KylinConfig config) throws IOException {
         logger.info("Initializing HybridManager with config " + config);
         this.config = config;
-        this.hybridMap = new 
CaseInsensitiveStringCache<HybridInstance>(config, Broadcaster.TYPE.HYBRID);
+        this.hybridMap = new 
CaseInsensitiveStringCache<HybridInstance>(config, "hybrid");
+        Broadcaster.getInstance(config).registerListener(new SyncListener(), 
"hybrid", "cube");
         loadAllHybridInstance();
     }
 
+    private class SyncListener implements Broadcaster.Listener {
+        @Override
+        public void clearAll() {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void notify(String entity, Event event, String cacheKey) {
+            if (event == Event.CREATE || event == Event.UPDATE) {
+                switch (entity) {
+                case "hybrid":
+                    loadAllHybridInstance();
+                    break;
+                case "cube":
+                    reloadHybridInstanceByChild(RealizationType.CUBE, 
cacheKey);
+                    break;
+                }
+            }
+            
+        }
+    }
+
     private void loadAllHybridInstance() throws IOException {
         ResourceStore store = getStore();
         List<String> paths = 
store.collectResourceRecursively(ResourceStore.HYBRID_RESOURCE_ROOT, ".json");

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index e4e1359..87dd5d5 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -45,6 +45,7 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
@@ -72,10 +73,25 @@ public class StreamingManager {
 
     private StreamingManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.streamingMap = new 
CaseInsensitiveStringCache<StreamingConfig>(config, Broadcaster.TYPE.STREAMING);
+        this.streamingMap = new 
CaseInsensitiveStringCache<StreamingConfig>(config, "streaming", new 
SyncListener());
         reloadAllStreaming();
     }
 
+    private class SyncListener implements Broadcaster.Listener {
+        @Override
+        public void clearAll() {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void notify(String entity, Event event, String cacheKey) throws 
IOException {
+            if (event == Event.CREATE || event == Event.UPDATE) {
+                reloadStreamingConfigLocal(cacheKey);
+            }
+        }
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
index 845ffe0..dd9936f 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CacheController.java
@@ -21,7 +21,7 @@ package org.apache.kylin.rest.controller;
 import java.io.IOException;
 
 import org.apache.kylin.common.restclient.Broadcaster;
-import org.apache.kylin.common.restclient.Broadcaster.EVENT;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.rest.service.CacheService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,31 +48,30 @@ public class CacheController extends BasicController {
     /**
      * Wipe system cache
      *
-     * @param type  {@link Broadcaster.TYPE}
-     * @param event {@link Broadcaster.EVENT}
-     * @param name
+     * @param entity  {@link Broadcaster.TYPE}
+     * @param event {@link Broadcaster.Event}
+     * @param cacheKey
      * @return if the action success
      * @throws IOException
      */
-    @RequestMapping(value = "/{type}/{name}/{event}", method = { 
RequestMethod.PUT })
+    @RequestMapping(value = "/{entity}/{cacheKey}/{event}", method = { 
RequestMethod.PUT })
     @ResponseBody
-    public void wipeCache(@PathVariable String type, @PathVariable String 
event, @PathVariable String name) throws IOException {
+    public void wipeCache(@PathVariable String entity, @PathVariable String 
event, @PathVariable String cacheKey) throws IOException {
 
-        Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-        EVENT wipeEvent = Broadcaster.EVENT.getEvent(event);
+        Event wipeEvent = Broadcaster.Event.getEvent(event);
 
-        logger.info("wipe cache type: " + wipeType + " event:" + wipeEvent + " 
name:" + name);
+        logger.info("wipe cache entity: " + entity + " event:" + wipeEvent + " 
cache key:" + cacheKey);
 
         switch (wipeEvent) {
         case CREATE:
         case UPDATE:
-            cacheService.rebuildCache(wipeType, name);
+            cacheService.rebuildCache(entity, cacheKey);
             break;
         case DROP:
-            cacheService.removeCache(wipeType, name);
+            cacheService.removeCache(entity, cacheKey);
             break;
         default:
-            throw new RuntimeException("invalid type:" + wipeEvent);
+            throw new RuntimeException("invalid event:" + wipeEvent);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 9d134d6..c121d06 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -34,6 +34,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -66,7 +67,7 @@ public class CacheService extends BasicService {
 
     private static final Logger logger = 
LoggerFactory.getLogger(CacheService.class);
 
-    private static ConcurrentMap<String, DataSource> olapDataSources = new 
ConcurrentHashMap<String, DataSource>();
+    private ConcurrentMap<String, DataSource> olapDataSources = new 
ConcurrentHashMap<String, DataSource>();
 
     @Autowired
     private CubeService cubeService;
@@ -75,25 +76,32 @@ public class CacheService extends BasicService {
     private CacheManager cacheManager;
 
     @PostConstruct
-    public void initCubeChangeListener() throws IOException {
-        CubeManager cubeMgr = CubeManager.getInstance(getConfig());
-        cubeMgr.setCubeChangeListener(new CubeManager.CubeChangeListener() {
-
+    public void initCacheListener() throws IOException {
+        
+        Broadcaster.getInstance(getConfig()).registerListener(new 
Broadcaster.Listener() {
             @Override
-            public void afterCubeCreate(CubeInstance cube) {
-                // no cache need change
+            public void notify(String entity, Event event, String cacheKey) 
throws IOException {
+                switch (entity) {
+                case "cube":
+                    String cubeName = cacheKey;
+                    CubeInstance cube = getCubeManager().getCube(cubeName);
+
+                    cleanDataCache(cube.getUuid());
+                    for (ProjectInstance prj : 
getProjectManager().findProjects(RealizationType.CUBE, cubeName)) {
+                        removeOLAPDataSource(prj.getName());
+                    }
+                    break;
+                case "project":
+                    removeOLAPDataSource(cacheKey);
+                    break;
+                }
             }
-
-            @Override
-            public void afterCubeUpdate(CubeInstance cube) {
-                rebuildCubeCache(cube.getName());
-            }
-
+            
             @Override
-            public void afterCubeDelete(CubeInstance cube) {
-                removeCubeCache(cube.getName(), cube);
+            public void clearAll() throws IOException {
+                
             }
-        });
+        }, "cube", "project");
     }
 
     // for test
@@ -120,7 +128,7 @@ public class CacheService extends BasicService {
         }
     }
 
-    private static void removeOLAPDataSource(String project) {
+    private void removeOLAPDataSource(String project) {
         logger.info("removeOLAPDataSource is called for project " + project);
         if (StringUtils.isEmpty(project))
             throw new IllegalArgumentException("removeOLAPDataSource: project 
name not given");
@@ -129,7 +137,7 @@ public class CacheService extends BasicService {
         olapDataSources.remove(project);
     }
 
-    public static void removeAllOLAPDataSources() {
+    public void removeAllOLAPDataSources() {
         // brutal, yet simplest way
         logger.info("removeAllOLAPDataSources is called.");
         olapDataSources.clear();
@@ -166,40 +174,36 @@ public class CacheService extends BasicService {
         return ret;
     }
 
-    public void rebuildCache(Broadcaster.TYPE cacheType, String cacheKey) {
-        final String log = "rebuild cache type: " + cacheType + " name:" + 
cacheKey;
+    public void rebuildCache(String entity, String cacheKey) {
+        final String log = "rebuild cache type: " + entity + " name:" + 
cacheKey;
         logger.info(log);
         try {
-            switch (cacheType) {
-            case CUBE:
+            switch (entity) {
+            case "cube":
                 rebuildCubeCache(cacheKey);
                 break;
-            case STREAMING:
-                getStreamingManager().reloadStreamingConfigLocal(cacheKey);
+            case "streaming":
+                
                 break;
-            case KAFKA:
-                getKafkaManager().reloadKafkaConfigLocal(cacheKey);
+            case "kafka":
+                
                 break;
-            case CUBE_DESC:
-                getCubeDescManager().reloadCubeDescLocal(cacheKey);
+            case "cube_desc":
+                
                 break;
-            case PROJECT:
+            case "project":
                 reloadProjectCache(cacheKey);
                 break;
-            case TABLE:
-                getMetadataManager().reloadTableCache(cacheKey);
-                CubeDescManager.clearCache();
+            case "table":
                 clearRealizationCache();
                 break;
-            case EXTERNAL_FILTER:
-                getMetadataManager().reloadExtFilter(cacheKey);
-                CubeDescManager.clearCache();
+            case "external_filter":
+                
                 break;
-            case DATA_MODEL:
-                getMetadataManager().reloadDataModelDesc(cacheKey);
-                CubeDescManager.clearCache();
+            case "data_model":
+                
                 break;
-            case ALL:
+            case "all":
                 DictionaryManager.clearCache();
                 MetadataManager.clearCache();
                 CubeDescManager.clearCache();
@@ -214,7 +218,7 @@ public class CacheService extends BasicService {
                 removeAllOLAPDataSources();
                 break;
             default:
-                logger.error("invalid cacheType:" + cacheType);
+                logger.error("invalid cacheType:" + entity);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);
@@ -228,9 +232,9 @@ public class CacheService extends BasicService {
     }
 
     private void rebuildCubeCache(String cubeName) {
-        CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
-        getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, 
cubeName);
-        
reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, 
cubeName));
+        //CubeInstance cube = getCubeManager().reloadCubeLocal(cubeName);
+        //getHybridManager().reloadHybridInstanceByChild(RealizationType.CUBE, 
cubeName);
+        
//reloadProjectCache(getProjectManager().findProjects(RealizationType.CUBE, 
cubeName));
         //clean query related cache first
         if (cube != null) {
             cleanDataCache(cube.getUuid());
@@ -238,28 +242,28 @@ public class CacheService extends BasicService {
         cubeService.updateOnNewSegmentReady(cubeName);
     }
 
-    public void removeCache(Broadcaster.TYPE cacheType, String cacheKey) {
-        final String log = "remove cache type: " + cacheType + " name:" + 
cacheKey;
+    public void removeCache(String entity, String cacheKey) {
+        final String log = "remove cache type: " + entity + " name:" + 
cacheKey;
         try {
-            switch (cacheType) {
-            case CUBE:
+            switch (entity) {
+            case "cube":
                 removeCubeCache(cacheKey, null);
                 break;
-            case CUBE_DESC:
+            case "cube_desc":
                 getCubeDescManager().removeLocalCubeDesc(cacheKey);
                 break;
-            case PROJECT:
+            case "project":
                 ProjectManager.clearCache();
                 break;
-            case TABLE:
+            case "table":
                 throw new UnsupportedOperationException(log);
-            case EXTERNAL_FILTER:
+            case "external_filter":
                 throw new UnsupportedOperationException(log);
-            case DATA_MODEL:
+            case "data_model":
                 getMetadataManager().removeModelCache(cacheKey);
                 break;
             default:
-                throw new RuntimeException("invalid cacheType:" + cacheType);
+                throw new RuntimeException("invalid cacheType:" + entity);
             }
         } catch (IOException e) {
             throw new RuntimeException("error " + log, e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index e446045..bdf317c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -28,9 +28,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.WeakHashMap;
 
+import javax.annotation.PostConstruct;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -92,6 +96,32 @@ public class CubeService extends BasicService {
     @Autowired
     private AccessService accessService;
 
+    @PostConstruct
+    public void initCacheListener() throws IOException {
+        Broadcaster.getInstance(getConfig()).registerListener(new 
Broadcaster.Listener() {
+            @Override
+            public void notify(String entity, Event event, String cacheKey) 
throws IOException {
+                if (event == Event.UPDATE) {
+                    final String cubeName = cacheKey;
+                    new Thread() { // do not block the event broadcast thread
+                        public void run() {
+                            try {
+                                Thread.sleep(1000);
+                                updateOnNewSegmentReady(cubeName);
+                            } catch (Throwable ex) {
+                                logger.error("Error in 
updateOnNewSegmentReady()", ex);
+                            }
+                        }
+                    }.run();
+                }
+            }
+            
+            @Override
+            public void clearAll() throws IOException {
+            }
+        }, "cube");
+    }
+
     @PostFilter(Constant.ACCESS_POST_FILTER_READ)
     public List<CubeInstance> listAllCubes(final String cubeName, final String 
projectName, final String modelName) {
         List<CubeInstance> cubeInstances = null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git 
a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java 
b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 38cd93f..6101fb6 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -115,22 +115,21 @@ public class CacheServiceTest extends 
LocalFileMetadataTestCase {
 
         context.addServlet(new ServletHolder(new BroadcasterReceiveServlet(new 
BroadcasterReceiveServlet.BroadcasterHandler() {
             @Override
-            public void handle(String type, String name, String event) {
+            public void handle(String entity, String cacheKey, String event) {
 
-                Broadcaster.TYPE wipeType = Broadcaster.TYPE.getType(type);
-                Broadcaster.EVENT wipeEvent = 
Broadcaster.EVENT.getEvent(event);
-                final String log = "wipe cache type: " + wipeType + " event:" 
+ wipeEvent + " name:" + name;
+                Broadcaster.Event wipeEvent = 
Broadcaster.Event.getEvent(event);
+                final String log = "wipe cache type: " + entity + " event:" + 
wipeEvent + " name:" + cacheKey;
                 logger.info(log);
                 try {
                     switch (wipeEvent) {
                     case CREATE:
                     case UPDATE:
-                        serviceA.rebuildCache(wipeType, name);
-                        serviceB.rebuildCache(wipeType, name);
+                        serviceA.rebuildCache(entity, cacheKey);
+                        serviceB.rebuildCache(entity, cacheKey);
                         break;
                     case DROP:
-                        serviceA.removeCache(wipeType, name);
-                        serviceB.removeCache(wipeType, name);
+                        serviceA.removeCache(entity, cacheKey);
+                        serviceB.removeCache(entity, cacheKey);
                         break;
                     default:
                         throw new RuntimeException("invalid type:" + 
wipeEvent);

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index d594873..a3b675b 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -45,6 +45,7 @@ import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
+import org.apache.kylin.common.restclient.Broadcaster.Event;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
@@ -73,10 +74,25 @@ public class KafkaConfigManager {
 
     private KafkaConfigManager(KylinConfig config) throws IOException {
         this.config = config;
-        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, 
Broadcaster.TYPE.KAFKA);
+        this.kafkaMap = new CaseInsensitiveStringCache<KafkaConfig>(config, 
"kafka", new SyncListener());
         reloadAllKafkaConfig();
     }
 
+    private class SyncListener implements Broadcaster.Listener {
+        @Override
+        public void clearAll() {
+            // TODO Auto-generated method stub
+            
+        }
+
+        @Override
+        public void notify(String entity, Event event, String cacheKey) throws 
IOException {
+            if (event == Event.CREATE || event == Event.UPDATE) {
+                reloadKafkaConfigLocal(cacheKey);
+            }
+        }
+    }
+
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/478066d0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 3066fb5..71ab0d5 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -564,7 +564,7 @@ public class CubeMigrationCLI {
             RestClient restClient = new RestClient(node);
             try {
                 logger.info("update meta cache for " + node);
-                restClient.wipeCache(Broadcaster.TYPE.ALL.getType(), 
Broadcaster.EVENT.UPDATE.getType(), "all");
+                restClient.wipeCache(Broadcaster.SYNC_ALL, 
Broadcaster.Event.UPDATE.getType(), Broadcaster.SYNC_ALL);
             } catch (IOException e) {
                 logger.error(e.getMessage());
             }

Reply via email to