kirklund commented on a change in pull request #6373: URL: https://github.com/apache/geode/pull/6373#discussion_r688163114
########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.DistributedClearOperation; +import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PRClearCreateIndexDUnitTest implements Serializable { + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private MemberVM primary, secondary; + private ClientVM client; + + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + primary = cluster.startServerVM(0, locatorPort); + secondary = cluster.startServerVM(1, locatorPort); + + // create region on server1 first, making sure server1 has the primary bucket + primary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + Region<Object, Object> region = + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i)); + }); + + // server2 has the secondary bucket + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + }); + } + + @After + public void after() throws Exception { + primary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + + // All tests create index on secondary members. These tests are making sure we are requesting + // locks for clear on secondary members as well. If we create index on the primary, the clear + // and createIndex will run sequentially so there would be no error. But if we create index on + // the secondary member and if the secondary member will not + // request a lock for clear operation, it will result in an EntryDestroyedException when create + // index is happening. + + // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members + // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody + + @Test + // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the + // secondary member + // in the end an OP_PR_CLEAR is sent to the secondary for no effect + public void clearFromPrimaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + AsyncInvocation clear = primary.invokeAsync(PRClearCreateIndexDUnitTest::clear); + + createIndex.get(); Review comment: These calls to `AsyncInvocation.get()` should actually be `AsyncInvocation.await()` since there's no return ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.DistributedClearOperation; +import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PRClearCreateIndexDUnitTest implements Serializable { + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private MemberVM primary, secondary; Review comment: The spotless format really shouldn't be allowing this. Please declare each field on its own line. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class ClearPRDuringGIIProviderDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int TOTAL_BUCKET_NUM = 10; + protected static final int DATA_SIZE = 100; + protected static final int NUM_SERVERS = 3; + + @Parameterized.Parameter(0) + public RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public InitialImageOperation.GIITestHookType giiTestHookType; + + protected int locatorPort; + protected MemberVM[] memberVMS; + + private static final Logger logger = LogManager.getLogger(); + + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT + }; + } + + @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}") + public static Collection<Object[]> getCombinations() { + List<Object[]> params = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply}); + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage}); + params.add(new Object[] {regionShortcut, + InitialImageOperation.GIITestHookType.AfterReceivedRequestImage}); + }); + return params; + } + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1); + + @Before + public void setUp() throws Exception { + // serverVMs[1] is the accessor, which is feeder and invokes clear + // serverVMs[2] will add GII TestHook of requester + // serverVMs[3] will be GII provider for the specified bucket region + memberVMS = new MemberVM[NUM_SERVERS + 1]; + memberVMS[0] = cluster.startLocatorVM(0); + locatorPort = memberVMS[0].getPort(); + memberVMS[1] = cluster.startServerVM(1, locatorPort); + memberVMS[1].invoke(() -> initAccessor()); + for (int i = 2; i <= NUM_SERVERS; i++) { + memberVMS[i] = cluster.startServerVM(i, locatorPort); + memberVMS[i].invoke(() -> initDataStore(regionShortcut)); + } + feed("valueOne"); + verifyRegionSizes(DATA_SIZE); + } + + @After + public final void preTearDown() throws Exception { + for (int i = 1; i <= NUM_SERVERS; i++) { + memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks()); + } + } + + private void initDataStore(RegionShortcut regionShortcut) { + RegionFactory factory = getCache().createRegionFactory(regionShortcut); + factory.setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create()); + factory.create(REGION_NAME); + } + + private void initAccessor() { + RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT); + factory.setPartitionAttributes(new PartitionAttributesFactory() + .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create()); + factory.create(REGION_NAME); + } + + private void feed(String valueStub) { + memberVMS[1].invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i)); + }); + } + + private void verifyRegionSize(int expectedNum) { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(expectedNum); + } + + protected void giiTestHookSyncWithClear(boolean clearBeforeGII) { + // set test hook at server3, the provider + memberVMS[3].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + List<Integer> localBucketList = pr.getLocalBucketsListTestOnly(); + final String bucketName = "_B__testPR_" + localBucketList.get(0); + + PauseDuringGIICallback myGIITestHook = + // using bucket name for region name to ensure callback is triggered + new PauseDuringGIICallback(giiTestHookType, bucketName); + InitialImageOperation.setGIITestHook(myGIITestHook); + }); + + memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close()); + feed("valueTwo"); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver()); + }); + } + + AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut)); + AsyncInvocation asyncClear = + memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear()); + + waitForGIITeskHookStarted(memberVMS[3], giiTestHookType); + + if (clearBeforeGII) { + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + } else { + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + } + + try { + asyncGII.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async recreate region interupted" + ex.getMessage()); + } + try { + asyncClear.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async clear interupted" + ex.getMessage()); + } + + if (asyncClear.exceptionOccurred()) { + assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException); + } else { + verifyRegionSizes(0); + } + } + + @Test + public void clearBeforeGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(true); + } + + @Test + public void clearAfterGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(false); + } + + private void verifyRegionSizes(int expectedSize) { + for (int i = 2; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + logger.info("verifyRegionSizes:" + br.getFullPath() + ":" + + br.getBucketAdvisor().isPrimary() + ":" + br.size()); + } + }); + } + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> verifyRegionSize(expectedSize)); + } + } + + public void waitForGIITeskHookStarted(final MemberVM vm, + final InitialImageOperation.GIITestHookType callbacktype) { + SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { + @Override + public void run() { + + final InitialImageOperation.GIITestHook callback = + getGIITestHookForCheckingPurpose(callbacktype); + WaitCriterion ev = new WaitCriterion() { + + @Override + public boolean done() { + return (callback != null && callback.isRunning); + } + + @Override + public String description() { + return null; + } + }; + + GeodeAwaitility.await().untilAsserted(ev); + if (callback == null || !callback.isRunning) { + fail("GII tesk hook is not started yet"); + } Review comment: Please use an assertions instead: ``` assertThat(callback).isNotNull(); assertThat(callback.isRunning).isTrue(); ``` In general, I would try to avoid having something like `callback` be null so that you don't need the `isNotNull`. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class ClearPRDuringGIIProviderDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int TOTAL_BUCKET_NUM = 10; + protected static final int DATA_SIZE = 100; + protected static final int NUM_SERVERS = 3; + + @Parameterized.Parameter(0) + public RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public InitialImageOperation.GIITestHookType giiTestHookType; + + protected int locatorPort; + protected MemberVM[] memberVMS; + + private static final Logger logger = LogManager.getLogger(); + + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT + }; + } + + @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}") + public static Collection<Object[]> getCombinations() { + List<Object[]> params = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply}); + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage}); + params.add(new Object[] {regionShortcut, + InitialImageOperation.GIITestHookType.AfterReceivedRequestImage}); + }); + return params; + } + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1); + + @Before + public void setUp() throws Exception { + // serverVMs[1] is the accessor, which is feeder and invokes clear + // serverVMs[2] will add GII TestHook of requester + // serverVMs[3] will be GII provider for the specified bucket region + memberVMS = new MemberVM[NUM_SERVERS + 1]; + memberVMS[0] = cluster.startLocatorVM(0); + locatorPort = memberVMS[0].getPort(); + memberVMS[1] = cluster.startServerVM(1, locatorPort); + memberVMS[1].invoke(() -> initAccessor()); + for (int i = 2; i <= NUM_SERVERS; i++) { + memberVMS[i] = cluster.startServerVM(i, locatorPort); + memberVMS[i].invoke(() -> initDataStore(regionShortcut)); + } + feed("valueOne"); + verifyRegionSizes(DATA_SIZE); + } + + @After + public final void preTearDown() throws Exception { + for (int i = 1; i <= NUM_SERVERS; i++) { + memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks()); + } + } + + private void initDataStore(RegionShortcut regionShortcut) { + RegionFactory factory = getCache().createRegionFactory(regionShortcut); + factory.setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create()); + factory.create(REGION_NAME); + } + + private void initAccessor() { + RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT); + factory.setPartitionAttributes(new PartitionAttributesFactory() + .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create()); + factory.create(REGION_NAME); + } + + private void feed(String valueStub) { + memberVMS[1].invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i)); + }); + } + + private void verifyRegionSize(int expectedNum) { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(expectedNum); + } + + protected void giiTestHookSyncWithClear(boolean clearBeforeGII) { + // set test hook at server3, the provider + memberVMS[3].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + List<Integer> localBucketList = pr.getLocalBucketsListTestOnly(); + final String bucketName = "_B__testPR_" + localBucketList.get(0); + + PauseDuringGIICallback myGIITestHook = + // using bucket name for region name to ensure callback is triggered + new PauseDuringGIICallback(giiTestHookType, bucketName); + InitialImageOperation.setGIITestHook(myGIITestHook); + }); + + memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close()); + feed("valueTwo"); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver()); + }); + } + + AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut)); + AsyncInvocation asyncClear = + memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear()); + + waitForGIITeskHookStarted(memberVMS[3], giiTestHookType); + + if (clearBeforeGII) { + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + } else { + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + } + + try { + asyncGII.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async recreate region interupted" + ex.getMessage()); + } + try { + asyncClear.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async clear interupted" + ex.getMessage()); + } + + if (asyncClear.exceptionOccurred()) { + assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException); + } else { + verifyRegionSizes(0); + } + } + + @Test + public void clearBeforeGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(true); + } + + @Test + public void clearAfterGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(false); + } + + private void verifyRegionSizes(int expectedSize) { + for (int i = 2; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + logger.info("verifyRegionSizes:" + br.getFullPath() + ":" + + br.getBucketAdvisor().isPrimary() + ":" + br.size()); + } + }); + } + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> verifyRegionSize(expectedSize)); + } + } + + public void waitForGIITeskHookStarted(final MemberVM vm, + final InitialImageOperation.GIITestHookType callbacktype) { + SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { + @Override + public void run() { + + final InitialImageOperation.GIITestHook callback = + getGIITestHookForCheckingPurpose(callbacktype); + WaitCriterion ev = new WaitCriterion() { + + @Override + public boolean done() { + return (callback != null && callback.isRunning); + } + + @Override + public String description() { + return null; + } + }; + + GeodeAwaitility.await().untilAsserted(ev); + if (callback == null || !callback.isRunning) { + fail("GII tesk hook is not started yet"); + } + } + }; + vm.invoke(waitForCallbackStarted); + } + + private static class PauseDuringGIICallback extends InitialImageOperation.GIITestHook { + private Object lockObject = new Object(); + + public PauseDuringGIICallback(InitialImageOperation.GIITestHookType type, String region_name) { + super(type, region_name); + } + + @Override + public void reset() { + synchronized (this.lockObject) { + this.lockObject.notify(); + } + } + + @Override + public void run() { + synchronized (this.lockObject) { + try { + isRunning = true; + this.lockObject.wait(); + } catch (InterruptedException e) { + } + } + } + } // Mycallback + + private class PauseDuringClearDistributionMessageObserver + extends DistributionMessageObserver { + private CountDownLatch latch = new CountDownLatch(1); Review comment: You'll want to have a tearDown in the test class that counts this down. This will prevent the test from leaving an orphaned thread. In general, it's cleaner to have the test own the CountDownLatch and pass a reference into this inner class via a constructor. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1; + protected MemberVM dataStore2; + protected MemberVM dataStore3; + protected MemberVM accessor; + + protected ClientVM client1; + protected ClientVM client2; + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void invokeClearOnDataStoreAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void invokeClearOnAccessorAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void invokeClearFromClientAndVerifyListenerCount() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) + public void verifyTheLocksAreClearedWhenMemberDepartsAfterTakingClearLockOnRemoteMembers() + throws Exception { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + dataStore2.invoke(() -> DistributionMessageObserver.setInstance( + testHookToKillMemberCallingClearBeforeMessageProcessed())); + + AsyncInvocation ds1ClearAsync = dataStore1.invokeAsync(() -> getRegion(false).clear()); + + getBlackboard().waitForGate("CLOSE_CACHE", 30, SECONDS); Review comment: All of these timeouts should change to `GeodeAwaitility.getTimeout().getSeconds()` to avoid intermittent failures on slow machines. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.DistributedClearOperation; +import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PRClearCreateIndexDUnitTest implements Serializable { + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private MemberVM primary, secondary; + private ClientVM client; + + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + primary = cluster.startServerVM(0, locatorPort); + secondary = cluster.startServerVM(1, locatorPort); + + // create region on server1 first, making sure server1 has the primary bucket + primary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + Region<Object, Object> region = + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i)); + }); + + // server2 has the secondary bucket + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + }); + } + + @After + public void after() throws Exception { + primary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + + // All tests create index on secondary members. These tests are making sure we are requesting + // locks for clear on secondary members as well. If we create index on the primary, the clear + // and createIndex will run sequentially so there would be no error. But if we create index on + // the secondary member and if the secondary member will not + // request a lock for clear operation, it will result in an EntryDestroyedException when create + // index is happening. + + // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members + // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody + + @Test + // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the + // secondary member + // in the end an OP_PR_CLEAR is sent to the secondary for no effect + public void clearFromPrimaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); Review comment: When there's no return from an invokeAsync, the type should be `AsyncInvocation<Void>`. In general, we should always declare types on everything. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class ClearPRDuringGIIProviderDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int TOTAL_BUCKET_NUM = 10; + protected static final int DATA_SIZE = 100; + protected static final int NUM_SERVERS = 3; + + @Parameterized.Parameter(0) + public RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public InitialImageOperation.GIITestHookType giiTestHookType; + + protected int locatorPort; + protected MemberVM[] memberVMS; + + private static final Logger logger = LogManager.getLogger(); + + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT + }; + } + + @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}") + public static Collection<Object[]> getCombinations() { + List<Object[]> params = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply}); + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage}); + params.add(new Object[] {regionShortcut, + InitialImageOperation.GIITestHookType.AfterReceivedRequestImage}); + }); + return params; + } + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1); + + @Before + public void setUp() throws Exception { + // serverVMs[1] is the accessor, which is feeder and invokes clear + // serverVMs[2] will add GII TestHook of requester + // serverVMs[3] will be GII provider for the specified bucket region + memberVMS = new MemberVM[NUM_SERVERS + 1]; + memberVMS[0] = cluster.startLocatorVM(0); + locatorPort = memberVMS[0].getPort(); + memberVMS[1] = cluster.startServerVM(1, locatorPort); + memberVMS[1].invoke(() -> initAccessor()); + for (int i = 2; i <= NUM_SERVERS; i++) { + memberVMS[i] = cluster.startServerVM(i, locatorPort); + memberVMS[i].invoke(() -> initDataStore(regionShortcut)); + } + feed("valueOne"); + verifyRegionSizes(DATA_SIZE); + } + + @After + public final void preTearDown() throws Exception { + for (int i = 1; i <= NUM_SERVERS; i++) { + memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks()); + } + } + + private void initDataStore(RegionShortcut regionShortcut) { + RegionFactory factory = getCache().createRegionFactory(regionShortcut); + factory.setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create()); + factory.create(REGION_NAME); + } + + private void initAccessor() { + RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT); + factory.setPartitionAttributes(new PartitionAttributesFactory() + .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create()); + factory.create(REGION_NAME); + } + + private void feed(String valueStub) { + memberVMS[1].invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i)); + }); + } + + private void verifyRegionSize(int expectedNum) { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(expectedNum); + } + + protected void giiTestHookSyncWithClear(boolean clearBeforeGII) { + // set test hook at server3, the provider + memberVMS[3].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + List<Integer> localBucketList = pr.getLocalBucketsListTestOnly(); + final String bucketName = "_B__testPR_" + localBucketList.get(0); + + PauseDuringGIICallback myGIITestHook = + // using bucket name for region name to ensure callback is triggered + new PauseDuringGIICallback(giiTestHookType, bucketName); + InitialImageOperation.setGIITestHook(myGIITestHook); + }); + + memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close()); + feed("valueTwo"); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver()); + }); + } + + AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut)); + AsyncInvocation asyncClear = + memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear()); + + waitForGIITeskHookStarted(memberVMS[3], giiTestHookType); + + if (clearBeforeGII) { + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + } else { + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + } + + try { + asyncGII.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async recreate region interupted" + ex.getMessage()); + } + try { + asyncClear.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async clear interupted" + ex.getMessage()); + } + + if (asyncClear.exceptionOccurred()) { + assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException); + } else { + verifyRegionSizes(0); + } + } + + @Test + public void clearBeforeGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(true); + } + + @Test + public void clearAfterGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(false); + } + + private void verifyRegionSizes(int expectedSize) { + for (int i = 2; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + logger.info("verifyRegionSizes:" + br.getFullPath() + ":" + + br.getBucketAdvisor().isPrimary() + ":" + br.size()); + } + }); + } + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> verifyRegionSize(expectedSize)); + } + } + + public void waitForGIITeskHookStarted(final MemberVM vm, + final InitialImageOperation.GIITestHookType callbacktype) { + SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { + @Override + public void run() { + + final InitialImageOperation.GIITestHook callback = + getGIITestHookForCheckingPurpose(callbacktype); + WaitCriterion ev = new WaitCriterion() { Review comment: Please don't use `WaitCriterion` anymore. You can just directly specify the content of `done()` within the call to `await()`: ``` GeodeAwaitility.await().until(() -> callback != null && callback.isRunning); ``` Note that you want to use `await().until` instead of `await().untilAsserted` since `callback != null && callback.isRunning` never throws anything. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.junit.rules.VMProvider.invokeInEveryMember; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.cache.query.Index; +import org.apache.geode.cache.query.IndexStatistics; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.data.City; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.ClientCacheRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class PRClearQueryIndexDUnitTest { + public static final String MUMBAI_QUERY = "select * from /cities c where c.name = 'MUMBAI'"; + public static final String ID_10_QUERY = "select * from /cities c where c.id = 10"; + @ClassRule + public static ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private static MemberVM server1; + private static MemberVM server2; + + private static DUnitBlackboard blackboard; + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public ExecutorServiceRule executor = ExecutorServiceRule.builder().build(); + + private ClientCache clientCache; + private Region cities; + + // class test setup. set up the servers, regions and indexes on the servers + @BeforeClass + public static void beforeClass() { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + server1 = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + server2 = cluster.startServerVM(2, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + + server1.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + // create indexes + QueryService queryService = cache.getQueryService(); + queryService.createKeyIndex("cityId", "c.id", "/cities c"); + queryService.createIndex("cityName", "c.name", "/cities c"); + assertThat(cache.getQueryService().getIndexes(region)) + .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName"); + }); + + server2.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + assertThat(cache.getQueryService().getIndexes(region)) + .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName"); + }); + } + + // before every test method, create the client cache and region + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + clientCache = clientCacheRule.withLocatorConnection(locatorPort).createCache(); + cities = clientCacheRule.createProxyRegion("cities"); + } + + @Test + public void clearOnEmptyRegion() throws Exception { + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i))); + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + } + + @Test + public void createIndexWhileClear() throws Exception { + IntStream.range(0, 1000).forEach(i -> cities.put(i, new City(i))); + + // create index while clear + AsyncInvocation createIndex = server1.invokeAsync("create index", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c"); + assertThat(cityZip).isNotNull(); + }); + + // do clear for 3 times at the same time to increease the concurrency of clear and createIndex + for (int i = 0; i < 3; i++) { + cities.clear(); + } + createIndex.await(); + + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + QueryService queryService = clientCache.getQueryService(); + Query query = + queryService.newQuery("select * from /cities c where c.zip < " + (City.ZIP_START + 10)); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + + IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i))); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(10); + } + + @Test + public void createIndexWhileClearOnReplicateRegion() throws Exception { + invokeInEveryMember(() -> { + Cache cache = ClusterStartupRule.getCache(); + cache.createRegionFactory(RegionShortcut.PARTITION) + .create("replicateCities"); + }, server1, server2); + + Region replicateCities = clientCacheRule.createProxyRegion("replicateCities"); + IntStream.range(0, 1000).forEach(i -> replicateCities.put(i, new City(i))); + + // create index while clear + AsyncInvocation createIndex = server1.invokeAsync("create index on replicate regions", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip_replicate", "c.zip", "/replicateCities c"); + assertThat(cityZip).isNotNull(); + }); + + // do clear at the same time for 3 timese + for (int i = 0; i < 3; i++) { + replicateCities.clear(); + } + createIndex.await(); + + invokeInEveryMember(() -> { + verifyIndexesAfterClear("replicateCities", "cityZip_replicate"); + }, server1, server2); + + QueryService queryService = clientCache.getQueryService(); + Query query = + queryService + .newQuery("select * from /replicateCities c where c.zip < " + (City.ZIP_START + 10)); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + + IntStream.range(0, 10).forEach(i -> replicateCities.put(i, new City(i))); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(10); + } + + @Test + public void removeIndexWhileClear() throws Exception { + // create cityZip index + server1.invoke("create index", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c"); + assertThat(cityZip).isNotNull(); + }); + + // remove index while clear + // removeIndex has to be invoked on each server. It's not distributed + AsyncInvocation removeIndex1 = server1.invokeAsync("remove index", + PRClearQueryIndexDUnitTest::removeCityZipIndex); + AsyncInvocation removeIndex2 = server2.invokeAsync("remove index", + PRClearQueryIndexDUnitTest::removeCityZipIndex); + + cities.clear(); + removeIndex1.await(); + removeIndex2.await(); + + // make sure removeIndex and clear operations are successful + invokeInEveryMember(() -> { + InternalCache internalCache = ClusterStartupRule.getCache(); + QueryService qs = internalCache.getQueryService(); + Region region = internalCache.getRegion("cities"); + assertThat(region.size()).isEqualTo(0); + // verify only 2 indexes created in the beginning of the tests exist + assertThat(qs.getIndexes(region)).extracting(Index::getName) + .containsExactlyInAnyOrder("cityId", "cityName"); + }, server1, server2); + } + + private static void removeCityZipIndex() { + Cache cache = ClusterStartupRule.getCache(); + QueryService qs = cache.getQueryService(); + Region<Object, Object> region = cache.getRegion("cities"); + Index cityZip = qs.getIndex(region, "cityZip"); + if (cityZip != null) { + qs.removeIndex(cityZip); + } + } + + @Test + public void verifyQuerySucceedsAfterClear() throws Exception { + // put in some data + IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i))); + + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery(MUMBAI_QUERY); + Query query2 = queryService.newQuery(ID_10_QUERY); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(50); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(1); + + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0); + } + + private static void verifyIndexesAfterClear(String regionName, String... indexes) { + InternalCache internalCache = ClusterStartupRule.getCache(); + QueryService qs = internalCache.getQueryService(); + Region region = internalCache.getRegion(regionName); + assertThat(region.size()).isEqualTo(0); + for (String indexName : indexes) { + Index index = qs.getIndex(region, indexName); + IndexStatistics statistics = index.getStatistics(); + assertThat(statistics.getNumberOfKeys()).isEqualTo(0); + assertThat(statistics.getNumberOfValues()).isEqualTo(0); + } + } + + @Test + public void concurrentClearAndQuery() { + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery(MUMBAI_QUERY); + Query query2 = queryService.newQuery(ID_10_QUERY); + + IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i))); + + server1.invokeAsync(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + region.clear(); + }); + + await().untilAsserted(() -> { + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0); + }); + } + + @Test + public void concurrentClearAndPut() throws Exception { + AsyncInvocation puts = server1.invokeAsync(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + for (int i = 0; i < 1000; i++) { + // wait for gate to open + getBlackboard().waitForGate("proceedToPut", 60, TimeUnit.SECONDS); + region.put(i, new City(i)); + } + }); + + AsyncInvocation clears = server2.invokeAsync(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + // do clear 10 times + for (int i = 0; i < 10; i++) { + try { + // don't allow put to proceed. It's like "close the gate" + getBlackboard().clearGate("proceedToPut"); + region.clear(); + verifyIndexesAfterClear("cities", "cityId", "cityName"); + } finally { + // allow put to proceed. It's like "open the gate" + getBlackboard().signalGate("proceedToPut"); + } + } + }); + + puts.await(); + clears.await(); + } + + @Test + public void serverLeavingAndJoiningWhilePutAndClear() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + Future<Void> startStopServer = executor.submit(() -> { + for (int i = 0; i < 3; i++) { + MemberVM server3 = cluster.startServerVM(3, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + server3.stop(false); + } + }); + + Future<Void> putAndClear = executor.submit(() -> { + for (int i = 0; i < 30; i++) { + IntStream.range(0, 100).forEach(j -> cities.put(j, new City(j))); + try { + cities.clear(); + + // only verify if clear is successful + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery(MUMBAI_QUERY); + Query query2 = queryService.newQuery(ID_10_QUERY); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0); + } catch (ServerOperationException e) { + assertThat(e.getCause().getMessage()) + .contains("Unable to clear all the buckets from the partitioned region cities") + .contains("either data (buckets) moved or member departed"); + } + } + }); + startStopServer.get(60, TimeUnit.SECONDS); + putAndClear.get(60, TimeUnit.SECONDS); + } + + private static DUnitBlackboard getBlackboard() { + if (blackboard == null) { + blackboard = new DUnitBlackboard(); + } + return blackboard; + } Review comment: Please use `DistributedBlackboard` instead: ``` @Rule public DistributedBlackboard blackboard = new DistributedBlackboard(); ``` ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1; + protected MemberVM dataStore2; + protected MemberVM dataStore3; + protected MemberVM accessor; + + protected ClientVM client1; + protected ClientVM client2; + + private static volatile DUnitBlackboard blackboard; Review comment: Please use the DistributedBlackboard rule instead: ``` @Rule public DistributedBlackboard blackboard = new DistributedBlackboard(); ``` ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.junit.rules.VMProvider.invokeInEveryMember; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.cache.query.Index; +import org.apache.geode.cache.query.IndexStatistics; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.data.City; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.ClientCacheRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class PRClearQueryIndexDUnitTest { + public static final String MUMBAI_QUERY = "select * from /cities c where c.name = 'MUMBAI'"; + public static final String ID_10_QUERY = "select * from /cities c where c.id = 10"; + @ClassRule + public static ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private static MemberVM server1; + private static MemberVM server2; + + private static DUnitBlackboard blackboard; + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public ExecutorServiceRule executor = ExecutorServiceRule.builder().build(); + + private ClientCache clientCache; + private Region cities; + + // class test setup. set up the servers, regions and indexes on the servers + @BeforeClass + public static void beforeClass() { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + server1 = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + server2 = cluster.startServerVM(2, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + + server1.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + // create indexes + QueryService queryService = cache.getQueryService(); + queryService.createKeyIndex("cityId", "c.id", "/cities c"); + queryService.createIndex("cityName", "c.name", "/cities c"); + assertThat(cache.getQueryService().getIndexes(region)) + .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName"); + }); + + server2.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + assertThat(cache.getQueryService().getIndexes(region)) + .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName"); + }); + } + + // before every test method, create the client cache and region + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + clientCache = clientCacheRule.withLocatorConnection(locatorPort).createCache(); + cities = clientCacheRule.createProxyRegion("cities"); + } + + @Test + public void clearOnEmptyRegion() throws Exception { + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i))); + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + } + + @Test + public void createIndexWhileClear() throws Exception { + IntStream.range(0, 1000).forEach(i -> cities.put(i, new City(i))); + + // create index while clear + AsyncInvocation createIndex = server1.invokeAsync("create index", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c"); + assertThat(cityZip).isNotNull(); + }); + + // do clear for 3 times at the same time to increease the concurrency of clear and createIndex + for (int i = 0; i < 3; i++) { + cities.clear(); + } + createIndex.await(); + + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + QueryService queryService = clientCache.getQueryService(); + Query query = + queryService.newQuery("select * from /cities c where c.zip < " + (City.ZIP_START + 10)); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + + IntStream.range(0, 10).forEach(i -> cities.put(i, new City(i))); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(10); + } + + @Test + public void createIndexWhileClearOnReplicateRegion() throws Exception { + invokeInEveryMember(() -> { + Cache cache = ClusterStartupRule.getCache(); + cache.createRegionFactory(RegionShortcut.PARTITION) + .create("replicateCities"); + }, server1, server2); + + Region replicateCities = clientCacheRule.createProxyRegion("replicateCities"); + IntStream.range(0, 1000).forEach(i -> replicateCities.put(i, new City(i))); + + // create index while clear + AsyncInvocation createIndex = server1.invokeAsync("create index on replicate regions", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip_replicate", "c.zip", "/replicateCities c"); + assertThat(cityZip).isNotNull(); + }); + + // do clear at the same time for 3 timese + for (int i = 0; i < 3; i++) { + replicateCities.clear(); + } + createIndex.await(); + + invokeInEveryMember(() -> { + verifyIndexesAfterClear("replicateCities", "cityZip_replicate"); + }, server1, server2); + + QueryService queryService = clientCache.getQueryService(); + Query query = + queryService + .newQuery("select * from /replicateCities c where c.zip < " + (City.ZIP_START + 10)); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + + IntStream.range(0, 10).forEach(i -> replicateCities.put(i, new City(i))); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(10); + } + + @Test + public void removeIndexWhileClear() throws Exception { + // create cityZip index + server1.invoke("create index", () -> { + Cache cache = ClusterStartupRule.getCache(); + QueryService queryService = cache.getQueryService(); + Index cityZip = queryService.createIndex("cityZip", "c.zip", "/cities c"); + assertThat(cityZip).isNotNull(); + }); + + // remove index while clear + // removeIndex has to be invoked on each server. It's not distributed + AsyncInvocation removeIndex1 = server1.invokeAsync("remove index", + PRClearQueryIndexDUnitTest::removeCityZipIndex); + AsyncInvocation removeIndex2 = server2.invokeAsync("remove index", + PRClearQueryIndexDUnitTest::removeCityZipIndex); + + cities.clear(); + removeIndex1.await(); + removeIndex2.await(); + + // make sure removeIndex and clear operations are successful + invokeInEveryMember(() -> { + InternalCache internalCache = ClusterStartupRule.getCache(); + QueryService qs = internalCache.getQueryService(); + Region region = internalCache.getRegion("cities"); + assertThat(region.size()).isEqualTo(0); + // verify only 2 indexes created in the beginning of the tests exist + assertThat(qs.getIndexes(region)).extracting(Index::getName) + .containsExactlyInAnyOrder("cityId", "cityName"); + }, server1, server2); + } + + private static void removeCityZipIndex() { + Cache cache = ClusterStartupRule.getCache(); + QueryService qs = cache.getQueryService(); + Region<Object, Object> region = cache.getRegion("cities"); + Index cityZip = qs.getIndex(region, "cityZip"); + if (cityZip != null) { + qs.removeIndex(cityZip); + } + } + + @Test + public void verifyQuerySucceedsAfterClear() throws Exception { + // put in some data + IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i))); + + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery(MUMBAI_QUERY); + Query query2 = queryService.newQuery(ID_10_QUERY); + assertThat(((SelectResults) query.execute()).size()).isEqualTo(50); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(1); + + cities.clear(); + invokeInEveryMember(() -> { + verifyIndexesAfterClear("cities", "cityId", "cityName"); + }, server1, server2); + + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0); + } + + private static void verifyIndexesAfterClear(String regionName, String... indexes) { + InternalCache internalCache = ClusterStartupRule.getCache(); + QueryService qs = internalCache.getQueryService(); + Region region = internalCache.getRegion(regionName); + assertThat(region.size()).isEqualTo(0); + for (String indexName : indexes) { + Index index = qs.getIndex(region, indexName); + IndexStatistics statistics = index.getStatistics(); + assertThat(statistics.getNumberOfKeys()).isEqualTo(0); + assertThat(statistics.getNumberOfValues()).isEqualTo(0); + } + } + + @Test + public void concurrentClearAndQuery() { + QueryService queryService = clientCache.getQueryService(); + Query query = queryService.newQuery(MUMBAI_QUERY); + Query query2 = queryService.newQuery(ID_10_QUERY); + + IntStream.range(0, 100).forEach(i -> cities.put(i, new City(i))); + + server1.invokeAsync(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + region.clear(); + }); + + await().untilAsserted(() -> { + assertThat(((SelectResults) query.execute()).size()).isEqualTo(0); + assertThat(((SelectResults) query2.execute()).size()).isEqualTo(0); + }); + } + + @Test + public void concurrentClearAndPut() throws Exception { + AsyncInvocation puts = server1.invokeAsync(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + for (int i = 0; i < 1000; i++) { + // wait for gate to open + getBlackboard().waitForGate("proceedToPut", 60, TimeUnit.SECONDS); Review comment: We shouldn't be using timeouts less than the default that is in GeodeAwaitility. You can simply use: ``` getBlackboard().waitForGate("proceedToPut", GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS); ``` ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class ClearPRDuringGIIProviderDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int TOTAL_BUCKET_NUM = 10; + protected static final int DATA_SIZE = 100; + protected static final int NUM_SERVERS = 3; + + @Parameterized.Parameter(0) + public RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public InitialImageOperation.GIITestHookType giiTestHookType; + + protected int locatorPort; + protected MemberVM[] memberVMS; + + private static final Logger logger = LogManager.getLogger(); + + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT + }; + } + + @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}") + public static Collection<Object[]> getCombinations() { + List<Object[]> params = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply}); + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage}); + params.add(new Object[] {regionShortcut, + InitialImageOperation.GIITestHookType.AfterReceivedRequestImage}); + }); + return params; + } + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1); + + @Before + public void setUp() throws Exception { + // serverVMs[1] is the accessor, which is feeder and invokes clear + // serverVMs[2] will add GII TestHook of requester + // serverVMs[3] will be GII provider for the specified bucket region + memberVMS = new MemberVM[NUM_SERVERS + 1]; + memberVMS[0] = cluster.startLocatorVM(0); + locatorPort = memberVMS[0].getPort(); + memberVMS[1] = cluster.startServerVM(1, locatorPort); + memberVMS[1].invoke(() -> initAccessor()); + for (int i = 2; i <= NUM_SERVERS; i++) { + memberVMS[i] = cluster.startServerVM(i, locatorPort); + memberVMS[i].invoke(() -> initDataStore(regionShortcut)); + } + feed("valueOne"); + verifyRegionSizes(DATA_SIZE); + } + + @After + public final void preTearDown() throws Exception { + for (int i = 1; i <= NUM_SERVERS; i++) { + memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks()); + } + } + + private void initDataStore(RegionShortcut regionShortcut) { + RegionFactory factory = getCache().createRegionFactory(regionShortcut); + factory.setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create()); + factory.create(REGION_NAME); + } + + private void initAccessor() { + RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT); + factory.setPartitionAttributes(new PartitionAttributesFactory() + .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create()); + factory.create(REGION_NAME); + } + + private void feed(String valueStub) { + memberVMS[1].invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i)); + }); + } + + private void verifyRegionSize(int expectedNum) { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(expectedNum); + } + + protected void giiTestHookSyncWithClear(boolean clearBeforeGII) { + // set test hook at server3, the provider + memberVMS[3].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + List<Integer> localBucketList = pr.getLocalBucketsListTestOnly(); + final String bucketName = "_B__testPR_" + localBucketList.get(0); + + PauseDuringGIICallback myGIITestHook = + // using bucket name for region name to ensure callback is triggered + new PauseDuringGIICallback(giiTestHookType, bucketName); + InitialImageOperation.setGIITestHook(myGIITestHook); + }); + + memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close()); + feed("valueTwo"); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver()); + }); + } + + AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut)); + AsyncInvocation asyncClear = + memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear()); + + waitForGIITeskHookStarted(memberVMS[3], giiTestHookType); + + if (clearBeforeGII) { + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + } else { + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + } + + try { + asyncGII.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async recreate region interupted" + ex.getMessage()); + } + try { + asyncClear.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async clear interupted" + ex.getMessage()); + } + + if (asyncClear.exceptionOccurred()) { + assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException); + } else { + verifyRegionSizes(0); + } + } + + @Test + public void clearBeforeGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(true); + } + + @Test + public void clearAfterGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(false); + } + + private void verifyRegionSizes(int expectedSize) { + for (int i = 2; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + logger.info("verifyRegionSizes:" + br.getFullPath() + ":" + + br.getBucketAdvisor().isPrimary() + ":" + br.size()); + } + }); + } + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> verifyRegionSize(expectedSize)); + } + } + + public void waitForGIITeskHookStarted(final MemberVM vm, + final InitialImageOperation.GIITestHookType callbacktype) { + SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { + @Override + public void run() { + + final InitialImageOperation.GIITestHook callback = + getGIITestHookForCheckingPurpose(callbacktype); + WaitCriterion ev = new WaitCriterion() { + + @Override + public boolean done() { + return (callback != null && callback.isRunning); + } + + @Override + public String description() { + return null; + } + }; + + GeodeAwaitility.await().untilAsserted(ev); + if (callback == null || !callback.isRunning) { + fail("GII tesk hook is not started yet"); + } + } + }; + vm.invoke(waitForCallbackStarted); + } + + private static class PauseDuringGIICallback extends InitialImageOperation.GIITestHook { + private Object lockObject = new Object(); + + public PauseDuringGIICallback(InitialImageOperation.GIITestHookType type, String region_name) { + super(type, region_name); + } + + @Override + public void reset() { + synchronized (this.lockObject) { + this.lockObject.notify(); + } + } + + @Override + public void run() { + synchronized (this.lockObject) { + try { + isRunning = true; + this.lockObject.wait(); + } catch (InterruptedException e) { + } Review comment: Please use ErrorCollector or DistributedErrorCollector instead of eating exceptions: ``` @Rule public DistributedErrorCollector errorCollector = new DistributedErrorCollector(); ``` ``` } catch (InterruptedException e) { errorCollector.addError(e); } ``` You'll need to change PauseDuringGIICallback to non-static to use the Rule or change PauseDuringGIICallback to have a field of type DistributedErrorCollector which is passed in. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.SerializableCallableIF; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PartitionedRegionAfterClearNotificationDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 100; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1; + protected MemberVM dataStore2; + protected MemberVM dataStore3; + protected MemberVM accessor; + + protected ClientVM client1; + protected ClientVM client2; + + private static volatile DUnitBlackboard blackboard; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(7); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort); + dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort); + dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort); + accessor = cluster.startServerVM(4, getProperties(), locatorPort); + + client1 = cluster.startClientVM(5, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(6, + c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort))); + + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + dataStore3.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + getBlackboard().initBlackboard(); + } + + protected RegionShortcut getRegionShortCut() { + return RegionShortcut.PARTITION_REDUNDANT; + } + + protected Properties getProperties() { + Properties properties = new Properties(); + return properties; + } + + private Region getRegion(boolean isClient) { + if (isClient) { + return getClientCache().getRegion(REGION_NAME); + } else { + return getCache().getRegion(REGION_NAME); + } + } + + private void verifyRegionSize(boolean isClient, int expectedNum) { + GeodeAwaitility.await() + .untilAsserted(() -> assertThat(getRegion(isClient).size()).isEqualTo(expectedNum)); + } + + private void initClientCache() { + Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) + .create(REGION_NAME); + region.registerInterestForAllKeys(InterestResultPolicy.KEYS); + } + + private void stopServers() { + List<CacheServer> cacheServers = getCache().getCacheServers(); + for (CacheServer server : cacheServers) { + server.stop(); + } + } + + private void initDataStore() { + getCache().createRegionFactory(getRegionShortCut()) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void initAccessor() { + RegionShortcut shortcut = getRegionShortCut(); + getCache().createRegionFactory(shortcut) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .addCacheListener(new CountingCacheListener()) + .create(REGION_NAME); + } + + private void feed(boolean isClient) { + Region region = getRegion(isClient); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + } + + private void verifyServerRegionSize(int expectedNum) { + accessor.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore1.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore2.invoke(() -> verifyRegionSize(false, expectedNum)); + dataStore3.invoke(() -> verifyRegionSize(false, expectedNum)); + } + + private void verifyClientRegionSize(int expectedNum) { + client1.invoke(() -> verifyRegionSize(true, expectedNum)); + client2.invoke(() -> verifyRegionSize(true, expectedNum)); + } + + private void verifyCacheListenerTriggerCount(MemberVM serverVM) { + SerializableCallableIF<Integer> getListenerTriggerCount = () -> { + CountingCacheListener countingCacheListener = + (CountingCacheListener) getRegion(false).getAttributes() + .getCacheListeners()[0]; + return countingCacheListener.getClears(); + }; + + int count = accessor.invoke(getListenerTriggerCount) + + dataStore1.invoke(getListenerTriggerCount) + + dataStore2.invoke(getListenerTriggerCount) + + dataStore3.invoke(getListenerTriggerCount); + assertThat(count).isEqualTo(4); + + if (serverVM != null) { + assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1); + } + } + + @Test + public void invokeClearOnDataStoreAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test + public void invokeClearOnAccessorAndVerifyListenerCount() { + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + accessor.invoke(() -> getRegion(false).clear()); + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(accessor); + } + + @Test + public void invokeClearFromClientAndVerifyListenerCount() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromClientWithAccessorAsServer() { + dataStore1.invoke(this::stopServers); + dataStore2.invoke(this::stopServers); + dataStore3.invoke(this::stopServers); + + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + client1.invoke(() -> feed(true)); + verifyClientRegionSize(NUM_ENTRIES); + verifyServerRegionSize(NUM_ENTRIES); + + client1.invoke(() -> getRegion(true).clear()); + + verifyServerRegionSize(0); + verifyClientRegionSize(0); + verifyCacheListenerTriggerCount(null); + } + + @Test + public void invokeClearFromDataStoreWithClientInterest() { + client1.invoke(this::initClientCache); + client2.invoke(this::initClientCache); + + accessor.invoke(() -> feed(false)); + verifyServerRegionSize(NUM_ENTRIES); + + dataStore1.invoke(() -> getRegion(false).clear()); + + verifyServerRegionSize(0); + verifyCacheListenerTriggerCount(dataStore1); + } + + @Test(expected = AssertionError.class) Review comment: Please use AssertJ's `catchThrowable` instead. If any line throws AssertionError, this test will still fail. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClearPRDuringGIIProviderDUnitTest.java ########## @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_OVERFLOW; +import static org.apache.geode.cache.RegionShortcut.PARTITION_REDUNDANT_PERSISTENT; +import static org.apache.geode.internal.cache.InitialImageOperation.getGIITestHookForCheckingPurpose; +import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.IntStream; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.SerializableRunnable; +import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class ClearPRDuringGIIProviderDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int TOTAL_BUCKET_NUM = 10; + protected static final int DATA_SIZE = 100; + protected static final int NUM_SERVERS = 3; + + @Parameterized.Parameter(0) + public RegionShortcut regionShortcut; + + @Parameterized.Parameter(1) + public InitialImageOperation.GIITestHookType giiTestHookType; + + protected int locatorPort; + protected MemberVM[] memberVMS; + + private static final Logger logger = LogManager.getLogger(); + + static RegionShortcut[] regionTypes() { + return new RegionShortcut[] { + PARTITION_REDUNDANT, PARTITION_REDUNDANT_OVERFLOW, PARTITION_REDUNDANT_PERSISTENT + }; + } + + @Parameterized.Parameters(name = "{index}: regionShortcut={0} {1}") + public static Collection<Object[]> getCombinations() { + List<Object[]> params = new ArrayList<>(); + RegionShortcut[] regionShortcuts = regionTypes(); + Arrays.stream(regionShortcuts).forEach(regionShortcut -> { + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.AfterSentImageReply}); + params.add( + new Object[] {regionShortcut, InitialImageOperation.GIITestHookType.DuringPackingImage}); + params.add(new Object[] {regionShortcut, + InitialImageOperation.GIITestHookType.AfterReceivedRequestImage}); + }); + return params; + } + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(NUM_SERVERS + 1); + + @Before + public void setUp() throws Exception { + // serverVMs[1] is the accessor, which is feeder and invokes clear + // serverVMs[2] will add GII TestHook of requester + // serverVMs[3] will be GII provider for the specified bucket region + memberVMS = new MemberVM[NUM_SERVERS + 1]; + memberVMS[0] = cluster.startLocatorVM(0); + locatorPort = memberVMS[0].getPort(); + memberVMS[1] = cluster.startServerVM(1, locatorPort); + memberVMS[1].invoke(() -> initAccessor()); + for (int i = 2; i <= NUM_SERVERS; i++) { + memberVMS[i] = cluster.startServerVM(i, locatorPort); + memberVMS[i].invoke(() -> initDataStore(regionShortcut)); + } + feed("valueOne"); + verifyRegionSizes(DATA_SIZE); + } + + @After + public final void preTearDown() throws Exception { + for (int i = 1; i <= NUM_SERVERS; i++) { + memberVMS[i].invoke(() -> InitialImageOperation.resetAllGIITestHooks()); + } + } + + private void initDataStore(RegionShortcut regionShortcut) { + RegionFactory factory = getCache().createRegionFactory(regionShortcut); + factory.setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(TOTAL_BUCKET_NUM).create()); + factory.create(REGION_NAME); + } + + private void initAccessor() { + RegionFactory factory = getCache().createRegionFactory(PARTITION_REDUNDANT); + factory.setPartitionAttributes(new PartitionAttributesFactory() + .setTotalNumBuckets(TOTAL_BUCKET_NUM).setLocalMaxMemory(0).create()); + factory.create(REGION_NAME); + } + + private void feed(String valueStub) { + memberVMS[1].invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + IntStream.range(0, DATA_SIZE).forEach(i -> region.put(i, valueStub + i)); + }); + } + + private void verifyRegionSize(int expectedNum) { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(expectedNum); + } + + protected void giiTestHookSyncWithClear(boolean clearBeforeGII) { + // set test hook at server3, the provider + memberVMS[3].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + List<Integer> localBucketList = pr.getLocalBucketsListTestOnly(); + final String bucketName = "_B__testPR_" + localBucketList.get(0); + + PauseDuringGIICallback myGIITestHook = + // using bucket name for region name to ensure callback is triggered + new PauseDuringGIICallback(giiTestHookType, bucketName); + InitialImageOperation.setGIITestHook(myGIITestHook); + }); + + memberVMS[2].invoke(() -> getCache().getRegion(REGION_NAME).close()); + feed("valueTwo"); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + DistributionMessageObserver.setInstance(new PauseDuringClearDistributionMessageObserver()); + }); + } + + AsyncInvocation asyncGII = memberVMS[2].invokeAsync(() -> initDataStore(regionShortcut)); + AsyncInvocation asyncClear = + memberVMS[1].invokeAsync(() -> getCache().getRegion(REGION_NAME).clear()); + + waitForGIITeskHookStarted(memberVMS[3], giiTestHookType); + + if (clearBeforeGII) { + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + } else { + memberVMS[3].invoke(() -> { + InitialImageOperation.resetGIITestHook(giiTestHookType, true); + }); + + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PauseDuringClearDistributionMessageObserver observer = + (PauseDuringClearDistributionMessageObserver) DistributionMessageObserver + .getInstance(); + DistributionMessageObserver.setInstance(null); + observer.latch.countDown(); + }); + } + } + + try { + asyncGII.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async recreate region interupted" + ex.getMessage()); + } + try { + asyncClear.join(10000); + } catch (InterruptedException ex) { + Assert.fail("Async clear interupted" + ex.getMessage()); + } + + if (asyncClear.exceptionOccurred()) { + assertThat(asyncClear.getException() instanceof PartitionedRegionPartialClearException); + } else { + verifyRegionSizes(0); + } + } + + @Test + public void clearBeforeGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(true); + } + + @Test + public void clearAfterGIIShouldClearTheRegion() { + giiTestHookSyncWithClear(false); + } + + private void verifyRegionSizes(int expectedSize) { + for (int i = 2; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> { + PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(REGION_NAME); + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + logger.info("verifyRegionSizes:" + br.getFullPath() + ":" + + br.getBucketAdvisor().isPrimary() + ":" + br.size()); + } + }); + } + for (int i = 1; i < memberVMS.length; i++) { + memberVMS[i].invoke(() -> verifyRegionSize(expectedSize)); + } + } + + public void waitForGIITeskHookStarted(final MemberVM vm, + final InitialImageOperation.GIITestHookType callbacktype) { + SerializableRunnable waitForCallbackStarted = new SerializableRunnable() { + @Override + public void run() { + + final InitialImageOperation.GIITestHook callback = + getGIITestHookForCheckingPurpose(callbacktype); + WaitCriterion ev = new WaitCriterion() { + + @Override + public boolean done() { + return (callback != null && callback.isRunning); + } + + @Override + public String description() { + return null; + } + }; + + GeodeAwaitility.await().untilAsserted(ev); + if (callback == null || !callback.isRunning) { + fail("GII tesk hook is not started yet"); + } + } + }; + vm.invoke(waitForCallbackStarted); + } + + private static class PauseDuringGIICallback extends InitialImageOperation.GIITestHook { + private Object lockObject = new Object(); Review comment: Any synchronization object should be final, so just add the final keyword. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
