This closes #293 GEODE-1653: Added helper method getAllServers in ExecuteFunctionNoAckOp to get all available servers, instead executing Fire and Forget only on the ones the client is currently connected to.
GEODE-1653: Dunit test for Executing a fire-and-forget function on all servers as opposed to only executing on the ones the client is currently connected to. GEODE-1653: changes after spotlessApply. GEODE-1653: 1. renamed test file to FireAndForgetFunctionOnAllServersDUnitTest and test to testFireAndForgetFunctionOnAllServers 2. replaced thread sleep with waitUntilDeterministic function before verifying results after Fire and Forget Function execution 3. removed commented testServerSingleKeyExecution_Bug43513_OnServer test. GEODE-1653: Replacing Thread's sleep with Awatility, if the condition is not met during the timeout, a ConditionTimeoutException will be thrown. This makes analyzing the failure much simpler. GEODE-1653: Use a larger timeout value such as 60 seconds to prevent test from failing on slower machines or garbage collection during the test. GEODE-1653: 1. Code changes a. Added getAllServers() method to connection source, and converted AutoConnectionSourceImpl's findAllServers logic into it. b. Made relevant changes to both ExecuteFunctionNoAckOp and ExecuteFunctionOp classes 2. Test changes a. removed author tag in the test. b. created a new function (FireAndForgetFunctionOnAllServers) for test and removed it from TestFunction. c. test change about awaitility and actual assertion. d. asserted for pool's current server size after starting a new server and stopping existing server. GEODE-1653: Adding missed out file for FireAndForgetFunctionOnAllServers function. GEODE-1653: Adding @Override to ExplicitConnectionSourceImpl's getAllServers() method. GEODE-1653: fix the test code to double check the getCurrentServer only returns 1 server Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c6102896 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c6102896 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c6102896 Branch: refs/heads/feature/GEODE-2156 Commit: c61028966d51db436e75ba228465767cb1a0bd21 Parents: 44278d1 Author: Amey Barve <aba...@apache.org> Authored: Mon Nov 21 18:09:55 2016 +0530 Committer: zhouxh <gz...@pivotal.io> Committed: Thu Dec 1 16:19:59 2016 -0800 ---------------------------------------------------------------------- .../internal/AutoConnectionSourceImpl.java | 28 ++-- .../cache/client/internal/ConnectionSource.java | 7 + .../client/internal/ExecuteFunctionNoAckOp.java | 5 +- .../client/internal/ExecuteFunctionOp.java | 20 +-- .../internal/ExplicitConnectionSourceImpl.java | 3 +- .../client/internal/QueueManagerJUnitTest.java | 5 + ...eAndForgetFunctionOnAllServersDUnitTest.java | 160 +++++++++++++++++++ ...tServerRegionFunctionExecutionDUnitTest.java | 8 - .../FireAndForgetFunctionOnAllServers.java | 73 +++++++++ .../internal/cache/functions/TestFunction.java | 3 +- 10 files changed, 266 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/main/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImpl.java index ca3d979..d9fca87 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImpl.java @@ -120,6 +120,20 @@ public class AutoConnectionSourceImpl implements ConnectionSource { return isBalanced; } + @Override + public List<ServerLocation> getAllServers() { + if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { + return null; + } + GetAllServersRequest request = new GetAllServersRequest(serverGroup); + GetAllServersResponse response = (GetAllServersResponse) queryLocators(request); + if (response != null) { + return response.getServers(); + } else { + return null; + } + } + public ServerLocation findReplacementServer(ServerLocation currentServer, Set/* <ServerLocation> */ excludedServers) { if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { @@ -160,20 +174,6 @@ public class AutoConnectionSourceImpl implements ConnectionSource { return response.getServer(); } - - public ArrayList<ServerLocation> findAllServers() { - if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { - return null; - } - GetAllServersRequest request = new GetAllServersRequest(serverGroup); - GetAllServersResponse response = (GetAllServersResponse) queryLocators(request); - if (response != null) { - return response.getServers(); - } else { - return null; - } - } - public List/* ServerLocation */ findServersForQueue(Set/* <ServerLocation> */ excludedServers, int numServers, ClientProxyMembershipID proxyId, boolean findDurableQueue) { if (PoolImpl.TEST_DURABLE_IS_NET_DOWN) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionSource.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionSource.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionSource.java index b1ce89e..eaccb6d 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionSource.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionSource.java @@ -66,4 +66,11 @@ public interface ConnectionSource { * @return true if the servers have balanced load. */ boolean isBalanced(); + + /** + * get the list of all the servers + * + * @return a list of all servers + */ + List<ServerLocation> getAllServers(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionNoAckOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionNoAckOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionNoAckOp.java index f0dce70..def7389 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionNoAckOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionNoAckOp.java @@ -70,7 +70,7 @@ public class ExecuteFunctionNoAckOp { logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " + pool); } - servers = pool.getCurrentServers(); + servers = pool.getConnectionSource().getAllServers(); Iterator i = servers.iterator(); while (i.hasNext()) { pool.executeOn((ServerLocation) i.next(), op); @@ -111,7 +111,7 @@ public class ExecuteFunctionNoAckOp { logger.debug("ExecuteFunctionNoAckOp#execute : Sending Function Execution Message:" + op.getMessage() + " to all servers using pool: " + pool); } - servers = pool.getCurrentServers(); + servers = pool.getConnectionSource().getAllServers(); Iterator i = servers.iterator(); while (i.hasNext()) { pool.executeOn((ServerLocation) i.next(), op); @@ -235,5 +235,4 @@ public class ExecuteFunctionNoAckOp { return new Message(1, Version.CURRENT); } } - } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java index cce1c37..d32e0f4 100755 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java @@ -337,15 +337,7 @@ public class ExecuteFunctionOp { Object args, MemberMappedArgument memberMappedArg, byte hasResult, ResultCollector rc, boolean isFnSerializationReqd, UserAttributes attributes) { final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>(); - ArrayList<ServerLocation> servers = null; - if (pool.getLocators() == null || pool.getLocators().isEmpty()) { - servers = ((ExplicitConnectionSourceImpl) pool.getConnectionSource()).getAllServers(); - } else { - servers = ((AutoConnectionSourceImpl) pool.getConnectionSource()).findAllServers(); // n/w - // call on - // locator - } - + List<ServerLocation> servers = pool.getConnectionSource().getAllServers(); for (ServerLocation server : servers) { final AbstractOp op = new ExecuteFunctionOpImpl(function, args, memberMappedArg, hasResult, rc, isFnSerializationReqd, (byte) 0, null/* onGroups does not use single-hop for now */, @@ -362,15 +354,7 @@ public class ExecuteFunctionOp { boolean isFnSerializationReqd, boolean isHA, boolean optimizeForWrite, UserAttributes properties) { final List<SingleHopOperationCallable> tasks = new ArrayList<SingleHopOperationCallable>(); - ArrayList<ServerLocation> servers = null; - if (pool.getLocators() == null || pool.getLocators().isEmpty()) { - servers = ((ExplicitConnectionSourceImpl) pool.getConnectionSource()).getAllServers(); - } else { - servers = ((AutoConnectionSourceImpl) pool.getConnectionSource()).findAllServers(); // n/w - // call on - // locator - } - + List<ServerLocation> servers = pool.getConnectionSource().getAllServers(); for (ServerLocation server : servers) { final AbstractOp op = new ExecuteFunctionOpImpl(functionId, args, memberMappedArg, hasResult, rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 0, http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExplicitConnectionSourceImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExplicitConnectionSourceImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExplicitConnectionSourceImpl.java index 412ccb8..aee69e4 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExplicitConnectionSourceImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExplicitConnectionSourceImpl.java @@ -257,7 +257,8 @@ public class ExplicitConnectionSourceImpl implements ConnectionSource { return sb.toString(); } - ArrayList<ServerLocation> getAllServers() { + @Override + public ArrayList<ServerLocation> getAllServers() { ArrayList<ServerLocation> list = new ArrayList<ServerLocation>(); list.addAll(this.serverList); return list; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java index 0ec69aa..7f9a459 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/QueueManagerJUnitTest.java @@ -592,6 +592,11 @@ public class QueueManagerJUnitTest { public boolean isBalanced() { return false; } + + @Override + public List<ServerLocation> getAllServers() { + return Collections.emptyList(); + } } private class DummyConnection implements Connection { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/test/java/org/apache/geode/internal/cache/FireAndForgetFunctionOnAllServersDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FireAndForgetFunctionOnAllServersDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FireAndForgetFunctionOnAllServersDUnitTest.java new file mode 100644 index 0000000..d733ae0 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FireAndForgetFunctionOnAllServersDUnitTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.*; +import org.apache.geode.cache.client.internal.LocatorTestBase; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.internal.AvailablePort; +import org.apache.geode.internal.cache.functions.FireAndForgetFunctionOnAllServers; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static com.jayway.awaitility.Awaitility.*; +import static java.util.concurrent.TimeUnit.*; + +@Category(DistributedTest.class) +public class FireAndForgetFunctionOnAllServersDUnitTest extends LocatorTestBase { + + public FireAndForgetFunctionOnAllServersDUnitTest() { + super(); + } + + @Override + public final void postSetUp() throws Exception { + disconnectAllFromDS(); + } + + @Override + protected final void postTearDownLocatorTestBase() throws Exception { + disconnectAllFromDS(); + } + + @Test + public void testFireAndForgetFunctionOnAllServers() { + + // Test case for Executing a fire-and-forget function on all servers as opposed to only + // executing on the ones the + // client is currently connected to. + + Host host = Host.getHost(0); + VM locator = host.getVM(0); + VM server1 = host.getVM(1); + VM server2 = host.getVM(2); + VM client = host.getVM(3); + + final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + final String locatorHost = NetworkUtils.getServerHostName(host); + + // Step 1. Start a locator and one cache server. + locator.invoke("Start Locator", () -> startLocator(locatorHost, locatorPort, "")); + + String locString = getLocatorString(host, locatorPort); + + // Step 2. Start a server and create a replicated region "R1". + server1.invoke("Start BridgeServer", + () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"})); + + // Step 3. Create a client cache with pool mentioning locator. + client.invoke("create client cache and pool mentioning locator", () -> { + ClientCacheFactory ccf = new ClientCacheFactory(); + ccf.addPoolLocator(locatorHost, locatorPort); + ClientCache cache = ccf.create(); + Pool pool1 = PoolManager.createFactory().addLocator(locatorHost, locatorPort) + .setServerGroup("R1").create("R1"); + + Region region1 = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).setPoolName("R1") + .create("R1"); + + // Step 4. Execute the function to put DistributedMemberID into above created replicated + // region. + Function function = new FireAndForgetFunctionOnAllServers(); + FunctionService.registerFunction(function); + + PoolImpl pool = (PoolImpl) pool1; + + await().atMost(60, SECONDS) + .until(() -> Assert.assertEquals(1, pool.getCurrentServers().size())); + + String regionName = "R1"; + Execution dataSet = FunctionService.onServers(pool1); + dataSet.withArgs(regionName).execute(function); + + // Using Awatility, if the condition is not met during the timeout, a + // ConditionTimeoutException will be thrown. This makes analyzing the failure much simpler + // Step 5. Assert for the region keyset size with 1. + await().atMost(60, SECONDS) + .until(() -> Assert.assertEquals(1, region1.keySetOnServer().size())); + + region1.clear(); + + // Step 6. Start another server mentioning locator and create a replicated region "R1". + server2.invoke("Start BridgeServer", + () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"})); + + // Step 7. Execute the same function to put DistributedMemberID into above created replicated + // region. + await().atMost(60, SECONDS) + .until(() -> Assert.assertEquals(1, pool.getCurrentServers().size())); + dataSet = FunctionService.onServers(pool1); + dataSet.withArgs(regionName).execute(function); + + await().atMost(60, SECONDS) + .until(() -> Assert.assertEquals(2, pool.getCurrentServers().size())); + + // Using Awatility, if the condition is not met during the timeout, a + // ConditionTimeoutException will be thrown. This makes analyzing the failure much simpler + // Step 8. Assert for the region keyset size with 2, since above function was executed on 2 + // servers. + await().atMost(60, SECONDS).until(() -> { + Assert.assertEquals(2, region1.keySetOnServer().size()); + }); + + region1.clear(); + + // Step 8.Stop one of the servers. + server1.invoke("Stop BridgeServer", () -> stopBridgeMemberVM(server1)); + + // Step 9. Execute the same function to put DistributedMemberID into above created replicated + // region. + dataSet = FunctionService.onServers(pool1); + dataSet.withArgs(regionName).execute(function); + + await().atMost(60, SECONDS) + .until(() -> Assert.assertEquals(1, pool.getCurrentServers().size())); + + // Using Awatility, if the condition is not met during the timeout, a + // ConditionTimeoutException will be thrown. This makes analyzing the failure much simpler + // Step 10. Assert for the region keyset size with 1, since only one server was running. + await().atMost(60, SECONDS).until(() -> { + Assert.assertEquals(1, region1.keySetOnServer().size()); + }); + + region1.clear(); + + return null; + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionDUnitTest.java index 3b072d6..e4a85b0 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionDUnitTest.java @@ -143,14 +143,6 @@ public class PRClientServerRegionFunctionExecutionDUnitTest extends PRClientServ .serverSingleKeyExecutionOnRegion_SingleConnection()); } - @Ignore("Bug47584") - @Test - public void testServerSingleKeyExecution_Bug43513_OnServer() { - createScenario_SingleConnection(); - client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest - .serverSingleKeyExecutionOnServer_SingleConnection()); - } - @Test public void testServerSingleKeyExecution_SendException() { createScenario(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/test/java/org/apache/geode/internal/cache/functions/FireAndForgetFunctionOnAllServers.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/functions/FireAndForgetFunctionOnAllServers.java b/geode-core/src/test/java/org/apache/geode/internal/cache/functions/FireAndForgetFunctionOnAllServers.java new file mode 100644 index 0000000..f292f1a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/functions/FireAndForgetFunctionOnAllServers.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.functions; + +import org.apache.geode.LogWriter; +import org.apache.geode.cache.*; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.distributed.internal.InternalDistributedSystem; + +/* + * Function for Executing a fire-and-forget function on all servers as opposed to only executing on + * the ones the client is currently connected to. + */ +public class FireAndForgetFunctionOnAllServers implements Function { + + public static final String ID = FireAndForgetFunctionOnAllServers.class.getName(); + + @Override + public boolean hasResult() { + return false; + } + + @Override + public void execute(FunctionContext context) { + DistributedSystem ds = InternalDistributedSystem.getAnyInstance(); + LogWriter logger = ds.getLogWriter(); + Cache cache = CacheFactory.getAnyInstance(); + String regionName = (String) context.getArguments(); + Region<String, Integer> region1 = cache.getRegion(regionName); + if (region1 == null) { + RegionFactory<String, Integer> rf; + rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + region1 = rf.create(regionName); + } + region1.put(ds.getDistributedMember().toString(), 1); + + logger.info("Executing FireAndForgetFunctionOnAllServers on Member : " + + ds.getDistributedMember() + " with Context : " + context); + + if (!hasResult()) { + return; + } + } + + @Override + public String getId() { + return FireAndForgetFunctionOnAllServers.ID; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6102896/geode-core/src/test/java/org/apache/geode/internal/cache/functions/TestFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/functions/TestFunction.java b/geode-core/src/test/java/org/apache/geode/internal/cache/functions/TestFunction.java index 2a5f51f..438cdd3 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/functions/TestFunction.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/functions/TestFunction.java @@ -15,8 +15,7 @@ package org.apache.geode.internal.cache.functions; import org.apache.geode.LogWriter; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Region; +import org.apache.geode.cache.*; import org.apache.geode.cache.control.RebalanceFactory; import org.apache.geode.cache.control.RebalanceOperation; import org.apache.geode.cache.control.RebalanceResults;