This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0e550cc  GEODE-5828: Add dunit test reproducing the issue and 
incorporate review changes (#2584)
0e550cc is described below

commit 0e550cce06dbf3c5d0dfddffa20a6bca743179a0
Author: agingade <aging...@pivotal.io>
AuthorDate: Wed Oct 10 12:09:09 2018 -0700

    GEODE-5828: Add dunit test reproducing the issue and incorporate review 
changes (#2584)
    
    This ticket is already closed by the checkin db8ba67
    As part of this checkin dunit test is added reproduce the issue and verify 
the previous checking fixed the issue.
    Incorporate review comments missed in the previous checkin.
---
 ...ntServerTransactionFailoverDistributedTest.java | 124 ++++++++++++++++++++-
 .../geode/internal/cache/TXCommitMessage.java      |   2 +-
 2 files changed, 124 insertions(+), 2 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
index 9f411f4..c8acb15 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -14,8 +14,12 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.test.dunit.VM.getHostName;
 import static org.apache.geode.test.dunit.VM.getVM;
+import static 
org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.getBlackboard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 
 import java.io.Serializable;
@@ -26,6 +30,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheTransactionManager;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
@@ -38,11 +43,16 @@ import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.distributed.internal.ServerLocation;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
 import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
 import org.apache.geode.test.dunit.rules.ClientCacheRule;
@@ -113,8 +123,13 @@ public class 
ClientServerTransactionFailoverDistributedTest implements Serializa
   }
 
   private int createServerRegion(int totalNumBuckets, boolean isAccessor) 
throws Exception {
+    return createServerRegion(totalNumBuckets, isAccessor, 0);
+  }
+
+  private int createServerRegion(int totalNumBuckets, boolean isAccessor, int 
redundancy)
+      throws Exception {
     PartitionAttributesFactory factory = new PartitionAttributesFactory();
-    factory.setTotalNumBuckets(totalNumBuckets);
+    factory.setTotalNumBuckets(totalNumBuckets).setRedundantCopies(redundancy);
     if (isAccessor) {
       factory.setLocalMaxMemory(0);
     }
@@ -338,4 +353,111 @@ public class 
ClientServerTransactionFailoverDistributedTest implements Serializa
       Thread.sleep(1000);
     }
   }
+
+  @Test
+  public void 
txCommitGetsAppliedOnAllTheReplicasAfterHostIsShutDownAndIfOneOfTheNodeHasCommitted()
+      throws Exception {
+    getBlackboard().initBlackboard();
+    VM client = server4;
+
+    port1 = server1.invoke(() -> createServerRegion(1, false, 2));
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      region.put("Key-1", "Value-1");
+      region.put("Key-2", "Value-2");
+    });
+
+    port2 = server2.invoke(() -> createServerRegion(1, false, 2));
+
+    server3.invoke(() -> createServerRegion(1, false, 2));
+
+    client.invoke(() -> createClientRegion(true, port1, port2));
+
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeSendMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof 
TXCommitMessage.CommitProcessForTXIdMessage) {
+                InternalDistributedMember m = message.getRecipients()[0];
+                message.resetRecipients();
+                message.setRecipient(m);
+              }
+            }
+          });
+    });
+
+    server2.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeProcessMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof 
TXCommitMessage.CommitProcessForTXIdMessage) {
+                getBlackboard().signalGate("bounce");
+              }
+            }
+          });
+    });
+
+    server3.invoke(() -> {
+      DistributionMessageObserver.setInstance(
+          new DistributionMessageObserver() {
+            @Override
+            public void beforeProcessMessage(ClusterDistributionManager dm,
+                DistributionMessage message) {
+              if (message instanceof 
TXCommitMessage.CommitProcessForTXIdMessage) {
+                getBlackboard().signalGate("bounce");
+              }
+            }
+          });
+    });
+
+    AsyncInvocation clientAsync = client.invokeAsync(() -> {
+      {
+        CacheTransactionManager transactionManager =
+            clientCacheRule.getClientCache().getCacheTransactionManager();
+        Region region = clientCacheRule.getClientCache().getRegion(regionName);
+        transactionManager.begin();
+        region.put("TxKey-1", "TxValue-1");
+        region.put("TxKey-2", "TxValue-2");
+        transactionManager.commit();
+      }
+    });
+
+    await().atMost(60, SECONDS).until(() -> 
getBlackboard().isGateSignaled("bounce"));
+    server1.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+    server1.bounceForcibly();
+
+    clientAsync.join();
+
+    server2.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    server3.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    client.invoke(() -> {
+      Region region = clientCacheRule.getClientCache().getRegion(regionName);
+      assertThat(region.get("TxKey-1")).isEqualTo("TxValue-1");
+      assertThat(region.get("TxKey-2")).isEqualTo("TxValue-2");
+    });
+
+    server2.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+    server3.invoke(() -> {
+      DistributionMessageObserver.setInstance(null);
+    });
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index c88a086..4e0b896 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1985,7 +1985,7 @@ public class TXCommitMessage extends 
PooledDistributionMessage
     if (!getSender().equals(id)) {
       return;
     }
-    getDistributionManager().removeMembershipListener(this);
+    distributionManager.removeMembershipListener(this);
 
     synchronized (this) {
       if (isProcessing() || this.departureNoticed) {

Reply via email to