This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-6853 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 96e614641606959df40b0e48d975660608fbed0e Author: eshu <e...@pivotal.io> AuthorDate: Mon Jun 10 17:27:38 2019 -0700 GEODE-6853: Do not request region sync during message processing. * Move schedule region synchronize message to DistriubtedRegion. * Use the time task to request region sync with no deplay when processing reion sync message. --- .../distributed/internal/DistributionAdvisor.java | 45 ++++-------------- .../geode/internal/cache/DistributedRegion.java | 50 +++++++++++++++++++- .../internal/cache/InitialImageOperation.java | 4 +- .../internal/DistributionAdvisorTest.java | 55 ++++++++++++++++++++++ .../internal/cache/DistributedRegionTest.java | 36 ++++++++++++++ .../internal/cache/InitialImageOperationTest.java | 7 +-- 6 files changed, 154 insertions(+), 43 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java index 78d33a4..7093360 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java @@ -41,7 +41,6 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe import org.apache.geode.internal.Assert; import org.apache.geode.internal.DataSerializableFixedID; import org.apache.geode.internal.InternalDataSerializer; -import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; import org.apache.geode.internal.cache.DistributedRegion; @@ -277,50 +276,22 @@ public class DistributionAdvisor { // interval. This allows client caches to retry an operation that might otherwise be recovered // through the sync operation. Without associated event information this could cause the // retried operation to be mishandled. See GEODE-5505 - final long delay = dr.getGemFireCache().getCacheServers().stream() - .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L); - dr.getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() { - @Override - public void run2() { - while (!dr.isInitialized()) { - if (dr.isDestroyed()) { - return; - } else { - try { - if (isDebugEnabled) { - logger.debug( - "da.syncForCrashedMember waiting for region to finish initializing: {}", dr); - } - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - } - } - if (dr.getDataPolicy().withPersistence() && persistentId == null) { - // Fix for 46704. The lost member may be a replicate - // or an empty accessor. We don't need to do a synchronization - // in that case, because those members send their writes to - // a persistent member. - if (isDebugEnabled) { - logger.debug( - "da.syncForCrashedMember skipping sync because crashed member is not persistent: {}", - id); - } - return; - } - dr.synchronizeForLostMember(id, lostVersionID); - } - }, delay); + final long delay = getDelay(dr); + dr.scheduleSynchronizeForLostMember(id, lostVersionID, delay); if (dr.getConcurrencyChecksEnabled()) { dr.setRegionSynchronizeScheduled(lostVersionID); } } - private PersistentMemberID getPersistentID(CacheProfile cp) { + PersistentMemberID getPersistentID(CacheProfile cp) { return cp.persistentID; } + long getDelay(DistributedRegion dr) { + return dr.getGemFireCache().getCacheServers().stream() + .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L); + } + /** find the region for a delta-gii operation (synch) */ public DistributedRegion getRegionForDeltaGII() { if (advisee instanceof DistributedRegion) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 2898438..a8022b6 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -87,6 +87,7 @@ import org.apache.geode.distributed.internal.locks.DLockRemoteToken; import org.apache.geode.distributed.internal.locks.DLockService; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; +import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus; import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook; @@ -1273,11 +1274,40 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } + public void scheduleSynchronizeForLostMember(InternalDistributedMember member, + VersionSource lostVersionID, long delay) { + getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() { + @Override + public void run2() { + performSynchronizeForLostMemberTask(member, lostVersionID); + } + }, delay); + } + + void performSynchronizeForLostMemberTask(InternalDistributedMember member, + VersionSource lostVersionID) { + waitUntilInitialized(); + + if (getDataPolicy().withPersistence() && getPersistentID() == null) { + // Fix for 46704. The lost member may be a replicate + // or an empty accessor. We don't need to do a synchronization + // in that case, because those members send their writes to + // a persistent member. + if (logger.isDebugEnabled()) { + logger.debug( + "da.syncForCrashedMember skipping sync because crashed member is not persistent: {}", + member); + } + return; + } + synchronizeForLostMember(member, lostVersionID); + } + /** * If this region has concurrency controls enabled this will pull any missing changes from other * replicates using InitialImageOperation and a filtered chunking protocol. */ - public void synchronizeForLostMember(InternalDistributedMember lostMember, + void synchronizeForLostMember(InternalDistributedMember lostMember, VersionSource lostVersionID) { if (!getConcurrencyChecksEnabled()) { return; @@ -1328,6 +1358,24 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute return false; } + void waitUntilInitialized() { + while (!isInitialized()) { + if (isDestroyed()) { + return; + } else { + try { + if (logger.isDebugEnabled()) { + logger.debug( + "da.syncForCrashedMember waiting for region to finish initializing: {}", this); + } + Thread.sleep(100); + } catch (InterruptedException e) { + return; + } + } + } + } + /** remove any partial entries received in a failed GII */ void cleanUpAfterFailedGII(boolean recoverFromDisk) { DiskRegion dskRgn = getDiskRegion(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index 072f97c..d0811c3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -1918,9 +1918,9 @@ public class InitialImageOperation { InternalDistributedMember lostMember, VersionSource lostVersionSource) { if (region.setRegionSynchronizedWithIfNotScheduled(lostVersionSource)) { // if region synchronization has not been scheduled or performed, - // we do synchronization with others right away as we received the synchronization request + // we do synchronization with no delay as we received the synchronization request // indicating timed task has been triggered on other nodes - region.synchronizeForLostMember(lostMember, lostVersionSource); + region.scheduleSynchronizeForLostMember(lostMember, lostVersionSource, 0); } } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java index e92459c..a541338 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java @@ -14,14 +14,40 @@ */ package org.apache.geode.distributed.internal; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.junit.Before; import org.junit.Test; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.CacheDistributionAdvisor; +import org.apache.geode.internal.cache.DistributedRegion; +import org.apache.geode.internal.cache.persistence.PersistentMemberID; +import org.apache.geode.internal.cache.versions.VersionSource; + public class DistributionAdvisorTest { + private DistributionAdvisor distributionAdvisor; + private InternalDistributedMember member; + private DistributedRegion distributedRegion; + private DistributionAdvisor.Profile profile; + private VersionSource lostVersionID; + private PersistentMemberID persistentMemberID; + private long delay = 100; + + @Before + public void setup() { + distributionAdvisor = mock(DistributionAdvisor.class); + member = mock(InternalDistributedMember.class); + distributedRegion = mock(DistributedRegion.class); + profile = mock(CacheDistributionAdvisor.CacheProfile.class); + lostVersionID = mock(VersionSource.class); + persistentMemberID = mock(PersistentMemberID.class); + } @Test public void shouldBeMockable() throws Exception { @@ -29,4 +55,33 @@ public class DistributionAdvisorTest { mockDistributionAdvisor.initialize(); verify(mockDistributionAdvisor, times(1)).initialize(); } + + @Test + public void regionSyncScheduledForLostMember() { + when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion); + when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay); + when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true); + doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, profile); + + distributionAdvisor.syncForCrashedMember(member, profile); + + verify(distributedRegion).scheduleSynchronizeForLostMember(member, member, delay); + verify(distributedRegion).setRegionSynchronizeScheduled(member); + } + + @Test + public void regionSyncScheduledForLostPersistentMember() { + when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion); + when(distributionAdvisor.getPersistentID((CacheDistributionAdvisor.CacheProfile) profile)) + .thenReturn(persistentMemberID); + when(persistentMemberID.getVersionMember()).thenReturn(lostVersionID); + when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay); + when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true); + doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, profile); + + distributionAdvisor.syncForCrashedMember(member, profile); + + verify(distributedRegion).scheduleSynchronizeForLostMember(member, lostVersionID, delay); + verify(distributedRegion).setRegionSynchronizeScheduled(lostVersionID); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java index 143072f..23640a4 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java @@ -19,6 +19,7 @@ import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -26,7 +27,10 @@ import static org.mockito.Mockito.when; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.versions.RegionVersionHolder; import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.cache.versions.VersionSource; @@ -36,12 +40,14 @@ public class DistributedRegionTest { private RegionVersionVector vector; private RegionVersionHolder holder; private VersionSource lostMemberVersionID; + private InternalDistributedMember member; @Before public void setup() { vector = mock(RegionVersionVector.class); holder = mock(RegionVersionHolder.class); lostMemberVersionID = mock(VersionSource.class); + member = mock(InternalDistributedMember.class); } @Test @@ -146,4 +152,34 @@ public class DistributedRegionTest { verify(holder, never()).setRegionSynchronizeScheduledOrDoneIfNot(); } + + @Test + public void regionSyncInvokedInPerformSynchronizeForLostMemberTaskAfterRegionInitialized() { + DistributedRegion distributedRegion = mock(DistributedRegion.class); + when(distributedRegion.getDataPolicy()).thenReturn(mock(DataPolicy.class)); + doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member, + lostMemberVersionID); + InOrder inOrder = inOrder(distributedRegion); + + distributedRegion.performSynchronizeForLostMemberTask(member, lostMemberVersionID); + + inOrder.verify(distributedRegion).waitUntilInitialized(); + inOrder.verify(distributedRegion).synchronizeForLostMember(member, lostMemberVersionID); + } + + @Test + public void emptyAccessorOfPersistentRegionDoesNotSynchronizeForLostMember() { + DataPolicy dataPolicy = mock(DataPolicy.class); + DistributedRegion distributedRegion = mock(DistributedRegion.class); + when(distributedRegion.getDataPolicy()).thenReturn(dataPolicy); + when(dataPolicy.withPersistence()).thenReturn(true); + when(distributedRegion.getPersistentID()).thenReturn(null); + doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member, + lostMemberVersionID); + + distributedRegion.performSynchronizeForLostMemberTask(member, lostMemberVersionID); + + verify(distributedRegion).waitUntilInitialized(); + verify(distributedRegion, never()).synchronizeForLostMember(member, lostMemberVersionID); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java index 26663a5..61bed57 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java @@ -77,12 +77,12 @@ public class InitialImageOperationTest { } @Test - public void synchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() { + public void scheduleSynchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() { when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource)).thenReturn(true); message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource); - verify(distributedRegion).synchronizeForLostMember(lostMember, versionSource); + verify(distributedRegion).scheduleSynchronizeForLostMember(lostMember, versionSource, 0); } @Test @@ -92,6 +92,7 @@ public class InitialImageOperationTest { message.synchronizeIfNotScheduled(distributedRegion, lostMember, versionSource); - verify(distributedRegion, never()).synchronizeForLostMember(lostMember, versionSource); + verify(distributedRegion, never()).scheduleSynchronizeForLostMember(lostMember, versionSource, + 0); } }