This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch 2.8.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git


The following commit(s) were added to refs/heads/2.8.x by this push:
     new 863e8871a [SCB-2804]Fix instance not found when instances frequently 
change enhancement: keep cache versions (#3959)
863e8871a is described below

commit 863e8871a71d5a6250dd756cc7b8558976735608
Author: liubao68 <bi...@qq.com>
AuthorDate: Tue Oct 10 09:11:13 2023 +0800

    [SCB-2804]Fix instance not found when instances frequently change 
enhancement: keep cache versions (#3959)
---
 .../servicecomb/registry/consumer/AppManager.java  |  8 ++--
 .../registry/consumer/MicroserviceManager.java     | 16 +++-----
 .../registry/consumer/MicroserviceVersions.java    | 45 +++++++++++++++++++---
 .../servicecomb/serviceregistry/RegistryUtils.java |  8 +---
 .../registry/AbstractServiceRegistry.java          |  2 +
 .../servicecomb/serviceregistry/TestConsumers.java |  6 ++-
 6 files changed, 57 insertions(+), 28 deletions(-)

diff --git 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java
 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java
index ef72228c2..f149516e9 100644
--- 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java
+++ 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/AppManager.java
@@ -23,7 +23,7 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
 import org.apache.servicecomb.foundation.common.event.EventManager;
-import org.apache.servicecomb.registry.api.MicroserviceKey;
+import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,13 +70,13 @@ public class AppManager {
     return 
microserviceManager.getOrCreateMicroserviceVersions(microserviceName);
   }
 
-  public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) {
-    MicroserviceManager microserviceManager = 
apps.get(microserviceKey.getAppId());
+  public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent 
changedEvent) {
+    MicroserviceManager microserviceManager = 
apps.get(changedEvent.getKey().getAppId());
     if (microserviceManager == null) {
       return;
     }
 
-    microserviceManager.onMicroserviceInstancesChanged(microserviceKey);
+    microserviceManager.onMicroserviceInstanceChanged(changedEvent);
   }
 
   public void pullInstances() {
diff --git 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java
 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java
index 226bd7323..862ea506c 100644
--- 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java
+++ 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceManager.java
@@ -18,12 +18,11 @@
 package org.apache.servicecomb.registry.consumer;
 
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
 import 
org.apache.servicecomb.foundation.vertx.executor.SinglePoolBlockingExecutor;
-import org.apache.servicecomb.registry.api.MicroserviceKey;
+import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,15 +128,12 @@ public class MicroserviceManager {
   /**
    * Update instance information triggered by event, called when instance list 
changed.
    */
-  public void onMicroserviceInstancesChanged(MicroserviceKey microserviceKey) {
+  public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent 
changedEvent) {
     synchronized (lock) {
-      for (Entry<String, MicroserviceVersions> item : 
versionsByName.entrySet()) {
-        if (item.getKey().equals(microserviceKey.getServiceName())) {
-          versionsByName.remove(item.getKey());
-          item.getValue().destroy();
-          LOGGER.info("remove microservice version when instance changed, 
appId={}, microserviceName={}.",
-              appId, item.getKey());
-        }
+      for (MicroserviceVersions microserviceVersions : 
versionsByName.values()) {
+        microserviceVersions.onMicroserviceInstanceChanged(changedEvent);
+
+        tryRemoveInvalidMicroservice(microserviceVersions);
       }
     }
   }
diff --git 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java
 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java
index c2c2446ba..4e665b2db 100644
--- 
a/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java
+++ 
b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/consumer/MicroserviceVersions.java
@@ -32,6 +32,7 @@ import 
org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
 import org.apache.servicecomb.registry.DiscoveryManager;
 import org.apache.servicecomb.registry.api.event.CreateMicroserviceEvent;
 import org.apache.servicecomb.registry.api.event.DestroyMicroserviceEvent;
+import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
 import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
 import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
 import org.apache.servicecomb.registry.api.registry.MicroserviceInstances;
@@ -232,13 +233,19 @@ public class MicroserviceVersions {
     synchronized (lock) {
       MergedInstances mergedInstances = mergeInstances(pulledInstances, 
instances);
       instances = mergedInstances.instanceIdMap.values();
-      // clear cache
-      versions.forEach((key, value) -> value.setInstances(new ArrayList<>()));
+
+      // set instances to empty for no instance versions
+      versions.forEach((key, value) -> {
+        if (!mergedInstances.microserviceIdMap.containsKey(key)) {
+          value.setInstances(new ArrayList<>());
+        }
+      });
+
+      // update instances
       for (Entry<String, List<MicroserviceInstance>> entry : 
mergedInstances.microserviceIdMap.entrySet()) {
-        // always update microservice versions, because we allow microservice 
info override, like schema info
-        MicroserviceVersion newVersion = 
createMicroserviceVersion(entry.getKey(), entry.getValue());
-        newVersion.setInstances(entry.getValue());
-        versions.put(entry.getKey(), newVersion);
+        versions.computeIfAbsent(entry.getKey(),
+                microserviceId -> createMicroserviceVersion(microserviceId, 
entry.getValue()))
+            .setInstances(entry.getValue());
       }
 
       for (MicroserviceVersionRule microserviceVersionRule : 
versionRules.values()) {
@@ -297,6 +304,32 @@ public class MicroserviceVersions {
     return microserviceVersionRule;
   }
 
+  public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent 
changedEvent) {
+    if (!isEventAccept(changedEvent)) {
+      return;
+    }
+    // pull instances always replace old instances, not append
+    //
+    // pull result and watch event sequence is not defined even inside SC.
+    // it's not safe to trust the event, so we just send a new pull request
+    //
+    // CREATE/UPDATE:
+    //   if pull 1/2/3, and then add 4, but "add 4" received before pull 
result, will lost 4.
+    // DELETE:
+    //   if pull 1/2/3, and then delete 3, but "delete 3" received before pull 
result, will have wrong 3.
+    // EXPIRE::
+    //   black/white config in SC changed, we must refresh all data from sc.
+    pullInstances();
+  }
+
+  protected boolean isEventAccept(MicroserviceInstanceChangedEvent 
changedEvent) {
+    return (appId.equals(changedEvent.getKey().getAppId()) &&
+        microserviceName.equals(changedEvent.getKey().getServiceName())) ||
+        microserviceName.equals(
+            changedEvent.getKey().getAppId() + 
DefinitionConst.APP_SERVICE_SEPARATOR + changedEvent.getKey()
+                .getServiceName());
+  }
+
   public void destroy() {
     synchronized (lock) {
       for (MicroserviceVersion microserviceVersion : versions.values()) {
diff --git 
a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
 
b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
index fb398bba8..72022f6a1 100644
--- 
a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
+++ 
b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
@@ -36,7 +36,6 @@ import 
org.apache.servicecomb.foundation.common.event.EnableExceptionPropagation
 import org.apache.servicecomb.foundation.common.event.EventManager;
 import org.apache.servicecomb.foundation.common.utils.BeanUtils;
 import org.apache.servicecomb.registry.DiscoveryManager;
-import org.apache.servicecomb.registry.api.MicroserviceKey;
 import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent;
 import org.apache.servicecomb.registry.api.registry.FindInstancesResponse;
 import org.apache.servicecomb.registry.api.registry.Microservice;
@@ -98,12 +97,7 @@ public final class RegistryUtils {
     executeOnEachServiceRegistry(serviceRegistries::add);
     aggregateServiceRegistryCache = new 
AggregateServiceRegistryCache(serviceRegistries);
     aggregateServiceRegistryCache
-        .setCacheRefreshedWatcher(refreshedCaches -> {
-          MicroserviceKey microserviceKey = new MicroserviceKey();
-          microserviceKey.setAppId(refreshedCaches.get(0).getKey().getAppId());
-          
microserviceKey.setServiceName(refreshedCaches.get(0).getKey().getServiceName());
-          
DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstancesChanged(microserviceKey);
-        });
+        .setCacheRefreshedWatcher(refreshedCaches -> 
DiscoveryManager.INSTANCE.getAppManager().pullInstances());
 
     executeOnEachServiceRegistry(
         serviceRegistry -> serviceRegistry
diff --git 
a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
 
b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 0640a5933..cf32dde8c 100644
--- 
a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ 
b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.Configuration;
 import 
org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
+import org.apache.servicecomb.registry.DiscoveryManager;
 import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
 import org.apache.servicecomb.registry.api.registry.BasePath;
 import org.apache.servicecomb.registry.api.registry.Microservice;
@@ -276,6 +277,7 @@ public abstract class AbstractServiceRegistry implements 
ServiceRegistry {
       executorService.execute(new SuppressedRunnableWrapper(
           () -> {
             serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
+            
DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
           }));
     } catch (Exception e) {
       LOGGER.info("instance changed event ignored, {}", e.getMessage());
diff --git 
a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
 
b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
index 1458cf1e5..977a5e2f2 100644
--- 
a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
+++ 
b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
@@ -23,6 +23,7 @@ import 
org.apache.servicecomb.foundation.test.scaffolding.log.LogCollector;
 import org.apache.servicecomb.registry.DiscoveryManager;
 import org.apache.servicecomb.registry.RegistrationManager;
 import org.apache.servicecomb.registry.api.MicroserviceKey;
+import 
org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
 import org.apache.servicecomb.registry.api.registry.Microservice;
 import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
 import org.apache.servicecomb.registry.consumer.MicroserviceVersion;
@@ -82,9 +83,12 @@ public class TestConsumers extends TestRegistryBase {
     mockNotExist();
 
     MicroserviceKey key = new MicroserviceKey();
+    MicroserviceInstanceChangedEvent event = new 
MicroserviceInstanceChangedEvent();
+    event.setKey(key);
+
     key.setAppId(appId);
     key.setServiceName(serviceName);
-    microserviceManager.onMicroserviceInstancesChanged(key);
+    eventBus.post(event);
     long begin = System.currentTimeMillis();
     while (microserviceManager.getVersionsByName().size() > 0 && 
System.currentTimeMillis() - begin < 1000) {
       Thread.yield();

Reply via email to