Repository: geode Updated Branches: refs/heads/develop 11a0b34cd -> 0fde215a1
GEODE-3407: fix deadlock between JMX and Membership Change InternalClientMembership to not synchronize on CacheFactory by accepting Cache parameter from CacheServerBridge. New regression test confirms bug and this fix. This closes #697 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0fde215a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0fde215a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0fde215a Branch: refs/heads/develop Commit: 0fde215a11f61c12c24da519fd195b50e39f1ee5 Parents: 11a0b34 Author: Kirk Lund <kl...@apache.org> Authored: Mon Aug 7 16:39:04 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Wed Aug 9 10:00:36 2017 -0700 ---------------------------------------------------------------------- .../cache/tier/InternalClientMembership.java | 30 +++-- .../internal/beans/CacheServerBridge.java | 18 ++- .../internal/beans/ManagementAdapter.java | 2 +- .../management/internal/beans/ServerBridge.java | 16 ++- ...verBridgeClientMembershipRegressionTest.java | 129 +++++++++++++++++++ 5 files changed, 173 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java index 504081d..6fac66f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/InternalClientMembership.java @@ -27,9 +27,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.net.SocketCreator; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -37,11 +34,14 @@ import org.apache.geode.SystemFailure; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystemDisconnectedException; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor; import org.apache.geode.internal.i18n.LocalizedStrings; @@ -290,18 +290,22 @@ public class InternalClientMembership { } public static Map getClientQueueSizes() { - Map clientQueueSizes = new HashMap(); - InternalCache c = (InternalCache) CacheFactory.getAnyInstance(); - if (c == null) // Add a NULL Check - return clientQueueSizes; + return getClientQueueSizes((InternalCache) CacheFactory.getAnyInstance()); + } - for (Iterator bsii = c.getCacheServers().iterator(); bsii.hasNext();) { - CacheServerImpl bsi = (CacheServerImpl) bsii.next(); - AcceptorImpl ai = bsi.getAcceptor(); - if (ai != null && ai.getCacheClientNotifier() != null) { - clientQueueSizes.putAll(ai.getCacheClientNotifier().getClientQueueSizes()); + public static Map getClientQueueSizes(final InternalCache cache) { + if (cache == null) { + return Collections.emptyMap(); + } + + Map clientQueueSizes = new HashMap(); + for (CacheServer cacheServer : cache.getCacheServers()) { + CacheServerImpl cacheServerImpl = (CacheServerImpl) cacheServer; + AcceptorImpl acceptor = cacheServerImpl.getAcceptor(); + if (acceptor != null && acceptor.getCacheClientNotifier() != null) { + clientQueueSizes.putAll(acceptor.getCacheClientNotifier().getClientQueueSizes()); } - } // for + } return clientQueueSizes; } http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java index 728402c..3b49f65 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/CacheServerBridge.java @@ -57,6 +57,7 @@ import org.apache.geode.management.ClientHealthStatus; import org.apache.geode.management.ClientQueueDetail; import org.apache.geode.management.ServerLoadData; import org.apache.geode.management.internal.ManagementConstants; +import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor; import org.apache.geode.management.internal.beans.stats.StatType; import org.apache.geode.management.internal.beans.stats.StatsAverageLatency; import org.apache.geode.management.internal.beans.stats.StatsKey; @@ -101,7 +102,7 @@ public class CacheServerBridge extends ServerBridge { } } - public CacheServerBridge(CacheServer cacheServer, InternalCache cache) { + public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer) { super(cacheServer); this.cacheServer = cacheServer; this.cache = cache; @@ -110,7 +111,18 @@ public class CacheServerBridge extends ServerBridge { initializeCacheServerStats(); } - // Dummy constructor for testing purpose only TODO why is this public then? + // For testing only + public CacheServerBridge(final InternalCache cache, final CacheServer cacheServer, + final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) { + super(acceptor, monitor); + this.cacheServer = cacheServer; + this.cache = cache; + this.qs = cache.getQueryService(); + + initializeCacheServerStats(); + } + + // For testing only public CacheServerBridge() { super(); initializeCacheServerStats(); @@ -648,7 +660,7 @@ public class CacheServerBridge extends ServerBridge { } public int getNumSubscriptions() { - Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes(); + Map clientProxyMembershipIDMap = InternalClientMembership.getClientQueueSizes(cache); return clientProxyMembershipIDMap.keySet().size(); } http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java index 003a8f3..8ea84f5 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java @@ -633,7 +633,7 @@ public class ManagementAdapter { return; } - CacheServerBridge cacheServerBridge = new CacheServerBridge(cacheServer, internalCache); + CacheServerBridge cacheServerBridge = new CacheServerBridge(internalCache, cacheServer); cacheServerBridge.setMemberMBeanBridge(memberMBeanBridge); CacheServerMBean cacheServerMBean = new CacheServerMBean(cacheServerBridge); http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java index 6834998..6008cdc 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ServerBridge.java @@ -29,7 +29,6 @@ public class ServerBridge { protected MBeanStatsMonitor monitor; - protected StatsRate getRequestRate; protected StatsRate putRequestRate; @@ -38,13 +37,20 @@ public class ServerBridge { protected StatsAverageLatency putRequestAvgLatency; - protected AcceptorImpl acceptor; + public ServerBridge(final CacheServer cacheServer) { + this((CacheServerImpl) cacheServer, + new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString())); + } - public ServerBridge(CacheServer cacheServer) { - this.monitor = new MBeanStatsMonitor(ManagementStrings.SERVER_MONITOR.toLocalizedString()); - this.acceptor = ((CacheServerImpl) cacheServer).getAcceptor(); + public ServerBridge(final CacheServerImpl cacheServer, final MBeanStatsMonitor monitor) { + this(cacheServer.getAcceptor(), monitor); + } + + public ServerBridge(final AcceptorImpl acceptor, final MBeanStatsMonitor monitor) { + this.monitor = monitor; + this.acceptor = acceptor; initializeStats(); startMonitor(); } http://git-wip-us.apache.org/repos/asf/geode/blob/0fde215a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java new file mode 100644 index 0000000..232df0a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/CacheServerBridgeClientMembershipRegressionTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal.beans; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.internal.cache.CacheServerImpl; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl; +import org.apache.geode.internal.cache.tier.sockets.CacheServerStats; +import org.apache.geode.management.internal.beans.stats.MBeanStatsMonitor; +import org.apache.geode.test.junit.categories.UnitTest; + +/** + * Regression test that confirms bug GEODE-3407. + * + * <p> + * GEODE-3407: JMX and membership may deadlock on CacheFactory.getAnyInstance + */ +@Category(UnitTest.class) +public class CacheServerBridgeClientMembershipRegressionTest { + + private final AtomicBoolean after = new AtomicBoolean(); + private final AtomicBoolean before = new AtomicBoolean(); + + private CacheServerBridge cacheServerBridge; + + private ExecutorService synchronizing; + private ExecutorService blocking; + private CountDownLatch latch; + + private InternalCache cache; + private CacheServerImpl cacheServer; + private AcceptorImpl acceptor; + private MBeanStatsMonitor monitor; + + @Before + public void setUp() throws Exception { + this.synchronizing = Executors.newSingleThreadExecutor(); + this.blocking = Executors.newSingleThreadExecutor(); + this.latch = new CountDownLatch(1); + + this.cache = mock(InternalCache.class); + this.cacheServer = mock(CacheServerImpl.class); + this.acceptor = mock(AcceptorImpl.class); + this.monitor = mock(MBeanStatsMonitor.class); + + when(cache.getQueryService()).thenReturn(mock(QueryService.class)); + when(acceptor.getStats()).thenReturn(mock(CacheServerStats.class)); + } + + @After + public void tearDown() throws Exception { + if (latch.getCount() > 0) { + latch.countDown(); + } + } + + @Test + public void getNumSubscriptionsDeadlocksOnCacheFactory() throws Exception { + givenCacheFactoryIsSynchronized(); + givenCacheServerBridge(); + + blocking.execute(() -> { + try { + before.set(true); + + // getNumSubscriptions -> getClientQueueSizes -> synchronizes on CacheFactory + cacheServerBridge.getNumSubscriptions(); + + } catch (CacheClosedException ignored) { + } finally { + after.set(true); + } + }); + + await().atMost(10, SECONDS).until(() -> before.get()); + + // if deadlocked, then this line will throw ConditionTimeoutException + await().atMost(10, SECONDS).until(() -> assertThat(after.get()).isTrue()); + } + + private void givenCacheFactoryIsSynchronized() { + synchronizing.execute(() -> { + synchronized (CacheFactory.class) { + try { + latch.await(2, MINUTES); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + }); + } + + private void givenCacheServerBridge() { + cacheServerBridge = new CacheServerBridge(cache, cacheServer, acceptor, monitor); + } + +}