This is an automated email from the ASF dual-hosted git repository. alberto pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 7ccdc8297b GEODE-10155: Avoid threads hanging when function execution times-out (#7493) 7ccdc8297b is described below commit 7ccdc8297b1ded06175f41543884d945d5995c60 Author: Alberto Gomez <alberto.go...@est.tech> AuthorDate: Mon Jun 6 20:18:46 2022 +0200 GEODE-10155: Avoid threads hanging when function execution times-out (#7493) * GEODE-10155: Avoid threads hanging when function execution times-out * GEODE-10155: Updated after review * GEODE-10155: More changes after review * GEODE-10155: Changes after more reviews * GEODE-10155: Some more changes after review * GEODE-10155: More changes after review * GEODE-10155: More clean-up after review --- ...nctionExecutionNoSingleHopDistributedTest.java} | 895 ++++++++++----------- .../cache/execute/PRClientServerTestBase.java | 351 ++++---- .../PartitionedRegionFunctionResultSender.java | 77 +- .../PartitionedRegionFunctionResultSenderTest.java | 174 ++++ .../internal/cache/functions/TestFunction.java | 23 +- .../geode/test/junit/rules/ServerStarterRule.java | 9 + .../sanctioned-geode-dunit-serializables.txt | 2 +- 7 files changed, 864 insertions(+), 667 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java similarity index 53% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java index a76a7157b8..1c66b3d2e0 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java @@ -14,10 +14,9 @@ */ package org.apache.geode.internal.cache.execute; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.Serializable; import java.util.ArrayList; @@ -27,6 +26,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.logging.log4j.Logger; import org.junit.Test; @@ -37,6 +38,7 @@ import org.junit.runners.Parameterized.UseParametersRunnerFactory; import org.apache.geode.cache.CacheClosedException; import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionAdapter; @@ -50,11 +52,10 @@ import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.functions.TestFunction; import org.apache.geode.logging.internal.log4j.api.LogService; -import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.awaitility.GeodeAwaitility; import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.SerializableRunnableIF; -import org.apache.geode.test.dunit.ThreadUtils; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.WaitCriterion; import org.apache.geode.test.junit.categories.ClientServerTest; @@ -64,7 +65,7 @@ import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor @Category({ClientServerTest.class, FunctionServiceTest.class}) @RunWith(Parameterized.class) @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class) -public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest +public class PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest extends PRClientServerTestBase { private static final Logger logger = LogService.getLogger(); @@ -77,7 +78,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest private static final int retryCount = 0; - public PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest() { + public PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest() { super(); } @@ -87,10 +88,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerAllKeyExecution_byInstance() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.FALSE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverAllKeyExecution(isByName)); } @@ -100,7 +101,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerGetAllFunction() { createScenario(); - client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::getAll); + client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::getAll); } /* @@ -109,7 +110,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerPutAllFunction() { createScenario(); - client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putAll); + client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putAll); } /* @@ -119,10 +120,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerSingleKeyExecution_byName() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverSingleKeyExecution(isByName)); } @@ -135,7 +136,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest public void testserverSingleKeyExecution_FunctionInvocationTargetException() { createScenario(); client.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_FunctionInvocationTargetException); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_FunctionInvocationTargetException); } /* @@ -144,7 +145,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testBucketFilter() { createScenarioForBucketFilter(); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); registerFunctionAtServer(function); Set<Integer> bucketFilterSet = new HashSet<>(); @@ -161,7 +162,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testBucketFilterOverride() { createScenarioForBucketFilter(); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); registerFunctionAtServer(function); // test multi key filter Set<Integer> bucketFilterSet = new HashSet<>(); @@ -181,10 +182,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerSingleKeyExecution_SocketTimeOut() { createScenario(); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverSingleKeyExecutionSocketTimeOut(isByName)); } @@ -195,10 +196,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testServerSingleKeyExecution_byInstance() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.FALSE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverSingleKeyExecution(isByName)); } @@ -209,7 +210,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest public void testServerSingleKeyExecution_byInlineFunction() { createScenario(); client.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_Inline); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_Inline); } /* @@ -219,26 +220,26 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testserverMultiKeyExecution_byName() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecution(isByName)); server1.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer); server2.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer); server3.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer); } @Test - public void testserverMultiKeyExecution_SocektTimeOut() { + public void testserverMultiKeyExecution_SocketTimeOut() { createScenario(); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecutionSocketTimeOut(isByName)); } @@ -249,7 +250,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest public void testserverMultiKeyExecution_byInlineFunction() { createScenario(); client.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_Inline); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_Inline); } /* @@ -262,7 +263,7 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest public void testserverMultiKeyExecution_FunctionInvocationTargetException() { createScenario(); client.invoke( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_FunctionInvocationTargetException); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_FunctionInvocationTargetException); } /* @@ -272,10 +273,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testserverMultiKeyExecutionNoResult_byName() { createScenario(); - Function function = new TestFunction(false, TEST_FUNCTION7); + Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecutionNoResult(isByName)); } @@ -286,10 +287,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testserverMultiKeyExecution_byInstance() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.FALSE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecution(isByName)); } @@ -300,10 +301,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testserverMultiKeyExecutionOnASingleBucket_byName() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.TRUE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecutionOnASingleBucket(isByName)); } @@ -314,10 +315,10 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest @Test public void testserverMultiKeyExecutionOnASingleBucket_byInstance() { createScenario(); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); registerFunctionAtServer(function); isByName = Boolean.FALSE; - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .serverMultiKeyExecutionOnASingleBucket(isByName)); } @@ -326,33 +327,30 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest * failover to other available server */ @Test - public void testServerFailoverWithTwoServerAliveHA() { + public void testServerFailoverWithTwoServerAliveHA() throws InterruptedException { IgnoredException.addIgnoredException("FunctionInvocationTargetException"); - ArrayList commonAttributes = + ArrayList<Object> commonAttributes = createCommonServerAttributes("TestPartitionedRegion", null, 1, null); createClientServerScenarion(commonAttributes, 20, 20, 20); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_HA); registerFunctionAtServer(function); - server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA); - server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA); - client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation); + server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA); + server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA); + client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation); int AsyncInvocationArrSize = 1; - AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; + AsyncInvocation<?>[] async = new AsyncInvocation<?>[AsyncInvocationArrSize]; async[0] = client.invokeAsync( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA); - server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA); - server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA); - server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA); + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA); + server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA); + server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA); + server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA); client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest .verifyDeadAndLiveServers(2)); - ThreadUtils.join(async[0], 6 * 60 * 1000); - if (async[0].getException() != null) { - Assert.fail("UnExpected Exception Occurred : ", async[0].getException()); - } - List l = (List) async[0].getReturnValue(); - assertEquals(2, l.size()); + List<?> l = (List<?>) async[0].get(); + + assertThat(l).hasSize(2); } /* @@ -360,31 +358,28 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest * failover to other available server */ @Test - public void testServerCacheClosedFailoverWithTwoServerAliveHA() { + public void testServerCacheClosedFailoverWithTwoServerAliveHA() throws InterruptedException { IgnoredException.addIgnoredException("FunctionInvocationTargetException"); - ArrayList commonAttributes = + ArrayList<Object> commonAttributes = createCommonServerAttributes("TestPartitionedRegion", null, 1, null); createClientServerScenarion(commonAttributes, 20, 20, 20); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_HA); registerFunctionAtServer(function); - server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA); - server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA); - client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation); + server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA); + server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA); + client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation); int AsyncInvocationArrSize = 1; - AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; + AsyncInvocation<?>[] async = new AsyncInvocation<?>[AsyncInvocationArrSize]; async[0] = client.invokeAsync( - PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA); - server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA); - server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA); - server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::closeCacheHA); - client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest + PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA); + server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA); + server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA); + server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::closeCacheHA); + client.invoke(() -> PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest .verifyDeadAndLiveServers(2)); - ThreadUtils.join(async[0], 5 * 60 * 1000); - if (async[0].getException() != null) { - Assert.fail("UnExpected Exception Occurred : ", async[0].getException()); - } - List l = (List) async[0].getReturnValue(); - assertEquals(2, l.size()); + + List<?> l = (List<?>) async[0].get(); + assertThat(l).hasSize(2); } @Test @@ -392,20 +387,90 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest createScenario(); server1 .invoke( - (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction); + (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction); server1 .invoke( - (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction); + (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction); server1 .invoke( - (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction); + (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction); client .invoke( - (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction); + (SerializableRunnableIF) PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction); client.invoke( PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714); } + /** + * This test case verifies that if the execution of a function handled + * by a Function Execution thread times out at the client, the ServerConnection + * thread will eventually be released. + * In order to test this, a slow function will be executed by a client + * with a small time-out a number of times equal to the number of servers + * in the cluster * the max number of threads configured. + * After the function executions have timed-out, another request will be + * sent by the client to any server and it should be served timely. + * If the ServerConnection threads had not been released, this new + * request will never be served because there would be not ServerConnection + * threads available and the test case will time-out. + */ + @Test + public void testClientFunctionExecutionTimingOutDoesNotLeaveServerConnectionThreadsHanged() { + // Set client connect-timeout to a very high value so that if there are no + // ServerConnection threads available the test will time-out before the client times-out. + int connectTimeout = (int) (GeodeAwaitility.getTimeout().toMillis() * 2); + int maxThreads = 2; + createScenarioWithClientConnectTimeout(connectTimeout, maxThreads); + + // The function must be executed a number of times equal + // to the number of servers * the max-threads, to check if all the + // threads are hanged. + int executions = (3 * maxThreads); + + // functionTimeoutSecs should be lower than the + // time taken by the slow function to return all + // the results + int functionTimeoutSecs = 2; + + Function<?> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_SLOW); + registerFunctionAtServer(function); + + // Run the function that will time-out at the client + // the number of specified times. + IntStream.range(0, executions) + .forEach(i -> assertThatThrownBy(() -> client + .invoke(() -> executeSlowFunctionOnRegionNoFilter(function, PartitionedRegionName, + functionTimeoutSecs))) + .getCause().getCause().isInstanceOf(ServerConnectivityException.class)); + + // Make sure that the get returns timely. If it hangs, it means + // that there are no threads available in the servers to handle the + // request because they were hanged due to the previous function + // executions. + await().until(() -> { + client.invoke(() -> executeGet(PartitionedRegionName, "key")); + return true; + }); + } + + private Object executeGet(String regionName, Object key) { + Region<?, ?> region = cache.getRegion(regionName); + return region.get(key); + } + + private Object executeSlowFunctionOnRegionNoFilter(Function<?> function, String regionName, + int functionTimeoutSecs) { + FunctionService.registerFunction(function); + Region<?, ?> region = cache.getRegion(regionName); + + Execution execution = FunctionService.onRegion(region); + + Object[] args = {Boolean.TRUE}; + return execution.setArguments(args).execute(function.getId(), functionTimeoutSecs, + TimeUnit.SECONDS).getResult(); + } + + public static void registerFunction() { FunctionService.registerFunction(new FunctionAdapter() { @Override @@ -457,57 +522,56 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest public static void executeFunction() { - Region region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + Region<Object, Object> region = cache.getRegion(PartitionedRegionName); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 10); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); try { - ResultCollector rc1 = + ResultCollector<?, ?> rc1 = dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); - HashMap resultMap = ((HashMap) rc1.getResult()); - assertEquals(3, resultMap.size()); + HashMap<?, ?> resultMap = ((HashMap<?, ?>) rc1.getResult()); + assertThat(resultMap).hasSize(3); - for (Object o : resultMap.entrySet()) { - Map.Entry entry = (Map.Entry) o; - ArrayList resultListForMember = (ArrayList) entry.getValue(); + for (Map.Entry<?, ?> o : resultMap.entrySet()) { + ArrayList<?> resultListForMember = (ArrayList<?>) o.getValue(); for (Object result : resultListForMember) { - assertEquals(Boolean.TRUE, result); + assertThat(result).isEqualTo(true); } } } catch (Exception e) { logger.info("Got an exception : " + e.getMessage()); - assertTrue(e instanceof CacheClosedException); + assertThat(e).isInstanceOf(CacheClosedException.class); } } private static Object executeFunctionHA() { - Region region = cache.getRegion(PartitionedRegionName); + Region<Object, Object> region = cache.getRegion(PartitionedRegionName); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 10); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_HA); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); - ResultCollector rc1 = + ResultCollector<?, ?> rc1 = dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId()); - List l = ((List) rc1.getResult()); + List<?> l = ((List<?>) rc1.getResult()); logger.info("Result size : " + l.size()); return l; } private static void putOperation() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 10); i > 0; i--) { testKeysSet.add("execKey-" + i); @@ -520,132 +584,119 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest } private void createScenario() { - ArrayList commonAttributes = + ArrayList<Object> commonAttributes = createCommonServerAttributes("TestPartitionedRegion", null, 0, null); createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20); } + private void createScenarioWithClientConnectTimeout(int connectTimeout, int maxThreads) { + ArrayList<Object> commonAttributes = + createCommonServerAttributes("TestPartitionedRegion", null, 0, null); + createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20, maxThreads, connectTimeout); + } + + private void createScenarioForBucketFilter() { - ArrayList commonAttributes = createCommonServerAttributes("TestPartitionedRegion", + ArrayList<Object> commonAttributes = createCommonServerAttributes("TestPartitionedRegion", new BucketFilterPRResolver(), 0, null); createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20); } private static void checkBucketsOnServer() { PartitionedRegion region = (PartitionedRegion) cache.getRegion(PartitionedRegionName); - HashMap localBucket2RegionMap = (HashMap) region.getDataStore().getSizeLocally(); + HashMap<Integer, Integer> localBucket2RegionMap = + (HashMap<Integer, Integer>) region.getDataStore().getSizeLocally(); logger.info( "Size of the " + PartitionedRegionName + " in this VM :- " + localBucket2RegionMap.size()); - Set entrySet = localBucket2RegionMap.entrySet(); - assertNotNull(entrySet); + Set<Map.Entry<Integer, Integer>> entrySet = localBucket2RegionMap.entrySet(); + assertThat(entrySet).isNotNull(); } private static void serverAllKeyExecution(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets / 2); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); - try { - int j = 0; - HashSet<Integer> origVals = new HashSet<>(); - for (String item : testKeysSet) { - Integer val = j++; - origVals.add(val); - region.put(item, val); - } - ResultCollector rc1 = executeOnAll(dataSet, Boolean.TRUE, function, isByName); - List resultList = (List) rc1.getResult(); - logger.info("Result size : " + resultList.size()); - logger.info("Result are SSSS : " + resultList); - assertEquals(3, resultList.size()); - - for (Object result : resultList) { - assertEquals(Boolean.TRUE, result); - } - ResultCollector rc2 = executeOnAll(dataSet, testKeysSet, function, isByName); - List l2 = ((List) rc2.getResult()); - assertEquals(3, l2.size()); - HashSet<Integer> foundVals = new HashSet<>(); - for (Object value : l2) { - ArrayList subL = (ArrayList) (value); - assertTrue(subL.size() > 0); - for (Object o : subL) { - assertTrue(foundVals.add((Integer) o)); - } - } - assertEquals(origVals, foundVals); - } catch (Exception e) { - Assert.fail("Test failed after the put operation", e); + int j = 0; + HashSet<Integer> origVals = new HashSet<>(); + for (String item : testKeysSet) { + Integer val = j++; + origVals.add(val); + region.put(item, val); + } + ResultCollector<?, ?> rc1 = executeOnAll(dataSet, Boolean.TRUE, function, isByName); + List<?> resultList = (List<?>) rc1.getResult(); + assertThat(resultList).hasSize(3); + for (Object result : resultList) { + assertThat(result).isEqualTo(true); } + ResultCollector<?, ?> rc2 = executeOnAll(dataSet, testKeysSet, function, isByName); + List<?> l2 = (List<?>) rc2.getResult(); + assertThat(l2).hasSize(3); + HashSet<Integer> foundVals = new HashSet<>(); + for (Object value : l2) { + List<?> subL = (List<?>) value; + assertThat(subL).hasSizeGreaterThan(0); + for (Object o : subL) { + assertThat(foundVals.add((Integer) o)).isTrue(); + } + } + assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals); } public static void getAll() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final List<String> testKeysList = new ArrayList<>(); for (int i = (totalNumBuckets * 3); i > 0; i--) { testKeysList.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - try { - int j = 0; - Map<String, Integer> origVals = new HashMap<>(); - for (String key : testKeysList) { - Integer val = j++; - origVals.put(key, val); - region.put(key, val); - } - Map resultMap = region.getAll(testKeysList); - assertEquals(resultMap, origVals); - Wait.pause(2000); - Map secondResultMap = region.getAll(testKeysList); - assertEquals(secondResultMap, origVals); - - } catch (Exception e) { - Assert.fail("Test failed after the put operation", e); - + int j = 0; + Map<String, Integer> origVals = new HashMap<>(); + for (String key : testKeysList) { + Integer val = j++; + origVals.put(key, val); + region.put(key, val); } + Map<String, Integer> resultMap = region.getAll(testKeysList); + assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals); + await().untilAsserted( + () -> assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals)); } public static void putAll() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final List<String> testKeysList = new ArrayList<>(); for (int i = (totalNumBuckets * 3); i > 0; i--) { testKeysList.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - try { - int j = 0; - Map<String, Integer> origVals = new HashMap<>(); - for (String key : testKeysList) { - Integer val = j++; - origVals.put(key, val); - region.put(key, val); - } - Map resultMap = region.getAll(testKeysList); - assertEquals(resultMap, origVals); - Wait.pause(2000); - Map secondResultMap = region.getAll(testKeysList); - assertEquals(secondResultMap, origVals); - - } catch (Exception e) { - Assert.fail("Test failed after the put operation", e); - + int j = 0; + Map<String, Integer> origVals = new HashMap<>(); + for (String key : testKeysList) { + Integer val = j++; + origVals.put(key, val); + region.put(key, val); } + Map<String, Integer> resultMap = region.getAll(testKeysList); + assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals); + await().untilAsserted( + () -> assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals)); } private static void serverMultiKeyExecutionOnASingleBucket(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); @@ -656,192 +707,163 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest region.put(value, val); } DistributedSystem.setThreadsSocketPolicy(false); - for (String o : testKeysSet) { - try { - Set<String> singleKeySet = Collections.singleton(o); - Function function = new TestFunction(true, TEST_FUNCTION2); - FunctionService.registerFunction(function); - Execution dataSet = FunctionService.onRegion(region); - ResultCollector rc1 = execute(dataSet, singleKeySet, Boolean.TRUE, function, isByName); - List l = ((List) rc1.getResult()); - assertEquals(1, l.size()); - - ResultCollector rc2 = - execute(dataSet, singleKeySet, new HashSet<>(singleKeySet), function, isByName); - List l2 = ((List) rc2.getResult()); - - assertEquals(1, l2.size()); - List subList = (List) l2.iterator().next(); - assertEquals(1, subList.size()); - assertEquals(region.get(singleKeySet.iterator().next()), subList.iterator().next()); - } catch (Exception expected) { - logger.info("Exception : " + expected.getMessage()); - expected.printStackTrace(); - fail("Test failed after the put operation"); - } + for (String key : testKeysSet) { + Set<String> singleKeySet = Collections.singleton(key); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); + FunctionService.registerFunction(function); + Execution dataSet = FunctionService.onRegion(region); + ResultCollector<?, ?> rc1 = execute(dataSet, singleKeySet, Boolean.TRUE, function, isByName); + List<?> list1 = (List<?>) rc1.getResult(); + assertThat(list1).hasSize(1); + + ResultCollector<?, ?> rc2 = + execute(dataSet, singleKeySet, new HashSet<>(singleKeySet), function, isByName); + List<?> list2 = (List<?>) rc2.getResult(); + + assertThat(list2).hasSize(1); + List<Integer> subList = (List<Integer>) list2.iterator().next(); + assertThat(subList).hasSize(1); + assertThat(subList).containsOnly(region.get(singleKeySet.iterator().next())); } } private static void serverMultiKeyExecution(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); - try { - int j = 0; - HashSet<Integer> origVals = new HashSet<>(); - for (String element : testKeysSet) { - Integer val = j++; - origVals.add(val); - region.put(element, val); - } - ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - List l = ((List) rc1.getResult()); - logger.info("Result size : " + l.size()); - assertEquals(3, l.size()); - for (Object item : l) { - assertEquals(Boolean.TRUE, item); - } - - ResultCollector rc2 = execute(dataSet, testKeysSet, testKeysSet, function, isByName); - List l2 = ((List) rc2.getResult()); - assertEquals(3, l2.size()); - HashSet<Integer> foundVals = new HashSet<>(); - for (Object value : l2) { - ArrayList subL = (ArrayList) value; - assertTrue(subL.size() > 0); - for (Object o : subL) { - assertTrue(foundVals.add((Integer) o)); - } - } - assertEquals(origVals, foundVals); - } catch (Exception e) { - Assert.fail("Test failed after the put operation", e); + int j = 0; + HashSet<Integer> origVals = new HashSet<>(); + for (String element : testKeysSet) { + Integer val = j++; + origVals.add(val); + region.put(element, val); + } + ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); + List<?> l = (List<?>) rc1.getResult(); + assertThat(l).hasSize(3); + for (Object item : l) { + assertThat(item).isEqualTo(true); + } + ResultCollector<?, ?> rc2 = execute(dataSet, testKeysSet, testKeysSet, function, isByName); + List<?> l2 = (List<?>) rc2.getResult(); + assertThat(l2).hasSize(3); + HashSet<Integer> foundVals = new HashSet<>(); + for (Object value : l2) { + List<?> subL = (List<?>) value; + assertThat(subL).hasSizeGreaterThan(0); + for (Object o : subL) { + assertThat(foundVals.add((Integer) o)).isTrue(); + } } + assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals); } private static void serverMultiKeyExecutionSocketTimeOut(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); - try { - int j = 0; - for (String value : testKeysSet) { - Integer val = j++; - region.put(value, val); - } - ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - List l = ((List) rc1.getResult()); - logger.info("Result size : " + l.size()); - assertEquals(3, l.size()); - for (Object o : l) { - assertEquals(Boolean.TRUE, o); - } - - } catch (Exception e) { - Assert.fail("Test failed after the function execution", e); + int j = 0; + for (String value : testKeysSet) { + Integer val = j++; + region.put(value, val); + } + ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); + List<?> l = (List<?>) rc1.getResult(); + logger.info("Result size : " + l.size()); + assertThat(l).hasSize(3); + for (Object o : l) { + assertThat(o).isEqualTo(true); } } private static void serverSingleKeyExecutionSocketTimeOut(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final String testKey = "execKey"; final Set<String> testKeysSet = new HashSet<>(); testKeysSet.add(testKey); DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); + Function<Object> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); region.put(testKey, 1); - try { - ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0)); - ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); - assertEquals(testKey, ((List) rs2.getResult()).get(0)); + ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); + assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true); - } catch (Exception ex) { - ex.printStackTrace(); - logger.info("Exception : ", ex); - Assert.fail("Test failed after the put operation", ex); - } + ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); + assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(testKey); } private static void serverMultiKeyExecution_Inline() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); Execution dataSet = FunctionService.onRegion(region); - try { - int j = 0; - for (String value : testKeysSet) { - Integer val = j++; - region.put(value, val); - } - ResultCollector rc1 = - dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { - @Override - public void execute(FunctionContext context) { - @SuppressWarnings("unchecked") - final ResultSender<Object> resultSender = context.getResultSender(); - if (context.getArguments() instanceof String) { - resultSender.lastResult("Success"); - } else if (context.getArguments() instanceof Boolean) { - resultSender.lastResult(Boolean.TRUE); - } - } - @Override - public String getId() { - return getClass().getName(); + int j = 0; + for (String value : testKeysSet) { + Integer val = j++; + region.put(value, val); + } + ResultCollector<?, ?> rc1 = + dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { + @Override + public void execute(FunctionContext context) { + @SuppressWarnings("unchecked") + final ResultSender<Object> resultSender = context.getResultSender(); + if (context.getArguments() instanceof String) { + resultSender.lastResult("Success"); + } else if (context.getArguments() instanceof Boolean) { + resultSender.lastResult(Boolean.TRUE); } + } - @Override - public boolean hasResult() { - return true; - } - }); - List l = ((List) rc1.getResult()); - logger.info("Result size : " + l.size()); - assertEquals(3, l.size()); - for (Object o : l) { - assertEquals(Boolean.TRUE, o); - } - } catch (Exception e) { - logger.info("Exception : " + e.getMessage()); - e.printStackTrace(); - fail("Test failed after the put operation"); + @Override + public String getId() { + return getClass().getName(); + } + @Override + public boolean hasResult() { + return true; + } + }); + List<?> list = (List<?>) rc1.getResult(); + logger.info("Result size : " + list.size()); + assertThat(list).hasSize(3); + for (Object item : list) { + assertThat(item).isEqualTo(true); } } private static void serverMultiKeyExecution_FunctionInvocationTargetException() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); @@ -853,49 +875,43 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest Integer val = j++; region.put(o, val); } - try { - ResultCollector rc1 = - dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { - @Override - public void execute(FunctionContext context) { - if (context.isPossibleDuplicate()) { - context.getResultSender().lastResult(retryCount); - return; - } - if (context.getArguments() instanceof Boolean) { - throw new FunctionInvocationTargetException("I have been thrown from TestFunction"); - } + ResultCollector<?, ?> rc1 = + dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { + @Override + public void execute(FunctionContext context) { + if (context.isPossibleDuplicate()) { + context.getResultSender().lastResult(retryCount); + return; } - - @Override - public String getId() { - return getClass().getName(); + if (context.getArguments() instanceof Boolean) { + throw new FunctionInvocationTargetException("I have been thrown from TestFunction"); } + } - @Override - public boolean hasResult() { - return true; - } - }); + @Override + public String getId() { + return getClass().getName(); + } - List list = (ArrayList) rc1.getResult(); - assertEquals(list.get(0), 0); - } catch (Throwable e) { - e.printStackTrace(); - Assert.fail("This is not expected Exception", e); - } + @Override + public boolean hasResult() { + return true; + } + }); + List<?> list = (List<?>) rc1.getResult(); + assertThat(list.get(0)).isEqualTo(0); } private static void serverMultiKeyExecutionNoResult(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<String> testKeysSet = new HashSet<>(); for (int i = (totalNumBuckets * 2); i > 0; i--) { testKeysSet.add("execKey-" + i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(false, TEST_FUNCTION7); + Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); try { @@ -906,18 +922,11 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest Integer val = j++; region.put(o, val); } - ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - rc1.getResult(); - Thread.sleep(20000); - fail("Test failed after the put operation"); - } catch (FunctionException expected) { - expected.printStackTrace(); - logger.info("Exception : " + expected.getMessage()); - assertTrue(expected.getMessage() - .startsWith((String.format("Cannot %s result as the Function#hasResult() is false", - "return any")))); - } catch (Exception notexpected) { - Assert.fail("Test failed during execute or sleeping", notexpected); + ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); + assertThatThrownBy(rc1::getResult).isExactlyInstanceOf(FunctionException.class) + .hasMessageStartingWith( + String.format("Cannot %s result as the Function#hasResult() is false", + "return any")); } finally { cache.getLogger() .info("<ExpectedException action=remove>" + "FunctionException" + "</ExpectedException>"); @@ -926,179 +935,149 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest private static void serverSingleKeyExecution(Boolean isByName) { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final String testKey = "execKey"; final Set<String> testKeysSet = new HashSet<>(); testKeysSet.add(testKey); DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TEST_FUNCTION2); + Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); - try { - execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - } catch (Exception expected) { - assertTrue(expected.getMessage().contains("No target node found for KEY = " + testKey) - || expected.getMessage().startsWith("Server could not send the reply") - || expected.getMessage().startsWith("Unexpected exception during")); - } + + execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); region.put(testKey, 1); - try { - ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); - assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0)); - ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); - assertEquals(1, ((List) rs2.getResult()).get(0)); + ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName); + assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true); - HashMap<String, Integer> putData = new HashMap<>(); - putData.put(testKey + "1", 2); - putData.put(testKey + "2", 3); + ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey, function, isByName); + assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(1); - ResultCollector rs1 = execute(dataSet, testKeysSet, putData, function, isByName); - assertEquals(Boolean.TRUE, ((List) rs1.getResult()).get(0)); + HashMap<String, Integer> putData = new HashMap<>(); + putData.put(testKey + "1", 2); + putData.put(testKey + "2", 3); - assertEquals((Integer) 2, region.get(testKey + "1")); - assertEquals((Integer) 3, region.get(testKey + "2")); + ResultCollector<?, ?> rs1 = execute(dataSet, testKeysSet, putData, function, isByName); + assertThat(((List<?>) rs1.getResult()).get(0)).isEqualTo(true); - } catch (Exception ex) { - ex.printStackTrace(); - logger.info("Exception : ", ex); - Assert.fail("Test failed after the put operation", ex); - } + assertThat(region.get(testKey + "1")).isEqualTo(2); + assertThat(region.get(testKey + "2")).isEqualTo(3); } private static void serverSingleKeyExecution_FunctionInvocationTargetException() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final String testKey = "execKey"; final Set<String> testKeysSet = new HashSet<>(); testKeysSet.add(testKey); DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION); + Function<Object> function = + new TestFunction<>(true, TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION); FunctionService.registerFunction(function); Execution dataSet = FunctionService.onRegion(region); region.put(testKey, 1); - try { - ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, false); - ArrayList list = (ArrayList) rs.getResult(); - assertTrue(((Integer) list.get(0)) >= 5); - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail("This is not expected Exception", ex); - } + + ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, function, false); + ArrayList<?> list = (ArrayList<?>) rs.getResult(); + assertThat(((Integer) list.get(0))).isGreaterThanOrEqualTo(5); } private static void serverSingleKeyExecution_Inline() { Region<String, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final String testKey = "execKey"; final Set<String> testKeysSet = new HashSet<>(); testKeysSet.add(testKey); DistributedSystem.setThreadsSocketPolicy(false); Execution dataSet = FunctionService.onRegion(region); - try { - cache.getLogger() - .info("<ExpectedException action=add>" + "No target node found for KEY = " - + "|Server could not send the reply" + "|Unexpected exception during" - + "</ExpectedException>"); - dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { - @Override - public void execute(FunctionContext context) { - @SuppressWarnings("unchecked") - final ResultSender<Object> resultSender = context.getResultSender(); - if (context.getArguments() instanceof String) { - resultSender.lastResult("Success"); - } - resultSender.lastResult("Failure"); - } - @Override - public String getId() { - return getClass().getName(); + FunctionAdapter functionAdapter = new FunctionAdapter() { + @Override + public void execute(FunctionContext context) { + @SuppressWarnings("unchecked") + final ResultSender<Object> resultSender = context.getResultSender(); + if (context.getArguments() instanceof String) { + resultSender.lastResult("Success"); } + resultSender.lastResult("Failure"); + } - @Override - public boolean hasResult() { - return true; - } - }); - } catch (Exception expected) { - logger.debug("Exception occurred : " + expected.getMessage()); - assertTrue(expected.getMessage().contains("No target node found for KEY = " + testKey) - || expected.getMessage().startsWith("Server could not send the reply") - || expected.getMessage().startsWith("Unexpected exception during")); - } finally { - cache.getLogger() - .info("<ExpectedException action=remove>" + "No target node found for KEY = " - + "|Server could not send the reply" + "|Unexpected exception during" - + "</ExpectedException>"); - } + @Override + public String getId() { + return getClass().getName(); + } + + @Override + public boolean hasResult() { + return true; + } + }; + + dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(functionAdapter); region.put(testKey, 1); - try { - ResultCollector rs = - dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { - @Override - public void execute(FunctionContext context) { - @SuppressWarnings("unchecked") - final ResultSender<Object> resultSender = context.getResultSender(); - if (context.getArguments() instanceof String) { - resultSender.lastResult("Success"); - } else { - resultSender.lastResult("Failure"); - } - } - @Override - public String getId() { - return getClass().getName(); + ResultCollector<?, ?> rs = + dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new FunctionAdapter() { + @Override + public void execute(FunctionContext context) { + @SuppressWarnings("unchecked") + final ResultSender<Object> resultSender = context.getResultSender(); + if (context.getArguments() instanceof String) { + resultSender.lastResult("Success"); + } else { + resultSender.lastResult("Failure"); } + } - @Override - public boolean hasResult() { - return true; - } - }); - assertEquals("Failure", ((List) rs.getResult()).get(0)); - - ResultCollector rs2 = - dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new FunctionAdapter() { - @Override - public void execute(FunctionContext context) { - @SuppressWarnings("unchecked") - final ResultSender<Object> resultSender = context.getResultSender(); - if (context.getArguments() instanceof String) { - resultSender.lastResult("Success"); - } else { - resultSender.lastResult("Failure"); - } - } + @Override + public String getId() { + return getClass().getName(); + } - @Override - public String getId() { - return getClass().getName(); + @Override + public boolean hasResult() { + return true; + } + }); + assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo("Failure"); + + ResultCollector<?, ?> rs2 = + dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new FunctionAdapter() { + @Override + public void execute(FunctionContext context) { + @SuppressWarnings("unchecked") + final ResultSender<Object> resultSender = context.getResultSender(); + if (context.getArguments() instanceof String) { + resultSender.lastResult("Success"); + } else { + resultSender.lastResult("Failure"); } + } - @Override - public boolean hasResult() { - return true; - } - }); - assertEquals("Success", ((List) rs2.getResult()).get(0)); + @Override + public String getId() { + return getClass().getName(); + } + + @Override + public boolean hasResult() { + return true; + } + }); + + assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo("Success"); - } catch (Exception ex) { - ex.printStackTrace(); - logger.info("Exception : ", ex); - Assert.fail("Test failed after the put operation", ex); - } } - private static ResultCollector execute(Execution dataSet, Set testKeysSet, Serializable args, - Function function, Boolean isByName) { + private static ResultCollector<?, ?> execute(Execution dataSet, Set<?> testKeysSet, + Serializable args, + Function<?> function, Boolean isByName) { if (isByName) {// by name return dataSet.withFilter(testKeysSet).setArguments(args).execute(function.getId()); } else { // By Instance @@ -1106,8 +1085,8 @@ public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest } } - private static ResultCollector executeOnAll(Execution dataSet, Serializable args, - Function function, Boolean isByName) { + private static ResultCollector<?, ?> executeOnAll(Execution dataSet, Serializable args, + Function<?> function, Boolean isByName) { if (isByName) {// by name return dataSet.setArguments(args).execute(function.getId()); } else { // By Instance diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java index 1a0c92d1aa..107b50a26e 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java @@ -17,11 +17,9 @@ package org.apache.geode.internal.cache.execute; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -46,6 +44,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolFactory; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.execute.Function; @@ -57,8 +56,6 @@ import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.cache.functions.TestFunction; import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; import org.apache.geode.logging.internal.log4j.api.LogService; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.SerializableCallableIF; import org.apache.geode.test.dunit.SerializableRunnable; @@ -79,6 +76,8 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { protected static Cache cache = null; + protected String hostName; + static String PartitionedRegionName = "TestPartitionedRegion"; // default name protected static String regionName = "TestRegion"; // default name @@ -93,15 +92,15 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { @Override public final void postSetUp() throws Exception { - Host host = Host.getHost(0); - server1 = host.getVM(0); - server2 = host.getVM(1); - server3 = host.getVM(2); - client = host.getVM(3); + server1 = VM.getVM(0); + server2 = VM.getVM(1); + server3 = VM.getVM(2); + client = VM.getVM(3); + hostName = NetworkUtils.getServerHostName(); postSetUpPRClientServerTestBase(); } - protected void postSetUpPRClientServerTestBase() throws Exception {} + protected void postSetUpPRClientServerTestBase() {} private enum ExecuteFunctionMethod { ExecuteFunctionByObject, ExecuteFunctionById @@ -119,7 +118,8 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { return ExecuteFunctionMethod.ExecuteFunctionByObject == functionExecutionType; } - ArrayList createCommonServerAttributes(String regionName, PartitionResolver pr, int red, + ArrayList<Object> createCommonServerAttributes(String regionName, PartitionResolver<?, ?> pr, + int red, String colocatedWithRegion) { ArrayList<Object> commonAttributes = new ArrayList<>(); commonAttributes.add(regionName); // 0 @@ -130,123 +130,111 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { return commonAttributes; } - public static Integer createCacheServer(ArrayList commonAttributes, Integer localMaxMemory) { - AttributesFactory factory = new AttributesFactory(); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); + public static Integer createCacheServer(ArrayList<Object> commonAttributes, + Integer localMaxMemory) { + return createCacheServer(commonAttributes, localMaxMemory, -1); + } + + public static Integer createCacheServer(ArrayList<Object> commonAttributes, + Integer localMaxMemory, + int maxThreads) { + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); + PartitionAttributesFactory<Object, Object> paf = new PartitionAttributesFactory<>(); - paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1)); + paf.setPartitionResolver((PartitionResolver<Object, Object>) commonAttributes.get(1)); paf.setRedundantCopies((Integer) commonAttributes.get(2)); paf.setTotalNumBuckets((Integer) commonAttributes.get(3)); paf.setColocatedWith((String) commonAttributes.get(4)); paf.setLocalMaxMemory(localMaxMemory); - PartitionAttributes partitionAttributes = paf.create(); + PartitionAttributes<?, ?> partitionAttributes = paf.create(); factory.setDataPolicy(DataPolicy.PARTITION); factory.setPartitionAttributes(partitionAttributes); - RegionAttributes attrs = factory.create(); + RegionAttributes<Object, Object> attrs = factory.create(); - Region region = cache.createRegion((String) commonAttributes.get(0), attrs); - assertNotNull(region); + Region<Object, Object> region = cache.createRegion((String) commonAttributes.get(0), attrs); + assertThat(region).isNotNull(); CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); + assertThat(server1).isNotNull(); int port = getRandomAvailableTCPPort(); server1.setPort(port); - try { - server1.start(); - } catch (IOException e) { - Assert.fail("Failed to start the Server", e); + if (maxThreads > 0) { + server1.setMaxThreads(maxThreads); } - assertTrue(server1.isRunning()); + assertThatNoException().isThrownBy(server1::start); + assertThat(server1.isRunning()).isTrue(); return server1.getPort(); } - private static Integer createSelectorCacheServer(ArrayList commonAttributes, + private static Integer createSelectorCacheServer(ArrayList<Object> commonAttributes, Integer localMaxMemory) throws Exception { - AttributesFactory factory = new AttributesFactory(); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); + PartitionAttributesFactory<Object, Object> paf = new PartitionAttributesFactory<>(); - paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1)); + paf.setPartitionResolver((PartitionResolver<Object, Object>) commonAttributes.get(1)); paf.setRedundantCopies((Integer) commonAttributes.get(2)); paf.setTotalNumBuckets((Integer) commonAttributes.get(3)); paf.setColocatedWith((String) commonAttributes.get(4)); paf.setLocalMaxMemory(localMaxMemory); - PartitionAttributes partitionAttributes = paf.create(); + PartitionAttributes<?, ?> partitionAttributes = paf.create(); factory.setDataPolicy(DataPolicy.PARTITION); factory.setPartitionAttributes(partitionAttributes); - RegionAttributes attrs = factory.create(); + RegionAttributes<Object, Object> attrs = factory.create(); - Region region = cache.createRegion((String) commonAttributes.get(0), attrs); - assertNotNull(region); + Region<Object, Object> region = cache.createRegion((String) commonAttributes.get(0), attrs); + assertThat(region).isNotNull(); CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); + assertThat(server1).isNotNull(); int port = getRandomAvailableTCPPort(); server1.setPort(port); server1.setMaxThreads(16); server1.start(); - assertTrue(server1.isRunning()); + assertThat(server1.isRunning()).isTrue(); return server1.getPort(); } - private static Integer createCacheServerWith2Regions(ArrayList commonAttributes, + private static Integer createCacheServerWith2Regions(ArrayList<Object> commonAttributes, Integer localMaxMemory) throws Exception { - AttributesFactory factory = new AttributesFactory(); - PartitionAttributesFactory paf = new PartitionAttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); + PartitionAttributesFactory<Object, Object> paf = new PartitionAttributesFactory<>(); - paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1)); + paf.setPartitionResolver((PartitionResolver<Object, Object>) commonAttributes.get(1)); paf.setRedundantCopies((Integer) commonAttributes.get(2)); paf.setTotalNumBuckets((Integer) commonAttributes.get(3)); paf.setColocatedWith((String) commonAttributes.get(4)); paf.setLocalMaxMemory(localMaxMemory); - PartitionAttributes partitionAttributes = paf.create(); + PartitionAttributes<?, ?> partitionAttributes = paf.create(); factory.setDataPolicy(DataPolicy.PARTITION); factory.setPartitionAttributes(partitionAttributes); - RegionAttributes attrs = factory.create(); + RegionAttributes<Object, Object> attrs = factory.create(); - Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs); - assertNotNull(region1); - Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs); - assertNotNull(region2); + Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName + "1", attrs); + assertThat(region1).isNotNull(); + Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName + "2", attrs); + assertThat(region2).isNotNull(); CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); + assertThat(server1).isNotNull(); int port = getRandomAvailableTCPPort(); server1.setPort(port); server1.start(); - assertTrue(server1.isRunning()); + assertThat(server1.isRunning()).isTrue(); return server1.getPort(); } public static Integer createCacheServer() throws Exception { CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); + assertThat(server1).isNotNull(); int port = getRandomAvailableTCPPort(); server1.setPort(port); server1.start(); - assertTrue(server1.isRunning()); + assertThat(server1.isRunning()).isTrue(); return server1.getPort(); } - private static Integer createCacheServerWithDR() throws Exception { - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(DataPolicy.REPLICATE); - assertNotNull(cache); - Region region = cache.createRegion(regionName, factory.create()); - assertNotNull(region); - - CacheServer server1 = cache.addCacheServer(); - assertNotNull(server1); - int port = getRandomAvailableTCPPort(); - server1.setPort(port); - server1.start(); - assertTrue(server1.isRunning()); - - return server1.getPort(); - } - - public static void createCacheClient(String host, Integer port1, Integer port2, Integer port3) { + public static void createCacheClient(String host, int port1, int port2, int port3) { CacheServerTestUtil.disableShufflingOfEndpoints(); Pool p; @@ -260,16 +248,16 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); factory.setScope(Scope.LOCAL); factory.setDataPolicy(DataPolicy.EMPTY); factory.setPoolName(p.getName()); - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(PartitionedRegionName, attrs); - assertNotNull(region); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region = cache.createRegion(PartitionedRegionName, attrs); + assertThat(region).isNotNull(); } - private static void createCacheClient_SingleConnection(String host, Integer port1) { + private static void createCacheClient_SingleConnection(String host, int port1) { CacheServerTestUtil.disableShufflingOfEndpoints(); Pool p; @@ -282,17 +270,17 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); factory.setScope(Scope.LOCAL); factory.setDataPolicy(DataPolicy.EMPTY); factory.setPoolName(p.getName()); - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(PartitionedRegionName, attrs); - assertNotNull(region); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region = cache.createRegion(PartitionedRegionName, attrs); + assertThat(region).isNotNull(); } - private static void createCacheClientWith2Regions(String host, Integer port1, Integer port2, - Integer port3) { + private static void createCacheClientWith2Regions(String host, int port1, int port2, + int port3) { CacheServerTestUtil.disableShufflingOfEndpoints(); Pool p; @@ -306,22 +294,22 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); factory.setDataPolicy(DataPolicy.EMPTY); factory.setPoolName(p.getName()); - RegionAttributes attrs = factory.create(); - Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs); - assertNotNull(region1); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName + "1", attrs); + assertThat(region1).isNotNull(); - factory = new AttributesFactory(); + factory = new AttributesFactory<>(); factory.setDataPolicy(DataPolicy.EMPTY); attrs = factory.create(); - Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs); - assertNotNull(region2); + Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName + "2", attrs); + assertThat(region2).isNotNull(); } - private static void createSingleHopCacheClient(String host, Integer port1, Integer port2, - Integer port3) { + private static void createSingleHopCacheClient(String host, int port1, int port2, + int port3) { CacheServerTestUtil.disableShufflingOfEndpoints(); Pool p; @@ -335,17 +323,17 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); factory.setScope(Scope.LOCAL); factory.setDataPolicy(DataPolicy.EMPTY); factory.setPoolName(p.getName()); - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(PartitionedRegionName, attrs); - assertNotNull(region); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region = cache.createRegion(PartitionedRegionName, attrs); + assertThat(region).isNotNull(); } - private static void createNoSingleHopCacheClient(String host, Integer port1, Integer port2, - Integer port3) { + private static void createNoSingleHopCacheClient(String host, int port1, int port2, + int port3) { CacheServerTestUtil.disableShufflingOfEndpoints(); Pool p; @@ -359,36 +347,44 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); factory.setScope(Scope.LOCAL); factory.setDataPolicy(DataPolicy.EMPTY); factory.setPoolName(p.getName()); - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(PartitionedRegionName, attrs); - assertNotNull(region); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region = cache.createRegion(PartitionedRegionName, attrs); + assertThat(region).isNotNull(); } - private static void createCacheClientWithoutRegion(String host, Integer port1, Integer port2, - Integer port3) { + private static void createNoSingleHopCacheClient(String host, + int port1, int port2, int port3, int connectTimeout) { CacheServerTestUtil.disableShufflingOfEndpoints(); - logger - .info("PRClientServerTestBase#createCacheClientWithoutRegion : creating pool"); Pool p; try { - p = PoolManager.createFactory().addServer(host, port1) - .addServer(host, port2).addServer(host, port3).setPingInterval(250) - .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000) - .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1) - .create("PRClientServerTestBaseWithoutRegion"); + PoolFactory factory = PoolManager.createFactory().addServer(host, port1) + .addServer(host, port2).addServer(host, port3).setPingInterval(2000) + .setSubscriptionEnabled(true).setReadTimeout(2000) + .setSocketBufferSize(1000).setRetryAttempts(0) + .setSocketConnectTimeout(connectTimeout) + .setPRSingleHopEnabled(false); + + p = factory.create("PRClientServerTestBase"); } finally { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; + AttributesFactory<Object, Object> factory = new AttributesFactory<>(); + factory.setScope(Scope.LOCAL); + factory.setDataPolicy(DataPolicy.EMPTY); + factory.setPoolName(p.getName()); + RegionAttributes<Object, Object> attrs = factory.create(); + Region<Object, Object> region = cache.createRegion(PartitionedRegionName, attrs); + assertThat(region).isNotNull(); } - private static void createCacheClientWithDistributedRegion(String host, Integer port1, - Integer port2, Integer port3) throws Exception { + private static void createCacheClientWithoutRegion(String host, int port1, int port2, + int port3) { CacheServerTestUtil.disableShufflingOfEndpoints(); logger .info("PRClientServerTestBase#createCacheClientWithoutRegion : creating pool"); @@ -398,70 +394,64 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { p = PoolManager.createFactory().addServer(host, port1) .addServer(host, port2).addServer(host, port3).setPingInterval(250) .setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000) - .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(0) + .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1) .create("PRClientServerTestBaseWithoutRegion"); } finally { CacheServerTestUtil.enableShufflingOfEndpoints(); } pool = (PoolImpl) p; - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(DataPolicy.REPLICATE); - assertNotNull(cache); - Region region = cache.createRegion(regionName, factory.create()); - assertNotNull(region); } - void createClientServerScenarion(ArrayList commonAttributes, int localMaxMemoryServer1, + void createClientServerScenarion(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2, int localMaxMemoryServer3) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer1)); - Integer port2 = server2.invoke(() -> PRClientServerTestBase + int port2 = server2.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer2)); - Integer port3 = server3.invoke(() -> PRClientServerTestBase + int port3 = server3.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer3)); client.invoke(() -> PRClientServerTestBase - .createCacheClient(NetworkUtils.getServerHostName(server1.getHost()), port1, port2, port3)); + .createCacheClient(hostName, port1, port2, port3)); } - void createClientServerScenarion_SingleConnection(ArrayList commonAttributes, + void createClientServerScenarion_SingleConnection(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer1)); server2.invoke(() -> PRClientServerTestBase.createCacheServer(commonAttributes, localMaxMemoryServer2)); client.invoke(() -> PRClientServerTestBase.createCacheClient_SingleConnection( - NetworkUtils.getServerHostName(server1.getHost()), port1)); + hostName, port1)); } - void createClientServerScenarionWith2Regions(ArrayList commonAttributes, + void createClientServerScenarionWith2Regions(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2, int localMaxMemoryServer3) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createCacheServerWith2Regions(commonAttributes, localMaxMemoryServer1)); - Integer port2 = server2.invoke(() -> PRClientServerTestBase + int port2 = server2.invoke(() -> PRClientServerTestBase .createCacheServerWith2Regions(commonAttributes, localMaxMemoryServer2)); - Integer port3 = server3.invoke(() -> PRClientServerTestBase + int port3 = server3.invoke(() -> PRClientServerTestBase .createCacheServerWith2Regions(commonAttributes, localMaxMemoryServer3)); client.invoke(() -> PRClientServerTestBase.createCacheClientWith2Regions( - NetworkUtils.getServerHostName(server1.getHost()), port1, port2, port3)); + hostName, port1, port2, port3)); } - void createClientServerScenarioSingleHop(ArrayList commonAttributes, + void createClientServerScenarioSingleHop(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2, int localMaxMemoryServer3) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer1)); - Integer port2 = server2.invoke(() -> PRClientServerTestBase + int port2 = server2.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer2)); - Integer port3 = server3.invoke(() -> PRClientServerTestBase + int port3 = server3.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer3)); // Workaround for the issue that hostnames returned by the client metadata may // not match those configured by the pool, leading to multiple copies @@ -475,33 +465,49 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { return cache.getDistributedSystem().getDistributedMember().getHost(); } - void createClientServerScenarioNoSingleHop(ArrayList commonAttributes, + void createClientServerScenarioNoSingleHop(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2, int localMaxMemoryServer3) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer1)); - Integer port2 = server2.invoke(() -> PRClientServerTestBase + int port2 = server2.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer2)); - Integer port3 = server3.invoke(() -> PRClientServerTestBase + int port3 = server3.invoke(() -> PRClientServerTestBase .createCacheServer(commonAttributes, localMaxMemoryServer3)); client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient( - NetworkUtils.getServerHostName(server1.getHost()), port1, port2, port3)); + hostName, port1, port2, port3)); + } + + void createClientServerScenarioNoSingleHop(ArrayList<Object> commonAttributes, + int localMaxMemoryServer1, int localMaxMemoryServer2, + int localMaxMemoryServer3, + int maxThreads, + int connectTimeout) { + createCacheInClientServer(); + int port1 = server1.invoke(() -> PRClientServerTestBase + .createCacheServer(commonAttributes, localMaxMemoryServer1, maxThreads)); + int port2 = server2.invoke(() -> PRClientServerTestBase + .createCacheServer(commonAttributes, localMaxMemoryServer2, maxThreads)); + int port3 = server3.invoke(() -> PRClientServerTestBase + .createCacheServer(commonAttributes, localMaxMemoryServer3, maxThreads)); + client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient( + hostName, port1, port2, port3, connectTimeout)); } - void createClientServerScenarioSelectorNoSingleHop(ArrayList commonAttributes, + void createClientServerScenarioSelectorNoSingleHop(ArrayList<Object> commonAttributes, int localMaxMemoryServer1, int localMaxMemoryServer2, int localMaxMemoryServer3) { createCacheInClientServer(); - Integer port1 = server1.invoke(() -> PRClientServerTestBase + int port1 = server1.invoke(() -> PRClientServerTestBase .createSelectorCacheServer(commonAttributes, localMaxMemoryServer1)); - Integer port2 = server2.invoke(() -> PRClientServerTestBase + int port2 = server2.invoke(() -> PRClientServerTestBase .createSelectorCacheServer(commonAttributes, localMaxMemoryServer2)); - Integer port3 = server3.invoke(() -> PRClientServerTestBase + int port3 = server3.invoke(() -> PRClientServerTestBase .createSelectorCacheServer(commonAttributes, localMaxMemoryServer3)); client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient( - NetworkUtils.getServerHostName(server1.getHost()), port1, port2, port3)); + hostName, port1, port2, port3)); } @@ -509,15 +515,15 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { logger.info( "PRClientServerTestBase#createClientServerScenarionWithoutRegion : creating client server"); createCacheInClientServer(); - Integer port1 = server1.invoke( + int port1 = server1.invoke( (SerializableCallableIF<Integer>) PRClientServerTestBase::createCacheServer); - Integer port2 = server2.invoke( + int port2 = server2.invoke( (SerializableCallableIF<Integer>) PRClientServerTestBase::createCacheServer); - Integer port3 = server3.invoke( + int port3 = server3.invoke( (SerializableCallableIF<Integer>) PRClientServerTestBase::createCacheServer); client.invoke(() -> PRClientServerTestBase.createCacheClientWithoutRegion( - NetworkUtils.getServerHostName(server1.getHost()), port1, port2, port3)); + hostName, port1, port2, port3)); } void runOnAllServers(SerializableRunnable runnable) { @@ -526,7 +532,7 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { server3.invoke(runnable); } - void registerFunctionAtServer(Function function) { + void registerFunctionAtServer(Function<?> function) { server1.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); server2.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); @@ -534,7 +540,7 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { server3.invoke(PRClientServerTestBase.class, "registerFunction", new Object[] {function}); } - public static void registerFunction(Function function) { + public static void registerFunction(Function<?> function) { FunctionService.registerFunction(function); } @@ -559,24 +565,20 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { } private void createCache(Properties props) { - try { - DistributedSystem ds = getSystem(props); - assertNotNull(ds); - ds.disconnect(); - ds = getSystem(props); - cache = CacheFactory.create(ds); - assertNotNull(cache); - } catch (Exception e) { - Assert.fail("Failed while creating the cache", e); - } + DistributedSystem ds = getSystem(props); + assertThat(ds).isNotNull(); + ds.disconnect(); + ds = getSystem(props); + cache = CacheFactory.create(ds); + assertThat(cache).isNotNull(); } static void startServerHA() throws Exception { Wait.pause(2000); - Collection bridgeServers = cache.getCacheServers(); + Collection<?> bridgeServers = cache.getCacheServers(); logger .info("Start Server cache servers list : " + bridgeServers.size()); - Iterator bridgeIterator = bridgeServers.iterator(); + Iterator<?> bridgeIterator = bridgeServers.iterator(); CacheServer bridgeServer = (CacheServer) bridgeIterator.next(); logger.info("start Server cache server" + bridgeServer); bridgeServer.start(); @@ -584,7 +586,7 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { static void stopServerHA() { Wait.pause(1000); - Iterator iter = cache.getCacheServers().iterator(); + Iterator<?> iter = cache.getCacheServers().iterator(); if (iter.hasNext()) { CacheServer server = (CacheServer) iter.next(); server.stop(); @@ -616,13 +618,13 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { void serverBucketFilterExecution(Set<Integer> bucketFilterSet) { Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<Integer> testKeysSet = new HashSet<>(); for (int i = 150; i > 0; i--) { testKeysSet.add(i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); + Function<?> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); if (shouldRegisterFunctionsOnClient()) { FunctionService.registerFunction(function); } @@ -636,24 +638,24 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { ResultCollector<Integer, List<Integer>> rc = dataSet.withBucketFilter(bucketFilterSet).execute(function.getId()); List<Integer> results = rc.getResult(); - assertEquals(bucketFilterSet.size(), results.size()); + assertThat(results.size()).isEqualTo(bucketFilterSet.size()); for (Integer bucket : results) { bucketFilterSet.remove(bucket); } - assertTrue(bucketFilterSet.isEmpty()); + assertThat(bucketFilterSet).isEmpty(); } void serverBucketFilterOverrideExecution(Set<Integer> bucketFilterSet, Set<Integer> ketFilterSet) { Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName); - assertNotNull(region); + assertThat(region).isNotNull(); final HashSet<Integer> testKeysSet = new HashSet<>(); for (int i = 150; i > 0; i--) { testKeysSet.add(i); } DistributedSystem.setThreadsSocketPolicy(false); - Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); + Function<?> function = new TestFunction<>(true, TestFunction.TEST_FUNCTION_BUCKET_FILTER); if (shouldRegisterFunctionsOnClient()) { FunctionService.registerFunction(function); } @@ -671,20 +673,21 @@ public class PRClientServerTestBase extends JUnit4CacheTestCase { ResultCollector<Integer, List<Integer>> rc = dataSet.withBucketFilter(bucketFilterSet) .withFilter(ketFilterSet).execute(function.getId()); List<Integer> results = rc.getResult(); - assertEquals(expectedBucketSet.size(), results.size()); + assertThat(results.size()).isEqualTo(expectedBucketSet.size()); for (Integer bucket : results) { expectedBucketSet.remove(bucket); } - assertTrue(expectedBucketSet.isEmpty()); + assertThat(expectedBucketSet).isEmpty(); } - public static class BucketFilterPRResolver implements PartitionResolver, Serializable { + public static class BucketFilterPRResolver + implements PartitionResolver<Object, Object>, Serializable { @Override public void close() {} @Override - public Object getRoutingObject(EntryOperation opDetails) { + public Object getRoutingObject(EntryOperation<Object, Object> opDetails) { Object key = opDetails.getKey(); return getBucketID(key); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java index 69a0450294..a63c8a1b52 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java @@ -16,6 +16,8 @@ package org.apache.geode.internal.cache.execute; +import java.util.function.BiFunction; + import org.apache.logging.log4j.Logger; import org.apache.geode.cache.execute.Function; @@ -23,8 +25,10 @@ import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.execute.metrics.FunctionStats; import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager; import org.apache.geode.internal.cache.partitioned.PartitionedRegionFunctionStreamingMessage; import org.apache.geode.internal.serialization.KnownVersion; @@ -43,7 +47,7 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend private static final Logger logger = LogService.getLogger(); - PartitionedRegionFunctionStreamingMessage msg = null; + private final PartitionedRegionFunctionStreamingMessage msg; private final DistributionManager dm; @@ -53,15 +57,15 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend private final boolean forwardExceptions; - private ResultCollector rc; + private final ResultCollector rc; - private ServerToClientFunctionResultSender serverSender; + private final ServerToClientFunctionResultSender serverSender; private boolean localLastResultReceived = false; - private boolean onlyLocal = false; + private final boolean onlyLocal; - private boolean onlyRemote = false; + private final boolean onlyRemote; private boolean completelyDoneFromRemote = false; @@ -73,6 +77,7 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend private BucketMovedException bme; + private BiFunction<String, InternalDistributedSystem, FunctionStats> functionStatsFunctionProvider; public KnownVersion getClientVersion() { if (serverSender != null && serverSender.sc != null) { // is a client-server connection @@ -81,41 +86,40 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend return null; } - /** - * Have to combine next two constructor in one and make a new class which will send Results back. - * - */ public PartitionedRegionFunctionResultSender(DistributionManager dm, PartitionedRegion pr, - long time, PartitionedRegionFunctionStreamingMessage msg, Function function, - int[] bucketArray) { - this.msg = msg; - this.dm = dm; - this.pr = pr; - this.time = time; - this.function = function; - this.bucketArray = bucketArray; - - forwardExceptions = false; + long time, PartitionedRegionFunctionStreamingMessage msg, + Function function, int[] bucketArray) { + this(dm, pr, time, null, null, false, false, false, function, bucketArray, msg, + (x, y) -> FunctionStatsManager.getFunctionStats((String) x, (InternalDistributedSystem) y)); } - /** - * Have to combine next two constructor in one and make a new class which will send Results back. - * - */ public PartitionedRegionFunctionResultSender(DistributionManager dm, PartitionedRegion partitionedRegion, long time, ResultCollector rc, ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean onlyRemote, boolean forwardExceptions, Function function, int[] bucketArray) { + this(dm, partitionedRegion, time, rc, sender, onlyLocal, onlyRemote, forwardExceptions, + function, bucketArray, null, + (x, y) -> FunctionStatsManager.getFunctionStats((String) x, (InternalDistributedSystem) y)); + } + + PartitionedRegionFunctionResultSender(DistributionManager dm, + PartitionedRegion partitionedRegion, long time, ResultCollector rc, + ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean onlyRemote, + boolean forwardExceptions, Function function, int[] bucketArray, + PartitionedRegionFunctionStreamingMessage msg, + BiFunction functionStatsFunctionProvider) { this.dm = dm; pr = partitionedRegion; this.time = time; this.rc = rc; + this.msg = msg; serverSender = sender; this.onlyLocal = onlyLocal; this.onlyRemote = onlyRemote; this.forwardExceptions = forwardExceptions; this.function = function; this.bucketArray = bucketArray; + this.functionStatsFunctionProvider = functionStatsFunctionProvider; } private void checkForBucketMovement(Object oneResult) { @@ -201,7 +205,7 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend // call a synchronized method as local node is also waiting to send lastResult lastResult(oneResult, rc, false, true, dm.getDistributionManagerId()); } - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReceived(); } // incrementing result sent stats. @@ -210,7 +214,7 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend // time the stats for the result sent is again incremented : Once the PR team comes with the // concept of the Streaming FunctionOperation // for the partitioned Region then it will be simple to fix this problem. - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReturned(); } } @@ -319,14 +323,14 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend if (dm == null) { FunctionStatsManager.getFunctionStats(function.getId()).incResultsReceived(); } else { - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReceived(); } } if (dm == null) { FunctionStatsManager.getFunctionStats(function.getId()).incResultsReturned(); } else { - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReturned(); } } @@ -360,21 +364,31 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend "PartitionedRegionFunctionResultSender adding result to ResultCollector on local node {}", oneResult); rc.addResult(dm.getDistributionManagerId(), oneResult); - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReceived(); } // incrementing result sent stats. - FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()) + functionStatsFunctionProvider.apply(function.getId(), dm.getSystem()) .incResultsReturned(); } } private void clientSend(Object oneResult, DistributedMember memberID) { - serverSender.sendResult(oneResult, memberID); + try { + serverSender.sendResult(oneResult, memberID); + } catch (FunctionException e) { + logger.warn("Exception when sending result to client", e); + setException(e); + } } private void lastClientSend(DistributedMember memberID, Object lastResult) { - serverSender.lastResult(lastResult, memberID); + try { + serverSender.lastResult(lastResult, memberID); + } catch (FunctionException e) { + logger.warn("Exception when sending last result to client", e); + setException(e); + } } @Override @@ -411,5 +425,4 @@ public class PartitionedRegionFunctionResultSender implements InternalResultSend public boolean isLastResultReceived() { return localLastResultReceived; } - } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java new file mode 100644 index 0000000000..9ac049d663 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.execute; + +import static org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSenderTest.MethodToInvoke.LAST_RESULT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.execute.ResultCollector; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionDataStore; +import org.apache.geode.internal.cache.execute.metrics.FunctionStats; +import org.apache.geode.test.junit.runners.GeodeParamsRunner; + +@RunWith(GeodeParamsRunner.class) +public class PartitionedRegionFunctionResultSenderTest { + private DistributionManager dm = mock(DistributionManager.class); + private PartitionedRegion region = mock(PartitionedRegion.class); + private PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class); + private ATestResultCollector rc = new ATestResultCollector(); + private ServerToClientFunctionResultSender serverToClientFunctionResultSender = + mock(ServerToClientFunctionResultSender.class); + private FunctionStats functionStats = mock(FunctionStats.class); + + enum MethodToInvoke { + SEND_EXCEPTION, LAST_RESULT + } + + @BeforeEach + public void setUp() { + when(region.getDataStore()).thenReturn(dataStore); + when(dataStore.areAllBucketsHosted(any())).thenReturn(true); + } + + @Test + public void whenResponseToClientInLastResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote() { + doThrow(new FunctionException()).when(serverToClientFunctionResultSender) + .lastResult(any(), any()); + PartitionedRegionFunctionResultSender sender = + new PartitionedRegionFunctionResultSender(dm, + region, 1, rc, serverToClientFunctionResultSender, false, true, true, + new TestFunction(), new int[2], null, (x, y) -> functionStats); + + sender.lastResult(new Object(), true, rc, null); + + assertThat(rc.isEndResultsCalled()).isEqualTo(true); + } + + @ParameterizedTest(name = "{displayName} with {arguments}") + @EnumSource(MethodToInvoke.class) + public void whenResponseToClientInLastResultFailsEndResultsIsCalled_OnlyLocal_NotOnlyRemote( + MethodToInvoke methodToInvoke) { + doThrow(new FunctionException("IOException")).when(serverToClientFunctionResultSender) + .lastResult(any(), any()); + + PartitionedRegionFunctionResultSender sender = + new PartitionedRegionFunctionResultSender(dm, + region, 1, rc, serverToClientFunctionResultSender, true, false, true, + new TestFunction(), new int[2]); + + if (methodToInvoke == LAST_RESULT) { + sender.lastResult(new Object()); + } else { + sender.sendException(new Exception()); + } + + assertThat(rc.isEndResultsCalled()).isEqualTo(true); + } + + @ParameterizedTest(name = "{displayName} with {arguments}") + @EnumSource(MethodToInvoke.class) + public void whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote( + MethodToInvoke methodToInvoke) { + doThrow(new FunctionException("IOException")).when(serverToClientFunctionResultSender) + .sendResult(any(), any()); + PartitionedRegionFunctionResultSender sender = + new PartitionedRegionFunctionResultSender(dm, + region, 1, rc, serverToClientFunctionResultSender, false, true, true, + new TestFunction(), new int[2], null, (x, y) -> functionStats); + + if (methodToInvoke == LAST_RESULT) { + sender.lastResult(new Object()); + } else { + sender.sendException(new Exception()); + } + + assertThat(rc.isEndResultsCalled()).isEqualTo(true); + } + + @ParameterizedTest(name = "{displayName} with {arguments}") + @EnumSource(MethodToInvoke.class) + public void whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_NotOnlyRemote( + MethodToInvoke methodToInvoke) { + doThrow(new FunctionException("IOException")).when(serverToClientFunctionResultSender) + .sendResult(any(), any()); + PartitionedRegionFunctionResultSender sender = + new PartitionedRegionFunctionResultSender(dm, + region, 1, rc, serverToClientFunctionResultSender, false, false, true, + new TestFunction(), new int[2], null, (x, y) -> functionStats); + + if (methodToInvoke == LAST_RESULT) { + sender.lastResult(new Object(), true, rc, null); + } else { + sender.sendException(new Exception()); + } + + assertThat(rc.isEndResultsCalled()).isEqualTo(true); + } + + private static class TestFunction implements Function { + @Override + public void execute(FunctionContext context) {} + } + + private static class ATestResultCollector implements ResultCollector { + private volatile boolean isEndResultsCalled = false; + + @Override + public Object getResult() throws FunctionException { + return null; + } + + @Override + public Object getResult(long timeout, TimeUnit unit) + throws FunctionException, InterruptedException { + return null; + } + + @Override + public void addResult(DistributedMember memberID, Object resultOfSingleExecution) {} + + @Override + public void endResults() { + isEndResultsCalled = true; + } + + @Override + public void clearResults() {} + + public boolean isEndResultsCalled() { + return isEndResultsCalled; + } + } +} diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java index 951a9b7ca1..a46840fd0e 100755 --- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java +++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java @@ -99,6 +99,7 @@ public class TestFunction<T> implements Function<T>, Declarable2, DataSerializab public static final String TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP = "executeFunctionSingleHopForceNetworkHop"; public static final String TEST_FUNCTION_GET_NETWORK_HOP = "executeFunctionGetNetworkHop"; + public static final String TEST_FUNCTION_SLOW = "SlowFunction"; private static final String ID = "id"; private static final String HAVE_RESULTS = "haveResults"; private final Properties props; @@ -197,6 +198,8 @@ public class TestFunction<T> implements Function<T>, Declarable2, DataSerializab executeSingleHopForceNetworkHop(context); } else if (id.equals(TEST_FUNCTION_GET_NETWORK_HOP)) { executeGetNetworkHop(context); + } else if (id.equals(TEST_FUNCTION_SLOW)) { + executeSlowFunction(context); } else if (noAckTest.equals("true")) { execute1(context); } @@ -1041,6 +1044,22 @@ public class TestFunction<T> implements Function<T>, Declarable2, DataSerializab context.getResultSender().lastResult(networkHopType); } + private void executeSlowFunction(FunctionContext context) { + int entries = 4; + int waitBetweenEntriesMs = 5000; + for (int i = 0; i < entries; i++) { + try { + Thread.sleep(waitBetweenEntriesMs); + } catch (InterruptedException e) { + context.getResultSender().sendException(e); + Thread.currentThread().interrupt(); + return; + } + context.getResultSender().sendResult(i); + } + context.getResultSender().lastResult(entries); + } + /** * Get the function identifier, used by clients to invoke this function * @@ -1096,12 +1115,12 @@ public class TestFunction<T> implements Function<T>, Declarable2, DataSerializab @Override public boolean isHA() { - if (getId().equals(TEST_FUNCTION10)) { return true; } if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || getId().equals(TEST_FUNCTION_NONHA_REGION) - || getId().equals(TEST_FUNCTION_NONHA_NOP) || getId().equals(TEST_FUNCTION_NONHA)) { + || getId().equals(TEST_FUNCTION_NONHA_NOP) || getId().equals(TEST_FUNCTION_NONHA) + || getId().equals(TEST_FUNCTION_SLOW)) { return false; } return Boolean.parseBoolean(props.getProperty(HAVE_RESULTS)); diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java index bce7ed9ade..f23ee9ad23 100644 --- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java +++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java @@ -66,6 +66,7 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl private PdxSerializer pdxSerializer = null; private boolean pdxReadSerialized = false; private boolean pdxReadSerializedUserSet = false; + private int maxThreads = -1; // By default we start one server per jvm private int serverCount = 1; @@ -123,6 +124,11 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl servers.clear(); } + public ServerStarterRule withMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + return this; + } + public ServerStarterRule withPDXPersistent() { pdxPersistent = true; pdxPersistentUserSet = true; @@ -219,6 +225,9 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl } else { server.setPort(0); } + if (maxThreads >= 0) { + server.setMaxThreads(maxThreads); + } try { server.start(); } catch (IOException e) { diff --git a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt index 81f334b6bf..70a8520acc 100644 --- a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt +++ b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt @@ -180,4 +180,4 @@ org/apache/geode/test/junit/rules/LocatorLauncherStartupRule,false,autoStart:boo org/apache/geode/test/junit/rules/LocatorStarterRule,false org/apache/geode/test/junit/rules/MemberStarterRule,false,autoStart:boolean,availableHttpPort:int,availableJmxPort:int,cleanWorkingDir:boolean,firstLevelChildrenFile:java/util/List,httpPort:int,jmxPort:int,logFile:boolean,memberPort:int,name:java/lang/String,properties:java/util/Properties,restore:org/apache/geode/test/junit/rules/accessible/AccessibleRestoreSystemProperties,systemProperties:java/util/Properties org/apache/geode/test/junit/rules/ServerLauncherStartupRule,false,autoStart:boolean,builderOperator:java/util/function/UnaryOperator,launcher:org/apache/geode/distributed/ServerLauncher,properties:java/util/Properties,temp:org/junit/rules/TemporaryFolder -org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int +org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,maxThreads:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int