mhansonp commented on a change in pull request #6310: URL: https://github.com/apache/geode/pull/6310#discussion_r612880592
########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearQueryIndexDUnitTest.java ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.apache.geode.distributed.ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.junit.rules.VMProvider.invokeInEveryMember; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.cache.query.Index; +import org.apache.geode.cache.query.IndexStatistics; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.data.City; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.ClientCacheRule; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; + +public class PRClearQueryIndexDUnitTest { + public static final String MUMBAI_QUERY = "select * from /cities c where c.name = 'MUMBAI'"; + public static final String ID_10_QUERY = "select * from /cities c where c.id = 10"; + @ClassRule + public static ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private static MemberVM server1; + private static MemberVM server2; + + private static DUnitBlackboard blackboard; + + @Rule + public ClientCacheRule clientCacheRule = new ClientCacheRule(); + + @Rule + public ExecutorServiceRule executor = ExecutorServiceRule.builder().build(); + + private ClientCache clientCache; + private Region cities; + + // class test setup. set up the servers, regions and indexes on the servers + @BeforeClass + public static void beforeClass() { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + server1 = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + server2 = cluster.startServerVM(2, s -> s.withConnectionToLocator(locatorPort) + .withProperty(SERIALIZABLE_OBJECT_FILTER, "org.apache.geode.cache.query.data.*") + .withRegion(RegionShortcut.PARTITION, "cities")); + + server1.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + // create indexes + QueryService queryService = cache.getQueryService(); + queryService.createKeyIndex("cityId", "c.id", "/cities c"); + queryService.createIndex("cityName", "c.name", "/cities c"); + assertThat(cache.getQueryService().getIndexes(region)) + .extracting(Index::getName).containsExactlyInAnyOrder("cityId", "cityName"); + }); + + server2.invoke(() -> { + Cache cache = ClusterStartupRule.getCache(); + Region region = cache.getRegion("cities"); + assertThat(cache.getQueryService().getIndexes(region)) Review comment: Is there a replication time for this or something that can cause it to be flaky? ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.DistributedClearOperation; +import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PRClearCreateIndexDUnitTest implements Serializable { + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private MemberVM primary, secondary; + private ClientVM client; + + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + primary = cluster.startServerVM(0, locatorPort); + secondary = cluster.startServerVM(1, locatorPort); + + // create region on server1 first, making sure server1 has the primary bucket + primary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + Region<Object, Object> region = + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i)); + }); + + // server2 has the secondary bucket + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + }); + } + + @After + public void after() throws Exception { + primary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + + // All tests create index on secondary members. These tests are making sure we are requesting + // locks for clear on secondary members as well. If we create index on the primary, the clear + // and createIndex will run sequentially so there would be no error. But if we create index on + // the secondary member and if the secondary member will not + // request a lock for clear operation, it will result in an EntryDestroyedException when create + // index is happening. + + // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members + // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody + + @Test + // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the + // secondary member + // in the end an OP_PR_CLEAR is sent to the secondary for no effect + public void clearFromPrimaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + AsyncInvocation clear = primary.invokeAsync(PRClearCreateIndexDUnitTest::clear); + + createIndex.get(); + clear.get(); + + // assert that secondary member received these messages + primary.invoke(() -> verifyEvents(false, false, false, false)); + secondary.invoke(() -> verifyEvents(false, true, true, true)); + } + + @Test + // all local buckets are secondary, so an OP_PR_CLEAR is sent to the primary member, from there + // a OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent back to the secondary + public void clearFromSecondaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + AsyncInvocation clear = secondary.invokeAsync(PRClearCreateIndexDUnitTest::clear); + + createIndex.get(); + clear.get(); + + // assert that secondary member received these messages + primary.invoke(() -> verifyEvents(false, true, false, false)); + secondary.invoke(() -> verifyEvents(false, false, true, true)); + } + + /** + * For interested client connecting to secondary member + * 1. locks all local primary region + * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members + * 3. send OP_PR_CLEAR to primary to clear + * 4. primary will send a OP_CLEAR message back to the secondary to clear + */ + @Test + public void clearFromInterestedClientConnectingToSecondaryMember() throws Exception { + int port = secondary.getPort(); + client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true)); + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + + AsyncInvocation clear = client.invokeAsync(() -> { + Thread.sleep(200); + ClientCache clientCache = ClusterStartupRule.getClientCache(); + Region<Object, Object> regionA = + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA"); + regionA.registerInterestForAllKeys(); + regionA.clear(); + }); + + createIndex.get(); + clear.get(); + primary.invoke(() -> verifyEvents(true, true, false, false)); + secondary.invoke(() -> verifyEvents(false, false, true, true)); + } + + @Test + /** + * For interested client connecting to primary member, behaves like starting from primary member + * except it locks first + * 1. locks local primary regions + * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members' primary buckets + * 3. send a OP_LOCK_FOR_CLEAR message to lock all secondary buckets + * 4. send OP_CLEAR to clear all secondary buckets + */ + public void clearFromInterestedClientConnectingToPrimaryMember() throws Exception { + int port = primary.getPort(); + client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true)); + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + + AsyncInvocation clear = client.invokeAsync(() -> { Review comment: Using a type of Object will keep the AsyncInvocation from warning. Assuming there is no return. ########## File path: geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionOverflowClearDUnitTest.java ########## @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; +import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE; +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; +import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION; +import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.VM.getVMId; +import static org.apache.geode.test.dunit.VM.toArray; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.DiskStoreFactory; +import org.apache.geode.cache.EvictionAction; +import org.apache.geode.cache.EvictionAttributes; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.distributed.LocatorLauncher; +import org.apache.geode.distributed.ServerLauncher; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.management.internal.cli.util.CommandStringBuilder; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.rules.GfshCommandRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; + +public class PartitionedRegionOverflowClearDUnitTest implements Serializable { + + @Rule + public DistributedRule distributedRule = new DistributedRule(5); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + private VM locator; + private VM server1; + private VM server2; + private VM accessor; + private VM client; + + private static final String LOCATOR_NAME = "locator"; + private static final String SERVER1_NAME = "server1"; + private static final String SERVER2_NAME = "server2"; + private static final String SERVER3_NAME = "server3"; + + private File locatorDir; + private File server1Dir; + private File server2Dir; + private File server3Dir; + + private String locatorString; + + private int locatorPort; + private int locatorJmxPort; + private int locatorHttpPort; + private int serverPort1; + private int serverPort2; + private int serverPort3; + + private static final AtomicReference<LocatorLauncher> LOCATOR_LAUNCHER = new AtomicReference<>(); + + private static final AtomicReference<ServerLauncher> SERVER_LAUNCHER = new AtomicReference<>(); + + private static final AtomicReference<ClientCache> CLIENT_CACHE = new AtomicReference<>(); + + private static final String OVERFLOW_REGION_NAME = "testOverflowRegion"; + + public static final int NUM_ENTRIES = 1000; + + @Before + public void setup() throws Exception { + locator = getVM(0); + server1 = getVM(1); + server2 = getVM(2); + accessor = getVM(3); + client = getVM(4); Review comment: like the clear role names ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.cache.query.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; +import org.apache.geode.internal.cache.DistributedClearOperation; +import org.apache.geode.internal.cache.DistributedClearOperation.ClearRegionMessage; +import org.apache.geode.internal.cache.PartitionedRegionClearMessage; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PRClearCreateIndexDUnitTest implements Serializable { + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(4, true); + + private MemberVM primary, secondary; + private ClientVM client; + + @Before + public void before() throws Exception { + int locatorPort = ClusterStartupRule.getDUnitLocatorPort(); + primary = cluster.startServerVM(0, locatorPort); + secondary = cluster.startServerVM(1, locatorPort); + + // create region on server1 first, making sure server1 has the primary bucket + primary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + Region<Object, Object> region = + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + IntStream.range(0, 100).forEach(i -> region.put(i, "value" + i)); + }); + + // server2 has the secondary bucket + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(new MessageObserver()); + ClusterStartupRule.memberStarter.createPartitionRegion("regionA", + f -> f.setTotalNumBuckets(1).setRedundantCopies(1)); + }); + } + + @After + public void after() throws Exception { + primary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + secondary.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } + + // All tests create index on secondary members. These tests are making sure we are requesting + // locks for clear on secondary members as well. If we create index on the primary, the clear + // and createIndex will run sequentially so there would be no error. But if we create index on + // the secondary member and if the secondary member will not + // request a lock for clear operation, it will result in an EntryDestroyedException when create + // index is happening. + + // Note: OP_LOCK_FOR_CLEAR, OP_CLEAR, OP_UNLOCK_FOR_CLEAR are messages for secondary members + // OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR can be for anybody + + @Test + // all local buckets are primary, so only OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent to the + // secondary member + // in the end an OP_PR_CLEAR is sent to the secondary for no effect + public void clearFromPrimaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + AsyncInvocation clear = primary.invokeAsync(PRClearCreateIndexDUnitTest::clear); + + createIndex.get(); + clear.get(); + + // assert that secondary member received these messages + primary.invoke(() -> verifyEvents(false, false, false, false)); + secondary.invoke(() -> verifyEvents(false, true, true, true)); + } + + @Test + // all local buckets are secondary, so an OP_PR_CLEAR is sent to the primary member, from there + // a OP_LOCK_FOR_CLEAR and OP_CLEAR messages are sent back to the secondary + public void clearFromSecondaryMember() throws Exception { + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + AsyncInvocation clear = secondary.invokeAsync(PRClearCreateIndexDUnitTest::clear); + + createIndex.get(); + clear.get(); + + // assert that secondary member received these messages + primary.invoke(() -> verifyEvents(false, true, false, false)); + secondary.invoke(() -> verifyEvents(false, false, true, true)); + } + + /** + * For interested client connecting to secondary member + * 1. locks all local primary region + * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members + * 3. send OP_PR_CLEAR to primary to clear + * 4. primary will send a OP_CLEAR message back to the secondary to clear + */ + @Test + public void clearFromInterestedClientConnectingToSecondaryMember() throws Exception { + int port = secondary.getPort(); + client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true)); + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + + AsyncInvocation clear = client.invokeAsync(() -> { + Thread.sleep(200); + ClientCache clientCache = ClusterStartupRule.getClientCache(); + Region<Object, Object> regionA = + clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("regionA"); + regionA.registerInterestForAllKeys(); + regionA.clear(); + }); + + createIndex.get(); + clear.get(); + primary.invoke(() -> verifyEvents(true, true, false, false)); + secondary.invoke(() -> verifyEvents(false, false, true, true)); + } + + @Test + /** + * For interested client connecting to primary member, behaves like starting from primary member + * except it locks first + * 1. locks local primary regions + * 2. send OP_LOCK_FOR_PR_CLEAR to lock all other members' primary buckets + * 3. send a OP_LOCK_FOR_CLEAR message to lock all secondary buckets + * 4. send OP_CLEAR to clear all secondary buckets + */ + public void clearFromInterestedClientConnectingToPrimaryMember() throws Exception { + int port = primary.getPort(); + client = cluster.startClientVM(2, c -> c.withServerConnection(port).withPoolSubscription(true)); + AsyncInvocation createIndex = secondary.invokeAsync(PRClearCreateIndexDUnitTest::createIndex); + + AsyncInvocation clear = client.invokeAsync(() -> { + Thread.sleep(200); Review comment: Is there something we could wait for? ########## File path: geode-core/src/distributedTest/java/org/apache/geode/cache/PRCacheListenerDistributedTest.java ########## @@ -36,47 +57,591 @@ @RunWith(Parameterized.class) @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) @SuppressWarnings("serial") -public class PRCacheListenerDistributedTest extends ReplicateCacheListenerDistributedTest { +public class PRCacheListenerDistributedTest implements Serializable { + + protected static final String CLEAR = "CLEAR"; + protected static final String REGION_DESTROY = "REGION_DESTROY"; + private static final String CREATES = "CREATES"; + private static final String UPDATES = "UPDATES"; + private static final String INVALIDATES = "INVALIDATES"; + private static final String DESTROYS = "DESTROYS"; + private static final int ENTRY_VALUE = 0; + private static final int UPDATED_ENTRY_VALUE = 1; + private static final String KEY = "key-1"; + @Rule + public DistributedRule distributedRule = new DistributedRule(); + @Rule + public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build(); + @Rule + public SerializableTestName testName = new SerializableTestName(); + @Rule + public DistributedCounters sharedCountersRule = new DistributedCounters(); + @Rule + public DistributedErrorCollector errorCollector = new DistributedErrorCollector(); + protected String regionName; - @Parameters(name = "{index}: redundancy={0}") - public static Iterable<Integer> data() { - return Arrays.asList(0, 3); + @Parameters + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { + {1, Boolean.FALSE}, + {3, Boolean.TRUE}, + }); } @Parameter public int redundancy; - @Override + @Parameter(1) + public Boolean withData; + protected Region<String, Integer> createRegion(final String name, final CacheListener<String, Integer> listener) { + return createPartitionedRegion(name, listener, false); + } + + protected Region<String, Integer> createAccessorRegion(final String name, + final CacheListener<String, Integer> listener) { + return createPartitionedRegion(name, listener, true); + } + + private Region<String, Integer> createPartitionedRegion(String name, + CacheListener<String, Integer> listener, boolean accessor) { + LogService.getLogger() + .info("Params [Redundancy: " + redundancy + " withData:" + withData + "]"); PartitionAttributesFactory<String, Integer> paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundancy); + if (accessor) { + paf.setLocalMaxMemory(0); + } RegionFactory<String, Integer> regionFactory = cacheRule.getCache().createRegionFactory(); - regionFactory.addCacheListener(listener); + if (listener != null) { + regionFactory.addCacheListener(listener); + } regionFactory.setDataPolicy(DataPolicy.PARTITION); regionFactory.setPartitionAttributes(paf.create()); return regionFactory.create(name); } - @Override + private void withData(Region region) { + if (withData) { + // Fewer buckets. + // Covers case where node doesn't have any buckets depending on redundancy. + region.put("key1", "value1"); + region.put("key2", "value2"); + } + } + protected int expectedCreates() { return 1; } - @Override protected int expectedUpdates() { return 1; } - @Override protected int expectedInvalidates() { return 1; } - @Override protected int expectedDestroys() { return 1; } + + @Test + public void afterRegionDestroyIsInvokedInEveryMember() { + CacheListener<String, Integer> listener = new RegionDestroyCountingCacheListener(); + Region<String, Integer> region = createRegion(regionName, listener); + + for (int i = 0; i < getVMCount(); i++) { + getVM(i).invoke(() -> { + withData(createRegion(regionName, listener)); + }); + } Review comment: Seems like this could become a method. It is used a lot. ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java ########## @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.OperationAbortedException; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.serialization.KnownVersion; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class PartitionedRegionClear { + + private static final Logger logger = LogService.getLogger(); + + protected static final String CLEAR_OPERATION = "_clearOperation"; + + private final int retryTime = 2 * 60 * 1000; Review comment: can this be capitalized? ########## File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java ########## @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelException; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.Operation; +import org.apache.geode.cache.OperationAbortedException; +import org.apache.geode.cache.PartitionedRegionPartialClearException; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.MembershipListener; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.serialization.KnownVersion; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class PartitionedRegionClear { + + private static final Logger logger = LogService.getLogger(); + + protected static final String CLEAR_OPERATION = "_clearOperation"; + + private final int retryTime = 2 * 60 * 1000; + + private final PartitionedRegion partitionedRegion; + + protected final LockForListenerAndClientNotification lockForListenerAndClientNotification = + new LockForListenerAndClientNotification(); + + private volatile boolean membershipChange = false; + + protected final PartitionedRegionClearListener partitionedRegionClearListener = + new PartitionedRegionClearListener(); + + public PartitionedRegionClear(PartitionedRegion partitionedRegion) { + this.partitionedRegion = partitionedRegion; + partitionedRegion.getDistributionManager() + .addMembershipListener(partitionedRegionClearListener); + } + + public boolean isLockedForListenerAndClientNotification() { + return lockForListenerAndClientNotification.isLocked(); + } + + void acquireDistributedClearLock(String clearLock) { + try { + partitionedRegion.getPartitionedRegionLockService().lock(clearLock, -1, -1); + } catch (IllegalStateException e) { + partitionedRegion.lockCheckReadiness(); + throw e; + } + } + + void releaseDistributedClearLock(String clearLock) { Review comment: Isn't clear lock just an object? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
