AlbumenJ commented on code in PR #10885: URL: https://github.com/apache/dubbo/pull/10885#discussion_r1048221006
########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -16,182 +16,127 @@ */ package org.apache.dubbo.registry.xds.util.protocol; + import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; + + import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.registry.xds.util.XdsChannel; - import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.Collections; -import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; -public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> { +public abstract class AbstractProtocol<T, S extends DeltaResource<T>, R> implements XdsProtocol<T> { private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - protected final XdsChannel xdsChannel; + protected XdsChannel xdsChannel; protected final Node node; - /** - * Store Request Parameter ( resourceNames ) - * K - requestId, V - resourceNames - */ - protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>(); - - /** - * Store ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>(); - - /** - * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap<>(); - - /** - * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver ) - * K - requestId, V - CompletableFuture - */ - private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>(); + private final int pollingTimeout; - private final ScheduledExecutorService pollingExecutor; + private final Object consumerObserveMapUpdate = new Object(); Review Comment: Replace to `ReentrantLock` ########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -16,182 +16,127 @@ */ package org.apache.dubbo.registry.xds.util.protocol; + import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; + + import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.registry.xds.util.XdsChannel; - import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.Collections; -import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; -public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> { +public abstract class AbstractProtocol<T, S extends DeltaResource<T>, R> implements XdsProtocol<T> { private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - protected final XdsChannel xdsChannel; + protected XdsChannel xdsChannel; protected final Node node; - /** - * Store Request Parameter ( resourceNames ) - * K - requestId, V - resourceNames - */ - protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>(); - - /** - * Store ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>(); - - /** - * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap<>(); - - /** - * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver ) - * K - requestId, V - CompletableFuture - */ - private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>(); + private final int pollingTimeout; - private final ScheduledExecutorService pollingExecutor; + private final Object consumerObserveMapUpdate = new Object(); - private final int pollingTimeout; + private Set<String> observeResourcesName; - protected final static AtomicLong requestId = new AtomicLong(0); + private final Map<Set<String>, List<Consumer<T>>> consumerObserveMap = new ConcurrentHashMap<>(); - public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) { + public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) { this.xdsChannel = xdsChannel; this.node = node; - this.pollingExecutor = new ScheduledThreadPoolExecutor(pollingPoolSize, new NamedThreadFactory("Dubbo-registry-xds")); this.pollingTimeout = pollingTimeout; } + protected Map<String, R> resourcesMap = new ConcurrentHashMap<>(); + + private CompletableFuture<T> future; + + private StreamObserver<DiscoveryRequest> requestObserver; + /** * Abstract method to obtain Type-URL from sub-class * * @return Type-URL of xDS */ public abstract String getTypeUrl(); - @Override - public T getResource(Set<String> resourceNames) { - long request = requestId.getAndIncrement(); - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; - - // Store Request Parameter, which will be used for ACK - requestParam.put(request, resourceNames); - - // create observer - StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request)); - - // use future to get async result - CompletableFuture<T> future = new CompletableFuture<>(); - requestObserverMap.put(request, requestObserver); - streamResult.put(request, future); - - // send request to control panel - requestObserver.onNext(buildDiscoveryRequest(resourceNames)); - - try { - // get result - return future.get(); - } catch (InterruptedException | ExecutionException e) { - logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when request control panel."); - return null; - } finally { - // close observer - //requestObserver.onCompleted(); - - // remove temp - streamResult.remove(request); - requestObserverMap.remove(request); - requestParam.remove(request); - } - } + public abstract boolean isExistResource(Set<String> resourceNames); Review Comment: Remove this abstract method. Should be handled in `AbstractProtocol`. ########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -16,182 +16,127 @@ */ package org.apache.dubbo.registry.xds.util.protocol; + import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; + + import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.registry.xds.util.XdsChannel; - import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.Collections; -import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; -public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> { +public abstract class AbstractProtocol<T, S extends DeltaResource<T>, R> implements XdsProtocol<T> { private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - protected final XdsChannel xdsChannel; + protected XdsChannel xdsChannel; protected final Node node; - /** - * Store Request Parameter ( resourceNames ) - * K - requestId, V - resourceNames - */ - protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>(); - - /** - * Store ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>(); - - /** - * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap<>(); - - /** - * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver ) - * K - requestId, V - CompletableFuture - */ - private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>(); + private final int pollingTimeout; Review Comment: rename to `checkInterval` ########## dubbo-xds/src/test/java/org/apache/dubbo/registry/xds/istio/IstioEnvMock.java: ########## @@ -0,0 +1,47 @@ +package org.apache.dubbo.registry.xds.istio; Review Comment: Add ASF license header for all of the newly created files ########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -248,13 +196,30 @@ private void returnResult(T result) { @Override public void onCompleted() { - logger.info("xDS Client completed, requestId: " + requestId); - clear(); - } - - private void clear() { - requestObserverMap.remove(requestId); + logger.info("xDS Client completed"); } } + private void triggerReConnectTask() { + AtomicBoolean isConnectFail = new AtomicBoolean(false); + ScheduledExecutorService scheduledFuture = ApplicationModel.defaultModel().getFrameworkModel().getBeanFactory() + .getBean(FrameworkExecutorRepository.class).getSharedScheduledExecutor(); Review Comment: Should fetch `ApplicationModel` in contructor. `ApplicationModel.defaultModel()` is forbidden here. ########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -16,182 +16,127 @@ */ package org.apache.dubbo.registry.xds.util.protocol; + import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; + + import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.registry.xds.util.XdsChannel; - import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.Collections; -import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; -public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> { +public abstract class AbstractProtocol<T, S extends DeltaResource<T>, R> implements XdsProtocol<T> { private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - protected final XdsChannel xdsChannel; + protected XdsChannel xdsChannel; protected final Node node; - /** - * Store Request Parameter ( resourceNames ) - * K - requestId, V - resourceNames - */ - protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>(); - - /** - * Store ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>(); - - /** - * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap<>(); - - /** - * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver ) - * K - requestId, V - CompletableFuture - */ - private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>(); + private final int pollingTimeout; - private final ScheduledExecutorService pollingExecutor; + private final Object consumerObserveMapUpdate = new Object(); - private final int pollingTimeout; + private Set<String> observeResourcesName; - protected final static AtomicLong requestId = new AtomicLong(0); + private final Map<Set<String>, List<Consumer<T>>> consumerObserveMap = new ConcurrentHashMap<>(); - public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) { + public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) { this.xdsChannel = xdsChannel; this.node = node; - this.pollingExecutor = new ScheduledThreadPoolExecutor(pollingPoolSize, new NamedThreadFactory("Dubbo-registry-xds")); this.pollingTimeout = pollingTimeout; } + protected Map<String, R> resourcesMap = new ConcurrentHashMap<>(); + + private CompletableFuture<T> future; + + private StreamObserver<DiscoveryRequest> requestObserver; + /** * Abstract method to obtain Type-URL from sub-class * * @return Type-URL of xDS */ public abstract String getTypeUrl(); - @Override - public T getResource(Set<String> resourceNames) { - long request = requestId.getAndIncrement(); - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; - - // Store Request Parameter, which will be used for ACK - requestParam.put(request, resourceNames); - - // create observer - StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request)); - - // use future to get async result - CompletableFuture<T> future = new CompletableFuture<>(); - requestObserverMap.put(request, requestObserver); - streamResult.put(request, future); - - // send request to control panel - requestObserver.onNext(buildDiscoveryRequest(resourceNames)); - - try { - // get result - return future.get(); - } catch (InterruptedException | ExecutionException e) { - logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when request control panel."); - return null; - } finally { - // close observer - //requestObserver.onCompleted(); - - // remove temp - streamResult.remove(request); - requestObserverMap.remove(request); - requestParam.remove(request); - } - } + public abstract boolean isExistResource(Set<String> resourceNames); - @Override - public long observeResource(Set<String> resourceNames, Consumer<T> consumer) { - long request = requestId.getAndIncrement(); - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; + public abstract void updateResourceCollection(R resourceCollection, Set<String> resourceNames); - // Store Request Parameter, which will be used for ACK - requestParam.put(request, resourceNames); - - // call once for full data - consumer.accept(getResource(resourceNames)); + public abstract R getResourceCollection(); - // channel reused - StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request)); - requestObserverMap.put(request, requestObserver); + public abstract T getDsResult(R resourceCollection); - ScheduledFuture<?> scheduledFuture = pollingExecutor.scheduleAtFixedRate(() -> { + public T getCacheResource(Set<String> resourceNames) { + R resourceCollection = getResourceCollection(); + if (!resourceNames.isEmpty() && isExistResource(resourceNames)) { + updateResourceCollection(resourceCollection, resourceNames); + } else { + if (requestObserver == null) { + future = new CompletableFuture<>(); + requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(future)); Review Comment: Observe resource in `consumerObserveMap` ########## dubbo-xds/src/main/java/org/apache/dubbo/registry/xds/util/protocol/AbstractProtocol.java: ########## @@ -16,182 +16,127 @@ */ package org.apache.dubbo.registry.xds.util.protocol; + import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; + + import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.registry.xds.util.XdsChannel; - import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import org.apache.dubbo.rpc.model.ApplicationModel; import java.util.Collections; -import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; -public abstract class AbstractProtocol<T, S extends DeltaResource<T>> implements XdsProtocol<T> { +public abstract class AbstractProtocol<T, S extends DeltaResource<T>, R> implements XdsProtocol<T> { private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - protected final XdsChannel xdsChannel; + protected XdsChannel xdsChannel; protected final Node node; - /** - * Store Request Parameter ( resourceNames ) - * K - requestId, V - resourceNames - */ - protected final Map<Long, Set<String>> requestParam = new ConcurrentHashMap<>(); - - /** - * Store ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, StreamObserver<DiscoveryRequest>> requestObserverMap = new ConcurrentHashMap<>(); - - /** - * Store Delta-ADS Request Observer ( StreamObserver in Streaming Request ) - * K - requestId, V - StreamObserver - */ - private final Map<Long, ScheduledFuture<?>> observeScheduledMap = new ConcurrentHashMap<>(); - - /** - * Store CompletableFuture for Request ( used to fetch async result in ResponseObserver ) - * K - requestId, V - CompletableFuture - */ - private final Map<Long, CompletableFuture<T>> streamResult = new ConcurrentHashMap<>(); + private final int pollingTimeout; - private final ScheduledExecutorService pollingExecutor; + private final Object consumerObserveMapUpdate = new Object(); - private final int pollingTimeout; + private Set<String> observeResourcesName; - protected final static AtomicLong requestId = new AtomicLong(0); + private final Map<Set<String>, List<Consumer<T>>> consumerObserveMap = new ConcurrentHashMap<>(); - public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingPoolSize, int pollingTimeout) { + public AbstractProtocol(XdsChannel xdsChannel, Node node, int pollingTimeout) { this.xdsChannel = xdsChannel; this.node = node; - this.pollingExecutor = new ScheduledThreadPoolExecutor(pollingPoolSize, new NamedThreadFactory("Dubbo-registry-xds")); this.pollingTimeout = pollingTimeout; } + protected Map<String, R> resourcesMap = new ConcurrentHashMap<>(); + + private CompletableFuture<T> future; + + private StreamObserver<DiscoveryRequest> requestObserver; + /** * Abstract method to obtain Type-URL from sub-class * * @return Type-URL of xDS */ public abstract String getTypeUrl(); - @Override - public T getResource(Set<String> resourceNames) { - long request = requestId.getAndIncrement(); - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; - - // Store Request Parameter, which will be used for ACK - requestParam.put(request, resourceNames); - - // create observer - StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request)); - - // use future to get async result - CompletableFuture<T> future = new CompletableFuture<>(); - requestObserverMap.put(request, requestObserver); - streamResult.put(request, future); - - // send request to control panel - requestObserver.onNext(buildDiscoveryRequest(resourceNames)); - - try { - // get result - return future.get(); - } catch (InterruptedException | ExecutionException e) { - logger.error(REGISTRY_ERROR_REQUEST_XDS, "", "", "Error occur when request control panel."); - return null; - } finally { - // close observer - //requestObserver.onCompleted(); - - // remove temp - streamResult.remove(request); - requestObserverMap.remove(request); - requestParam.remove(request); - } - } + public abstract boolean isExistResource(Set<String> resourceNames); - @Override - public long observeResource(Set<String> resourceNames, Consumer<T> consumer) { - long request = requestId.getAndIncrement(); - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; + public abstract void updateResourceCollection(R resourceCollection, Set<String> resourceNames); - // Store Request Parameter, which will be used for ACK - requestParam.put(request, resourceNames); - - // call once for full data - consumer.accept(getResource(resourceNames)); + public abstract R getResourceCollection(); - // channel reused - StreamObserver<DiscoveryRequest> requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(request)); - requestObserverMap.put(request, requestObserver); + public abstract T getDsResult(R resourceCollection); - ScheduledFuture<?> scheduledFuture = pollingExecutor.scheduleAtFixedRate(() -> { + public T getCacheResource(Set<String> resourceNames) { + R resourceCollection = getResourceCollection(); + if (!resourceNames.isEmpty() && isExistResource(resourceNames)) { + updateResourceCollection(resourceCollection, resourceNames); + } else { + if (requestObserver == null) { + future = new CompletableFuture<>(); + requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(future)); Review Comment: remove this `future` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org