This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push: new 17b75a7895 Refresh valid invokers after connectivity check (#13773) 17b75a7895 is described below commit 17b75a78955effbf82f4cd958c37545209c152e1 Author: Albumen Kevin <jhq0...@gmail.com> AuthorDate: Thu Feb 29 11:26:11 2024 +0800 Refresh valid invokers after connectivity check (#13773) --- .../rpc/cluster/directory/AbstractDirectory.java | 167 +++++++++++++-------- .../org/apache/dubbo/common/utils/LockUtils.java | 54 +++++++ .../apache/dubbo/common/utils/LockUtilsTest.java | 144 ++++++++++++++++++ 3 files changed, 301 insertions(+), 64 deletions(-) diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java index 087eacbade..be68810f4a 100644 --- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java +++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java @@ -25,6 +25,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.LockUtils; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.metrics.event.MetricsEventBus; @@ -56,6 +57,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER; @@ -122,6 +125,8 @@ public abstract class AbstractDirectory<T> implements Directory<T> { private volatile ScheduledFuture<?> connectivityCheckFuture; + private final ReentrantLock invokerRefreshLock = new ReentrantLock(); + /** * The max count of invokers for each reconnect task select to try to reconnect. */ @@ -293,17 +298,19 @@ public abstract class AbstractDirectory<T> implements Directory<T> { @Override public void addInvalidateInvoker(Invoker<T> invoker) { - // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time - if (removeValidInvoker(invoker)) { - // 2. add this invoker to reconnect list - invokersToReconnect.add(invoker); - // 3. try start check connectivity task - checkConnectivity(); - - logger.info("The invoker " + invoker.getUrl() - + " has been added to invalidate list due to connectivity problem. " - + "Will trying to reconnect to it in the background."); - } + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + // 1. remove this invoker from validInvokers list, this invoker will not be listed in the next time + if (removeValidInvoker(invoker)) { + // 2. add this invoker to reconnect list + invokersToReconnect.add(invoker); + // 3. try start check connectivity task + checkConnectivity(); + + logger.info("The invoker " + invoker.getUrl() + + " has been added to invalidate list due to connectivity problem. " + + "Will trying to reconnect to it in the background."); + } + }); } public void checkConnectivity() { @@ -322,23 +329,30 @@ public abstract class AbstractDirectory<T> implements Directory<T> { // 1. pick invokers from invokersToReconnect // limit max reconnectTaskTryCount, prevent this task hang up all the connectivityExecutor // for long time - if (invokersToReconnect.size() < reconnectTaskTryCount) { - invokersToTry.addAll(invokersToReconnect); - } else { - for (int i = 0; i < reconnectTaskTryCount; i++) { - Invoker<T> tInvoker = invokersToReconnect.get( - ThreadLocalRandom.current().nextInt(invokersToReconnect.size())); - if (!invokersToTry.contains(tInvoker)) { - // ignore if is selected, invokersToTry's size is always smaller than - // reconnectTaskTryCount + 1 - invokersToTry.add(tInvoker); + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + if (invokersToReconnect.size() < reconnectTaskTryCount) { + invokersToTry.addAll(invokersToReconnect); + } else { + for (int i = 0; i < reconnectTaskTryCount; i++) { + Invoker<T> tInvoker = invokersToReconnect.get( + ThreadLocalRandom.current().nextInt(invokersToReconnect.size())); + if (!invokersToTry.contains(tInvoker)) { + // ignore if is selected, invokersToTry's size is always smaller than + // reconnectTaskTryCount + 1 + invokersToTry.add(tInvoker); + } } } - } + }); // 2. try to check the invoker's status for (Invoker<T> invoker : invokersToTry) { - if (invokers.contains(invoker)) { + AtomicBoolean invokerExist = new AtomicBoolean(false); + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + invokerExist.set(invokers.contains(invoker)); + }); + // Should not lock here, `invoker.isAvailable` may need some time to check + if (invokerExist.get()) { if (invoker.isAvailable()) { needDeleteList.add(invoker); } @@ -348,22 +362,37 @@ public abstract class AbstractDirectory<T> implements Directory<T> { } // 3. recover valid invoker - for (Invoker<T> tInvoker : needDeleteList) { - if (invokers.contains(tInvoker)) { - addValidInvoker(tInvoker); - logger.info( - "Recover service address: " + tInvoker.getUrl() + " from invalid list."); + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + for (Invoker<T> tInvoker : needDeleteList) { + if (invokers.contains(tInvoker)) { + addValidInvoker(tInvoker); + logger.info("Recover service address: " + tInvoker.getUrl() + + " from invalid list."); + } else { + logger.info( + "The invoker " + tInvoker.getUrl() + + " has been removed from invokers list. Will remove it in reconnect list."); + } + invokersToReconnect.remove(tInvoker); } - invokersToReconnect.remove(tInvoker); - } + }); + } catch (Throwable t) { + logger.error( + LoggerCodeConstants.INTERNAL_ERROR, + "", + "", + "Error occurred when check connectivity. ", + t); } finally { checkConnectivityPermit.release(); } // 4. submit new task if it has more to recover - if (!invokersToReconnect.isEmpty()) { - checkConnectivity(); - } + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + if (!invokersToReconnect.isEmpty()) { + checkConnectivity(); + } + }); MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent( applicationModel, getSummary(), getDirectoryMeta())); }, @@ -382,9 +411,11 @@ public abstract class AbstractDirectory<T> implements Directory<T> { * 4. all the invokers disappeared from total invokers should be removed in the disabled invokers list */ public void refreshInvoker() { - if (invokersInitialized) { - refreshInvokerInternal(); - } + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + if (invokersInitialized) { + refreshInvokerInternal(); + } + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); } @@ -393,7 +424,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> { return Collections.emptyMap(); } - private synchronized void refreshInvokerInternal() { + private void refreshInvokerInternal() { BitList<Invoker<T>> copiedInvokers = invokers.clone(); refreshInvokers(copiedInvokers, invokersToReconnect); refreshInvokers(copiedInvokers, disabledInvokers); @@ -414,25 +445,29 @@ public abstract class AbstractDirectory<T> implements Directory<T> { @Override public void addDisabledInvoker(Invoker<T> invoker) { - if (invokers.contains(invoker)) { - disabledInvokers.add(invoker); - removeValidInvoker(invoker); - logger.info("Disable service address: " + invoker.getUrl() + "."); - } + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + if (invokers.contains(invoker)) { + disabledInvokers.add(invoker); + removeValidInvoker(invoker); + logger.info("Disable service address: " + invoker.getUrl() + "."); + } + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); } @Override public void recoverDisabledInvoker(Invoker<T> invoker) { - if (disabledInvokers.remove(invoker)) { - try { - addValidInvoker(invoker); - logger.info("Recover service address: " + invoker.getUrl() + " from disabled list."); - } catch (Throwable ignore) { + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + if (disabledInvokers.remove(invoker)) { + try { + addValidInvoker(invoker); + logger.info("Recover service address: " + invoker.getUrl() + " from disabled list."); + } catch (Throwable ignore) { + } } - } + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); } @@ -491,9 +526,11 @@ public abstract class AbstractDirectory<T> implements Directory<T> { } protected void setInvokers(BitList<Invoker<T>> invokers) { - this.invokers = invokers; - refreshInvokerInternal(); - this.invokersInitialized = true; + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + this.invokers = invokers; + refreshInvokerInternal(); + this.invokersInitialized = true; + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); @@ -501,29 +538,31 @@ public abstract class AbstractDirectory<T> implements Directory<T> { protected void destroyInvokers() { // set empty instead of clearing to support concurrent access. - this.invokers = BitList.emptyList(); - this.validInvokers = BitList.emptyList(); - this.invokersInitialized = false; + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + this.invokers = BitList.emptyList(); + this.validInvokers = BitList.emptyList(); + this.invokersInitialized = false; + }); } private boolean addValidInvoker(Invoker<T> invoker) { - boolean result; - synchronized (this.validInvokers) { - result = this.validInvokers.add(invoker); - } + AtomicBoolean result = new AtomicBoolean(false); + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + result.set(this.validInvokers.add(invoker)); + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); - return result; + return result.get(); } private boolean removeValidInvoker(Invoker<T> invoker) { - boolean result; - synchronized (this.validInvokers) { - result = this.validInvokers.remove(invoker); - } + AtomicBoolean result = new AtomicBoolean(false); + LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, () -> { + result.set(this.validInvokers.remove(invoker)); + }); MetricsEventBus.publish( RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary(), getDirectoryMeta())); - return result; + return result.get(); } protected abstract List<Invoker<T>> doList( diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java new file mode 100644 index 0000000000..f441dc9b2f --- /dev/null +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.utils; + +import org.apache.dubbo.common.constants.LoggerCodeConstants; +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; + +public class LockUtils { + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(LockUtils.class); + + public static final int DEFAULT_TIMEOUT = 60_000; + + public static void safeLock(Lock lock, int timeout, Runnable runnable) { + try { + if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + logger.error( + LoggerCodeConstants.INTERNAL_ERROR, + "", + "", + "Try to lock failed, timeout: " + timeout, + new TimeoutException()); + } + runnable.run(); + } catch (InterruptedException e) { + logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to lock failed", e); + Thread.currentThread().interrupt(); + } finally { + try { + lock.unlock(); + } catch (Exception e) { + // ignore + } + } + } +} diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java new file mode 100644 index 0000000000..87c1824930 --- /dev/null +++ b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.utils; + +import java.lang.Thread.State; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; + +import static org.awaitility.Awaitility.await; + +public class LockUtilsTest { + @RepeatedTest(5) + void testLockFailed() { + ReentrantLock reentrantLock = new ReentrantLock(); + AtomicBoolean releaseLock = new AtomicBoolean(false); + new Thread(() -> { + reentrantLock.lock(); + while (!releaseLock.get()) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + reentrantLock.unlock(); + }) + .start(); + + await().until(reentrantLock::isLocked); + + AtomicLong lockTime = new AtomicLong(0); + long startTime = System.currentTimeMillis(); + LockUtils.safeLock(reentrantLock, 1000, () -> { + lockTime.set(System.currentTimeMillis()); + }); + Assertions.assertTrue(lockTime.get() - startTime >= 1000); + releaseLock.set(true); + + while (reentrantLock.isLocked()) { + try { + Thread.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + lockTime.set(0); + startTime = System.currentTimeMillis(); + LockUtils.safeLock(reentrantLock, 1000, () -> { + lockTime.set(System.currentTimeMillis()); + }); + Assertions.assertTrue(lockTime.get() - startTime < 1000); + } + + @RepeatedTest(5) + void testReentrant() { + ReentrantLock reentrantLock = new ReentrantLock(); + reentrantLock.lock(); + + AtomicLong lockTime = new AtomicLong(0); + long startTime = System.currentTimeMillis(); + LockUtils.safeLock(reentrantLock, 1000, () -> { + lockTime.set(System.currentTimeMillis()); + }); + Assertions.assertTrue(lockTime.get() - startTime < 1000); + + reentrantLock.lock(); + lockTime.set(0); + startTime = System.currentTimeMillis(); + LockUtils.safeLock(reentrantLock, 1000, () -> { + lockTime.set(System.currentTimeMillis()); + }); + Assertions.assertTrue(lockTime.get() - startTime < 1000); + + Assertions.assertTrue(reentrantLock.isLocked()); + reentrantLock.unlock(); + Assertions.assertTrue(reentrantLock.isLocked()); + reentrantLock.unlock(); + Assertions.assertFalse(reentrantLock.isLocked()); + } + + @RepeatedTest(5) + void testInterrupt() { + ReentrantLock reentrantLock = new ReentrantLock(); + reentrantLock.lock(); + + AtomicBoolean locked = new AtomicBoolean(false); + Thread thread = new Thread(() -> { + LockUtils.safeLock(reentrantLock, 10000, () -> { + locked.set(true); + }); + }); + thread.start(); + + await().until(() -> thread.getState() == State.TIMED_WAITING); + thread.interrupt(); + await().until(() -> thread.getState() == State.TERMINATED); + + Assertions.assertFalse(locked.get()); + + reentrantLock.unlock(); + } + + @RepeatedTest(5) + void testHoldLock() throws InterruptedException { + ReentrantLock reentrantLock = new ReentrantLock(); + reentrantLock.lock(); + + AtomicLong lockTime = new AtomicLong(0); + long startTime = System.currentTimeMillis(); + Thread thread = new Thread(() -> { + LockUtils.safeLock(reentrantLock, 10000, () -> { + lockTime.set(System.currentTimeMillis()); + }); + }); + thread.start(); + + await().until(() -> thread.getState() == State.TIMED_WAITING); + Thread.sleep(1000); + reentrantLock.unlock(); + + await().until(() -> thread.getState() == State.TERMINATED); + Assertions.assertTrue(lockTime.get() - startTime > 1000); + Assertions.assertTrue(lockTime.get() - startTime < 10000); + } +}