moremind commented on code in PR #5001:
URL: https://github.com/apache/shenyu/pull/5001#discussion_r1293209618


##########
shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java:
##########
@@ -85,88 +85,159 @@ public ConsulSyncDataService(final ConsulClient 
consulClient,
         super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.consulClient = consulClient;
         this.consulConfig = consulConfig;
-        this.executor = new ScheduledThreadPoolExecutor(1,
+        // corePool is the total number of watcher nodes
+        this.executor = new ScheduledThreadPoolExecutor(7,
                 ShenyuThreadFactory.create("consul-config-watch", true));
-        consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
-        initUpdateMap();
-        start();
+        watcherData();
     }
 
-    /**
-     * init config key and update method mapping.
-     */
-    private void initUpdateMap() {
-        groupMap.put(ConsulConstants.PLUGIN_DATA, this::updatePluginData);
-        groupMap.put(ConsulConstants.SELECTOR_DATA, this::updateSelectorMap);
-        groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
-        groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
-        groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
-        groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID, 
this::updateSelectorDataMap);
-        groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM, 
this::updateDiscoveryUpstreamMap);
+    private void watcherData() {
+        watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+        watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+        watcherData0(DefaultPathConstants.RULE_PARENT);
+        watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+        watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+        watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+        watcherData0(DefaultPathConstants.META_DATA);
     }
 
-    private void watchConfigKeyValues() {
-        if (this.running.get()) {
-            for (String context : this.consulIndexes.keySet()) {
-                try {
-                    Long currentIndex = this.consulIndexes.get(context);
-                    if (Objects.isNull(currentIndex)) {
-                        currentIndex = 
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
-                    }
-                    Response<List<GetValue>> response = 
this.consulClient.getKVValues(context, null,
-                            new QueryParams(consulConfig.getWaitTime(), 
currentIndex));
-                    if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("No value for context " + context);
-                        }
-                        continue;
-                    }
-                    Long newIndex = response.getConsulIndex();
-                    if (Objects.isNull(newIndex) || Objects.equals(newIndex, 
currentIndex)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Same index for context " + context);
-                        }
-                        continue;
+    private void watcherData0(final String registerPath) {
+        consulIndexes.put(registerPath, 0L);
+        BiConsumer<String, String> updateHandler = (changeData, decodedValue) 
-> this.event(changeData, decodedValue, registerPath, EventType.PUT);
+        Consumer<String> deleteHandler = removeKey -> this.event(removeKey, 
null, registerPath, EventType.DELETE);
+        this.executor.schedule(() -> watchConfigKeyValues(registerPath, 
updateHandler, deleteHandler), -1, TimeUnit.MILLISECONDS);
+    }
+
+    private void watchConfigKeyValues(final String watchPathRoot,
+                                      final BiConsumer<String, String> 
updateHandler,
+                                      final Consumer<String> deleteHandler) {
+        try {
+            Long currentIndex = this.consulIndexes.get(watchPathRoot);
+            if (Objects.isNull(currentIndex)) {
+                currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
+            }
+            Response<List<GetValue>> response = 
this.consulClient.getKVValues(watchPathRoot, null,
+                    new 
QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()), 
currentIndex));
+            if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("No value for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            Long newIndex = response.getConsulIndex();
+            if (Objects.isNull(newIndex)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Same index for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (Objects.equals(newIndex, currentIndex)) {
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        -1, TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (!this.consulIndexes.containsValue(newIndex)
+                    && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("watchPathRoot " + watchPathRoot + " has new 
index " + newIndex);
+                }
+                final Long lastIndex = currentIndex;
+                final List<ConsulData> lastDatas = 
cacheConsulDataKeyMap.get(watchPathRoot);
+                response.getValue().forEach(data -> {
+                    if (data.getModifyIndex() == lastIndex) {
+                        //data has not changed
+                        return;
                     }
-                    if (!this.consulIndexes.containsValue(newIndex)
-                            && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Context " + context + " has new index " 
+ newIndex);
+                    if (lastDatas != null) {

Review Comment:
   remove !=



##########
shenyu-sync-data-center/shenyu-sync-data-consul/src/main/java/org/apache/shenyu/sync/data/consul/ConsulSyncDataService.java:
##########
@@ -85,88 +85,159 @@ public ConsulSyncDataService(final ConsulClient 
consulClient,
         super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers, 
proxySelectorDataSubscribers, discoveryUpstreamDataSubscribers);
         this.consulClient = consulClient;
         this.consulConfig = consulConfig;
-        this.executor = new ScheduledThreadPoolExecutor(1,
+        // corePool is the total number of watcher nodes
+        this.executor = new ScheduledThreadPoolExecutor(7,
                 ShenyuThreadFactory.create("consul-config-watch", true));
-        consulIndexes.put(ConsulConstants.SYNC_PRE_FIX, 0L);
-        initUpdateMap();
-        start();
+        watcherData();
     }
 
-    /**
-     * init config key and update method mapping.
-     */
-    private void initUpdateMap() {
-        groupMap.put(ConsulConstants.PLUGIN_DATA, this::updatePluginData);
-        groupMap.put(ConsulConstants.SELECTOR_DATA, this::updateSelectorMap);
-        groupMap.put(ConsulConstants.RULE_DATA, this::updateRuleMap);
-        groupMap.put(ConsulConstants.META_DATA, this::updateMetaDataMap);
-        groupMap.put(ConsulConstants.AUTH_DATA, this::updateAuthMap);
-        groupMap.put(ConsulConstants.PROXY_SELECTOR_DATA_ID, 
this::updateSelectorDataMap);
-        groupMap.put(ConsulConstants.DISCOVERY_UPSTREAM, 
this::updateDiscoveryUpstreamMap);
+    private void watcherData() {
+        watcherData0(DefaultPathConstants.PLUGIN_PARENT);
+        watcherData0(DefaultPathConstants.SELECTOR_PARENT);
+        watcherData0(DefaultPathConstants.RULE_PARENT);
+        watcherData0(DefaultPathConstants.PROXY_SELECTOR);
+        watcherData0(DefaultPathConstants.DISCOVERY_UPSTREAM);
+        watcherData0(DefaultPathConstants.APP_AUTH_PARENT);
+        watcherData0(DefaultPathConstants.META_DATA);
     }
 
-    private void watchConfigKeyValues() {
-        if (this.running.get()) {
-            for (String context : this.consulIndexes.keySet()) {
-                try {
-                    Long currentIndex = this.consulIndexes.get(context);
-                    if (Objects.isNull(currentIndex)) {
-                        currentIndex = 
ConsulConstants.INIT_CONFIG_VERSION_INDEX;
-                    }
-                    Response<List<GetValue>> response = 
this.consulClient.getKVValues(context, null,
-                            new QueryParams(consulConfig.getWaitTime(), 
currentIndex));
-                    if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("No value for context " + context);
-                        }
-                        continue;
-                    }
-                    Long newIndex = response.getConsulIndex();
-                    if (Objects.isNull(newIndex) || Objects.equals(newIndex, 
currentIndex)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Same index for context " + context);
-                        }
-                        continue;
+    private void watcherData0(final String registerPath) {
+        consulIndexes.put(registerPath, 0L);
+        BiConsumer<String, String> updateHandler = (changeData, decodedValue) 
-> this.event(changeData, decodedValue, registerPath, EventType.PUT);
+        Consumer<String> deleteHandler = removeKey -> this.event(removeKey, 
null, registerPath, EventType.DELETE);
+        this.executor.schedule(() -> watchConfigKeyValues(registerPath, 
updateHandler, deleteHandler), -1, TimeUnit.MILLISECONDS);
+    }
+
+    private void watchConfigKeyValues(final String watchPathRoot,
+                                      final BiConsumer<String, String> 
updateHandler,
+                                      final Consumer<String> deleteHandler) {
+        try {
+            Long currentIndex = this.consulIndexes.get(watchPathRoot);
+            if (Objects.isNull(currentIndex)) {
+                currentIndex = ConsulConstants.INIT_CONFIG_VERSION_INDEX;
+            }
+            Response<List<GetValue>> response = 
this.consulClient.getKVValues(watchPathRoot, null,
+                    new 
QueryParams(TimeUnit.MILLISECONDS.toSeconds(consulConfig.getWaitTime()), 
currentIndex));
+            if (Objects.isNull(response.getValue()) || 
response.getValue().isEmpty()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("No value for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            Long newIndex = response.getConsulIndex();
+            if (Objects.isNull(newIndex)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Same index for watchPathRoot " + watchPathRoot);
+                }
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        consulConfig.getWatchDelay(), TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (Objects.equals(newIndex, currentIndex)) {
+                this.executor.schedule(() -> 
watchConfigKeyValues(watchPathRoot, updateHandler, deleteHandler),
+                        -1, TimeUnit.MILLISECONDS);
+                return;
+            }
+            if (!this.consulIndexes.containsValue(newIndex)
+                    && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("watchPathRoot " + watchPathRoot + " has new 
index " + newIndex);
+                }
+                final Long lastIndex = currentIndex;
+                final List<ConsulData> lastDatas = 
cacheConsulDataKeyMap.get(watchPathRoot);
+                response.getValue().forEach(data -> {
+                    if (data.getModifyIndex() == lastIndex) {
+                        //data has not changed
+                        return;
                     }
-                    if (!this.consulIndexes.containsValue(newIndex)
-                            && 
!currentIndex.equals(ConsulConstants.INIT_CONFIG_VERSION_INDEX)) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Context " + context + " has new index " 
+ newIndex);
+                    if (lastDatas != null) {
+                        final ConsulData consulData = lastDatas.stream()
+                                .filter(lastData -> 
data.getKey().equals(lastData.getConsulKey())).findFirst().orElse(null);
+                        if (consulData != null && 
!StringUtils.isBlank(consulData.getConsulDataMd5())

Review Comment:
   remove !=



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to