This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-10242
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d6f4e1f61aad5bcaac8d27b100a80ea645f53bf4
Author: Eric Shu <e...@pivotal.io>
AuthorDate: Wed Apr 27 10:45:51 2022 -0700

    GEODE-10242: Do not release primary lock prematurely
    
     * When depose primary during rebalance, do not release the primary lock
       before all colocated child buckets has deposed primary. This is to
       ensure that the node becomes new primary can only acquire the primary
       lock afterwards.
     * All colocated buckets now share the same primaryMoveReadWriteLock.
       When parent bucket is being moved, no operations will be executed on
       child buckets as well. So moving primary for all colocated buckets
       shold be faster, and there is no need to hold parent locks anymore.
---
 .../apache/geode/internal/cache/BucketAdvisor.java | 42 ++++++------
 .../apache/geode/internal/cache/BucketRegion.java  | 39 ++---------
 .../geode/internal/cache/BucketAdvisorTest.java    | 80 +++++++++++++++++-----
 3 files changed, 93 insertions(+), 68 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 2b70f868d2..ea012690ba 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -43,6 +43,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.TestOnly;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
@@ -142,9 +143,9 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
    * A read/write lock to prevent making this bucket not primary while a write 
is in progress on the
    * bucket.
    */
-  private final ReadWriteLock primaryMoveReadWriteLock = new 
ReentrantReadWriteLock();
-  private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
-  private final Lock primaryMoveWriteLock = 
primaryMoveReadWriteLock.writeLock();
+  private final ReadWriteLock primaryMoveReadWriteLock;
+  private final Lock primaryMoveReadLock;
+  private final Lock primaryMoveWriteLock;
 
   /**
    * The advisor for the bucket region that we are colocated with, if this 
region is a colocated
@@ -181,6 +182,14 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     redundancyTracker =
         new BucketRedundancyTracker(pRegion.getRedundantCopies(), 
pRegion.getRedundancyTracker());
     resetParentAdvisor(bucket.getId());
+
+    if (parentAdvisor == null) {
+      primaryMoveReadWriteLock = new ReentrantReadWriteLock();
+    } else {
+      primaryMoveReadWriteLock = parentAdvisor.primaryMoveReadWriteLock;
+    }
+    primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
+    primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
   }
 
   public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor 
regionAdvisor) {
@@ -240,19 +249,6 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     return primaryMoveReadLock;
   }
 
-  /**
-   * Returns the lock that prevents the parent's primary from moving while 
active writes are in
-   * progress. This should be locked before checking if the local bucket is 
primary.
-   *
-   * @return the lock for in-progress write operations
-   */
-  Lock getParentPrimaryMoveReadLock() {
-    if (parentAdvisor != null) {
-      return parentAdvisor.getPrimaryMoveReadLock();
-    }
-    return null;
-  }
-
   /**
    * Try to lock the primary bucket to make sure no operation is on-going at 
current bucket.
    *
@@ -309,7 +305,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
    * Caller must synchronize on this BucketAdvisor.
    *
    */
-  private void deposePrimaryForColocatedChildren() {
+  void deposePrimaryForColocatedChildren() {
     boolean deposedChildPrimaries = true;
     List<PartitionedRegion> colocatedChildPRs = 
ColocationHelper.getColocatedChildRegions(pRegion);
     if (colocatedChildPRs != null) {
@@ -845,7 +841,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor 
{
    *
    * @param member the member who is not primary
    */
-  private void removePrimary(InternalDistributedMember member) {
+  void removePrimary(InternalDistributedMember member) {
     boolean needToVolunteerForPrimary = false;
     if (!isClosed()) { // hole: requestPrimaryState not hosting
       initializationGate();
@@ -896,9 +892,10 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
           ((BucketRegion) br).beforeReleasingPrimaryLockDuringDemotion();
         }
 
-        releasePrimaryLock();
         // this was a deposePrimary call so we need to depose children as well
         deposePrimaryForColocatedChildren();
+        releasePrimaryLock();
+
         if (pRegion.isFixedPartitionedRegion()) {
           deposeOtherPrimaryBucketForFixedPartition();
         }
@@ -1688,6 +1685,11 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
     notifyAll(); // wake up any threads in waitForPrimaryMember
   }
 
+  @TestOnly
+  void setPrimaryMemberForTest(InternalDistributedMember member) {
+    primaryMember.set(member);
+  }
+
   void setHadPrimary() {
     everHadPrimary = true;
   }
@@ -1809,7 +1811,7 @@ public class BucketAdvisor extends 
CacheDistributionAdvisor {
   /**
    * Releases the primary lock for this bucket.
    */
-  private void releasePrimaryLock() {
+  void releasePrimaryLock() {
     // We don't have a lock if we have a parent advisor
     if (parentAdvisor != null) {
       return;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 421a854498..ffb1109115 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -799,7 +799,7 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   }
 
   /**
-   * lock this bucket and, if present, its colocated "parent"
+   * lock this bucket
    *
    * @param tryLock - whether to use tryLock (true) or a blocking lock (false)
    * @return true if locks were obtained and are still held
@@ -832,41 +832,20 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
 
   private boolean lockPrimaryStateReadLock(boolean tryLock) {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
     for (;;) {
       boolean interrupted = Thread.interrupted();
       try {
         // Get the lock. If we have to wait here, it's because
         // this VM is actively becoming "not primary". We don't want
         // to throw an exception until this VM is actually no longer
-        // primary, so we wait here for not primary to complete. See bug #39963
-        if (parentLock != null) {
-          if (tryLock) {
-            boolean locked = parentLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            parentLock.lockInterruptibly();
-          }
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              parentLock.unlock();
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
+        // primary, so we wait here for not primary to complete.
+        if (tryLock) {
+          boolean locked = primaryMoveReadLock.tryLock();
+          if (!locked) {
+            return false;
           }
         } else {
-          if (tryLock) {
-            boolean locked = primaryMoveReadLock.tryLock();
-            if (!locked) {
-              return false;
-            }
-          } else {
-            primaryMoveReadLock.lockInterruptibly();
-          }
+          primaryMoveReadLock.lockInterruptibly();
         }
         break; // success
       } catch (InterruptedException e) {
@@ -886,10 +865,6 @@ public class BucketRegion extends DistributedRegion 
implements Bucket {
   public void doUnlockForPrimary() {
     Lock primaryMoveReadLock = getBucketAdvisor().getPrimaryMoveReadLock();
     primaryMoveReadLock.unlock();
-    Lock parentLock = getBucketAdvisor().getParentPrimaryMoveReadLock();
-    if (parentLock != null) {
-      parentLock.unlock();
-    }
   }
 
   /**
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
index 817386cb7f..9e7bb65ea2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketAdvisorTest.java
@@ -19,11 +19,11 @@ import static org.apache.geode.cache.Region.SEPARATOR;
 import static 
org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -33,8 +33,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.server.CacheServer;
@@ -43,10 +48,18 @@ import 
org.apache.geode.distributed.internal.membership.InternalDistributedMembe
 import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 
-public class BucketAdvisorTest {
+class BucketAdvisorTest {
+  @Mock
+  private PartitionedRegion partitionedRegion;
+  @Mock
+  private Bucket bucket;
+  @Mock
+  private RegionAdvisor regionAdvisor;
+
+  private AutoCloseable closeable;
 
   @Test
-  public void shouldBeMockable() throws Exception {
+  void shouldBeMockable() throws Exception {
     BucketAdvisor mockBucketAdvisor = mock(BucketAdvisor.class);
     InternalDistributedMember mockInternalDistributedMember = 
mock(InternalDistributedMember.class);
 
@@ -58,7 +71,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
+  void 
whenServerStopsAfterTheFirstIsRunningCheckThenItShouldNotBeAddedToLocations() {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -86,7 +99,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
 {
+  void 
whenServerThrowsIllegalStateExceptionWithoutBindAddressMsgThenExceptionMustBeThrown()
 {
     InternalCache mockCache = mock(InternalCache.class);
     ProxyBucketRegion mockBucket = mock(ProxyBucketRegion.class);
     RegionAdvisor mockRegionAdvisor = mock(RegionAdvisor.class);
@@ -114,7 +127,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void volunteerForPrimaryIgnoresMissingPrimaryElector() {
+  void volunteerForPrimaryIgnoresMissingPrimaryElector() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -153,12 +166,13 @@ public class BucketAdvisorTest {
         mock(BucketAdvisor.VolunteeringDelegate.class);
     advisorSpy.setVolunteeringDelegate(volunteeringDelegate);
     advisorSpy.initializePrimaryElector(missingElectorId);
-    assertEquals(missingElectorId, advisorSpy.getPrimaryElector());
+    assertThat(missingElectorId).isEqualTo(advisorSpy.getPrimaryElector());
     advisorSpy.volunteerForPrimary();
     verify(volunteeringDelegate).volunteerForPrimary();
   }
 
-  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(Map<String, 
Boolean> shadowBuckets) {
+  BucketAdvisor mockBucketAdvisorWithShadowBucketsDestroyedMap(
+      Map<String, Boolean> shadowBuckets) {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -180,7 +194,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
+  void 
markAllShadowBucketsAsNonDestroyedShouldClearTheShadowBucketsDestroyedMap() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false, SEPARATOR + 
"b2", true);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -190,7 +204,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
 {
+  void 
markAllShadowBucketsAsDestroyedShouldSetTheFlagAsTrueForEveryKnownShadowBucket()
 {
     Map<String, Boolean> buckets =
         of(SEPARATOR + "b1", false, SEPARATOR + "b2", false, SEPARATOR + "b3", 
false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
@@ -201,7 +215,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
+  void 
markShadowBucketAsDestroyedShouldSetTheFlagAsTrueOnlyForTheSpecificBucket() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -217,7 +231,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void isShadowBucketDestroyedShouldReturnCorrectly() {
+  void isShadowBucketDestroyedShouldReturnCorrectly() {
     Map<String, Boolean> buckets = of(SEPARATOR + "b1", true, SEPARATOR + 
"b2", false);
     BucketAdvisor bucketAdvisor = 
mockBucketAdvisorWithShadowBucketsDestroyedMap(buckets);
 
@@ -230,7 +244,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
+  void 
testGetAllHostingMembersReturnsNoMembersWhenBucketAdvisorHasNoProfiles() {
     DistributionManager distributionManager = mock(DistributionManager.class);
     when(distributionManager.getId()).thenReturn(new 
InternalDistributedMember("localhost", 321));
 
@@ -252,7 +266,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
 {
+  void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasOneProfileWithHostingBucket()
 {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
 
@@ -282,7 +296,7 @@ public class BucketAdvisorTest {
   }
 
   @Test
-  public void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
 {
+  void 
testGetAllHostingMembersReturnsMemberWhenBucketAdvisorHasTwoProfilesAndOneIsHostingBucket()
 {
     DistributionManager distributionManager = mock(DistributionManager.class);
     InternalDistributedMember memberId = new 
InternalDistributedMember("localhost", 321);
     InternalDistributedMember memberId2 = new 
InternalDistributedMember("localhost", 323);
@@ -317,4 +331,38 @@ public class BucketAdvisorTest {
 
     assertThat(bucketAdvisor.adviseInitialized().size()).isEqualTo(1);
   }
+
+  @Test
+  void 
removePrimaryDeposePrimaryForColocatedChildrenBeforeReleasePrimaryLock() {
+    when(regionAdvisor.getPartitionedRegion()).thenReturn(partitionedRegion);
+    when(regionAdvisor.getBucket(any(Integer.class))).thenReturn(bucket);
+    when(partitionedRegion.getPartitionAttributes()).thenReturn(new 
PartitionAttributesImpl());
+    
when(bucket.getDistributionManager()).thenReturn(mock(DistributionManager.class));
+    BucketAdvisor bucketAdvisor = 
spy(BucketAdvisor.createBucketAdvisor(bucket, regionAdvisor));
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    DistributionManager manager = mock(DistributionManager.class);
+    doReturn(true).when(bucketAdvisor).isPrimary();
+    doReturn(manager).when(bucketAdvisor).getDistributionManager();
+    when(manager.getId()).thenReturn(member);
+    bucketAdvisor.setPrimaryMemberForTest(member);
+    bucketAdvisor.setInitialized();
+    doNothing().when(bucketAdvisor).deposePrimaryForColocatedChildren();
+
+    InOrder order = inOrder(bucketAdvisor);
+    bucketAdvisor.removePrimary(member);
+
+    order.verify(bucketAdvisor).deposePrimaryForColocatedChildren();
+    order.verify(bucketAdvisor).releasePrimaryLock();
+    assertThat(bucketAdvisor.basicGetPrimaryMember()).isNull();
+  }
+
+  @BeforeEach
+  void init() {
+    closeable = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterEach
+  void close() throws Exception {
+    closeable.close();
+  }
 }

Reply via email to