This is an automated email from the ASF dual-hosted git repository. mcmellawatt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new bc2a2fa GEODE-6708: Ensuring single drainer and preventing NPE bc2a2fa is described below commit bc2a2fa5af374cfedfba4dc1abe6cbc2a7b719c8 Author: Ryan McMahon <rmcma...@pivotal.io> AuthorDate: Thu Apr 25 17:19:31 2019 -0700 GEODE-6708: Ensuring single drainer and preventing NPE --- .../cache/tier/sockets/CacheClientNotifier.java | 19 ++++- .../ClientRegistrationEventQueueManager.java | 99 ++++++++++++++-------- .../ClientRegistrationEventQueueManagerTest.java | 59 ++++++++++++- 3 files changed, 136 insertions(+), 41 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index a7dc8a4..da141d2 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; @@ -171,15 +172,15 @@ public class CacheClientNotifier { try { if (isClientPermitted(clientRegistrationMetadata, clientProxyMembershipID)) { registrationQueueManager.create(clientProxyMembershipID, new ConcurrentLinkedQueue<>(), - new ReentrantReadWriteLock()); + new ReentrantReadWriteLock(), new ReentrantLock()); try { registerClientInternal(clientRegistrationMetadata, socket, isPrimary, acceptorId, notifyBySubscription); } finally { - registrationQueueManager.drain( - clientProxyMembershipID, - this); + if (isProxyInitialized(clientProxyMembershipID)) { + registrationQueueManager.drain(clientProxyMembershipID, this); + } } } } catch (final AuthenticationRequiredException ex) { @@ -1220,6 +1221,16 @@ public class CacheClientNotifier { } /** + * Determines whether a client proxy has been initialized + * + * @param clientProxyMembershipID The client proxy membership ID + * @return Whether the client proxy is initialized + */ + private boolean isProxyInitialized(final ClientProxyMembershipID clientProxyMembershipID) { + return getClientProxy(clientProxyMembershipID) != null; + } + + /** * Returns the <code>CacheClientProxy</code> associated to the membershipID * * * @return the <code>CacheClientProxy</code> associated to the membershipID diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java index 122a192..23d8ef5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java @@ -20,6 +20,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.Logger; @@ -95,39 +96,58 @@ class ClientRegistrationEventQueueManager { void drain(final ClientProxyMembershipID clientProxyMembershipID, final CacheClientNotifier cacheClientNotifier) { - // As an optimization, we drain as many events from the queue as we can - // before taking out a lock to drain the remaining events - if (logger.isDebugEnabled()) { - logger.debug("Draining events from registration queue for client proxy " - + clientProxyMembershipID - + " without synchronization"); - } - - drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier); - ClientRegistrationEventQueue registrationEventQueue = registeringProxyEventQueues.get(clientProxyMembershipID); - registrationEventQueue.lockForDraining(); - try { - if (logger.isDebugEnabled()) { - logger.debug("Draining remaining events from registration queue for client proxy " - + clientProxyMembershipID + " with synchronization"); - } + if (registrationEventQueue != null) { + // It is possible that several client registration threads are active for the same + // ClientProxyMembershipID, in which case we only want a single drainer to drain + // and remove the queue. + registrationEventQueue.lockForSingleDrainer(); + try { + // See if the queue is still available after acquiring the lock as it may have + // been removed from registeringProxyEventQueues by the previous thread + if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) { + // As an optimization, we drain as many events from the queue as we can + // before taking out a lock to drain the remaining events. When we lock for draining, + // it prevents additional events from being added to the queue while the queue is drained + // and removed. + if (logger.isDebugEnabled()) { + logger.debug("Draining events from registration queue for client proxy " + + clientProxyMembershipID + + " without synchronization"); + } + + drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, registrationEventQueue, + cacheClientNotifier); + + // Prevents additional events from being added to the queue while we process and remove it + registrationEventQueue.lockForDraining(); + try { + if (logger.isDebugEnabled()) { + logger.debug("Draining remaining events from registration queue for client proxy " + + clientProxyMembershipID + " with synchronization"); + } - drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier); + drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, + registrationEventQueue, + cacheClientNotifier); - registeringProxyEventQueues.remove(clientProxyMembershipID); - } finally { - registrationEventQueue.unlockForDraining(); + registeringProxyEventQueues.remove(clientProxyMembershipID); + } finally { + registrationEventQueue.unlockForDraining(); + } + } + } finally { + registrationEventQueue.unlockForSingleDrainer(); + } } } private void drainEventsReceivedWhileRegisteringClient(final ClientProxyMembershipID proxyID, + final ClientRegistrationEventQueue registrationEventQueue, final CacheClientNotifier cacheClientNotifier) { ClientRegistrationEvent queuedEvent; - ClientRegistrationEventQueue registrationEventQueue = registeringProxyEventQueues.get(proxyID); - while ((queuedEvent = registrationEventQueue.poll()) != null) { InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent; Conflatable conflatable = queuedEvent.conflatable; @@ -139,23 +159,28 @@ class ClientRegistrationEventQueueManager { public ClientRegistrationEventQueue create( final ClientProxyMembershipID clientProxyMembershipID, final Queue<ClientRegistrationEvent> eventQueue, - final ReadWriteLock putDrainLock) { + final ReadWriteLock eventAddDrainLock, + final ReentrantLock singleDrainerLock) { final ClientRegistrationEventQueue clientRegistrationEventQueue = new ClientRegistrationEventQueue(eventQueue, - putDrainLock); - registeringProxyEventQueues.put(clientProxyMembershipID, + eventAddDrainLock, singleDrainerLock); + registeringProxyEventQueues.putIfAbsent(clientProxyMembershipID, clientRegistrationEventQueue); return clientRegistrationEventQueue; } class ClientRegistrationEventQueue { - Queue<ClientRegistrationEvent> eventQueue; - ReadWriteLock readWriteLock; + private final Queue<ClientRegistrationEvent> eventQueue; + private final ReadWriteLock eventAddDrainLock; + private final ReentrantLock singleDrainerLock; ClientRegistrationEventQueue( - final Queue<ClientRegistrationEvent> eventQueue, final ReadWriteLock readWriteLock) { + final Queue<ClientRegistrationEvent> eventQueue, + final ReadWriteLock eventAddDrainLock, + final ReentrantLock singleDrainerLock) { this.eventQueue = eventQueue; - this.readWriteLock = readWriteLock; + this.eventAddDrainLock = eventAddDrainLock; + this.singleDrainerLock = singleDrainerLock; } boolean isEmpty() { @@ -171,19 +196,27 @@ class ClientRegistrationEventQueueManager { } private void lockForDraining() { - readWriteLock.writeLock().lock(); + eventAddDrainLock.writeLock().lock(); } private void unlockForDraining() { - readWriteLock.writeLock().unlock(); + eventAddDrainLock.writeLock().unlock(); } private void lockForPutting() { - readWriteLock.readLock().lock(); + eventAddDrainLock.readLock().lock(); } private void unlockForPutting() { - readWriteLock.readLock().unlock(); + eventAddDrainLock.readLock().unlock(); + } + + private void lockForSingleDrainer() { + singleDrainerLock.lock(); + } + + private void unlockForSingleDrainer() { + singleDrainerLock.unlock(); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java index 801d178..f9af401 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java @@ -27,7 +27,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.junit.Test; @@ -68,7 +70,7 @@ public class ClientRegistrationEventQueueManagerTest { }); clientRegistrationEventQueueManager.create(clientProxyMembershipID, - new ConcurrentLinkedQueue<>(), mockPutDrainLock); + new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock()); InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class); LocalRegion localRegion = mock(LocalRegion.class); @@ -125,7 +127,7 @@ public class ClientRegistrationEventQueueManagerTest { ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class); clientRegistrationEventQueueManager.create(clientProxyMembershipID, - new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock()); + new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock()); InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class); when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class)); @@ -170,7 +172,7 @@ public class ClientRegistrationEventQueueManagerTest { ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue = clientRegistrationEventQueueManager.create(clientProxyMembershipID, - new ConcurrentLinkedQueue<>(), mockPutDrainLock); + new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock()); InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class); when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class)); @@ -180,7 +182,7 @@ public class ClientRegistrationEventQueueManagerTest { CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class); CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> { - for (int i = 0; i < 100000; ++i) { + for (int numAdds = 0; numAdds < 100000; ++numAdds) { // In thread one, we add events to the queue clientRegistrationEventQueueManager .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier); @@ -196,4 +198,53 @@ public class ClientRegistrationEventQueueManagerTest { assertThat(clientRegistrationEventQueue.isEmpty()).isTrue(); } + + @Test + public void twoThreadsRegisteringSameClientNoEventsLost() + throws ExecutionException, InterruptedException { + ClientRegistrationEventQueueManager clientRegistrationEventQueueManager = + new ClientRegistrationEventQueueManager(); + + InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class); + when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class)); + + Conflatable conflatable = mock(Conflatable.class); + Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>(); + CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class); + ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class); + + ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue = + clientRegistrationEventQueueManager.create(clientProxyMembershipID, + new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock()); + + for (int registrationIterations = 0; registrationIterations < 1000; ++registrationIterations) { + Runnable clientRegistrationSimulation = () -> { + for (int numAdds = 0; numAdds < getRandomNumberOfAdds(); ++numAdds) { + // In thread one, we add events to the queue + clientRegistrationEventQueueManager + .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier); + } + // In thread two, we drain events from the queue + clientRegistrationEventQueueManager.drain(clientProxyMembershipID, cacheClientNotifier); + }; + + CompletableFuture<Void> registrationFutureOne = + CompletableFuture.runAsync(clientRegistrationSimulation); + CompletableFuture<Void> registrationFutureTwo = + CompletableFuture.runAsync(clientRegistrationSimulation); + + CompletableFuture.allOf(registrationFutureOne, registrationFutureTwo).get(); + + assertThat(clientRegistrationEventQueue.isEmpty()).isTrue(); + } + } + + /* + * This helps to create contention between registration threads during the drain phase + */ + private static int getRandomNumberOfAdds() { + int min = 10_000; + int max = 50_000; + return ThreadLocalRandom.current().nextInt(min, max + 1); + } }