This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 0e550cc GEODE-5828: Add dunit test reproducing the issue and incorporate review changes (#2584) 0e550cc is described below commit 0e550cce06dbf3c5d0dfddffa20a6bca743179a0 Author: agingade <aging...@pivotal.io> AuthorDate: Wed Oct 10 12:09:09 2018 -0700 GEODE-5828: Add dunit test reproducing the issue and incorporate review changes (#2584) This ticket is already closed by the checkin db8ba67 As part of this checkin dunit test is added reproduce the issue and verify the previous checking fixed the issue. Incorporate review comments missed in the previous checkin. --- ...ntServerTransactionFailoverDistributedTest.java | 124 ++++++++++++++++++++- .../geode/internal/cache/TXCommitMessage.java | 2 +- 2 files changed, 124 insertions(+), 2 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java index 9f411f4..c8acb15 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java @@ -14,8 +14,12 @@ */ package org.apache.geode.internal.cache; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.geode.test.dunit.VM.getHostName; import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.getBlackboard; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import java.io.Serializable; @@ -26,6 +30,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheTransactionManager; import org.apache.geode.cache.PartitionAttributes; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; @@ -38,11 +43,16 @@ import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.DistributionMessageObserver; import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil; import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.CacheRule; import org.apache.geode.test.dunit.rules.ClientCacheRule; @@ -113,8 +123,13 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa } private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception { + return createServerRegion(totalNumBuckets, isAccessor, 0); + } + + private int createServerRegion(int totalNumBuckets, boolean isAccessor, int redundancy) + throws Exception { PartitionAttributesFactory factory = new PartitionAttributesFactory(); - factory.setTotalNumBuckets(totalNumBuckets); + factory.setTotalNumBuckets(totalNumBuckets).setRedundantCopies(redundancy); if (isAccessor) { factory.setLocalMaxMemory(0); } @@ -338,4 +353,111 @@ public class ClientServerTransactionFailoverDistributedTest implements Serializa Thread.sleep(1000); } } + + @Test + public void txCommitGetsAppliedOnAllTheReplicasAfterHostIsShutDownAndIfOneOfTheNodeHasCommitted() + throws Exception { + getBlackboard().initBlackboard(); + VM client = server4; + + port1 = server1.invoke(() -> createServerRegion(1, false, 2)); + + server1.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + region.put("Key-1", "Value-1"); + region.put("Key-2", "Value-2"); + }); + + port2 = server2.invoke(() -> createServerRegion(1, false, 2)); + + server3.invoke(() -> createServerRegion(1, false, 2)); + + client.invoke(() -> createClientRegion(true, port1, port2)); + + server1.invoke(() -> { + DistributionMessageObserver.setInstance( + new DistributionMessageObserver() { + @Override + public void beforeSendMessage(ClusterDistributionManager dm, + DistributionMessage message) { + if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) { + InternalDistributedMember m = message.getRecipients()[0]; + message.resetRecipients(); + message.setRecipient(m); + } + } + }); + }); + + server2.invoke(() -> { + DistributionMessageObserver.setInstance( + new DistributionMessageObserver() { + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, + DistributionMessage message) { + if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) { + getBlackboard().signalGate("bounce"); + } + } + }); + }); + + server3.invoke(() -> { + DistributionMessageObserver.setInstance( + new DistributionMessageObserver() { + @Override + public void beforeProcessMessage(ClusterDistributionManager dm, + DistributionMessage message) { + if (message instanceof TXCommitMessage.CommitProcessForTXIdMessage) { + getBlackboard().signalGate("bounce"); + } + } + }); + }); + + AsyncInvocation clientAsync = client.invokeAsync(() -> { + { + CacheTransactionManager transactionManager = + clientCacheRule.getClientCache().getCacheTransactionManager(); + Region region = clientCacheRule.getClientCache().getRegion(regionName); + transactionManager.begin(); + region.put("TxKey-1", "TxValue-1"); + region.put("TxKey-2", "TxValue-2"); + transactionManager.commit(); + } + }); + + await().atMost(60, SECONDS).until(() -> getBlackboard().isGateSignaled("bounce")); + server1.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + server1.bounceForcibly(); + + clientAsync.join(); + + server2.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1"); + assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2"); + }); + + server3.invoke(() -> { + Region region = cacheRule.getCache().getRegion(regionName); + assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1"); + assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2"); + }); + + client.invoke(() -> { + Region region = clientCacheRule.getClientCache().getRegion(regionName); + assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1"); + assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2"); + }); + + server2.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + server3.invoke(() -> { + DistributionMessageObserver.setInstance(null); + }); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java index c88a086..4e0b896 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java @@ -1985,7 +1985,7 @@ public class TXCommitMessage extends PooledDistributionMessage if (!getSender().equals(id)) { return; } - getDistributionManager().removeMembershipListener(this); + distributionManager.removeMembershipListener(this); synchronized (this) { if (isProcessing() || this.departureNoticed) {