This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4803096  GEODE-6928 peer-to-peer SSL stream corruption with 
conserve-sockets=false
4803096 is described below

commit 48030961f82f91360d8c88f19516758ba0d6affe
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Mon Jul 1 13:00:21 2019 -0700

    GEODE-6928 peer-to-peer SSL stream corruption with conserve-sockets=false
    
    Modified the NioSslEngine to not modify the decrypted SSL buffer after
    reading a direct-ack response.  This allows the readFully method to
    correctly see what bytes have already been consumed and correctly
    compact the buffer for subsequent reads, if necessary.
    
    The cluster communication test is modified to check for aborted
    connections created (retries) during operation distribution.  Without the
    fix for the problem this check would fail.
---
 .../org/apache/geode/ClusterCommunicationsDUnitTest.java   | 14 +++++++++++---
 .../org/apache/geode/distributed/internal/DMStats.java     |  3 +++
 .../geode/distributed/internal/DistributionStats.java      |  5 +++++
 .../distributed/internal/LonerDistributionManager.java     |  5 +++++
 .../geode/internal/cache/AbstractUpdateOperation.java      |  2 +-
 .../main/java/org/apache/geode/internal/net/NioFilter.java |  7 +++++++
 .../java/org/apache/geode/internal/net/NioSslEngine.java   |  7 +++++++
 .../main/java/org/apache/geode/internal/tcp/MsgReader.java |  8 +-------
 8 files changed, 40 insertions(+), 11 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 4d7bb23..eca86ed 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -66,6 +66,7 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -143,7 +144,9 @@ public class ClusterCommunicationsDUnitTest implements 
Serializable {
     for (int i = 1; i <= NUM_SERVERS; i++) {
       verifyCreatedEntry(getVM(i));
     }
-    performUpdate(getVM(1));
+    for (int iteration = 1; iteration < 6; iteration++) {
+      performUpdate(getVM(1));
+    }
     for (int i = 1; i <= NUM_SERVERS; i++) {
       verifyUpdatedEntry(getVM(i));
     }
@@ -239,8 +242,13 @@ public class ClusterCommunicationsDUnitTest implements 
Serializable {
   }
 
   private void performUpdate(VM memberVM) {
-    memberVM.invoke("perform update", () -> cache
-        .getRegion(regionName).put("testKey", "updatedTestValue"));
+    memberVM.invoke("perform update", () -> {
+      DMStats stats = ((InternalDistributedSystem) 
cache.getDistributedSystem())
+          .getDistributionManager().getStats();
+      int reconnectAttempts = stats.getReconnectAttempts();
+      cache.getRegion(regionName).put("testKey", "updatedTestValue");
+      assertThat(stats.getReconnectAttempts()).isEqualTo(reconnectAttempts);
+    });
   }
 
   private void performCreateWithLargeValue(VM memberVM) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
index 410d74f..f91bf55 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java
@@ -297,6 +297,9 @@ public interface DMStats {
    */
   void incReconnectAttempts();
 
+
+  int getReconnectAttempts();
+
   /**
    * @since GemFire 4.1
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
index f644209..9d7836c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java
@@ -1657,6 +1657,11 @@ public class DistributionStats implements DMStats {
   }
 
   @Override
+  public int getReconnectAttempts() {
+    return stats.getInt(reconnectAttemptsId);
+  }
+
+  @Override
   public void incLostLease() {
     stats.incInt(lostConnectionLeaseId, 1);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index a676a7e..248ff96 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -553,6 +553,11 @@ public class LonerDistributionManager implements 
DistributionManager {
     public void incReconnectAttempts() {}
 
     @Override
+    public int getReconnectAttempts() {
+      return 0;
+    }
+
+    @Override
     public void incLostLease() {}
 
     @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index e467673..96b03b0 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -135,7 +135,7 @@ public abstract class AbstractUpdateOperation extends 
DistributedCacheOperation
       boolean doUpdate = true; // start with assumption we have key and need 
value
       if (shouldDoRemoteCreate(rgn, ev)) {
         if (logger.isDebugEnabled()) {
-          logger.debug("doPutOrCreate: attempting to create entry");
+          logger.debug("doPutOrCreate: attempting to update or create entry");
         }
         final long startPut = CachePerfStats.getStatTime();
         final boolean isBucket = rgn.isUsedForPartitionedRegionBucket();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
index 8e41ef1..01556dc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
@@ -57,6 +57,13 @@ public interface NioFilter {
       throws IOException;
 
   /**
+   * When done reading a direct ack message invoke this method
+   */
+  default void doneReadingDirectAck(ByteBuffer unwrappedBuffer) {
+    doneReading(unwrappedBuffer);
+  }
+
+  /**
    * You must invoke this when done reading from the unwrapped buffer
    */
   default void doneReading(ByteBuffer unwrappedBuffer) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 9bf969d..09472d7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -369,6 +369,13 @@ public class NioSslEngine implements NioFilter {
     return peerAppData;
   }
 
+  @Override
+  public void doneReadingDirectAck(ByteBuffer unwrappedBuffer) {
+    // nothing needs to be done - the next direct-ack message will be
+    // read into the same buffer and compaction will be done during
+    // read-operations
+  }
+
 
   @Override
   public void close(SocketChannel socketChannel) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index 0a33428..789c34b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -58,9 +58,6 @@ public class MsgReader {
 
     Assert.assertTrue(unwrappedBuffer.remaining() >= 
Connection.MSG_HEADER_BYTES);
 
-    int position = unwrappedBuffer.position();
-    int limit = unwrappedBuffer.limit();
-
     try {
       int nioMessageLength = unwrappedBuffer.getInt();
       /* nioMessageVersion = */
@@ -94,12 +91,9 @@ public class MsgReader {
     Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength);
     this.getStats().incMessagesBeingReceived(true, header.messageLength);
     long startSer = this.getStats().startMsgDeserialization();
-    int position = nioInputBuffer.position();
-    int limit = nioInputBuffer.limit();
     try {
       byteBufferInputStream.setBuffer(nioInputBuffer);
       ReplyProcessor21.initMessageRPId();
-      // dumpState("readMessage ready to deserialize", null, nioInputBuffer, 
position, limit);
       return (DistributionMessage) 
InternalDataSerializer.readDSFID(byteBufferInputStream);
     } catch (RuntimeException e) {
       throw e;
@@ -108,7 +102,7 @@ public class MsgReader {
     } finally {
       this.getStats().endMsgDeserialization(startSer);
       this.getStats().decMessagesBeingReceived(header.messageLength);
-      ioFilter.doneReading(nioInputBuffer);
+      ioFilter.doneReadingDirectAck(nioInputBuffer);
     }
   }
 

Reply via email to