This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 17b75a7895 Refresh valid invokers after connectivity check (#13773)
17b75a7895 is described below

commit 17b75a78955effbf82f4cd958c37545209c152e1
Author: Albumen Kevin <jhq0...@gmail.com>
AuthorDate: Thu Feb 29 11:26:11 2024 +0800

    Refresh valid invokers after connectivity check (#13773)
---
 .../rpc/cluster/directory/AbstractDirectory.java   | 167 +++++++++++++--------
 .../org/apache/dubbo/common/utils/LockUtils.java   |  54 +++++++
 .../apache/dubbo/common/utils/LockUtilsTest.java   | 144 ++++++++++++++++++
 3 files changed, 301 insertions(+), 64 deletions(-)

diff --git 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 087eacbade..be68810f4a 100644
--- 
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ 
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
 import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.LockUtils;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.metrics.event.MetricsEventBus;
@@ -56,6 +57,8 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
@@ -122,6 +125,8 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     private volatile ScheduledFuture<?> connectivityCheckFuture;
 
+    private final ReentrantLock invokerRefreshLock = new ReentrantLock();
+
     /**
      * The max count of invokers for each reconnect task select to try to 
reconnect.
      */
@@ -293,17 +298,19 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     @Override
     public void addInvalidateInvoker(Invoker<T> invoker) {
-        // 1. remove this invoker from validInvokers list, this invoker will 
not be listed in the next time
-        if (removeValidInvoker(invoker)) {
-            // 2. add this invoker to reconnect list
-            invokersToReconnect.add(invoker);
-            // 3. try start check connectivity task
-            checkConnectivity();
-
-            logger.info("The invoker " + invoker.getUrl()
-                    + " has been added to invalidate list due to connectivity 
problem. "
-                    + "Will trying to reconnect to it in the background.");
-        }
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            // 1. remove this invoker from validInvokers list, this invoker 
will not be listed in the next time
+            if (removeValidInvoker(invoker)) {
+                // 2. add this invoker to reconnect list
+                invokersToReconnect.add(invoker);
+                // 3. try start check connectivity task
+                checkConnectivity();
+
+                logger.info("The invoker " + invoker.getUrl()
+                        + " has been added to invalidate list due to 
connectivity problem. "
+                        + "Will trying to reconnect to it in the background.");
+            }
+        });
     }
 
     public void checkConnectivity() {
@@ -322,23 +329,30 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                             // 1. pick invokers from invokersToReconnect
                             // limit max reconnectTaskTryCount, prevent this 
task hang up all the connectivityExecutor
                             // for long time
-                            if (invokersToReconnect.size() < 
reconnectTaskTryCount) {
-                                invokersToTry.addAll(invokersToReconnect);
-                            } else {
-                                for (int i = 0; i < reconnectTaskTryCount; 
i++) {
-                                    Invoker<T> tInvoker = 
invokersToReconnect.get(
-                                            
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
-                                    if (!invokersToTry.contains(tInvoker)) {
-                                        // ignore if is selected, 
invokersToTry's size is always smaller than
-                                        // reconnectTaskTryCount + 1
-                                        invokersToTry.add(tInvoker);
+                            LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                                if (invokersToReconnect.size() < 
reconnectTaskTryCount) {
+                                    invokersToTry.addAll(invokersToReconnect);
+                                } else {
+                                    for (int i = 0; i < reconnectTaskTryCount; 
i++) {
+                                        Invoker<T> tInvoker = 
invokersToReconnect.get(
+                                                
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
+                                        if (!invokersToTry.contains(tInvoker)) 
{
+                                            // ignore if is selected, 
invokersToTry's size is always smaller than
+                                            // reconnectTaskTryCount + 1
+                                            invokersToTry.add(tInvoker);
+                                        }
                                     }
                                 }
-                            }
+                            });
 
                             // 2. try to check the invoker's status
                             for (Invoker<T> invoker : invokersToTry) {
-                                if (invokers.contains(invoker)) {
+                                AtomicBoolean invokerExist = new 
AtomicBoolean(false);
+                                LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                                    
invokerExist.set(invokers.contains(invoker));
+                                });
+                                // Should not lock here, `invoker.isAvailable` 
may need some time to check
+                                if (invokerExist.get()) {
                                     if (invoker.isAvailable()) {
                                         needDeleteList.add(invoker);
                                     }
@@ -348,22 +362,37 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
                             }
 
                             // 3. recover valid invoker
-                            for (Invoker<T> tInvoker : needDeleteList) {
-                                if (invokers.contains(tInvoker)) {
-                                    addValidInvoker(tInvoker);
-                                    logger.info(
-                                            "Recover service address: " + 
tInvoker.getUrl() + "  from invalid list.");
+                            LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                                for (Invoker<T> tInvoker : needDeleteList) {
+                                    if (invokers.contains(tInvoker)) {
+                                        addValidInvoker(tInvoker);
+                                        logger.info("Recover service address: 
" + tInvoker.getUrl()
+                                                + "  from invalid list.");
+                                    } else {
+                                        logger.info(
+                                                "The invoker " + 
tInvoker.getUrl()
+                                                        + " has been removed 
from invokers list. Will remove it in reconnect list.");
+                                    }
+                                    invokersToReconnect.remove(tInvoker);
                                 }
-                                invokersToReconnect.remove(tInvoker);
-                            }
+                            });
+                        } catch (Throwable t) {
+                            logger.error(
+                                    LoggerCodeConstants.INTERNAL_ERROR,
+                                    "",
+                                    "",
+                                    "Error occurred when check connectivity. ",
+                                    t);
                         } finally {
                             checkConnectivityPermit.release();
                         }
 
                         // 4. submit new task if it has more to recover
-                        if (!invokersToReconnect.isEmpty()) {
-                            checkConnectivity();
-                        }
+                        LockUtils.safeLock(invokerRefreshLock, 
LockUtils.DEFAULT_TIMEOUT, () -> {
+                            if (!invokersToReconnect.isEmpty()) {
+                                checkConnectivity();
+                            }
+                        });
                         
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(
                                 applicationModel, getSummary(), 
getDirectoryMeta()));
                     },
@@ -382,9 +411,11 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
      * 4. all the invokers disappeared from total invokers should be removed 
in the disabled invokers list
      */
     public void refreshInvoker() {
-        if (invokersInitialized) {
-            refreshInvokerInternal();
-        }
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            if (invokersInitialized) {
+                refreshInvokerInternal();
+            }
+        });
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
     }
@@ -393,7 +424,7 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
         return Collections.emptyMap();
     }
 
-    private synchronized void refreshInvokerInternal() {
+    private void refreshInvokerInternal() {
         BitList<Invoker<T>> copiedInvokers = invokers.clone();
         refreshInvokers(copiedInvokers, invokersToReconnect);
         refreshInvokers(copiedInvokers, disabledInvokers);
@@ -414,25 +445,29 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     @Override
     public void addDisabledInvoker(Invoker<T> invoker) {
-        if (invokers.contains(invoker)) {
-            disabledInvokers.add(invoker);
-            removeValidInvoker(invoker);
-            logger.info("Disable service address: " + invoker.getUrl() + ".");
-        }
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            if (invokers.contains(invoker)) {
+                disabledInvokers.add(invoker);
+                removeValidInvoker(invoker);
+                logger.info("Disable service address: " + invoker.getUrl() + 
".");
+            }
+        });
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
     }
 
     @Override
     public void recoverDisabledInvoker(Invoker<T> invoker) {
-        if (disabledInvokers.remove(invoker)) {
-            try {
-                addValidInvoker(invoker);
-                logger.info("Recover service address: " + invoker.getUrl() + " 
 from disabled list.");
-            } catch (Throwable ignore) {
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            if (disabledInvokers.remove(invoker)) {
+                try {
+                    addValidInvoker(invoker);
+                    logger.info("Recover service address: " + invoker.getUrl() 
+ "  from disabled list.");
+                } catch (Throwable ignore) {
 
+                }
             }
-        }
+        });
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
     }
@@ -491,9 +526,11 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
     }
 
     protected void setInvokers(BitList<Invoker<T>> invokers) {
-        this.invokers = invokers;
-        refreshInvokerInternal();
-        this.invokersInitialized = true;
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            this.invokers = invokers;
+            refreshInvokerInternal();
+            this.invokersInitialized = true;
+        });
 
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
@@ -501,29 +538,31 @@ public abstract class AbstractDirectory<T> implements 
Directory<T> {
 
     protected void destroyInvokers() {
         // set empty instead of clearing to support concurrent access.
-        this.invokers = BitList.emptyList();
-        this.validInvokers = BitList.emptyList();
-        this.invokersInitialized = false;
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            this.invokers = BitList.emptyList();
+            this.validInvokers = BitList.emptyList();
+            this.invokersInitialized = false;
+        });
     }
 
     private boolean addValidInvoker(Invoker<T> invoker) {
-        boolean result;
-        synchronized (this.validInvokers) {
-            result = this.validInvokers.add(invoker);
-        }
+        AtomicBoolean result = new AtomicBoolean(false);
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            result.set(this.validInvokers.add(invoker));
+        });
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
-        return result;
+        return result.get();
     }
 
     private boolean removeValidInvoker(Invoker<T> invoker) {
-        boolean result;
-        synchronized (this.validInvokers) {
-            result = this.validInvokers.remove(invoker);
-        }
+        AtomicBoolean result = new AtomicBoolean(false);
+        LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () 
-> {
+            result.set(this.validInvokers.remove(invoker));
+        });
         MetricsEventBus.publish(
                 RegistryEvent.refreshDirectoryEvent(applicationModel, 
getSummary(), getDirectoryMeta()));
-        return result;
+        return result.get();
     }
 
     protected abstract List<Invoker<T>> doList(
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java
new file mode 100644
index 0000000000..f441dc9b2f
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.utils;
+
+import org.apache.dubbo.common.constants.LoggerCodeConstants;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+
+public class LockUtils {
+    private static final ErrorTypeAwareLogger logger = 
LoggerFactory.getErrorTypeAwareLogger(LockUtils.class);
+
+    public static final int DEFAULT_TIMEOUT = 60_000;
+
+    public static void safeLock(Lock lock, int timeout, Runnable runnable) {
+        try {
+            if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+                logger.error(
+                        LoggerCodeConstants.INTERNAL_ERROR,
+                        "",
+                        "",
+                        "Try to lock failed, timeout: " + timeout,
+                        new TimeoutException());
+            }
+            runnable.run();
+        } catch (InterruptedException e) {
+            logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to 
lock failed", e);
+            Thread.currentThread().interrupt();
+        } finally {
+            try {
+                lock.unlock();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+}
diff --git 
a/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java 
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java
new file mode 100644
index 0000000000..87c1824930
--- /dev/null
+++ 
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.utils;
+
+import java.lang.Thread.State;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import static org.awaitility.Awaitility.await;
+
+public class LockUtilsTest {
+    @RepeatedTest(5)
+    void testLockFailed() {
+        ReentrantLock reentrantLock = new ReentrantLock();
+        AtomicBoolean releaseLock = new AtomicBoolean(false);
+        new Thread(() -> {
+                    reentrantLock.lock();
+                    while (!releaseLock.get()) {
+                        try {
+                            Thread.sleep(5);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    reentrantLock.unlock();
+                })
+                .start();
+
+        await().until(reentrantLock::isLocked);
+
+        AtomicLong lockTime = new AtomicLong(0);
+        long startTime = System.currentTimeMillis();
+        LockUtils.safeLock(reentrantLock, 1000, () -> {
+            lockTime.set(System.currentTimeMillis());
+        });
+        Assertions.assertTrue(lockTime.get() - startTime >= 1000);
+        releaseLock.set(true);
+
+        while (reentrantLock.isLocked()) {
+            try {
+                Thread.sleep(5);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        lockTime.set(0);
+        startTime = System.currentTimeMillis();
+        LockUtils.safeLock(reentrantLock, 1000, () -> {
+            lockTime.set(System.currentTimeMillis());
+        });
+        Assertions.assertTrue(lockTime.get() - startTime < 1000);
+    }
+
+    @RepeatedTest(5)
+    void testReentrant() {
+        ReentrantLock reentrantLock = new ReentrantLock();
+        reentrantLock.lock();
+
+        AtomicLong lockTime = new AtomicLong(0);
+        long startTime = System.currentTimeMillis();
+        LockUtils.safeLock(reentrantLock, 1000, () -> {
+            lockTime.set(System.currentTimeMillis());
+        });
+        Assertions.assertTrue(lockTime.get() - startTime < 1000);
+
+        reentrantLock.lock();
+        lockTime.set(0);
+        startTime = System.currentTimeMillis();
+        LockUtils.safeLock(reentrantLock, 1000, () -> {
+            lockTime.set(System.currentTimeMillis());
+        });
+        Assertions.assertTrue(lockTime.get() - startTime < 1000);
+
+        Assertions.assertTrue(reentrantLock.isLocked());
+        reentrantLock.unlock();
+        Assertions.assertTrue(reentrantLock.isLocked());
+        reentrantLock.unlock();
+        Assertions.assertFalse(reentrantLock.isLocked());
+    }
+
+    @RepeatedTest(5)
+    void testInterrupt() {
+        ReentrantLock reentrantLock = new ReentrantLock();
+        reentrantLock.lock();
+
+        AtomicBoolean locked = new AtomicBoolean(false);
+        Thread thread = new Thread(() -> {
+            LockUtils.safeLock(reentrantLock, 10000, () -> {
+                locked.set(true);
+            });
+        });
+        thread.start();
+
+        await().until(() -> thread.getState() == State.TIMED_WAITING);
+        thread.interrupt();
+        await().until(() -> thread.getState() == State.TERMINATED);
+
+        Assertions.assertFalse(locked.get());
+
+        reentrantLock.unlock();
+    }
+
+    @RepeatedTest(5)
+    void testHoldLock() throws InterruptedException {
+        ReentrantLock reentrantLock = new ReentrantLock();
+        reentrantLock.lock();
+
+        AtomicLong lockTime = new AtomicLong(0);
+        long startTime = System.currentTimeMillis();
+        Thread thread = new Thread(() -> {
+            LockUtils.safeLock(reentrantLock, 10000, () -> {
+                lockTime.set(System.currentTimeMillis());
+            });
+        });
+        thread.start();
+
+        await().until(() -> thread.getState() == State.TIMED_WAITING);
+        Thread.sleep(1000);
+        reentrantLock.unlock();
+
+        await().until(() -> thread.getState() == State.TERMINATED);
+        Assertions.assertTrue(lockTime.get() - startTime > 1000);
+        Assertions.assertTrue(lockTime.get() - startTime < 10000);
+    }
+}

Reply via email to