This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6853
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 96e614641606959df40b0e48d975660608fbed0e
Author: eshu <e...@pivotal.io>
AuthorDate: Mon Jun 10 17:27:38 2019 -0700

    GEODE-6853: Do not request region sync during message processing.
    
     * Move schedule region synchronize message to DistriubtedRegion.
     * Use the time task to request region sync with no deplay when processing 
reion sync message.
---
 .../distributed/internal/DistributionAdvisor.java  | 45 ++++--------------
 .../geode/internal/cache/DistributedRegion.java    | 50 +++++++++++++++++++-
 .../internal/cache/InitialImageOperation.java      |  4 +-
 .../internal/DistributionAdvisorTest.java          | 55 ++++++++++++++++++++++
 .../internal/cache/DistributedRegionTest.java      | 36 ++++++++++++++
 .../internal/cache/InitialImageOperationTest.java  |  7 +--
 6 files changed, 154 insertions(+), 43 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index 78d33a4..7093360 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -41,7 +41,6 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.DistributedRegion;
@@ -277,50 +276,22 @@ public class DistributionAdvisor {
     // interval. This allows client caches to retry an operation that might 
otherwise be recovered
     // through the sync operation. Without associated event information this 
could cause the
     // retried operation to be mishandled. See GEODE-5505
-    final long delay = dr.getGemFireCache().getCacheServers().stream()
-        .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L);
-    dr.getGemFireCache().getCCPTimer().schedule(new 
SystemTimer.SystemTimerTask() {
-      @Override
-      public void run2() {
-        while (!dr.isInitialized()) {
-          if (dr.isDestroyed()) {
-            return;
-          } else {
-            try {
-              if (isDebugEnabled) {
-                logger.debug(
-                    "da.syncForCrashedMember waiting for region to finish 
initializing: {}", dr);
-              }
-              Thread.sleep(100);
-            } catch (InterruptedException e) {
-              return;
-            }
-          }
-        }
-        if (dr.getDataPolicy().withPersistence() && persistentId == null) {
-          // Fix for 46704. The lost member may be a replicate
-          // or an empty accessor. We don't need to do a synchronization
-          // in that case, because those members send their writes to
-          // a persistent member.
-          if (isDebugEnabled) {
-            logger.debug(
-                "da.syncForCrashedMember skipping sync because crashed member 
is not persistent: {}",
-                id);
-          }
-          return;
-        }
-        dr.synchronizeForLostMember(id, lostVersionID);
-      }
-    }, delay);
+    final long delay = getDelay(dr);
+    dr.scheduleSynchronizeForLostMember(id, lostVersionID, delay);
     if (dr.getConcurrencyChecksEnabled()) {
       dr.setRegionSynchronizeScheduled(lostVersionID);
     }
   }
 
-  private PersistentMemberID getPersistentID(CacheProfile cp) {
+  PersistentMemberID getPersistentID(CacheProfile cp) {
     return cp.persistentID;
   }
 
+  long getDelay(DistributedRegion dr) {
+    return dr.getGemFireCache().getCacheServers().stream()
+        .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L);
+  }
+
   /** find the region for a delta-gii operation (synch) */
   public DistributedRegion getRegionForDeltaGII() {
     if (advisee instanceof DistributedRegion) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 2898438..a8022b6 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -87,6 +87,7 @@ import 
org.apache.geode.distributed.internal.locks.DLockRemoteToken;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import org.apache.geode.internal.cache.InitialImageOperation.GIIStatus;
 import org.apache.geode.internal.cache.RegionMap.ARMLockTestHook;
@@ -1273,11 +1274,40 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
     }
   }
 
+  public void scheduleSynchronizeForLostMember(InternalDistributedMember 
member,
+      VersionSource lostVersionID, long delay) {
+    getGemFireCache().getCCPTimer().schedule(new SystemTimer.SystemTimerTask() 
{
+      @Override
+      public void run2() {
+        performSynchronizeForLostMemberTask(member, lostVersionID);
+      }
+    }, delay);
+  }
+
+  void performSynchronizeForLostMemberTask(InternalDistributedMember member,
+      VersionSource lostVersionID) {
+    waitUntilInitialized();
+
+    if (getDataPolicy().withPersistence() && getPersistentID() == null) {
+      // Fix for 46704. The lost member may be a replicate
+      // or an empty accessor. We don't need to do a synchronization
+      // in that case, because those members send their writes to
+      // a persistent member.
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "da.syncForCrashedMember skipping sync because crashed member is 
not persistent: {}",
+            member);
+      }
+      return;
+    }
+    synchronizeForLostMember(member, lostVersionID);
+  }
+
   /**
    * If this region has concurrency controls enabled this will pull any 
missing changes from other
    * replicates using InitialImageOperation and a filtered chunking protocol.
    */
-  public void synchronizeForLostMember(InternalDistributedMember lostMember,
+  void synchronizeForLostMember(InternalDistributedMember lostMember,
       VersionSource lostVersionID) {
     if (!getConcurrencyChecksEnabled()) {
       return;
@@ -1328,6 +1358,24 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
     return false;
   }
 
+  void waitUntilInitialized() {
+    while (!isInitialized()) {
+      if (isDestroyed()) {
+        return;
+      } else {
+        try {
+          if (logger.isDebugEnabled()) {
+            logger.debug(
+                "da.syncForCrashedMember waiting for region to finish 
initializing: {}", this);
+          }
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+  }
+
   /** remove any partial entries received in a failed GII */
   void cleanUpAfterFailedGII(boolean recoverFromDisk) {
     DiskRegion dskRgn = getDiskRegion();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index 072f97c..d0811c3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -1918,9 +1918,9 @@ public class InitialImageOperation {
         InternalDistributedMember lostMember, VersionSource lostVersionSource) 
{
       if (region.setRegionSynchronizedWithIfNotScheduled(lostVersionSource)) {
         // if region synchronization has not been scheduled or performed,
-        // we do synchronization with others right away as we received the 
synchronization request
+        // we do synchronization with no delay as we received the 
synchronization request
         // indicating timed task has been triggered on other nodes
-        region.synchronizeForLostMember(lostMember, lostVersionSource);
+        region.scheduleSynchronizeForLostMember(lostMember, lostVersionSource, 
0);
       }
     }
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
index e92459c..a541338 100644
--- 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionAdvisorTest.java
@@ -14,14 +14,40 @@
  */
 package org.apache.geode.distributed.internal;
 
+import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import org.junit.Before;
 import org.junit.Test;
 
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.versions.VersionSource;
+
 
 public class DistributionAdvisorTest {
+  private DistributionAdvisor distributionAdvisor;
+  private InternalDistributedMember member;
+  private DistributedRegion distributedRegion;
+  private DistributionAdvisor.Profile profile;
+  private VersionSource lostVersionID;
+  private PersistentMemberID persistentMemberID;
+  private long delay = 100;
+
+  @Before
+  public void setup() {
+    distributionAdvisor = mock(DistributionAdvisor.class);
+    member = mock(InternalDistributedMember.class);
+    distributedRegion = mock(DistributedRegion.class);
+    profile = mock(CacheDistributionAdvisor.CacheProfile.class);
+    lostVersionID = mock(VersionSource.class);
+    persistentMemberID = mock(PersistentMemberID.class);
+  }
 
   @Test
   public void shouldBeMockable() throws Exception {
@@ -29,4 +55,33 @@ public class DistributionAdvisorTest {
     mockDistributionAdvisor.initialize();
     verify(mockDistributionAdvisor, times(1)).initialize();
   }
+
+  @Test
+  public void regionSyncScheduledForLostMember() {
+    
when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion);
+    when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay);
+    when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+    doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, 
profile);
+
+    distributionAdvisor.syncForCrashedMember(member, profile);
+
+    verify(distributedRegion).scheduleSynchronizeForLostMember(member, member, 
delay);
+    verify(distributedRegion).setRegionSynchronizeScheduled(member);
+  }
+
+  @Test
+  public void regionSyncScheduledForLostPersistentMember() {
+    
when(distributionAdvisor.getRegionForDeltaGII()).thenReturn(distributedRegion);
+    
when(distributionAdvisor.getPersistentID((CacheDistributionAdvisor.CacheProfile)
 profile))
+        .thenReturn(persistentMemberID);
+    when(persistentMemberID.getVersionMember()).thenReturn(lostVersionID);
+    when(distributionAdvisor.getDelay(distributedRegion)).thenReturn(delay);
+    when(distributedRegion.getConcurrencyChecksEnabled()).thenReturn(true);
+    doCallRealMethod().when(distributionAdvisor).syncForCrashedMember(member, 
profile);
+
+    distributionAdvisor.syncForCrashedMember(member, profile);
+
+    verify(distributedRegion).scheduleSynchronizeForLostMember(member, 
lostVersionID, delay);
+    verify(distributedRegion).setRegionSynchronizeScheduled(lostVersionID);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
index 143072f..23640a4 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionTest.java
@@ -19,6 +19,7 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -26,7 +27,10 @@ import static org.mockito.Mockito.when;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
+import org.apache.geode.cache.DataPolicy;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionSource;
@@ -36,12 +40,14 @@ public class DistributedRegionTest {
   private RegionVersionVector vector;
   private RegionVersionHolder holder;
   private VersionSource lostMemberVersionID;
+  private InternalDistributedMember member;
 
   @Before
   public void setup() {
     vector = mock(RegionVersionVector.class);
     holder = mock(RegionVersionHolder.class);
     lostMemberVersionID = mock(VersionSource.class);
+    member = mock(InternalDistributedMember.class);
   }
 
   @Test
@@ -146,4 +152,34 @@ public class DistributedRegionTest {
 
     verify(holder, never()).setRegionSynchronizeScheduledOrDoneIfNot();
   }
+
+  @Test
+  public void 
regionSyncInvokedInPerformSynchronizeForLostMemberTaskAfterRegionInitialized() {
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getDataPolicy()).thenReturn(mock(DataPolicy.class));
+    
doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member,
+        lostMemberVersionID);
+    InOrder inOrder = inOrder(distributedRegion);
+
+    distributedRegion.performSynchronizeForLostMemberTask(member, 
lostMemberVersionID);
+
+    inOrder.verify(distributedRegion).waitUntilInitialized();
+    inOrder.verify(distributedRegion).synchronizeForLostMember(member, 
lostMemberVersionID);
+  }
+
+  @Test
+  public void emptyAccessorOfPersistentRegionDoesNotSynchronizeForLostMember() 
{
+    DataPolicy dataPolicy = mock(DataPolicy.class);
+    DistributedRegion distributedRegion = mock(DistributedRegion.class);
+    when(distributedRegion.getDataPolicy()).thenReturn(dataPolicy);
+    when(dataPolicy.withPersistence()).thenReturn(true);
+    when(distributedRegion.getPersistentID()).thenReturn(null);
+    
doCallRealMethod().when(distributedRegion).performSynchronizeForLostMemberTask(member,
+        lostMemberVersionID);
+
+    distributedRegion.performSynchronizeForLostMemberTask(member, 
lostMemberVersionID);
+
+    verify(distributedRegion).waitUntilInitialized();
+    verify(distributedRegion, never()).synchronizeForLostMember(member, 
lostMemberVersionID);
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
index 26663a5..61bed57 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/InitialImageOperationTest.java
@@ -77,12 +77,12 @@ public class InitialImageOperationTest {
   }
 
   @Test
-  public void 
synchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization() 
{
+  public void 
scheduleSynchronizeForLostMemberIsInvokedIfRegionHasNotScheduledOrDoneSynchronization()
 {
     
when(distributedRegion.setRegionSynchronizedWithIfNotScheduled(versionSource)).thenReturn(true);
 
     message.synchronizeIfNotScheduled(distributedRegion, lostMember, 
versionSource);
 
-    verify(distributedRegion).synchronizeForLostMember(lostMember, 
versionSource);
+    verify(distributedRegion).scheduleSynchronizeForLostMember(lostMember, 
versionSource, 0);
   }
 
   @Test
@@ -92,6 +92,7 @@ public class InitialImageOperationTest {
 
     message.synchronizeIfNotScheduled(distributedRegion, lostMember, 
versionSource);
 
-    verify(distributedRegion, never()).synchronizeForLostMember(lostMember, 
versionSource);
+    verify(distributedRegion, 
never()).scheduleSynchronizeForLostMember(lostMember, versionSource,
+        0);
   }
 }

Reply via email to