This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 4803096 GEODE-6928 peer-to-peer SSL stream corruption with conserve-sockets=false 4803096 is described below commit 48030961f82f91360d8c88f19516758ba0d6affe Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Mon Jul 1 13:00:21 2019 -0700 GEODE-6928 peer-to-peer SSL stream corruption with conserve-sockets=false Modified the NioSslEngine to not modify the decrypted SSL buffer after reading a direct-ack response. This allows the readFully method to correctly see what bytes have already been consumed and correctly compact the buffer for subsequent reads, if necessary. The cluster communication test is modified to check for aborted connections created (retries) during operation distribution. Without the fix for the problem this check would fail. --- .../org/apache/geode/ClusterCommunicationsDUnitTest.java | 14 +++++++++++--- .../org/apache/geode/distributed/internal/DMStats.java | 3 +++ .../geode/distributed/internal/DistributionStats.java | 5 +++++ .../distributed/internal/LonerDistributionManager.java | 5 +++++ .../geode/internal/cache/AbstractUpdateOperation.java | 2 +- .../main/java/org/apache/geode/internal/net/NioFilter.java | 7 +++++++ .../java/org/apache/geode/internal/net/NioSslEngine.java | 7 +++++++ .../main/java/org/apache/geode/internal/tcp/MsgReader.java | 8 +------- 8 files changed, 40 insertions(+), 11 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java index 4d7bb23..eca86ed 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java @@ -66,6 +66,7 @@ import org.apache.geode.cache.RegionShortcut; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DirectReplyProcessor; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -143,7 +144,9 @@ public class ClusterCommunicationsDUnitTest implements Serializable { for (int i = 1; i <= NUM_SERVERS; i++) { verifyCreatedEntry(getVM(i)); } - performUpdate(getVM(1)); + for (int iteration = 1; iteration < 6; iteration++) { + performUpdate(getVM(1)); + } for (int i = 1; i <= NUM_SERVERS; i++) { verifyUpdatedEntry(getVM(i)); } @@ -239,8 +242,13 @@ public class ClusterCommunicationsDUnitTest implements Serializable { } private void performUpdate(VM memberVM) { - memberVM.invoke("perform update", () -> cache - .getRegion(regionName).put("testKey", "updatedTestValue")); + memberVM.invoke("perform update", () -> { + DMStats stats = ((InternalDistributedSystem) cache.getDistributedSystem()) + .getDistributionManager().getStats(); + int reconnectAttempts = stats.getReconnectAttempts(); + cache.getRegion(regionName).put("testKey", "updatedTestValue"); + assertThat(stats.getReconnectAttempts()).isEqualTo(reconnectAttempts); + }); } private void performCreateWithLargeValue(VM memberVM) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java index 410d74f..f91bf55 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java @@ -297,6 +297,9 @@ public interface DMStats { */ void incReconnectAttempts(); + + int getReconnectAttempts(); + /** * @since GemFire 4.1 */ diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java index f644209..9d7836c 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java @@ -1657,6 +1657,11 @@ public class DistributionStats implements DMStats { } @Override + public int getReconnectAttempts() { + return stats.getInt(reconnectAttemptsId); + } + + @Override public void incLostLease() { stats.incInt(lostConnectionLeaseId, 1); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java index a676a7e..248ff96 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java @@ -553,6 +553,11 @@ public class LonerDistributionManager implements DistributionManager { public void incReconnectAttempts() {} @Override + public int getReconnectAttempts() { + return 0; + } + + @Override public void incLostLease() {} @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java index e467673..96b03b0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java @@ -135,7 +135,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation boolean doUpdate = true; // start with assumption we have key and need value if (shouldDoRemoteCreate(rgn, ev)) { if (logger.isDebugEnabled()) { - logger.debug("doPutOrCreate: attempting to create entry"); + logger.debug("doPutOrCreate: attempting to update or create entry"); } final long startPut = CachePerfStats.getStatTime(); final boolean isBucket = rgn.isUsedForPartitionedRegionBucket(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java index 8e41ef1..01556dc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java @@ -57,6 +57,13 @@ public interface NioFilter { throws IOException; /** + * When done reading a direct ack message invoke this method + */ + default void doneReadingDirectAck(ByteBuffer unwrappedBuffer) { + doneReading(unwrappedBuffer); + } + + /** * You must invoke this when done reading from the unwrapped buffer */ default void doneReading(ByteBuffer unwrappedBuffer) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index 9bf969d..09472d7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -369,6 +369,13 @@ public class NioSslEngine implements NioFilter { return peerAppData; } + @Override + public void doneReadingDirectAck(ByteBuffer unwrappedBuffer) { + // nothing needs to be done - the next direct-ack message will be + // read into the same buffer and compaction will be done during + // read-operations + } + @Override public void close(SocketChannel socketChannel) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index 0a33428..789c34b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -58,9 +58,6 @@ public class MsgReader { Assert.assertTrue(unwrappedBuffer.remaining() >= Connection.MSG_HEADER_BYTES); - int position = unwrappedBuffer.position(); - int limit = unwrappedBuffer.limit(); - try { int nioMessageLength = unwrappedBuffer.getInt(); /* nioMessageVersion = */ @@ -94,12 +91,9 @@ public class MsgReader { Assert.assertTrue(nioInputBuffer.remaining() >= header.messageLength); this.getStats().incMessagesBeingReceived(true, header.messageLength); long startSer = this.getStats().startMsgDeserialization(); - int position = nioInputBuffer.position(); - int limit = nioInputBuffer.limit(); try { byteBufferInputStream.setBuffer(nioInputBuffer); ReplyProcessor21.initMessageRPId(); - // dumpState("readMessage ready to deserialize", null, nioInputBuffer, position, limit); return (DistributionMessage) InternalDataSerializer.readDSFID(byteBufferInputStream); } catch (RuntimeException e) { throw e; @@ -108,7 +102,7 @@ public class MsgReader { } finally { this.getStats().endMsgDeserialization(startSer); this.getStats().decMessagesBeingReceived(header.messageLength); - ioFilter.doneReading(nioInputBuffer); + ioFilter.doneReadingDirectAck(nioInputBuffer); } }