This is an automated email from the ASF dual-hosted git repository. jbarrett pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
commit 051f9cc7304847f05e8346ce6672cd5442b04b2e Author: Jacob Barrett <jbarr...@pivotal.io> AuthorDate: Thu Apr 7 23:28:31 2022 -0700 GEODE-8228: Cleanup static analyzer in WANTestBase. --- ...nedRegionSingleHopWithServerGroupDUnitTest.java | 6 +- .../cache/CustomerIDPartitionResolver.java | 11 +- .../internal/cache/wan/MyAsyncEventListener.java | 15 +- .../geode/internal/cache/wan/QueueListener.java | 50 +- .../commands/WanCopyRegionCommandDUnitTest.java | 15 +- .../cache/wan/CacheClientNotifierDUnitTest.java | 4 +- .../geode/internal/cache/wan/WANTestBase.java | 531 +++++++++------------ .../ConcurrentParallelGatewaySenderDUnitTest.java | 2 +- ...ParallelGatewaySenderOperation_2_DUnitTest.java | 90 ++-- .../ParallelGatewaySenderOperationsDUnitTest.java | 63 ++- .../parallel/ParallelWANConflationDUnitTest.java | 86 ++-- ...rallelWANPropagationConcurrentOpsDUnitTest.java | 48 +- .../wan/parallel/ParallelWANStatsDUnitTest.java | 2 +- .../serial/SerialGatewaySenderQueueDUnitTest.java | 49 +- .../cache/wan/serial/SerialWANStatsDUnitTest.java | 54 +-- 15 files changed, 446 insertions(+), 580 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopWithServerGroupDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopWithServerGroupDUnitTest.java index 355c2814fe..83142d51e3 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopWithServerGroupDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleHopWithServerGroupDUnitTest.java @@ -44,6 +44,7 @@ import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.PartitionResolver; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.client.Pool; @@ -770,8 +771,11 @@ public class PartitionedRegionSingleHopWithServerGroupDUnitTest extends JUnit4Ca int localMaxMemory) { PartitionAttributesFactory<K, V> paf = new PartitionAttributesFactory<>(); + @SuppressWarnings("unchecked") + final PartitionResolver<K, V> customerIDPartitionResolver = + (PartitionResolver<K, V>) new CustomerIDPartitionResolver("CustomerIDPartitionResolver"); paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNoofBuckets) - .setPartitionResolver(new CustomerIDPartitionResolver<>("CustomerIDPartitionResolver")); + .setPartitionResolver(customerIDPartitionResolver); if (colocatedRegionName != null) { paf.setColocatedWith(colocatedRegionName); diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/CustomerIDPartitionResolver.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/CustomerIDPartitionResolver.java index 22c26a4a42..2f805e7598 100755 --- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/CustomerIDPartitionResolver.java +++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/CustomerIDPartitionResolver.java @@ -23,15 +23,10 @@ import org.apache.geode.internal.cache.execute.data.CustId; import org.apache.geode.internal.cache.execute.data.OrderId; import org.apache.geode.internal.cache.execute.data.ShipmentId; -public class CustomerIDPartitionResolver<K, V> implements PartitionResolver<K, V> { +public class CustomerIDPartitionResolver implements PartitionResolver<Object, Object> { - private static final CustomerIDPartitionResolver<Object, Object> customerIDPartitionResolver = - null; private String id; - - private String resolverName; - public CustomerIDPartitionResolver() {} public CustomerIDPartitionResolver(String resolverID) { @@ -40,11 +35,11 @@ public class CustomerIDPartitionResolver<K, V> implements PartitionResolver<K, V @Override public String getName() { - return resolverName; + return id; } @Override - public Serializable getRoutingObject(EntryOperation opDetails) { + public Serializable getRoutingObject(EntryOperation<Object, Object> opDetails) { Serializable routingbject = null; diff --git a/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/MyAsyncEventListener.java b/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/MyAsyncEventListener.java index 4a312316d4..69c240d047 100644 --- a/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/MyAsyncEventListener.java +++ b/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/MyAsyncEventListener.java @@ -28,15 +28,11 @@ import org.apache.geode.logging.internal.log4j.api.LogService; public class MyAsyncEventListener implements AsyncEventListener, Declarable { private static final Logger logger = LogService.getLogger(); - private final Map eventsMap; - - public MyAsyncEventListener() { - eventsMap = new HashMap(); - } + private final Map<Object, Object> eventsMap = new HashMap<>(); @Override - public synchronized boolean processEvents(List<AsyncEvent> events) { - for (AsyncEvent event : events) { + public synchronized boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> events) { + for (AsyncEvent<?, ?> event : events) { if (eventsMap.containsKey(event.getKey())) { logger.debug("This is a duplicate event --> {}", event.getKey()); } @@ -47,7 +43,8 @@ public class MyAsyncEventListener implements AsyncEventListener, Declarable { return true; } - public Map getEventsMap() { - return eventsMap; + @SuppressWarnings("unchecked") + public <K, V> Map<K, V> getEventsMap() { + return (Map<K, V>) eventsMap; } } diff --git a/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/QueueListener.java b/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/QueueListener.java index 1863868fa3..ffb5fac5d6 100644 --- a/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/QueueListener.java +++ b/geode-junit/src/main/java/org/apache/geode/internal/cache/wan/QueueListener.java @@ -22,59 +22,41 @@ import org.apache.geode.cache.CacheListener; import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.RegionEvent; -public class QueueListener implements CacheListener { - public List createList = Collections.synchronizedList(new ArrayList()); - public List destroyList = Collections.synchronizedList(new ArrayList()); - public List updateList = Collections.synchronizedList(new ArrayList()); +public class QueueListener<K, V> implements CacheListener<K, V> { + public List<K> createList = Collections.synchronizedList(new ArrayList<>()); + public List<K> destroyList = Collections.synchronizedList(new ArrayList<>()); + public List<K> updateList = Collections.synchronizedList(new ArrayList<>()); @Override - public void afterCreate(EntryEvent event) { + public void afterCreate(EntryEvent<K, V> event) { createList.add(event.getKey()); } @Override - public void afterDestroy(EntryEvent event) { + public void afterDestroy(EntryEvent<K, V> event) { destroyList.add(event.getKey()); } @Override - public void afterInvalidate(EntryEvent event) { - // TODO Auto-generated method stub - - } + public void afterInvalidate(EntryEvent<K, V> event) {} @Override - public void afterRegionClear(RegionEvent event) { - // TODO Auto-generated method stub - - } + public void afterRegionClear(RegionEvent<K, V> event) {} @Override - public void afterRegionCreate(RegionEvent event) { - // TODO Auto-generated method stub - - } + public void afterRegionCreate(RegionEvent<K, V> event) {} @Override - public void afterRegionDestroy(RegionEvent event) { - // TODO Auto-generated method stub - - } + public void afterRegionDestroy(RegionEvent<K, V> event) {} @Override - public void afterRegionInvalidate(RegionEvent event) { - // TODO Auto-generated method stub - - } + public void afterRegionInvalidate(RegionEvent<K, V> event) {} @Override - public void afterRegionLive(RegionEvent event) { - // TODO Auto-generated method stub - - } + public void afterRegionLive(RegionEvent<K, V> event) {} @Override - public void afterUpdate(EntryEvent event) { + public void afterUpdate(EntryEvent<K, V> event) { updateList.add(event.getKey()); } @@ -83,9 +65,5 @@ public class QueueListener implements CacheListener { } @Override - public void close() { - // TODO Auto-generated method stub - - } - + public void close() {} } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java index 7ee8eece56..b05bf48264 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommandDUnitTest.java @@ -37,7 +37,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.text.NumberFormat; import java.text.ParseException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -257,6 +256,10 @@ public class WanCopyRegionCommandDUnitTest extends WANTestBase { } } + /** + * Used by {@link #testSenderOrReceiverGoesDownDuringExecution} in annotation. + */ + @SuppressWarnings("unused") private Object[] parametersToTestSenderOrReceiverGoesDownDuringExecution() { return new Object[] { new Object[] {true, true, Gateway.SENDER, false}, @@ -673,7 +676,7 @@ public class WanCopyRegionCommandDUnitTest extends WANTestBase { // Check that no entries are left in the queue from B to C for (VM serverInB : serversInB) { - List<Integer> stats1 = serverInB.invoke(() -> getSenderStats(senderIdInB, 0)); + serverInB.invoke(() -> getSenderStats(senderIdInB, 0)); } // Check that the region's data is the same in sites "A" and "B" @@ -1390,8 +1393,7 @@ public class WanCopyRegionCommandDUnitTest extends WANTestBase { .hasTableSection(ResultModel.MEMBER_STATUS_SECTION) .hasColumn("Member") .hasSize(members); - String[] oksList = - (String[]) (new ArrayList(Collections.nCopies(members, "OK"))).toArray(new String[0]); + String[] oksList = Collections.nCopies(members, "OK").toArray(new String[0]); command .hasTableSection(ResultModel.MEMBER_STATUS_SECTION) .hasColumn("Status") @@ -1417,8 +1419,7 @@ public class WanCopyRegionCommandDUnitTest extends WANTestBase { .hasTableSection(ResultModel.MEMBER_STATUS_SECTION) .hasColumn("Message") .hasSize(members); - String[] errorsList = - (String[]) (new ArrayList(Collections.nCopies(members, "ERROR"))).toArray(new String[0]); + String[] errorsList = Collections.nCopies(members, "ERROR").toArray(new String[0]); command .hasTableSection(ResultModel.MEMBER_STATUS_SECTION) .hasColumn("Status") @@ -1488,7 +1489,7 @@ public class WanCopyRegionCommandDUnitTest extends WANTestBase { } private void waitForWanCopyRegionCommandToStart(boolean useParallel, boolean usePartitionedRegion, - List<VM> servers) throws InterruptedException { + List<VM> servers) { // Wait for the command execution to be registered in the service final int executionsExpected = useParallel && usePartitionedRegion ? servers.size() : 1; await().untilAsserted( diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java index 6e40b648cb..2231a2a2c5 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/CacheClientNotifierDUnitTest.java @@ -266,11 +266,11 @@ public class CacheClientNotifierDUnitTest extends WANTestBase { CacheServerTestUtil.enableShufflingOfEndpoints(); } - RegionFactory factory = cache.createRegionFactory(RegionShortcut.LOCAL) + RegionFactory<?, ?> factory = cache.createRegionFactory(RegionShortcut.LOCAL) .setScope(Scope.DISTRIBUTED_NO_ACK) .setPoolName(p.getName()); region = factory.create(regionName); - region.registerInterest("ALL_KEYS"); + region.registerInterestForAllKeys(); assertNotNull(region); if (isDurable) { cache.readyForEvents(); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 1a99038815..62f4ca9749 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -54,7 +54,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -77,6 +76,7 @@ import javax.management.ObjectName; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import org.junit.Before; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -105,7 +105,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.client.Pool; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.ConnectionStats; import org.apache.geode.cache.client.internal.LocatorDiscoveryCallbackAdapter; @@ -137,7 +136,6 @@ import org.apache.geode.internal.cache.CustomerIDPartitionResolver; import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCacheBuilder; -import org.apache.geode.internal.cache.InternalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PoolStats; import org.apache.geode.internal.cache.RegionQueue; @@ -182,11 +180,12 @@ import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.WanTest; +@SuppressWarnings("deprecation") @Category({WanTest.class}) public class WANTestBase extends DistributedTestCase { protected static Cache cache; - protected static Region region; + protected static Region<?, ?> region; protected static PartitionedRegion customerRegion; protected static PartitionedRegion orderRegion; @@ -205,8 +204,8 @@ public class WANTestBase extends DistributedTestCase { protected static VM vm6; protected static VM vm7; - protected static QueueListener listener1; - protected static QueueListener listener2; + protected static QueueListener<Object, Object> listener1; + protected static QueueListener<Object, Object> listener2; protected static AsyncEventListener eventListener1; protected static AsyncEventListener eventListener2; @@ -222,7 +221,7 @@ public class WANTestBase extends DistributedTestCase { private static final Logger logger = LogService.getLogger(); @BeforeClass - public static void beforeClassWANTestBase() throws Exception { + public static void beforeClassWANTestBase() { vm0 = getHost(0).getVM(0); vm1 = getHost(0).getVM(1); vm2 = getHost(0).getVM(2); @@ -404,7 +403,7 @@ public class WANTestBase extends DistributedTestCase { receiver.start(); } catch (IOException e) { e.printStackTrace(); - org.apache.geode.test.dunit.Assert.fail("Failed to start GatewayReceiver on port " + port, e); + fail("Failed to start GatewayReceiver on port " + port, e); } return port; } @@ -417,7 +416,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp2 = addIgnoredException(GatewaySenderException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -444,7 +443,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp2 = addIgnoredException(GatewaySenderException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PROXY); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -463,7 +462,7 @@ public class WANTestBase extends DistributedTestCase { } public static void createNormalRegion(String regionName, String senderIds) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.LOCAL); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.LOCAL); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -477,7 +476,7 @@ public class WANTestBase extends DistributedTestCase { public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -495,7 +494,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp1 = addIgnoredException(ForceReattemptException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (asyncQueueIds != null) { StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ","); while (tokenizer.hasMoreTokens()) { @@ -516,7 +515,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp = addIgnoredException(ForceReattemptException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.REPLICATE); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.REPLICATE); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -541,7 +540,7 @@ public class WANTestBase extends DistributedTestCase { public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap, boolean statisticsEnabled, boolean concurrencyChecksEnabled) { - RegionFactory fact = cache.createRegionFactory(); + RegionFactory<?, ?> fact = cache.createRegionFactory(); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -607,7 +606,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp1 = addIgnoredException(PartitionOfflineException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(shortcut); + RegionFactory<?, ?> fact = cache.createRegionFactory(shortcut); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -615,7 +614,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); pfact.setRecoveryDelay(0); @@ -638,7 +637,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp1 = addIgnoredException(PartitionOfflineException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -646,7 +645,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); pfact.setRecoveryDelay(0); @@ -665,7 +664,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp1 = addIgnoredException(PartitionOfflineException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -674,7 +673,7 @@ public class WANTestBase extends DistributedTestCase { } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); pfact.setRecoveryDelay(0); @@ -689,19 +688,19 @@ public class WANTestBase extends DistributedTestCase { public static void addSenderThroughAttributesMutator(String regionName, String senderIds) { final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); - AttributesMutator mutator = r.getAttributesMutator(); + AttributesMutator<?, ?> mutator = r.getAttributesMutator(); mutator.addGatewaySenderId(senderIds); } public static void addAsyncEventQueueThroughAttributesMutator(String regionName, String queueId) { final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); - AttributesMutator mutator = r.getAttributesMutator(); + AttributesMutator<?, ?> mutator = r.getAttributesMutator(); mutator.addAsyncEventQueueId(queueId); } public static void createPartitionedRegionAsAccessor(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION_PROXY); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PROXY); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -709,7 +708,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); fact.setPartitionAttributes(pfact.create()); @@ -718,7 +717,7 @@ public class WANTestBase extends DistributedTestCase { public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (serialSenderIds != null) { StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -733,7 +732,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setColocatedWith(colocatedWith); fact.setPartitionAttributes(pfact.create()); fact.setOffHeap(offHeap); @@ -749,7 +748,7 @@ public class WANTestBase extends DistributedTestCase { addIgnoredException(PartitionOfflineException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -757,7 +756,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); fact.setPartitionAttributes(pfact.create()); @@ -774,7 +773,7 @@ public class WANTestBase extends DistributedTestCase { IgnoredException exp = addIgnoredException(ForceReattemptException.class.getName()); try { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -783,7 +782,7 @@ public class WANTestBase extends DistributedTestCase { } } - PartitionAttributesFactory paf = new PartitionAttributesFactory(); + PartitionAttributesFactory<Object, Object> paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets) .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); fact.setPartitionAttributes(paf.create()); @@ -792,7 +791,7 @@ public class WANTestBase extends DistributedTestCase { (PartitionedRegion) fact.create(customerRegionName); logger.info("Partitioned Region CUSTOMER created Successfully :" + customerRegion.toString()); - paf = new PartitionAttributesFactory(); + paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets) .setColocatedWith(customerRegionName) .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); @@ -810,7 +809,7 @@ public class WANTestBase extends DistributedTestCase { (PartitionedRegion) fact.create(orderRegionName); logger.info("Partitioned Region ORDER created Successfully :" + orderRegion.toString()); - paf = new PartitionAttributesFactory(); + paf = new PartitionAttributesFactory<>(); paf.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumBuckets) .setColocatedWith(orderRegionName) .setPartitionResolver(new CustomerIDPartitionResolver("CustomerIDPartitionResolver")); @@ -834,7 +833,7 @@ public class WANTestBase extends DistributedTestCase { public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -842,7 +841,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); fact.setPartitionAttributes(pfact.create()); @@ -859,7 +858,7 @@ public class WANTestBase extends DistributedTestCase { public static void createColocatedPartitionedRegions2(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap) { - RegionFactory fact = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> fact = cache.createRegionFactory(RegionShortcut.PARTITION); if (senderIds != null) { StringTokenizer tokenizer = new StringTokenizer(senderIds, ","); while (tokenizer.hasMoreTokens()) { @@ -867,7 +866,7 @@ public class WANTestBase extends DistributedTestCase { fact.addGatewaySenderId(senderId); } } - PartitionAttributesFactory pfact = new PartitionAttributesFactory(); + PartitionAttributesFactory<?, ?> pfact = new PartitionAttributesFactory<>(); pfact.setTotalNumBuckets(totalNumBuckets); pfact.setRedundantCopies(redundantCopies); fact.setPartitionAttributes(pfact.create()); @@ -903,15 +902,6 @@ public class WANTestBase extends DistributedTestCase { }); } - private static CacheListener myListener; - - public static void removeCacheListener() { - cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator() - .removeCacheListener(myListener); - - } - - public static void createCache(Integer locPort) { createCache(false, locPort); } @@ -1027,7 +1017,7 @@ public class WANTestBase extends DistributedTestCase { try { server1.start(); } catch (IOException e) { - org.apache.geode.test.dunit.Assert.fail("Failed to start the Server", e); + fail("Failed to start the Server", e); } assertThat(server1.isRunning()).isTrue(); @@ -1038,15 +1028,14 @@ public class WANTestBase extends DistributedTestCase { * Returns a Map that contains the count for number of cache server and number of Receivers. * */ - public static Map getCacheServers() { - List cacheServers = cache.getCacheServers(); + public static Map<String, Integer> getCacheServers() { + List<CacheServer> cacheServers = cache.getCacheServers(); - Map cacheServersMap = new HashMap(); - Iterator itr = cacheServers.iterator(); + Map<String, Integer> cacheServersMap = new HashMap<>(); int bridgeServerCounter = 0; int receiverServerCounter = 0; - while (itr.hasNext()) { - CacheServerImpl cacheServer = (CacheServerImpl) itr.next(); + for (final CacheServer server : cacheServers) { + CacheServerImpl cacheServer = (CacheServerImpl) server; if (cacheServer.getAcceptor().isGatewayReceiver()) { receiverServerCounter++; } else { @@ -1065,11 +1054,11 @@ public class WANTestBase extends DistributedTestCase { } public static void startSenderInVMsAsync(String senderId, VM... vms) { - List<AsyncInvocation> tasks = new LinkedList<>(); + List<AsyncInvocation<?>> tasks = new LinkedList<>(); for (VM vm : vms) { tasks.add(vm.invokeAsync(() -> startSender(senderId))); } - for (AsyncInvocation invocation : tasks) { + for (AsyncInvocation<?> invocation : tasks) { try { invocation.await(); } catch (InterruptedException e) { @@ -1080,11 +1069,11 @@ public class WANTestBase extends DistributedTestCase { public static void startSenderwithCleanQueuesInVMsAsync(String senderId, VM... vms) { - List<AsyncInvocation> tasks = new LinkedList<>(); + List<AsyncInvocation<?>> tasks = new LinkedList<>(); for (VM vm : vms) { tasks.add(vm.invokeAsync(() -> startSenderwithCleanQueues(senderId))); } - for (AsyncInvocation invocation : tasks) { + for (AsyncInvocation<?> invocation : tasks) { try { invocation.await(); } catch (InterruptedException e) { @@ -1134,11 +1123,11 @@ public class WANTestBase extends DistributedTestCase { sender.test_setBatchConflationEnabled(true); } - public static Map getSenderToReceiverConnectionInfo(String senderId) { + public static Map<String, Object> getSenderToReceiverConnectionInfo(String senderId) { GatewaySender sender = getGatewaySender(senderId); - Map connectionInfo = null; + Map<String, Object> connectionInfo = null; if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) { - connectionInfo = new HashMap(); + connectionInfo = new HashMap<>(); GatewaySenderEventDispatcher dispatcher = ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher(); if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) { @@ -1152,29 +1141,6 @@ public class WANTestBase extends DistributedTestCase { return connectionInfo; } - public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination, - final String regionName) { - - AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); - final RegionQueue regionQueue; - regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0]; - if (sender.isParallel()) { - ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = - (ConcurrentParallelGatewaySenderQueue) regionQueue; - PartitionedRegion prQ = - parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; - - Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); - for (int bid : primaryBucketIds) { - movePrimary(destination, regionName, bid); - } - - // double check after moved all primary buckets - primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds(); - assertThat(primaryBucketIds.isEmpty()).isTrue(); - } - } - public static void movePrimary(final DistributedMember destination, final String regionName, final int bucketId) { PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); @@ -1204,8 +1170,9 @@ public class WANTestBase extends DistributedTestCase { AbstractGatewaySender sender = (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId); - Collection statsCollection = sender.getProxy().getEndpointManager().getAllStats().values(); - assertThat(statsCollection.iterator().next() instanceof ConnectionStats).isTrue(); + Collection<ConnectionStats> statsCollection = + sender.getProxy().getEndpointManager().getAllStats().values(); + assertThat(statsCollection.iterator().next()).isNotNull(); } public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) { @@ -1217,8 +1184,7 @@ public class WANTestBase extends DistributedTestCase { if (sender.isParallel()) { ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue = (ConcurrentParallelGatewaySenderQueue) regionQueue; - PartitionedRegion pr = - parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0]; + parallelGatewaySenderQueue.getRegions(); } await() .untilAsserted(() -> assertThat(regionQueue.size()).as( @@ -1431,7 +1397,7 @@ public class WANTestBase extends DistributedTestCase { } public static void waitForSenderNonRunningState(String senderId) { - try (IgnoredException ie = addIgnoredException("Could not connect")) { + try (IgnoredException ignored = addIgnoredException("Could not connect")) { Set<GatewaySender> senders = cache.getGatewaySenders(); final GatewaySender sender = getGatewaySenderById(senders, senderId); await() @@ -1460,16 +1426,16 @@ public class WANTestBase extends DistributedTestCase { return null; } - public static HashMap checkQueue() { - HashMap listenerAttrs = new HashMap(); + public static Map<String, List<?>> checkQueue() { + HashMap<String, List<?>> listenerAttrs = new HashMap<>(); listenerAttrs.put("Create", listener1.createList); listenerAttrs.put("Update", listener1.updateList); listenerAttrs.put("Destroy", listener1.destroyList); return listenerAttrs; } - public static void checkQueueOnSecondary(final Map primaryUpdatesMap) { - final HashMap secondaryUpdatesMap = new HashMap(); + public static void checkQueueOnSecondary(final Map<String, List<?>> primaryUpdatesMap) { + final HashMap<String, List<?>> secondaryUpdatesMap = new HashMap<>(); secondaryUpdatesMap.put("Create", listener1.createList); secondaryUpdatesMap.put("Update", listener1.updateList); secondaryUpdatesMap.put("Destroy", listener1.destroyList); @@ -1484,31 +1450,33 @@ public class WANTestBase extends DistributedTestCase { }); } - public static HashMap checkQueue2() { - HashMap listenerAttrs = new HashMap(); + public static Map<String, List<?>> checkQueue2() { + HashMap<String, List<?>> listenerAttrs = new HashMap<>(); listenerAttrs.put("Create", listener2.createList); listenerAttrs.put("Update", listener2.updateList); listenerAttrs.put("Destroy", listener2.destroyList); return listenerAttrs; } - public static HashMap checkPR(String regionName) { + public static <K> Map<String, List<K>> checkPR(String regionName) { PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); - QueueListener listener = (QueueListener) region.getCacheListener(); + @SuppressWarnings("unchecked") + QueueListener<K, ?> listener = (QueueListener<K, ?>) region.getCacheListener(); - HashMap listenerAttrs = new HashMap(); + HashMap<String, List<K>> listenerAttrs = new HashMap<>(); listenerAttrs.put("Create", listener.createList); listenerAttrs.put("Update", listener.updateList); listenerAttrs.put("Destroy", listener.destroyList); return listenerAttrs; } - public static HashMap checkBR(String regionName, int numBuckets) { + public static <K> Map<String, List<K>> checkBR(String regionName, int numBuckets) { PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); - HashMap listenerAttrs = new HashMap(); + HashMap<String, List<K>> listenerAttrs = new HashMap<>(); for (int i = 0; i < numBuckets; i++) { BucketRegion br = region.getBucketRegion(i); - QueueListener listener = (QueueListener) br.getCacheListener(); + @SuppressWarnings("unchecked") + QueueListener<K, ?> listener = (QueueListener<K, ?>) br.getCacheListener(); listenerAttrs.put("Create" + i, listener.createList); listenerAttrs.put("Update" + i, listener.updateList); listenerAttrs.put("Destroy" + i, listener.destroyList); @@ -1516,17 +1484,18 @@ public class WANTestBase extends DistributedTestCase { return listenerAttrs; } - public static HashMap checkQueue_BR(String senderId, int numBuckets) { + public static <K> Map<String, List<K>> checkQueue_BR(String senderId, int numBuckets) { GatewaySender sender = getGatewaySender(senderId); RegionQueue parallelQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; PartitionedRegion region = (PartitionedRegion) parallelQueue.getRegion(); - HashMap listenerAttrs = new HashMap(); + Map<String, List<K>> listenerAttrs = new HashMap<>(); for (int i = 0; i < numBuckets; i++) { BucketRegion br = region.getBucketRegion(i); if (br != null) { - QueueListener listener = (QueueListener) br.getCacheListener(); + @SuppressWarnings("unchecked") + QueueListener<K, ?> listener = (QueueListener<K, ?>) br.getCacheListener(); if (listener != null) { listenerAttrs.put("Create" + i, listener.createList); listenerAttrs.put("Update" + i, listener.updateList); @@ -1546,8 +1515,9 @@ public class WANTestBase extends DistributedTestCase { PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName); for (int i = 0; i < numBuckets; i++) { BucketRegion br = region.getBucketRegion(i); - AttributesMutator mutator = br.getAttributesMutator(); - listener1 = new QueueListener(); + @SuppressWarnings("unchecked") + AttributesMutator<Object, Object> mutator = br.getAttributesMutator(); + listener1 = new QueueListener<>(); mutator.addCacheListener(listener1); } } @@ -1566,8 +1536,9 @@ public class WANTestBase extends DistributedTestCase { for (int i = 0; i < numBuckets; i++) { BucketRegion br = region.getBucketRegion(i); if (br != null) { - AttributesMutator mutator = br.getAttributesMutator(); - CacheListener listener = new QueueListener(); + @SuppressWarnings("unchecked") + AttributesMutator<Object, Object> mutator = br.getAttributesMutator(); + CacheListener<Object, Object> listener = new QueueListener<>(); mutator.addCacheListener(listener); } } @@ -1590,15 +1561,15 @@ public class WANTestBase extends DistributedTestCase { } private void addCacheListenerOnRegion(String regionName) { - Region<?, ?> region = cache.getRegion(regionName); - AttributesMutator mutator = region.getAttributesMutator(); - listener1 = new QueueListener(); + Region<Object, Object> region = cache.getRegion(regionName); + AttributesMutator<Object, Object> mutator = region.getAttributesMutator(); + listener1 = new QueueListener<>(); mutator.addCacheListener(listener1); } private void addCacheQueueListener(String senderId, boolean isParallel) { GatewaySender sender = getGatewaySender(senderId); - listener1 = new QueueListener(); + listener1 = new QueueListener<>(); if (!isParallel) { Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); for (RegionQueue q : queues) { @@ -1613,7 +1584,7 @@ public class WANTestBase extends DistributedTestCase { private void addSecondCacheQueueListener(String senderId, boolean isParallel) { GatewaySender sender = getGatewaySender(senderId); - listener2 = new QueueListener(); + listener2 = new QueueListener<>(); if (!isParallel) { Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); for (RegionQueue q : queues) { @@ -1655,11 +1626,11 @@ public class WANTestBase extends DistributedTestCase { } public static void stopSenderInVMsAsync(String senderId, VM... vms) { - List<AsyncInvocation<Void>> tasks = new LinkedList<>(); + List<AsyncInvocation<?>> tasks = new LinkedList<>(); for (VM vm : vms) { tasks.add(vm.invokeAsync(() -> stopSender(senderId))); } - for (AsyncInvocation invocation : tasks) { + for (AsyncInvocation<?> invocation : tasks) { try { invocation.await(); } catch (InterruptedException e) { @@ -1680,7 +1651,7 @@ public class WANTestBase extends DistributedTestCase { } sender.stop(); - Set<RegionQueue> queues = null; + Set<RegionQueue> queues; if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) { queues = ((ConcurrentSerialGatewaySenderEventProcessor) eventProcessor).getQueues(); for (RegionQueue queue : queues) { @@ -1931,7 +1902,7 @@ public class WANTestBase extends DistributedTestCase { public static String createSenderWithDiskStore(String dsName, int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent, GatewayEventFilter filter, String dsStore, boolean isManualStart) { - File persistentDirectory = null; + final File persistentDirectory; if (dsStore == null) { persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); @@ -2159,7 +2130,7 @@ public class WANTestBase extends DistributedTestCase { try { server.start(); } catch (IOException e) { - org.apache.geode.test.dunit.Assert.fail("Failed to start server ", e); + fail("Failed to start server ", e); } } @@ -2190,7 +2161,7 @@ public class WANTestBase extends DistributedTestCase { try { server.start(); } catch (IOException e) { - org.apache.geode.test.dunit.Assert.fail("Failed to start server ", e); + fail("Failed to start server ", e); } return port; } @@ -2206,11 +2177,11 @@ public class WANTestBase extends DistributedTestCase { public static void createClientWithLocatorAndRegion(int port0, String host, String regionName) { createClientWithLocatorAndRegion(port0, host); - RegionFactory factory = cache.createRegionFactory(RegionShortcut.LOCAL); + RegionFactory<?, ?> factory = cache.createRegionFactory(RegionShortcut.LOCAL); factory.setPoolName("pool"); region = factory.create(regionName); - region.registerInterest("ALL_KEYS"); + region.registerInterestForAllKeys(); logger.info("Distributed Region " + regionName + " created Successfully :" + region.toString()); } @@ -2225,9 +2196,8 @@ public class WANTestBase extends DistributedTestCase { assertThat(cache).isNotNull(); CacheServerTestUtil.disableShufflingOfEndpoints(); - Pool p; try { - p = PoolManager.createFactory().addLocator(host, port0).setPingInterval(250) + PoolManager.createFactory().addLocator(host, port0).setPingInterval(250) .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(20000) .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(3) .create("pool"); @@ -2374,9 +2344,9 @@ public class WANTestBase extends DistributedTestCase { } public static void doPutAll(String regionName, int numPuts, int size) { - Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); + Region<Long, String> r = cache.getRegion(SEPARATOR + regionName); for (long i = 0; i < numPuts; i++) { - Map putAllMap = new HashMap(); + Map<Long, String> putAllMap = new HashMap<>(); for (long j = 0; j < size; j++) { putAllMap.put((size * i) + j, "Value_" + i); } @@ -2393,24 +2363,24 @@ public class WANTestBase extends DistributedTestCase { } } - public static void putGivenKeyValue(String regionName, Map keyValues) { - Region<Object, Object> r = cache.getRegion(SEPARATOR + regionName); - for (Object key : keyValues.keySet()) { + public static <K, V> void putGivenKeyValue(String regionName, Map<K, V> keyValues) { + Region<K, V> r = cache.getRegion(SEPARATOR + regionName); + for (K key : keyValues.keySet()) { r.put(key, keyValues.get(key)); } } - public static void doOrderAndShipmentPutsInsideTransactions(Map keyValues, + public static void doOrderAndShipmentPutsInsideTransactions(Map<Object, Object> keyValues, int eventsPerTransaction) { - Region orderRegion = cache.getRegion(orderRegionName); - Region shipmentRegion = cache.getRegion(shipmentRegionName); + Region<Object, Object> orderRegion = cache.getRegion(orderRegionName); + Region<Object, Object> shipmentRegion = cache.getRegion(shipmentRegionName); int eventInTransaction = 0; CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager(); for (Object key : keyValues.keySet()) { if (eventInTransaction == 0) { cacheTransactionManager.begin(); } - Region r; + Region<Object, Object> r; if (key instanceof OrderId) { r = orderRegion; } else { @@ -2427,12 +2397,12 @@ public class WANTestBase extends DistributedTestCase { } } - public static void doPutsInsideTransactions(String regionName, Map keyValues, + public static <K, V> void doPutsInsideTransactions(String regionName, Map<K, V> keyValues, int eventsPerTransaction) { - Region<Object, Object> r = cache.getRegion(Region.SEPARATOR + regionName); + Region<K, V> r = cache.getRegion(Region.SEPARATOR + regionName); int eventInTransaction = 0; CacheTransactionManager cacheTransactionManager = cache.getCacheTransactionManager(); - for (Object key : keyValues.keySet()) { + for (K key : keyValues.keySet()) { if (eventInTransaction == 0) { cacheTransactionManager.begin(); } @@ -2469,18 +2439,19 @@ public class WANTestBase extends DistributedTestCase { } - public static Map putCustomerPartitionedRegion(int numPuts) { + public static Map<CustId, Customer> putCustomerPartitionedRegion(int numPuts) { String valueSuffix = ""; return putCustomerPartitionedRegion(numPuts, valueSuffix); } - public static Map updateCustomerPartitionedRegion(int numPuts) { + public static Map<CustId, Customer> updateCustomerPartitionedRegion(int numPuts) { String valueSuffix = "_update"; return putCustomerPartitionedRegion(numPuts, valueSuffix); } - protected static Map putCustomerPartitionedRegion(int numPuts, String valueSuffix) { - Map custKeyValues = new HashMap(); + protected static Map<CustId, Customer> putCustomerPartitionedRegion(int numPuts, + String valueSuffix) { + Map<CustId, Customer> custKeyValues = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Customer customer = new Customer("name" + i, "Address" + i + valueSuffix); @@ -2490,7 +2461,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(customerRegion.get(custid)).isEqualTo(customer); custKeyValues.put(custid, customer); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "putCustomerPartitionedRegion : failed while doing put operation in CustomerPartitionedRegion ", e); } @@ -2499,8 +2470,8 @@ public class WANTestBase extends DistributedTestCase { return custKeyValues; } - public static Map putOrderPartitionedRegion(int numPuts) { - Map orderKeyValues = new HashMap(); + public static Map<OrderId, Order> putOrderPartitionedRegion(int numPuts) { + Map<OrderId, Order> orderKeyValues = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); int oid = i + 1; @@ -2513,7 +2484,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(orderRegion.get(orderId)).isEqualTo(order); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ", e); } @@ -2522,8 +2493,8 @@ public class WANTestBase extends DistributedTestCase { return orderKeyValues; } - public static Map putOrderPartitionedRegionUsingCustId(int numPuts) { - Map orderKeyValues = new HashMap(); + public static Map<CustId, Order> putOrderPartitionedRegionUsingCustId(int numPuts) { + Map<CustId, Order> orderKeyValues = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Order order = new Order("ORDER" + i); @@ -2534,7 +2505,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(orderRegion.get(custid)).isEqualTo(order); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "putOrderPartitionedRegionUsingCustId : failed while doing put operation in OrderPartitionedRegion ", e); } @@ -2543,8 +2514,8 @@ public class WANTestBase extends DistributedTestCase { return orderKeyValues; } - public static Map updateOrderPartitionedRegion(int numPuts) { - Map orderKeyValues = new HashMap(); + public static Map<OrderId, Order> updateOrderPartitionedRegion(int numPuts) { + Map<OrderId, Order> orderKeyValues = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); int oid = i + 1; @@ -2557,7 +2528,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(orderRegion.get(orderId)).isEqualTo(order); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ", e); } @@ -2566,8 +2537,8 @@ public class WANTestBase extends DistributedTestCase { return orderKeyValues; } - public static Map updateOrderPartitionedRegionUsingCustId(int numPuts) { - Map orderKeyValues = new HashMap(); + public static Map<CustId, Order> updateOrderPartitionedRegionUsingCustId(int numPuts) { + Map<CustId, Order> orderKeyValues = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Order order = new Order("ORDER" + i + "_update"); @@ -2577,7 +2548,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(orderRegion.get(custid)).isEqualTo(order); orderKeyValues.put(custid, order); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "updateOrderPartitionedRegionUsingCustId : failed while doing put operation in OrderPartitionedRegion ", e); } @@ -2586,8 +2557,8 @@ public class WANTestBase extends DistributedTestCase { return orderKeyValues; } - public static Map putShipmentPartitionedRegion(int numPuts) { - Map shipmentKeyValue = new HashMap(); + public static Map<ShipmentId, Shipment> putShipmentPartitionedRegion(int numPuts) { + Map<ShipmentId, Shipment> shipmentKeyValue = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); int oid = i + 1; @@ -2601,7 +2572,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(shipmentRegion.get(shipmentId)).isEqualTo(shipment); shipmentKeyValue.put(shipmentId, shipment); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ", e); } @@ -2610,7 +2581,7 @@ public class WANTestBase extends DistributedTestCase { return shipmentKeyValue; } - public static void putcolocatedPartitionedRegion(int numPuts) { + public static void putColocatedPartitionedRegion(int numPuts) { for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Customer customer = new Customer("Customer" + custid, "Address" + custid); @@ -2626,8 +2597,8 @@ public class WANTestBase extends DistributedTestCase { } } - public static Map putShipmentPartitionedRegionUsingCustId(int numPuts) { - Map shipmentKeyValue = new HashMap(); + public static Map<CustId, Shipment> putShipmentPartitionedRegionUsingCustId(int numPuts) { + Map<CustId, Shipment> shipmentKeyValue = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Shipment shipment = new Shipment("Shipment" + i); @@ -2637,7 +2608,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(shipmentRegion.get(custid)).isEqualTo(shipment); shipmentKeyValue.put(custid, shipment); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "putShipmentPartitionedRegionUsingCustId : failed while doing put operation in ShipmentPartitionedRegion ", e); } @@ -2646,8 +2617,8 @@ public class WANTestBase extends DistributedTestCase { return shipmentKeyValue; } - public static Map updateShipmentPartitionedRegion(int numPuts) { - Map shipmentKeyValue = new HashMap(); + public static Map<ShipmentId, Shipment> updateShipmentPartitionedRegion(int numPuts) { + Map<ShipmentId, Shipment> shipmentKeyValue = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); int oid = i + 1; @@ -2661,7 +2632,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(shipmentRegion.get(shipmentId)).isEqualTo(shipment); shipmentKeyValue.put(shipmentId, shipment); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ", e); } @@ -2670,8 +2641,8 @@ public class WANTestBase extends DistributedTestCase { return shipmentKeyValue; } - public static Map updateShipmentPartitionedRegionUsingCustId(int numPuts) { - Map shipmentKeyValue = new HashMap(); + public static Map<CustId, Shipment> updateShipmentPartitionedRegionUsingCustId(int numPuts) { + Map<CustId, Shipment> shipmentKeyValue = new HashMap<>(); for (int i = 1; i <= numPuts; i++) { CustId custid = new CustId(i); Shipment shipment = new Shipment("Shipment" + i + "_update"); @@ -2681,7 +2652,7 @@ public class WANTestBase extends DistributedTestCase { assertThat(shipmentRegion.get(custid)).isEqualTo(shipment); shipmentKeyValue.put(custid, shipment); } catch (Exception e) { - org.apache.geode.test.dunit.Assert.fail( + fail( "updateShipmentPartitionedRegionUsingCustId : failed while doing put operation in ShipmentPartitionedRegion ", e); } @@ -2729,7 +2700,7 @@ public class WANTestBase extends DistributedTestCase { try { mgr.begin(); for (j = 0; j < putsPerTransaction; j++) { - long key = keyOffset + ((j + (10 * i)) * 100); + long key = keyOffset + ((j + (10L * i)) * 100); String value = "Value_" + key; r.put(key, value); } @@ -2810,6 +2781,7 @@ public class WANTestBase extends DistributedTestCase { break; } } + assertThat(sender).isNotNull(); if (sender.isParallel()) { final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); @@ -2839,6 +2811,8 @@ public class WANTestBase extends DistributedTestCase { break; } } + assertThat(sender).isNotNull(); + if (sender.isParallel()) { int totalSize = 0; Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); @@ -2857,14 +2831,14 @@ public class WANTestBase extends DistributedTestCase { final AtomicInteger threadNum = new AtomicInteger(); @Override - public Thread newThread(final Runnable r) { + public Thread newThread(final @NotNull Runnable r) { Thread result = new Thread(r, "Client Put Thread-" + threadNum.incrementAndGet()); result.setDaemon(true); return result; } }); - final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); + final Region<Integer, Integer> r = cache.getRegion(SEPARATOR + regionName); List<Callable<Object>> tasks = new ArrayList<>(); for (long i = 0; i < 5; i++) { @@ -2876,11 +2850,11 @@ public class WANTestBase extends DistributedTestCase { for (Future<Object> f : l) { f.get(); } - } catch (InterruptedException e1) { // TODO: eats exception + } catch (InterruptedException | ExecutionException e1) { + // TODO: eats exception e1.printStackTrace(); - } catch (ExecutionException e) { // TODO: eats exceptions - e.printStackTrace(); } + execService.shutdown(); } @@ -2913,30 +2887,30 @@ public class WANTestBase extends DistributedTestCase { boolean concurrencyChecksEnabled) { assertThat(vm1.invoke(() -> getRegionSize(regionName))) .isEqualTo(vm2.invoke(() -> getRegionSize(regionName))); - Map regionData1 = vm1.invoke(() -> getRegionData(regionName)); - Map regionData2 = vm2.invoke(() -> getRegionData(regionName)); + Map<?, ?> regionData1 = vm1.invoke(() -> getRegionData(regionName)); + Map<?, ?> regionData2 = vm2.invoke(() -> getRegionData(regionName)); assertThat(regionData1).isEqualTo(regionData2); if (concurrencyChecksEnabled) { - Map regionKeysTimestamps1 = vm1.invoke(() -> getKeysTimestamps(regionName)); - Map regionKeysTimestamps2 = vm2.invoke(() -> getKeysTimestamps(regionName)); + Map<?, ?> regionKeysTimestamps1 = vm1.invoke(() -> getKeysTimestamps(regionName)); + Map<?, ?> regionKeysTimestamps2 = vm2.invoke(() -> getKeysTimestamps(regionName)); assertThat(regionKeysTimestamps1).isEqualTo(regionKeysTimestamps2); } } - private Map getRegionData(String regionName) { - final Region<?, ?> region = cache.getRegion(SEPARATOR + regionName); - Map map = new HashMap(); - for (Object key : region.keySet()) { + private <K, V> Map<K, V> getRegionData(String regionName) { + final Region<K, V> region = cache.getRegion(SEPARATOR + regionName); + Map<K, V> map = new HashMap<>(); + for (K key : region.keySet()) { map.put(key, region.get(key)); } return map; } - private Map getKeysTimestamps(String regionName) { - final Region<?, ?> region = cache.getRegion(SEPARATOR + regionName); - Map map = new HashMap(); - for (Object key : region.keySet()) { + private <K> Map<K, Long> getKeysTimestamps(String regionName) { + final Region<K, ?> region = cache.getRegion(SEPARATOR + regionName); + Map<K, Long> map = new HashMap<>(); + for (K key : region.keySet()) { map.put(key, getTimestampForEntry(key, regionName)); } return map; @@ -2964,8 +2938,9 @@ public class WANTestBase extends DistributedTestCase { theListener = asyncQueue.getAsyncEventListener(); } } + assertThat(theListener).isNotNull(); - final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap(); + final Map<?, ?> eventsMap = ((MyAsyncEventListener) theListener).getEventsMap(); await() .untilAsserted(() -> assertThat(eventsMap.size()).as( "Expected map entries: " + expectedSize + " but actual entries: " + eventsMap.size()) @@ -2982,6 +2957,8 @@ public class WANTestBase extends DistributedTestCase { } } + assertThat(theAsyncEventQueue).isNotNull(); + final GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue).getSender(); if (sender.isParallel()) { @@ -3016,8 +2993,9 @@ public class WANTestBase extends DistributedTestCase { theListener = asyncQueue.getAsyncEventListener(); } } + assertThat(theListener).isNotNull(); - final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap(); + final Map<?, ?> eventsMap = ((MyAsyncEventListener) theListener).getEventsMap(); logger.info("The events map size is " + eventsMap.size()); return eventsMap.size(); } @@ -3118,11 +3096,11 @@ public class WANTestBase extends DistributedTestCase { return r.keySet().size(); } - public static void validateRegionContents(String regionName, final Map keyValues) { - final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); + public static <K, V> void validateRegionContents(String regionName, final Map<K, V> keyValues) { + final Region<K, V> r = cache.getRegion(SEPARATOR + regionName); await().untilAsserted(() -> { boolean matchFlag = true; - for (Object key : keyValues.keySet()) { + for (K key : keyValues.keySet()) { if (!r.get(key).equals(keyValues.get(key))) { logger.info("The values are for key " + " " + key + " " + r.get(key) + " in the map " + keyValues.get(key)); @@ -3145,11 +3123,11 @@ public class WANTestBase extends DistributedTestCase { } public static void addCacheListenerAndDestroyRegion(String regionName) { - final Region<?, ?> region = cache.getRegion(SEPARATOR + regionName); - CacheListenerAdapter cl = new CacheListenerAdapter() { + final Region<Long, Object> region = cache.getRegion(SEPARATOR + regionName); + CacheListenerAdapter<Long, Object> cl = new CacheListenerAdapter<Long, Object>() { @Override - public void afterCreate(EntryEvent event) { - if ((Long) event.getKey() == 99) { + public void afterCreate(EntryEvent<Long, Object> event) { + if (event.getKey() == 99) { region.destroyRegion(); } } @@ -3213,10 +3191,9 @@ public class WANTestBase extends DistributedTestCase { List<Locator> locatorsConfigured = Locator.getLocators(); Locator locator = locatorsConfigured.get(0); LocatorMembershipListener listener = ((InternalLocator) locator).getLocatorMembershipListener(); - if (listener == null) { - fail( - "No locator membership listener available. WAN is likely not enabled. Is this test in the WAN project?"); - } + assertThat(listener) + .as("No locator membership listener available. WAN is likely not enabled. Is this test in the WAN project?") + .isNotNull(); final Map<Integer, Set<DistributionLocatorId>> allSiteMetaData = listener.getAllLocatorsInfo(); System.out.println("allSiteMetaData : " + allSiteMetaData); @@ -3232,7 +3209,7 @@ public class WANTestBase extends DistributedTestCase { break; } } - if (false == completeFlag) { + if (!completeFlag) { break; } } @@ -3299,43 +3276,6 @@ public class WANTestBase extends DistributedTestCase { } - // Ensure that the sender's queue(s) have been closed. - public static void validateQueueClosedForConcurrentSerialGatewaySender(final String senderId) { - GatewaySender sender = getGatewaySender(senderId); - final Set<RegionQueue> regionQueue; - if (sender instanceof AbstractGatewaySender) { - regionQueue = ((AbstractGatewaySender) sender).getQueues(); - } else { - regionQueue = null; - } - assertThat(regionQueue).isEqualTo(null); - } - - public static void validateQueueContentsForConcurrentSerialGatewaySender(final String senderId, - final int regionSize) { - GatewaySender sender = getGatewaySender(senderId); - final Set<RegionQueue> regionQueue; - if (!sender.isParallel()) { - regionQueue = ((AbstractGatewaySender) sender).getQueues(); - } else { - regionQueue = null; - } - await().untilAsserted(() -> { - int size = 0; - for (RegionQueue q : regionQueue) { - size += q.size(); - } - assertThat(regionSize == size).isEqualTo(true); - }); - } - - public static Integer getSecondaryQueueContentSize(final String senderId) { - GatewaySender sender = getGatewaySender(senderId); - AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; - int size = abstractSender.getSecondaryEventQueueSize(); - return size; - } - public static String displayQueueContent(final RegionQueue queue) { if (queue instanceof ParallelGatewaySenderQueue) { ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) queue; @@ -3391,8 +3331,8 @@ public class WANTestBase extends DistributedTestCase { } return size; } else if (sender.isParallel()) { - RegionQueue regionQueue = null; - regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; + RegionQueue regionQueue = + ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0]; if (regionQueue instanceof ConcurrentParallelGatewaySenderQueue) { return ((ConcurrentParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary); } else if (regionQueue instanceof ParallelGatewaySenderQueue) { @@ -3428,22 +3368,18 @@ public class WANTestBase extends DistributedTestCase { GatewaySender sender = getGatewaySender(senderId); final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender; RegionQueue queue = abstractSender.getEventProcessor().queue; - await().untilAsserted(() -> { - assertThat(abstractSender.getEventQueueSize()).as( - "Expected events in all primary queues are drained but actual is " - + abstractSender.getEventQueueSize() + ". Queue content is: " - + displayQueueContent(queue)) - .isEqualTo(0); - }); + await().untilAsserted(() -> assertThat(abstractSender.getEventQueueSize()).as( + "Expected events in all primary queues are drained but actual is " + + abstractSender.getEventQueueSize() + ". Queue content is: " + + displayQueueContent(queue)) + .isEqualTo(0)); assertThat(abstractSender.getEventQueueSize()).as( "Expected events in all primary queues after drain is 0").isEqualTo(0); - await().untilAsserted(() -> { - assertThat(abstractSender.getSecondaryEventQueueSize()).as( - "Expected events in all secondary queues are drained but actual is " - + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: " - + displayQueueContent(queue)) - .isEqualTo(0); - }); + await().untilAsserted(() -> assertThat(abstractSender.getSecondaryEventQueueSize()).as( + "Expected events in all secondary queues are drained but actual is " + + abstractSender.getSecondaryEventQueueSize() + ". Queue content is: " + + displayQueueContent(queue)) + .isEqualTo(0)); assertThat(abstractSender.getSecondaryEventQueueSize()).as( "Expected events in all secondary queues after drain is 0").isEqualTo(0); } finally { @@ -3673,19 +3609,17 @@ public class WANTestBase extends DistributedTestCase { AbstractGatewaySender sender = (AbstractGatewaySender) getGatewaySender(senderId); assertThat(sender).isNull(); - String queueRegionNameSuffix = null; + final String queueRegionNameSuffix; if (isParallel) { queueRegionNameSuffix = ParallelGatewaySenderQueue.QSTRING; } else { queueRegionNameSuffix = "_SERIAL_GATEWAY_SENDER_QUEUE"; } - Set<InternalRegion> allRegions = ((GemFireCacheImpl) cache).getAllRegions(); - for (InternalRegion region : allRegions) { - if (region.getName().indexOf(senderId + queueRegionNameSuffix) != -1) { - fail("Region underlying the sender is not destroyed."); - } - } + assertThat(((GemFireCacheImpl) cache).getAllRegions()).allSatisfy( + r -> assertThat(r.getName()) + .as("Region underlying the sender is not destroyed.") + .doesNotContain(senderId + queueRegionNameSuffix)); } public static void destroyAsyncEventQueue(String id) { @@ -3712,18 +3646,20 @@ public class WANTestBase extends DistributedTestCase { public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { - private final Set discoveredLocators = new HashSet(); + private final Set<InetSocketAddress> discoveredLocators = new HashSet<>(); - private final Set removedLocators = new HashSet(); + private final Set<InetSocketAddress> removedLocators = new HashSet<>(); + @SuppressWarnings("unchecked") @Override - public synchronized void locatorsDiscovered(List locators) { + public synchronized void locatorsDiscovered(@SuppressWarnings("rawtypes") List locators) { discoveredLocators.addAll(locators); notifyAll(); } + @SuppressWarnings("unchecked") @Override - public synchronized void locatorsRemoved(List locators) { + public synchronized void locatorsRemoved(@SuppressWarnings("rawtypes") List locators) { removedLocators.addAll(locators); notifyAll(); } @@ -3734,7 +3670,8 @@ public class WANTestBase extends DistributedTestCase { } - private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) + private synchronized boolean waitFor(Set<InetSocketAddress> set, InetSocketAddress locator, + long time) throws InterruptedException { long remaining = time; long endTime = System.currentTimeMillis() + time; @@ -3745,27 +3682,27 @@ public class WANTestBase extends DistributedTestCase { return set.contains(locator); } - public synchronized Set getDiscovered() { - return new HashSet(discoveredLocators); + public synchronized Set<InetSocketAddress> getDiscovered() { + return new HashSet<>(discoveredLocators); } } - protected static class PutTask implements Callable { - private final Region region; + protected static class PutTask implements Callable<Object> { + private final Region<Integer, Integer> region; private final AtomicInteger key_value; private final int numPuts; - public PutTask(Region region, AtomicInteger key_value, int numPuts) { + public PutTask(Region<Integer, Integer> region, AtomicInteger key_value, int numPuts) { this.region = region; this.key_value = key_value; this.numPuts = numPuts; } @Override - public Object call() throws Exception { + public Object call() { while (true) { int key = key_value.incrementAndGet(); if (key < numPuts) { @@ -3789,31 +3726,27 @@ public class WANTestBase extends DistributedTestCase { public MyGatewayEventFilter() {} @Override - public boolean beforeEnqueue(GatewayQueueEvent event) { + public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { beforeEnqueueInvoked = true; return !((Long) event.getKey() >= 500 && (Long) event.getKey() < 600); } @Override - public boolean beforeTransmit(GatewayQueueEvent event) { + public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { beforeTransmitInvoked = true; return !((Long) event.getKey() >= 600 && (Long) event.getKey() < 700); } @Override - public void close() { - // TODO Auto-generated method stub - - } + public void close() {} public String toString() { return Id; } @Override - public void afterAcknowledgement(GatewayQueueEvent event) { + public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { afterAckInvoked = true; - // TODO Auto-generated method stub } public boolean equals(Object obj) { @@ -3837,31 +3770,28 @@ public class WANTestBase extends DistributedTestCase { public MyGatewayEventFilter_AfterAck() {} @Override - public boolean beforeEnqueue(GatewayQueueEvent event) { + public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { return true; } @Override - public boolean beforeTransmit(GatewayQueueEvent event) { + public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { return true; } @Override - public void close() { - // TODO Auto-generated method stub - - } + public void close() {} public String toString() { return Id; } @Override - public void afterAcknowledgement(GatewayQueueEvent event) { + public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { ackList.add((Long) event.getKey()); } - public Set getAckList() { + public Set<Long> getAckList() { return ackList; } @@ -3888,34 +3818,30 @@ public class WANTestBase extends DistributedTestCase { public PDXGatewayEventFilter() {} @Override - public boolean beforeEnqueue(GatewayQueueEvent event) { + public boolean beforeEnqueue(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { System.out.println("Invoked enqueue for " + event); beforeEnqueueInvoked++; return true; } @Override - public boolean beforeTransmit(GatewayQueueEvent event) { + public boolean beforeTransmit(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { System.out.println("Invoked transmit for " + event); beforeTransmitInvoked++; return true; } @Override - public void close() { - // TODO Auto-generated method stub - - } + public void close() {} public String toString() { return Id; } @Override - public void afterAcknowledgement(GatewayQueueEvent event) { + public void afterAcknowledgement(@SuppressWarnings("rawtypes") GatewayQueueEvent event) { System.out.println("Invoked afterAck for " + event); afterAckInvoked++; - // TODO Auto-generated method stub } public boolean equals(Object obj) { @@ -3933,17 +3859,17 @@ public class WANTestBase extends DistributedTestCase { @Override public final void preTearDown() throws Exception { cleanupVM(); - List<AsyncInvocation> invocations = new ArrayList<>(); + List<AsyncInvocation<?>> invocations = new ArrayList<>(); final Host host = getHost(0); for (int i = 0; i < host.getVMCount(); i++) { invocations.add(host.getVM(i).invokeAsync(WANTestBase::cleanupVM)); } - for (AsyncInvocation invocation : invocations) { + for (AsyncInvocation<?> invocation : invocations) { invocation.await(); } } - public static void cleanupVM() throws IOException { + public static void cleanupVM() { if (Locator.hasLocator()) { Locator.getLocator().stop(); } @@ -4004,7 +3930,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkAsyncQueueMBean(final VM vm, final boolean shouldExist) { SerializableRunnable checkAsyncQueueMBean = new SerializableRunnable("Check Async Queue MBean") { @@ -4027,7 +3952,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkProxyReceiver(final VM vm, final DistributedMember senderMember) { SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Receiver") { @Override @@ -4057,7 +3981,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkProxySender(final VM vm, final DistributedMember senderMember) { SerializableRunnable checkProxySender = new SerializableRunnable("Check Proxy Sender") { @Override @@ -4081,9 +4004,9 @@ public class WANTestBase extends DistributedTestCase { DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); await().untilAsserted(() -> { Map<String, Boolean> dsMap = dsBean.viewRemoteClusterStatus(); - dsMap.entrySet().stream() - .forEach(entry -> assertThat(entry.getValue()).as( - "Should be true " + entry.getKey()).isTrue()); + assertThat(dsMap).doesNotContainValue(false); + // dsMap.entrySet().forEach(entry -> assertThat(entry.getValue()).as( + // "Should be true " + entry.getKey()).isTrue()); }); } @@ -4097,7 +4020,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkReceiverMBean(final VM vm) { SerializableRunnable checkMBean = new SerializableRunnable("Check Receiver MBean") { @Override @@ -4110,7 +4032,6 @@ public class WANTestBase extends DistributedTestCase { vm.invoke(checkMBean); } - @SuppressWarnings("serial") public static void checkReceiverNavigationAPIS(final VM vm, final DistributedMember receiverMember) { SerializableRunnable checkNavigationAPIS = @@ -4143,7 +4064,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkSenderMBean(final VM vm, final String regionPath, boolean connected) { SerializableRunnable checkMBean = new SerializableRunnable("Check Sender MBean") { @Override @@ -4166,7 +4086,6 @@ public class WANTestBase extends DistributedTestCase { vm.invoke(checkMBean); } - @SuppressWarnings("serial") public static void checkSenderNavigationAPIS(final VM vm, final DistributedMember senderMember) { SerializableRunnable checkNavigationAPIS = new SerializableRunnable("Check Sender Navigation APIs") { @@ -4200,7 +4119,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void startGatewaySender(final VM vm) { SerializableRunnable stopGatewaySender = new SerializableRunnable("Start Gateway Sender") { @Override @@ -4219,7 +4137,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void stopGatewaySender(final VM vm) { SerializableRunnable stopGatewaySender = new SerializableRunnable("Stop Gateway Sender") { @Override @@ -4238,7 +4155,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkProxyAsyncQueue(final VM vm, final DistributedMember senderMember, final boolean shouldExist) { SerializableRunnable checkProxyAsyncQueue = @@ -4287,7 +4203,6 @@ public class WANTestBase extends DistributedTestCase { * * @param vm reference to VM */ - @SuppressWarnings("serial") public static void checkRemoteClusterStatus(final VM vm, final DistributedMember senderMember) { SerializableRunnable checkProxySender = new SerializableRunnable("DS Map Size") { @Override diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java index 34dcabe93d..497db7641d 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java @@ -335,7 +335,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.waitForSenderRunningState("ln")); vm7.invoke(() -> WANTestBase.waitForSenderRunningState("ln")); - vm4.invoke(() -> WANTestBase.putcolocatedPartitionedRegion(1000)); + vm4.invoke(() -> WANTestBase.putColocatedPartitionedRegion(1000)); // verify all buckets drained on all sender nodes. vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java index 976f6dbb06..648ec7bb1c 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java @@ -89,20 +89,15 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm3.invoke(() -> WANTestBase.waitForSenderRunningState("ln")); vm3.invoke(() -> WANTestBase.waitForSenderRunningState("ln2")); - AsyncInvocation asyncPuts = vm2.invokeAsync(() -> { - WANTestBase.doPuts(getTestMethodName() + "_PR", 1000); - }); + AsyncInvocation<Void> asyncPuts = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); // Guarantee some entries are in the queue even if the asyncPuts thread is slow - vm2.invoke(() -> { - WANTestBase.doPuts(getTestMethodName() + "_PR", 100); - }); + vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); vm2.invoke(() -> await() .until(() -> WANTestBase.getSenderStats("ln", -1).get(3) > 0)); vm2.invoke(() -> WANTestBase.stopSender("ln")); // Things have dispatched // Dispatch additional values - vm2.invoke(() -> { - WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 1000, 1100); - }); + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 1000, 1100)); await().until(asyncPuts::isDone); @@ -111,20 +106,17 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm4.invoke(() -> await() .untilAsserted(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1100))); - vm3.invoke(() -> { - await() - .untilAsserted( - () -> assertThat(WANTestBase.getQueueContentSize("ln2", true) == 0).as( - WANTestBase.getQueueContentSize("ln2", true) + " was the size").isTrue()); - }); + vm3.invoke(() -> await() + .untilAsserted( + () -> assertThat(WANTestBase.getQueueContentSize("ln2", true) == 0).as( + WANTestBase.getQueueContentSize("ln2", true) + " was the size").isTrue())); } // to test that when userPR is locally destroyed, shadow Pr is also locally // destroyed and on recreation userPr , shadow Pr is also recreated. @Test - public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() - throws Exception { + public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -167,8 +159,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() - throws Exception { + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -255,8 +246,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultaneousPut_RecreateRegion() - throws Exception { + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultaneousPut_RecreateRegion() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -270,7 +260,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm4.invoke(() -> resumeSender("ln")); - AsyncInvocation putAsync = + AsyncInvocation<Void> putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 10, 101)); try { putAsync.join(); @@ -286,12 +276,8 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes // before destroy, there is wait for queue to drain, so data will be // dispatched - vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 10, 101)); // possible - // size - // is - // more - // than - // 10 + // possible size is more than 10 + vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 10, 101)); vm4.invoke( () -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); @@ -300,12 +286,8 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm4.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10)); - vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 20, 101)); // possible - // size - // is - // more - // than - // 20 + // possible size is more than 20 + vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 20, 101)); } finally { vm4.invoke( ConcurrentParallelGatewaySenderOperation_2_DUnitTest::clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME); @@ -313,7 +295,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() throws Exception { + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() { IgnoredException.addIgnoredException("Broken pipe"); IgnoredException.addIgnoredException("Connection reset"); IgnoredException.addIgnoredException("Unexpected IOException"); @@ -333,10 +315,10 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm6.invoke(() -> WANTestBase.resumeSender("ln")); Wait.pause(200); - AsyncInvocation localDestroyAsync = + AsyncInvocation<Void> localDestroyAsync = vm4.invokeAsync(() -> WANTestBase.destroyRegion(getTestMethodName() + "_PR")); - AsyncInvocation closeAsync = vm4.invokeAsync(WANTestBase::closeCache); + AsyncInvocation<Void> closeAsync = vm4.invokeAsync(WANTestBase::closeCache); try { localDestroyAsync.join(); closeAsync.join(); @@ -358,8 +340,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultaneousPut_RecreateRegion() - throws Exception { + public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultaneousPut_RecreateRegion() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -385,9 +366,9 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm2.invoke(() -> validateRegionSize(regionName, 0)); - AsyncInvocation putAsync = + AsyncInvocation<Void> putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 10, 2000)); - AsyncInvocation localDestroyAsync = + AsyncInvocation<Void> localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest .closeRegion(getTestMethodName() + "_PR")); try { @@ -412,8 +393,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() - throws Exception { + public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -474,8 +454,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() - throws Exception { + public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -490,7 +469,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes String regionName = getTestMethodName() + "_PR"; vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10)); + AsyncInvocation<Void> inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10)); Wait.pause(1000); vm5.invoke(() -> localDestroyRegion(regionName)); @@ -518,8 +497,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySenders_MultipleNode_UserPR_localDestroy_Recreate() - throws Exception { + public void testParallelGatewaySenders_MultipleNode_UserPR_localDestroy_Recreate() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -538,7 +516,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm6.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 100, isOffHeap())); vm7.invoke(() -> WANTestBase.createPartitionedRegion(regionName, null, 1, 100, isOffHeap())); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10)); + AsyncInvocation<Void> inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(regionName, 10)); Wait.pause(1000); vm5.invoke(() -> WANTestBase.localDestroyRegion(regionName)); @@ -567,8 +545,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() - throws Exception { + public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -584,7 +561,8 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap())); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion(10)); + AsyncInvocation<Void> inv1 = + vm4.invokeAsync(() -> WANTestBase.putColocatedPartitionedRegion(10)); Wait.pause(1000); try { @@ -609,7 +587,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } @Test - public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception { + public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -626,7 +604,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap())); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion(2000)); + vm4.invokeAsync(() -> WANTestBase.putColocatedPartitionedRegion(2000)); Wait.pause(1000); try { @@ -649,14 +627,14 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes } public static void closeRegion(String regionName) { - Region r = cache.getRegion(SEPARATOR + regionName); + Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); assertThat(r).isNotNull(); r.close(); } public static void validateRegionSizeWithinRange(String regionName, final int min, final int max) { - final Region r = cache.getRegion(SEPARATOR + regionName); + final Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); assertThat(r).isNotNull(); WaitCriterion wc = new WaitCriterion() { @Override diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java index 9a75d6b9a1..a9fe2e04c3 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java @@ -73,7 +73,6 @@ import org.apache.geode.internal.offheap.MemoryAllocatorImpl; import org.apache.geode.internal.offheap.OffHeapClearRequired; import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; -import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.RMIException; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -86,7 +85,6 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTestName; * DUnit test for operations on ParallelGatewaySender */ @Category(WanTest.class) -@SuppressWarnings("serial") public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { @Rule @@ -115,7 +113,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test(timeout = 300_000) - public void testStopOneConcurrentGatewaySenderWithSSL() throws Exception { + public void testStopOneConcurrentGatewaySenderWithSSL() { Integer lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort)); @@ -198,7 +196,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * Normal scenario in which the sender is paused in between. */ @Test - public void testParallelPropagationSenderPause() throws Exception { + public void testParallelPropagationSenderPause() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -241,7 +239,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { int numPuts = 1000; // now, the senders are started. So, start the puts - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", numPuts)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", numPuts)); // now, pause all of the senders vm4.invoke(() -> pauseSender("ln")); @@ -268,7 +266,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * again. */ @Test - public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { + public void testParallelPropagationSenderResumeNegativeScenario() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -323,7 +321,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * Normal scenario in which a sender is stopped. */ @Test - public void testParallelPropagationSenderStop() throws Exception { + public void testParallelPropagationSenderStop() { addIgnoredException("Broken pipe"); Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -352,7 +350,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * Normal scenario in which a sender is stopped and then started again. */ @Test - public void testParallelPropagationSenderStartAfterStop() throws Exception { + public void testParallelPropagationSenderStartAfterStop() { addIgnoredException("Broken pipe"); Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; @@ -470,35 +468,30 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm2.invoke(() -> validateRegionSizeRemainsSame(getUniqueName() + "_PR", 200)); // SECOND RUN: start async puts on region - ArrayList<Integer> vm4List = null; - ArrayList<Integer> vm5List = null; - ArrayList<Integer> vm6List = null; - ArrayList<Integer> vm7List = null; boolean foundEventsDroppedDueToPrimarySenderNotRunning = false; int count = 0; do { stopSenders(); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); // when puts are happening by another thread, start the senders startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); async.join(); - vm4List = + ArrayList<Integer> vm4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm5List = + ArrayList<Integer> vm5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm6List = + ArrayList<Integer> vm6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); - vm7List = + ArrayList<Integer> vm7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStatsForDroppedEvents("ln")); if (vm4List.get(0) + vm5List.get(0) + vm6List.get(0) + vm7List.get(0) > 0) { foundEventsDroppedDueToPrimarySenderNotRunning = true; } count++; - } while (foundEventsDroppedDueToPrimarySenderNotRunning == false && count < 5); - assertThat(foundEventsDroppedDueToPrimarySenderNotRunning); + } while (!foundEventsDroppedDueToPrimarySenderNotRunning && count < 5); // verify all the buckets on all the sender nodes are drained validateParallelSenderQueueAllBucketsDrained(); @@ -513,7 +506,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { * Normal scenario in which a sender is stopped and then started again on accessor node. */ @Test - public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception { + public void testParallelPropagationSenderStartAfterStopOnAccessorNode() { addIgnoredException("Broken pipe"); addIgnoredException("Connection reset"); addIgnoredException("Unexpected IOException"); @@ -590,7 +583,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm6.invoke(() -> verifySenderPausedState("ln")); vm7.invoke(() -> verifySenderPausedState("ln")); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 1000)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 1000)); vm4.invoke(() -> resumeSender("ln")); vm5.invoke(() -> resumeSender("ln")); @@ -811,9 +804,8 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { GeodeAwaitility.await() .untilAsserted(() -> assertThat(System.getProperty(MAX_MESSAGE_SIZE_PROPERTY)) .isEqualTo(String.valueOf(1024 * 1024))); - GeodeAwaitility.await().untilAsserted(() -> { - assertThat(sender.getStatistics().getBatchesResized()).isGreaterThan(0); - }); + GeodeAwaitility.await().untilAsserted( + () -> assertThat(sender.getStatistics().getBatchesResized()).isGreaterThan(0)); }); } @@ -891,8 +883,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test - public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans() - throws Exception { + public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans() { MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties()); Properties properties = new Properties(); properties.put(OFF_HEAP_MEMORY_SIZE_NAME, "100"); @@ -901,15 +892,17 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { final String gatewaySenderId = "ln"; server.invoke(() -> { - IgnoredException ie = addIgnoredException("could not get remote locator"); + addIgnoredException("could not get remote locator"); InternalCache cache = ClusterStartupRule.getCache(); GatewaySender sender = cache.createGatewaySenderFactory().setParallel(true).create(gatewaySenderId, 1); - Region userRegion = cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true) - .addGatewaySenderId("ln").create(regionName); + Region<Object, Object> userRegion = + cache.createRegionFactory(RegionShortcut.PARTITION).setOffHeap(true) + .addGatewaySenderId("ln").create(regionName); PartitionedRegion shadowRegion = (PartitionedRegion) ((AbstractGatewaySender) sender) .getEventProcessor().getQueue().getRegion(); - CacheWriter mockCacheWriter = mock(CacheWriter.class); + @SuppressWarnings("unchecked") + CacheWriter<Object, Object> mockCacheWriter = mock(CacheWriter.class); CountDownLatch cacheWriterLatch = new CountDownLatch(1); CountDownLatch shadowRegionClearLatch = new CountDownLatch(1); @@ -987,7 +980,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test - public void testParallelGWSenderUpdateAttrWhileEntriesInQueue() throws Exception { + public void testParallelGWSenderUpdateAttrWhileEntriesInQueue() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -1044,7 +1037,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm6.invoke(() -> waitForSenderRunningState("ln")); vm7.invoke(() -> waitForSenderRunningState("ln")); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); updateBatchSize(100); updateBatchTimeInterval(150); @@ -1057,7 +1050,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { } @Test - public void testParallelGWSenderUpdateAttrWhilePaused() throws Exception { + public void testParallelGWSenderUpdateAttrWhilePaused() { Integer[] locatorPorts = createLNAndNYLocators(); Integer lnPort = locatorPorts[0]; Integer nyPort = locatorPorts[1]; @@ -1135,7 +1128,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm6.invoke(() -> waitForSenderRunningState("ln")); vm7.invoke(() -> waitForSenderRunningState("ln")); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); checkBatchSize(10); updateBatchSize(100); @@ -1184,7 +1177,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { vm6.invoke(() -> waitForSenderRunningState("ln")); vm7.invoke(() -> waitForSenderRunningState("ln")); - AsyncInvocation async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); + AsyncInvocation<Void> async = vm4.invokeAsync(() -> doPuts(getUniqueName() + "_PR", 5000)); checkBatchSize(10); updateBatchSize(100); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java index 33e09a6168..e7dafb0e95 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java @@ -25,6 +25,12 @@ import java.util.Random; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.internal.cache.execute.data.CustId; +import org.apache.geode.internal.cache.execute.data.Customer; +import org.apache.geode.internal.cache.execute.data.Order; +import org.apache.geode.internal.cache.execute.data.OrderId; +import org.apache.geode.internal.cache.execute.data.Shipment; +import org.apache.geode.internal.cache.execute.data.ShipmentId; import org.apache.geode.internal.cache.wan.WANTestBase; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.junit.categories.WanTest; @@ -43,7 +49,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { } @Test - public void testParallelPropagationConflationDisabled() throws Exception { + public void testParallelPropagationConflationDisabled() { initialSetUp(); createSendersNoConflation(); @@ -54,11 +60,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createReceiverPrs(); - final Map keyValues = putKeyValues(); + final Map<Integer, Object> keyValues = putKeyValues(); vm4.invoke(() -> checkQueueSize("ln", keyValues.size())); - final Map updateKeyValues = updateKeyValues(); + final Map<Integer, Object> updateKeyValues = updateKeyValues(); vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size()))); @@ -82,7 +88,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { * */ @Test - public void testParallelPropagationBatchConflation() throws Exception { + public void testParallelPropagationBatchConflation() { initialSetUp(); vm4.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true)); @@ -98,7 +104,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createReceiverPrs(); - final Map keyValues = new HashMap(); + final Map<Integer, Integer> keyValues = new HashMap<>(); for (int i = 1; i <= 10; i++) { for (int j = 1; j <= 10; j++) { @@ -135,12 +141,12 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { assertThat((v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0).as( "No events conflated in batch").isTrue(); - verifySecondaryEventQueuesDrained("ln"); + verifySecondaryEventQueuesDrained(); vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10)); validateEventsProcessedByPQRM(100, 1); } - private void verifySecondaryEventQueuesDrained(final String senderId) { + private void verifySecondaryEventQueuesDrained() { await().untilAsserted(() -> { int vm4SecondarySize = vm4.invoke(() -> getSecondaryQueueSizeInStats("ln")); int vm5SecondarySize = vm5.invoke(() -> getSecondaryQueueSizeInStats("ln")); @@ -155,16 +161,16 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { } @Test - public void testParallelPropagationConflation() throws Exception { + public void testParallelPropagationConflation() { doTestParallelPropagationConflation(0); } @Test - public void testParallelPropagationConflationRedundancy2() throws Exception { + public void testParallelPropagationConflationRedundancy2() { doTestParallelPropagationConflation(2); } - public void doTestParallelPropagationConflation(int redundancy) throws Exception { + public void doTestParallelPropagationConflation(int redundancy) { initialSetUp(); createSendersWithConflation(); @@ -175,10 +181,10 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createReceiverPrs(); - final Map keyValues = putKeyValues(); + final Map<Integer, Object> keyValues = putKeyValues(); vm4.invoke(() -> checkQueueSize("ln", keyValues.size())); - final Map updateKeyValues = updateKeyValues(); + final Map<Integer, Object> updateKeyValues = updateKeyValues(); vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates // aren't @@ -205,7 +211,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { // after dispatch, both primary and secondary queues are empty vm4.invoke(() -> checkQueueSize("ln", 0)); - verifySecondaryEventQueuesDrained("ln"); + verifySecondaryEventQueuesDrained(); validateSecondaryEventQueueSize(0, redundancy); validateEventsProcessedByPQRM(totalEventsProcessedByPQRM, redundancy); } @@ -246,7 +252,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { } @Test - public void testParallelPropagationConflationOfRandomKeys() throws Exception { + public void testParallelPropagationConflationOfRandomKeys() { initialSetUp(); createSendersWithConflation(); @@ -257,11 +263,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createReceiverPrs(); - final Map keyValues = putKeyValues(); + final Map<Integer, Object> keyValues = putKeyValues(); vm4.invoke(() -> checkQueueSize("ln", keyValues.size())); - final Map updateKeyValues = new HashMap(); + final Map<Integer, Object> updateKeyValues = new HashMap<>(); while (updateKeyValues.size() != 10) { int key = (new Random()).nextInt(keyValues.size()); updateKeyValues.put(key, key + "_updated"); @@ -285,7 +291,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { } @Test - public void testParallelPropagationColocatedRegionConflation() throws Exception { + public void testParallelPropagationColocatedRegionConflation() { initialSetUp(); createSendersWithConflation(); @@ -296,16 +302,19 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createOrderShipmentOnReceivers(); - Map custKeyValues = vm4.invoke(() -> putCustomerPartitionedRegion(20)); - Map orderKeyValues = vm4.invoke(() -> putOrderPartitionedRegion(20)); - Map shipmentKeyValues = vm4.invoke(() -> putShipmentPartitionedRegion(20)); + Map<CustId, Customer> custKeyValues = vm4.invoke(() -> putCustomerPartitionedRegion(20)); + Map<OrderId, Order> orderKeyValues = vm4.invoke(() -> putOrderPartitionedRegion(20)); + Map<ShipmentId, Shipment> shipmentKeyValues = + vm4.invoke(() -> putShipmentPartitionedRegion(20)); vm4.invoke(() -> WANTestBase.checkQueueSize("ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues.size()))); - Map updatedCustKeyValues = vm4.invoke(() -> updateCustomerPartitionedRegion(10)); - Map updatedOrderKeyValues = vm4.invoke(() -> updateOrderPartitionedRegion(10)); - Map updatedShipmentKeyValues = vm4.invoke(() -> updateShipmentPartitionedRegion(10)); + Map<CustId, Customer> updatedCustKeyValues = + vm4.invoke(() -> updateCustomerPartitionedRegion(10)); + Map<OrderId, Order> updatedOrderKeyValues = vm4.invoke(() -> updateOrderPartitionedRegion(10)); + Map<ShipmentId, Shipment> updatedShipmentKeyValues = + vm4.invoke(() -> updateShipmentPartitionedRegion(10)); int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues.size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size(); @@ -337,7 +346,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { // // This is the same as the previous test, except for the UsingCustId methods @Test - public void testParallelPropagationColocatedRegionConflationSameKey() throws Exception { + public void testParallelPropagationColocatedRegionConflationSameKey() { initialSetUp(); createSendersWithConflation(); @@ -348,16 +357,19 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { createOrderShipmentOnReceivers(); - Map custKeyValues = vm4.invoke(() -> putCustomerPartitionedRegion(20)); - Map orderKeyValues = vm4.invoke(() -> putOrderPartitionedRegionUsingCustId(20)); - Map shipmentKeyValues = vm4.invoke(() -> putShipmentPartitionedRegionUsingCustId(20)); + Map<CustId, Customer> custKeyValues = vm4.invoke(() -> putCustomerPartitionedRegion(20)); + Map<CustId, Order> orderKeyValues = vm4.invoke(() -> putOrderPartitionedRegionUsingCustId(20)); + Map<CustId, Shipment> shipmentKeyValues = + vm4.invoke(() -> putShipmentPartitionedRegionUsingCustId(20)); vm4.invoke(() -> checkQueueSize("ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues.size()))); - Map updatedCustKeyValues = vm4.invoke(() -> updateCustomerPartitionedRegion(10)); - Map updatedOrderKeyValues = vm4.invoke(() -> updateOrderPartitionedRegionUsingCustId(10)); - Map updatedShipmentKeyValues = + Map<CustId, Customer> updatedCustKeyValues = + vm4.invoke(() -> updateCustomerPartitionedRegion(10)); + Map<CustId, Order> updatedOrderKeyValues = + vm4.invoke(() -> updateOrderPartitionedRegionUsingCustId(10)); + Map<CustId, Shipment> updatedShipmentKeyValues = vm4.invoke(() -> updateShipmentPartitionedRegionUsingCustId(10)); int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues.size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() @@ -388,8 +400,8 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { validateColocatedRegionContents(custKeyValues, orderKeyValues, shipmentKeyValues); } - protected void validateColocatedRegionContents(Map custKeyValues, Map orderKeyValues, - Map shipmentKeyValues) { + protected void validateColocatedRegionContents(Map<?, ?> custKeyValues, Map<?, ?> orderKeyValues, + Map<?, ?> shipmentKeyValues) { vm2.invoke(() -> validateRegionSize(WANTestBase.customerRegionName, custKeyValues.size())); vm2.invoke(() -> validateRegionSize(WANTestBase.orderRegionName, orderKeyValues.size())); vm2.invoke(() -> validateRegionSize(WANTestBase.shipmentRegionName, shipmentKeyValues.size())); @@ -419,8 +431,8 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { vm7.invoke(() -> createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap())); } - protected Map updateKeyValues() { - final Map updateKeyValues = new HashMap(); + protected Map<Integer, Object> updateKeyValues() { + final Map<Integer, Object> updateKeyValues = new HashMap<>(); for (int i = 0; i < 10; i++) { updateKeyValues.put(i, i + "_updated"); } @@ -429,8 +441,8 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { return updateKeyValues; } - protected Map putKeyValues() { - final Map keyValues = new HashMap(); + protected Map<Integer, Object> putKeyValues() { + final Map<Integer, Object> keyValues = new HashMap<>(); for (int i = 0; i < 20; i++) { keyValues.put(i, i); } @@ -440,7 +452,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase { return keyValues; } - protected void validateReceiverRegionSize(final Map keyValues) { + protected <K, V> void validateReceiverRegionSize(final Map<K, V> keyValues) { vm2.invoke(() -> validateRegionSize(getTestMethodName(), keyValues.size())); vm3.invoke(() -> validateRegionSize(getTestMethodName(), keyValues.size())); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java index 46285563eb..44616cf237 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java @@ -17,8 +17,8 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.junit.Assert.assertEquals; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.Map; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -75,13 +75,13 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.pauseSender("ln")); vm5.invoke(() -> WANTestBase.pauseSender("ln")); - AsyncInvocation async1 = + AsyncInvocation<?> async1 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 700)); - AsyncInvocation async2 = + AsyncInvocation<?> async2 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); - AsyncInvocation async3 = + AsyncInvocation<?> async3 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 800)); - AsyncInvocation async4 = + AsyncInvocation<?> async4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); async1.join(); @@ -134,13 +134,13 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { vm4.invoke(() -> WANTestBase.waitForSenderRunningState("ln")); vm5.invoke(() -> WANTestBase.waitForSenderRunningState("ln")); - AsyncInvocation async1 = + AsyncInvocation<?> async1 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 700)); - AsyncInvocation async2 = + AsyncInvocation<?> async2 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); - AsyncInvocation async3 = + AsyncInvocation<?> async3 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 800)); - AsyncInvocation async4 = + AsyncInvocation<?> async4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); async1.join(); @@ -152,7 +152,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { } @Test - public void testParallelQueueDrainInOrder_PR() throws Exception { + public void testParallelQueueDrainInOrder_PR() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -207,24 +207,24 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); - HashMap vm4BRUpdates = + Map<String, List<Object>> vm4BRUpdates = vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm5BRUpdates = + Map<String, List<Object>> vm5BRUpdates = vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm6BRUpdates = + Map<String, List<Object>> vm6BRUpdates = vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm7BRUpdates = + Map<String, List<Object>> vm7BRUpdates = vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - List b0SenderUpdates = (List) vm4BRUpdates.get("Create0"); - List b1SenderUpdates = (List) vm4BRUpdates.get("Create1"); - List b2SenderUpdates = (List) vm4BRUpdates.get("Create2"); - List b3SenderUpdates = (List) vm4BRUpdates.get("Create3"); + List<Object> b0SenderUpdates = vm4BRUpdates.get("Create0"); + List<Object> b1SenderUpdates = vm4BRUpdates.get("Create1"); + List<Object> b2SenderUpdates = vm4BRUpdates.get("Create2"); + List<Object> b3SenderUpdates = vm4BRUpdates.get("Create3"); - HashMap vm4QueueBRUpdates = vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm5QueueBRUpdates = vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm6QueueBRUpdates = vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm7QueueBRUpdates = vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + Map<?, ?> vm4QueueBRUpdates = vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + Map<?, ?> vm5QueueBRUpdates = vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + Map<?, ?> vm6QueueBRUpdates = vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + Map<?, ?> vm7QueueBRUpdates = vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates); assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates); @@ -235,9 +235,9 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.resumeSender("ln")); vm7.invoke(() -> WANTestBase.resumeSender("ln")); vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); - HashMap receiverUpdates = + Map<String, List<Long>> receiverUpdates = vm2.invoke(() -> WANTestBase.checkPR(getTestMethodName() + "_PR")); - List<Long> createList = (List) receiverUpdates.get("Create"); + List<Long> createList = receiverUpdates.get("Create"); ArrayList<Long> b0ReceiverUpdates = new ArrayList<>(); ArrayList<Long> b1ReceiverUpdates = new ArrayList<>(); ArrayList<Long> b2ReceiverUpdates = new ArrayList<>(); diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java index b6dccd2120..79f7afb438 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -438,7 +438,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase { final Map<Object, Object> custKeyValue = new HashMap<>(); CustId custId = new CustId(intCustId); custKeyValue.put(custId, new Customer()); - customerData.add(new HashMap()); + customerData.add(new HashMap<>()); vm4.invoke(() -> WANTestBase.putGivenKeyValue(customerRegionName, custKeyValue)); for (int i = 0; i < transactions; i++) { diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java index b69d7f645f..7fc0d15fd9 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java @@ -22,8 +22,8 @@ import static org.assertj.core.api.Assertions.fail; import java.io.File; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -31,9 +31,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.DiskStoreFactory; -import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.wan.GatewayEventFilter; @@ -99,9 +97,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { () -> WANTestBase.createReplicatedProxyRegion(getTestMethodName() + "_RR", "ln", isOffHeap())); - AsyncInvocation a1 = + AsyncInvocation<Void> a1 = vm6.invokeAsync(() -> WANTestBase.doPutsSameKey(getTestMethodName() + "_RR", 2000, "DA")); - AsyncInvocation a2 = + AsyncInvocation<Void> a2 = vm7.invokeAsync(() -> WANTestBase.doPutsSameKey(getTestMethodName() + "_RR", 2000, "DA")); a1.await(); @@ -133,7 +131,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { @Test - public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception { + public void testPrimarySecondaryQueueDrainInOrder_RR() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -186,8 +184,8 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { // secondary queue size stats in serial queue should be 0 assertThat(v4List.get(10) + v5List.get(10)).isEqualTo(0); - HashMap primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); - HashMap secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue); + Map<String, List<?>> primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); + Map<String, List<?>> secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue); assertThat(secondarySenderUpdates).isEqualTo(primarySenderUpdates); vm4.invoke(() -> WANTestBase.resumeSender("ln")); @@ -206,10 +204,10 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { Wait.pause(5000); vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); - HashMap receiverUpdates = vm2.invoke(WANTestBase::checkQueue); + Map<String, List<?>> receiverUpdates = vm2.invoke(WANTestBase::checkQueue); - List destroyList = (List) primarySenderUpdates.get("Destroy"); - List createList = (List) receiverUpdates.get("Create"); + List<?> destroyList = primarySenderUpdates.get("Destroy"); + List<?> createList = receiverUpdates.get("Create"); for (int i = 0; i < 1000; i++) { assertThat(createList.get(i)).isEqualTo(destroyList.get(i)); } @@ -226,12 +224,12 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { } - protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) { + protected void checkPrimarySenderUpdatesOnVM5(Map<String, List<?>> primarySenderUpdates) { vm5.invoke(() -> WANTestBase.checkQueueOnSecondary(primarySenderUpdates)); } @Test - public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception { + public void testPrimarySecondaryQueueDrainInOrder_PR() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -272,8 +270,8 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); Wait.pause(5000); - HashMap primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); - HashMap secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue); + Map<String, List<?>> primarySenderUpdates = vm4.invoke(WANTestBase::checkQueue); + Map<String, List<?>> secondarySenderUpdates = vm5.invoke(WANTestBase::checkQueue); checkPrimarySenderUpdatesOnVM5(primarySenderUpdates); vm4.invoke(() -> WANTestBase.resumeSender("ln")); @@ -297,8 +295,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { public void test_ValidateSerialGatewaySenderQueueAttributes_1() { Integer localLocPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer remoteLocPort = - vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort)); + vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort)); WANTestBase test = new WANTestBase(); Properties props = test.getDistributedSystemProperties(); @@ -313,7 +310,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { File[] dirs1 = new File[] {directory}; DiskStoreFactory dsf = cache.createDiskStoreFactory(); dsf.setDiskDirs(dirs1); - DiskStore diskStore = dsf.create("FORNY"); + dsf.create("FORNY"); GatewaySenderFactory fact = cache.createGatewaySenderFactory(); fact.setBatchConflationEnabled(true); @@ -334,9 +331,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { try { GatewaySender sender1 = fact.create("TKSender", 2); - RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); regionFactory.addGatewaySenderId(sender1.getId()); - Region region = regionFactory.create("test_ValidateGatewaySenderAttributes"); + regionFactory.create("test_ValidateGatewaySenderAttributes"); Set<GatewaySender> senders = cache.getGatewaySenders(); assertThat(1).isEqualTo(senders.size()); GatewaySender gatewaySender = senders.iterator().next(); @@ -357,8 +354,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { public void test_ValidateSerialGatewaySenderQueueAttributes_2() { Integer localLocPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer remoteLocPort = - vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort)); + vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, localLocPort)); WANTestBase test = new WANTestBase(); Properties props = test.getDistributedSystemProperties(); @@ -384,9 +380,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect"); try { GatewaySender sender1 = fact.create("TKSender", 2); - RegionFactory regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); + RegionFactory<?, ?> regionFactory = cache.createRegionFactory(RegionShortcut.PARTITION); regionFactory.addGatewaySenderId(sender1.getId()); - Region region = regionFactory.create("test_ValidateGatewaySenderAttributes"); + regionFactory.create("test_ValidateGatewaySenderAttributes"); Set<GatewaySender> senders = cache.getGatewaySenders(); assertThat(1).isEqualTo(senders.size()); GatewaySender gatewaySender = senders.iterator().next(); @@ -458,8 +454,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { // Create sender with batchSize disabled vm4.invoke(() -> WANTestBase.createCache(lnPort)); - String senderId = "ln"; - final String builder = String.valueOf(senderId); + final String senderId = "ln"; vm4.invoke(() -> { InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory) cache.createGatewaySenderFactory(); @@ -475,7 +470,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { // Create region with the sender ids vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", - builder, isOffHeap())); + senderId, isOffHeap())); // Do puts int numPuts = 100; diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java index 0bc35b7c03..de4a19abaa 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java @@ -33,6 +33,7 @@ import org.apache.geode.internal.cache.wan.WANTestBase; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.junit.categories.WanTest; +@SuppressWarnings("deprecation") @Category({WanTest.class}) public class SerialWANStatsDUnitTest extends WANTestBase { @@ -122,7 +123,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - final Map keyValues = new HashMap(); + final Map<Integer, String> keyValues = new HashMap<>(); int entries = 12; for (int i = 0; i < entries; i++) { keyValues.put(i, i + "_Value"); @@ -179,7 +180,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - final Map keyValues = new HashMap(); + final Map<Integer, String> keyValues = new HashMap<>(); int entries = 12; for (int i = 0; i < entries; i++) { keyValues.put(i, i + "_Value"); @@ -279,17 +280,17 @@ public class SerialWANStatsDUnitTest extends WANTestBase { int entries = entriesPerInvocation * clients; - List<AsyncInvocation> invocations = new ArrayList<>(clients); + List<AsyncInvocation<?>> invocations = new ArrayList<>(clients); for (int i = 0; i < clients; i++) { final int index = i; - AsyncInvocation asyncInvocation = + AsyncInvocation<?> asyncInvocation = vm4.invokeAsync(() -> WANTestBase.doPutsInsideTransactions(regionName, data.get(index), eventsPerTransaction)); invocations.add(asyncInvocation); } try { - for (AsyncInvocation invocation : invocations) { + for (AsyncInvocation<?> invocation : invocations) { invocation.await(); } } catch (InterruptedException e) { @@ -340,7 +341,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - final Map keyValues = new HashMap(); + final Map<Integer, String> keyValues = new HashMap<>(); int entries = 24; for (int i = 0; i < entries; i++) { keyValues.put(i, i + "_Value"); @@ -371,8 +372,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() - throws Exception { + public void testReplicatedSerialPropagationWithBatchRedistWithGroupTransactionEventsSendsBatchesWithCompleteTransactions() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -404,7 +404,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - final Map keyValues = new HashMap(); + final Map<Integer, String> keyValues = new HashMap<>(); int entries = 24; for (int i = 0; i < entries; i++) { keyValues.put(i, i + "_Value"); @@ -445,7 +445,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { + public void testReplicatedSerialPropagationWithMultipleDispatchers() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -484,7 +484,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testWANStatsTwoWanSites() throws Exception { + public void testWANStatsTwoWanSites() { Integer lnPort = createFirstLocatorWithDSId(1); Integer nyPort = vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -568,18 +568,17 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName + "_RR", 10000)); + AsyncInvocation<?> inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts(testName + "_RR", 10000)); pause(2000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender("ln")); + AsyncInvocation<Boolean> inv2 = vm4.invokeAsync(() -> WANTestBase.killSender("ln")); Boolean isKilled = Boolean.FALSE; try { - isKilled = (Boolean) inv2.getResult(); + isKilled = inv2.getResult(); } catch (Throwable e) { fail("Unexpected exception while killing a sender"); } - AsyncInvocation inv3 = null; if (!isKilled) { - inv3 = vm5.invokeAsync(() -> WANTestBase.killSender("ln")); + AsyncInvocation<?> inv3 = vm5.invokeAsync(() -> WANTestBase.killSender("ln")); inv3.join(); } inv1.join(); @@ -630,25 +629,24 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm6.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); vm7.invoke(() -> WANTestBase.createReplicatedRegion(testName + "_RR", "ln", isOffHeap())); - AsyncInvocation inv1 = + AsyncInvocation<?> inv1 = vm6.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName + "_RR", 2, 5000, 0)); - AsyncInvocation inv2 = + AsyncInvocation<?> inv2 = vm7.invokeAsync(() -> WANTestBase.doTxPutsWithRetryIfError(testName + "_RR", 2, 5000, 1)); vm2.invoke(() -> await() .untilAsserted(() -> assertThat(getRegionSize(testName + "_RR") > 40).as( "Waiting for some batches to be received").isEqualTo(true))); - AsyncInvocation inv3 = vm4.invokeAsync(() -> WANTestBase.killSender("ln")); - Boolean isKilled = Boolean.FALSE; + AsyncInvocation<Boolean> inv3 = vm4.invokeAsync(() -> WANTestBase.killSender("ln")); + boolean isKilled = false; try { - isKilled = (Boolean) inv3.getResult(); + isKilled = inv3.getResult(); } catch (Throwable e) { fail("Unexpected exception while killing a sender"); } - AsyncInvocation inv4; if (!isKilled) { - inv4 = vm5.invokeAsync(() -> WANTestBase.killSender("ln")); + AsyncInvocation<?> inv4 = vm5.invokeAsync(() -> WANTestBase.killSender("ln")); inv4.join(); } inv1.join(); @@ -747,7 +745,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { * */ @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() { int numEntries = 2000; Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -785,7 +783,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { startSenderInVMs("ln", vm4, vm5); // start puts in RR_1 in another thread - AsyncInvocation inv1 = + AsyncInvocation<?> inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts(testName + "_RR_1", numEntries)); // destroy RR_1 in remote site vm2.invoke(() -> WANTestBase.destroyRegion(testName + "_RR_1", 5)); @@ -861,7 +859,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase { } @Test - public void testSerialPropagationConflation() throws Exception { + public void testSerialPropagationConflation() { Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); @@ -884,8 +882,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase { vm2.invoke(() -> WANTestBase.createPartitionedRegion(testName, null, 1, 100, isOffHeap())); vm3.invoke(() -> WANTestBase.createPartitionedRegion(testName, null, 1, 100, isOffHeap())); - final Map keyValues = new HashMap(); - final Map updateKeyValues = new HashMap(); + final Map<Integer, Object> keyValues = new HashMap<>(); + final Map<Integer, String> updateKeyValues = new HashMap<>(); for (int i = 0; i < 1000; i++) { keyValues.put(i, i); }