This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-5478 in repository https://gitbox.apache.org/repos/asf/geode.git
commit b4b3cc34ffc0bdcfbd0461cf0b8da642251bea7f Author: Barry Oglesby <bogle...@pivotal.io> AuthorDate: Thu Jul 26 10:37:39 2018 -0700 GEODE-5478: Modified BucketRedundancyTracker to not increment low redundnancy bucket count twice --- ...edRegionLowBucketRedundancyDistributedTest.java | 196 +++++++++++++++++++++ .../internal/cache/BucketRedundancyTracker.java | 2 +- .../cache/BucketRedundancyTrackerTest.java | 23 +++ 3 files changed, 220 insertions(+), 1 deletion(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionLowBucketRedundancyDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionLowBucketRedundancyDistributedTest.java new file mode 100644 index 0000000..979937b --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionLowBucketRedundancyDistributedTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION; +import static org.apache.geode.cache.RegionShortcut.PARTITION_PERSISTENT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +public class PartitionedRegionLowBucketRedundancyDistributedTest implements Serializable { + + public String regionName; + + @Rule + public ClusterStartupRule startupRule = new ClusterStartupRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Before + public void setUp() { + regionName = testName.getMethodName() + "_region"; + } + + @Test + public void testTwoServersWithOneRedundantCopy() throws Exception { + // Start locator + MemberVM locator = startupRule.startLocatorVM(0); + int locatorPort = locator.getPort(); + + // Start server1 and create region + MemberVM server1 = startServerAndCreateRegion(1, locatorPort, PARTITION, 1); + + // Do puts in server1 + server1.getVM().invoke(() -> doPuts(500)); + + // Verify lowBucketRedundancyCount == 113 in server1 + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(113)); + + // Start server2 and create region + MemberVM server2 = startServerAndCreateRegion(2, locatorPort, PARTITION, 1); + + // Verify lowBucketRedundancyCount == 0 in both servers + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server2.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + + // Stop server2 + server2.stopMember(false); + + // Verify lowBucketRedundancyCount == 113 in server1 + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(113)); + } + + @Test + public void testThreeServersWithTwoRedundantCopies() throws Exception { + // Start locator + MemberVM locator = startupRule.startLocatorVM(0); + int locatorPort = locator.getPort(); + + // Start server1 and create region + MemberVM server1 = startServerAndCreateRegion(1, locatorPort, PARTITION, 2); + + // Do puts in server1 + server1.getVM().invoke(() -> doPuts(500)); + + // Verify lowBucketRedundancyCount == 113 in server1 + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(113)); + + // Start server2 and create region + MemberVM server2 = startServerAndCreateRegion(2, locatorPort, PARTITION, 2); + + // Verify lowBucketRedundancyCount == 113 in both servers + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(113)); + server2.getVM().invoke(() -> waitForLowBucketRedundancyCount(113)); + + // Start server2 and create region + MemberVM server3 = startServerAndCreateRegion(3, locatorPort, PARTITION, 2); + + // Verify lowBucketRedundancyCount == 113 in server1 + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server2.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server3.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + } + + @Test + public void testFourServersWithPersistentRegionAndOneRedundantCopy() throws Exception { + // Start locator + MemberVM locator = startupRule.startLocatorVM(0); + int locatorPort = locator.getPort(); + + // Start servers and create regions + MemberVM server1 = startServerAndCreateRegion(1, locatorPort, PARTITION_PERSISTENT, 1); + MemberVM server2 = startServerAndCreateRegion(2, locatorPort, PARTITION_PERSISTENT, 1); + MemberVM server3 = startServerAndCreateRegion(3, locatorPort, PARTITION_PERSISTENT, 1); + MemberVM server4 = startServerAndCreateRegion(4, locatorPort, PARTITION_PERSISTENT, 1); + + // Do puts in server1 + server1.getVM().invoke(() -> doPuts(500)); + + // Verify lowBucketRedundancyCount == 0 in all servers + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server2.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server3.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server4.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + + // Stop servers 1 and 2 + server1.stopMember(false); + server2.stopMember(false); + + server3.getVM().invoke(() -> waitForMembers(1)); + server4.getVM().invoke(() -> waitForMembers(1)); + + // Restart servers 1 and 2 + server1 = startupRule.startServerVM(1, locatorPort); + server2 = startupRule.startServerVM(2, locatorPort); + + // Asynchronously recreate the regions in servers 1 and 2 (since they are recovering persistent + // data) + AsyncInvocation recreateRegionInServer1 = + server1.getVM().invokeAsync(() -> createRegion(PARTITION_PERSISTENT, 1)); + AsyncInvocation recreateRegionInServer2 = + server2.getVM().invokeAsync(() -> createRegion(PARTITION_PERSISTENT, 1)); + recreateRegionInServer1.await(); + recreateRegionInServer2.await(); + + // Verify lowBucketRedundancyCount == 0 in all servers + server1.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server2.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server3.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + server4.getVM().invoke(() -> waitForLowBucketRedundancyCount(0)); + } + + private MemberVM startServerAndCreateRegion(int vmId, int locatorPort, RegionShortcut shortcut, + int redundantCopies) { + // Start server + MemberVM server = startupRule.startServerVM(vmId, locatorPort); + + // Create region + server.getVM().invoke(() -> createRegion(shortcut, redundantCopies)); + + return server; + } + + private void createRegion(RegionShortcut shortcut, int redundantCopies) { + PartitionAttributesFactory<?, ?> paf = new PartitionAttributesFactory(); + paf.setRedundantCopies(redundantCopies); + ClusterStartupRule.getCache().createRegionFactory(shortcut) + .setPartitionAttributes(paf.create()).create(regionName); + } + + private void doPuts(int numPuts) { + Region region = ClusterStartupRule.getCache().getRegion(regionName); + for (int i = 0; i < numPuts; i++) { + region.put("key" + i, "value" + i); + } + } + + private void waitForLowBucketRedundancyCount(int count) { + PartitionedRegion region = + (PartitionedRegion) ClusterStartupRule.getCache().getRegion(regionName); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( + () -> assertThat(region.getPrStats().getLowRedundancyBucketCount()).isEqualTo(count)); + } + + private void waitForMembers(int count) { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( + () -> assertThat(ClusterStartupRule.getCache().getMembers().size()).isEqualTo(count)); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java index f46ef0c..20d2f9e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java @@ -97,7 +97,7 @@ class BucketRedundancyTracker { if (redundancySatisfied) { regionRedundancyTracker.incrementLowRedundancyBucketCount(); redundancySatisfied = false; - } else if (!hasAnyCopies && updatedRedundancy >= 0) { + } else if (!hasAnyCopies && !hasEverHadCopies && updatedRedundancy >= 0) { regionRedundancyTracker.incrementLowRedundancyBucketCount(); } } else if (!redundancySatisfied && updatedRedundancy == targetRedundancy) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java index f6c2102..f9410bf 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java @@ -81,6 +81,23 @@ public class BucketRedundancyTrackerTest { } @Test + public void decrementsBucketCountOnIncrementBeforeNoCopies() { + bucketRedundancyTracker = + new BucketRedundancyTracker(2, regionRedundancyTracker); + bucketRedundancyTracker.updateStatistics(3); + // Verify decrementLowRedundancyBucketCount is invoked. Note: It won't decrement below 0. + verify(regionRedundancyTracker, times(1)).decrementLowRedundancyBucketCount(); + bucketRedundancyTracker.updateStatistics(2); + // Verify incrementLowRedundancyBucketCount is invoked. + verify(regionRedundancyTracker, times(1)).incrementLowRedundancyBucketCount(); + bucketRedundancyTracker.updateStatistics(1); + bucketRedundancyTracker.updateStatistics(2); + // Verify incrementLowRedundancyBucketCount is not invoked again when the count goes 2. + verify(regionRedundancyTracker, times(1)).incrementLowRedundancyBucketCount(); + assertEquals(1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test public void bucketCountNotDecrementedOnClosingBucketThatNeverHadCopies() { verify(regionRedundancyTracker, never()).decrementLowRedundancyBucketCount(); assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy()); @@ -104,8 +121,14 @@ public class BucketRedundancyTrackerTest { @Test public void decrementsBucketCountOnHavingAtLeastOneCopyOfBucket() { bucketRedundancyTracker.updateStatistics(1); + // Verify incrementLowRedundancyBucketCount is invoked. + verify(regionRedundancyTracker, times(1)).incrementLowRedundancyBucketCount(); bucketRedundancyTracker.updateStatistics(0); bucketRedundancyTracker.updateStatistics(1); + // Verify incrementLowRedundancyBucketCount is not invoked again when the count goes to 1. + verify(regionRedundancyTracker, times(1)).incrementLowRedundancyBucketCount(); + // Verify decrementLowRedundancyBucketCount is not invoked. + verify(regionRedundancyTracker, never()).decrementLowRedundancyBucketCount(); verify(regionRedundancyTracker, times(1)).decrementNoCopiesBucketCount(); assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy()); }