GEODE-2490: Avoid processing tombstone GC message in-line Currently the tombstone message sent for replicas are getting processed in-line instead of handing it to thread pool. Based on the number of nodes in the cluster, this may take long time to process, impacting other cache operation that required to be processed in-line.
The change provided here enables tombstone messages to be not processed in-line instead processed in separate thread. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/826bdbfe Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/826bdbfe Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/826bdbfe Branch: refs/heads/feature/GEODE-2460 Commit: 826bdbfe2ae2f4b3cd27760584684bc35e19e9b7 Parents: 974d72c Author: Anil <aging...@pivotal.io> Authored: Wed Feb 15 17:35:22 2017 -0800 Committer: Anil <aging...@pivotal.io> Committed: Fri Feb 17 17:44:46 2017 -0800 ---------------------------------------------------------------------- .../cache/DistributedTombstoneOperation.java | 12 ++++ .../geode/cache30/ClientServerCCEDUnitTest.java | 62 +++++++++++++++++++- 2 files changed, 73 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/826bdbfe/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java index 0765e16..1759c86 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedTombstoneOperation.java @@ -115,6 +115,12 @@ public class DistributedTombstoneOperation extends DistributedCacheOperation { return this.regionGCVersions; } + @Override + public boolean supportsDirectAck() { + // Set to false to force TombstoneMessage to use shared connection w/o in-line processing + return false; + } + public static class TombstoneMessage extends CacheOperationMessage implements SerializationVersions { // protected long regionVersion; @@ -130,6 +136,12 @@ public class DistributedTombstoneOperation extends DistributedCacheOperation { public TombstoneMessage() {} @Override + public int getProcessorType() { + // Set to STANDARD to keep it from being processed in-line + return DistributionManager.STANDARD_EXECUTOR; + } + + @Override protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException { RegionEventImpl event = createRegionEvent(rgn); event.setEventID(this.eventID); http://git-wip-us.apache.org/repos/asf/geode/blob/826bdbfe/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java index 75cd95b..e33074d 100644 --- a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java @@ -37,7 +37,6 @@ import org.apache.geode.test.junit.categories.ClientServerTest; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; - import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.CacheListener; import org.apache.geode.cache.DataPolicy; @@ -51,12 +50,18 @@ import org.apache.geode.cache.client.ClientRegionFactory; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.cache.AbstractRegionEntry; +import org.apache.geode.internal.cache.DistributedTombstoneOperation.TombstoneMessage; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.ha.HARegionQueue; +import org.apache.geode.internal.cache.partitioned.PRTombstoneMessage; import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; +import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.LogWriterUtils; @@ -219,6 +224,61 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase { checkClientDoesNotReceiveGC(vm3); } + @Test + public void testTombstoneMessageSentToReplicatesAreNotProcessedInLine() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + + final String name = "Region"; + + createServerRegion(vm0, name, true); + createEntries(vm0); + createServerRegion(vm1, name, true); + + try { + vm1.invoke(() -> { + DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver()); + }); + destroyEntries(vm0); + forceGC(vm0); + + vm1.invoke(() -> { + PRTombstoneMessageObserver mo = + (PRTombstoneMessageObserver) DistributionMessageObserver.getInstance(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + return mo.tsMessageProcessed >= 1; + }); + assertTrue("Tombstone GC message is not expected.", mo.thName.contains( + LocalizedStrings.DistributionManager_POOLED_MESSAGE_PROCESSOR.toLocalizedString())); + }); + + } finally { + vm1.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + } + + private class PRTombstoneMessageObserver extends DistributionMessageObserver { + public int tsMessageProcessed = 0; + public int prTsMessageProcessed = 0; + public String thName; + + @Override + public void afterProcessMessage(DistributionManager dm, DistributionMessage message) { + thName = Thread.currentThread().getName(); + + if (message instanceof TombstoneMessage) { + tsMessageProcessed++; + } + + if (message instanceof PRTombstoneMessage) { + prTsMessageProcessed++; + } + } + } + /** * for bug #40791 we pull tombstones into clients on get(), getAll() and registerInterest() to * protect the client cache from stray putAll events sitting in backup queues on the server