AlbumenJ commented on code in PR #10885:
URL: https://github.com/apache/dubbo/pull/10885#discussion_r1043000896


##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/LdsProtocol.java:
##########
@@ -32,27 +34,75 @@
 import 
io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
 
-import java.util.Collections;
-import java.util.Objects;
+import java.util.HashMap;
 import java.util.Set;
-import java.util.function.Consumer;
+import java.util.Objects;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_RESPONSE_XDS;
 
 public class LdsProtocol extends AbstractProtocol<ListenerResult, 
DeltaListener> {
-
     private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(LdsProtocol.class);
 
-    public LdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, 
int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private CompletableFuture<ListenerResult> future;
+    public LdsProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) {
+        super(xdsChannel, node, pollingTimeout);
     }
 
     @Override
     public String getTypeUrl() {
         return "type.googleapis.com/envoy.config.listener.v3.Listener";
     }
 
+    private HashMap<String, Object> resourcesMap = new HashMap<>();
+
+    @Override
+    public boolean isExistResource(Set<String> resourceNames) {
+        for (String resourceName : resourceNames) {
+            if (!resourcesMap.containsKey(resourceName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ListenerResult getCacheResource(Set<String> resourceNames) {
+        Set<String> resourceSet = new HashSet<>();
+        if (!resourceNames.isEmpty() && isExistResource(resourceNames)) {
+            for (String resourceName : resourceNames) {
+                resourceSet.add((String) resourcesMap.get(resourceName));
+            }
+        } else {
+            if (requestObserver == null) {
+                future = new CompletableFuture<>();
+                requestObserver = xdsChannel.createDeltaDiscoveryRequest(new 
ResponseObserver(future));
+            }
+            resourceNames.addAll(resourcesMap.keySet());
+            requestObserver.onNext(buildDiscoveryRequest(resourceNames));
+            try {
+                return future.get();
+            } catch (InterruptedException e) {
+                logger.error("InterruptedException occur when request control 
panel. error={}", e);
+                Thread.currentThread().interrupt();
+            }  catch (Exception e) {
+                logger.error("Error occur when request control panel. error=. 
",e);
+            }
+        }
+        return new ListenerResult(resourceSet);
+    }

Review Comment:
   Move this method impl to `AbstractProtocol`.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/EdsProtocol.java:
##########
@@ -43,15 +48,61 @@ public class EdsProtocol extends 
AbstractProtocol<EndpointResult, DeltaEndpoint>
 
     private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(EdsProtocol.class);
 
-    public EdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, 
int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private HashMap<String, Object> resourcesMap = new HashMap<>();
+
+
+    public EdsProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) {
+        super(xdsChannel, node, pollingTimeout);
     }
 
     @Override
     public String getTypeUrl() {
         return 
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
     }
 
+    @Override
+    public boolean isExistResource(Set<String> resourceNames) {
+        for (String resourceName : resourceNames) {
+            if (!resourcesMap.containsKey(resourceName)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public EndpointResult getCacheResource(Set<String> resourceNames) {
+        Set<Endpoint> resourceSet = new HashSet<>();
+        if (!resourceNames.isEmpty() && isExistResource(resourceNames)) {
+            for (String resourceName : resourceNames) {
+                resourceSet.addAll((Set<Endpoint>) 
resourcesMap.get(resourceName));
+            }
+        } else {
+            CompletableFuture<EndpointResult> future = new 
CompletableFuture<>();
+            if (requestObserver == null) {
+                requestObserver = xdsChannel.createDeltaDiscoveryRequest(new 
ResponseObserver(future));
+            }

Review Comment:
   `future` will never get response if `requestObserver` is not null when 
`resourceNames` update.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/impl/RdsProtocol.java:
##########
@@ -43,22 +46,73 @@ public class RdsProtocol extends 
AbstractProtocol<RouteResult, DeltaRoute> {
 
     private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(RdsProtocol.class);
 
-    public RdsProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, 
int pollingTimeout) {
-        super(xdsChannel, node, pollingPoolSize, pollingTimeout);
+    private StreamObserver<DiscoveryRequest> requestObserver;
+
+    private HashMap<String, Object> resourcesMap = new HashMap<>();

Review Comment:
   Use concurrent map



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java:
##########
@@ -93,103 +65,23 @@ public AbstractProtocol(XdsChannel xdsChannel, Node node, 
int pollingPoolSize, i
      */
     public abstract String getTypeUrl();
 
+    public abstract boolean isExistResource(Set<String> resourceNames);
+
+    public abstract T getCacheResource(Set<String> resourceNames);
+
+    public abstract StreamObserver<DiscoveryRequest> getStreamObserver();
     @Override
     public T getResource(Set<String> resourceNames) {
-        long request = requestId.getAndIncrement();
         resourceNames = resourceNames == null ? Collections.emptySet() : 
resourceNames;
-
-        // Store Request Parameter, which will be used for ACK
-        requestParam.put(request, resourceNames);
-
-        // create observer
-        StreamObserver<DiscoveryRequest> requestObserver = 
xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
-
-        // use future to get async result
-        CompletableFuture<T> future = new CompletableFuture<>();
-        requestObserverMap.put(request, requestObserver);
-        streamResult.put(request, future);
-
-        // send request to control panel
-        requestObserver.onNext(buildDiscoveryRequest(resourceNames));
-
-        try {
-            // get result
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when 
request control panel.");
-            return null;
-        } finally {
-            // close observer
-            //requestObserver.onCompleted();
-
-            // remove temp
-            streamResult.remove(request);
-            requestObserverMap.remove(request);
-            requestParam.remove(request);
-        }
+        return getCacheResource(resourceNames);
     }
-
     @Override
-    public long observeResource(Set<String> resourceNames, Consumer<T> 
consumer) {
-        long request = requestId.getAndIncrement();
+    public void observeResource(Set<String> resourceNames, Consumer<T> 
consumer) {
         resourceNames = resourceNames == null ? Collections.emptySet() : 
resourceNames;
-
-        // Store Request Parameter, which will be used for ACK
-        requestParam.put(request, resourceNames);
-
         // call once for full data
         consumer.accept(getResource(resourceNames));
-
-        // channel reused
-        StreamObserver<DiscoveryRequest> requestObserver = 
xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request));
-        requestObserverMap.put(request, requestObserver);
-
-        ScheduledFuture<?> scheduledFuture = 
pollingExecutor.scheduleAtFixedRate(() -> {
-            try {
-                // origin request, may changed by updateObserve
-                Set<String> names = requestParam.get(request);
-
-                // use future to get async result, future complete on 
StreamObserver onNext
-                CompletableFuture<T> future = new CompletableFuture<>();
-                streamResult.put(request, future);
-
-                // observer reused
-                StreamObserver<DiscoveryRequest> observer = 
requestObserverMap.get(request);
-
-                if (observer == null) {
-                    observer = xdsChannel.createDeltaDiscoveryRequest(new 
ResponseObserver(request));
-                    requestObserverMap.put(request, observer);
-                }
-
-                // send request to control panel
-                observer.onNext(buildDiscoveryRequest(names));
-
-                try {
-                    // get result
-                    consumer.accept(future.get());
-                } catch (InterruptedException | ExecutionException e) {
-                    logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error 
occur when request control panel.");
-                } finally {
-                    // close observer
-                    //requestObserver.onCompleted();
-
-                    // remove temp
-                    streamResult.remove(request);
-                }
-            } catch (Throwable t) {
-                logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error when 
requesting observe data. Type: " + getTypeUrl(), t);
-            }
-        }, pollingTimeout, pollingTimeout, TimeUnit.SECONDS);
-
-        observeScheduledMap.put(request, scheduledFuture);
-
-        return request;
-    }
-
-    @Override
-    public void updateObserve(long request, Set<String> resourceNames) {
-        // send difference in resourceNames
-        requestParam.put(request, resourceNames);
+        this.observeResourcesName = resourceNames;
+        this.observeConsumer = consumer;

Review Comment:
   Should support multi observe.
   E.g. Consumer 1 consumer resource names 1. Consumer 2 consumer resource 
names 2.



##########
dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/PilotExchanger.java:
##########
@@ -45,61 +46,51 @@ public class PilotExchanger {
     private ListenerResult listenerResult;
 
     private RouteResult routeResult;
-
-    private final AtomicLong observeRouteRequest = new AtomicLong(-1);
-
-    private final Map<String, Long> domainObserveRequest = new 
ConcurrentHashMap<>();
+    private final AtomicBoolean isRdsObserve = new AtomicBoolean(false);
+    private final HashSet<String> domainObserveRequest = new HashSet<>();
 
     private final Map<String, Set<Consumer<Set<Endpoint>>>> 
domainObserveConsumer = new ConcurrentHashMap<>();
 
     private PilotExchanger(URL url) {
         xdsChannel = new XdsChannel(url);
-        int pollingPoolSize = url.getParameter("pollingPoolSize", 10);
         int pollingTimeout = url.getParameter("pollingTimeout", 10);
-        LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, 
NodeBuilder.build(), pollingPoolSize, pollingTimeout);
-        this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build(), 
pollingPoolSize, pollingTimeout);
-        this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build(), 
pollingPoolSize, pollingTimeout);
+        LdsProtocol ldsProtocol = new LdsProtocol(xdsChannel, 
NodeBuilder.build(), pollingTimeout);
+        this.rdsProtocol = new RdsProtocol(xdsChannel, NodeBuilder.build(), 
pollingTimeout);
+        this.edsProtocol = new EdsProtocol(xdsChannel, NodeBuilder.build(), 
pollingTimeout);
 
         this.listenerResult = ldsProtocol.getListeners();
         this.routeResult = 
rdsProtocol.getResource(listenerResult.getRouteConfigNames());
 
         // Observer RDS update
         if (CollectionUtils.isNotEmpty(listenerResult.getRouteConfigNames())) {
-            this.observeRouteRequest.set(createRouteObserve());
+            createRouteObserve();
+            isRdsObserve.set(true);
         }
+
         // Observe LDS updated
         ldsProtocol.observeListeners((newListener) -> {
             // update local cache
             if (!newListener.equals(listenerResult)) {
                 this.listenerResult = newListener;
                 // update RDS observation
-                synchronized (observeRouteRequest) {
-                    if (observeRouteRequest.get() == -1) {
-                        this.observeRouteRequest.set(createRouteObserve());
-                    } else {
-                        rdsProtocol.updateObserve(observeRouteRequest.get(), 
newListener.getRouteConfigNames());
-                    }
+                synchronized (isRdsObserve) {

Review Comment:
   Why sync on a atomic boolean object?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org
For additional commands, e-mail: notifications-h...@dubbo.apache.org

Reply via email to