This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-8020b in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-8020b by this push: new d9c3ed4 fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer d9c3ed4 is described below commit d9c3ed4529994917112a9f21af0639815d36bca2 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Wed Apr 29 08:54:27 2020 -0700 fixing TODO in MsgStreamerList after seeing a thread create a VersionedMsgStreamer the test doesn't have multiple versions, so I think this TODO is relevant to the SSL decryption exceptions we're seeing (GEODE-8020). [warn 2020/04/28 14:22:11.075 PDT <vm_0_thr_4_bridge1_host1_12580> tid=0x86] BRUCE: java.lang.Exception: stack trace at org.apache.geode.internal.net.BufferPool.acquireDirectBuffer(BufferPool.java:115) at org.apache.geode.internal.net.BufferPool.acquireDirectSenderBuffer(BufferPool.java:64) at org.apache.geode.internal.tcp.MsgStreamer.<init>(MsgStreamer.java:132) at org.apache.geode.internal.tcp.VersionedMsgStreamer.<init>(VersionedMsgStreamer.java:37) at org.apache.geode.internal.tcp.MsgStreamer.create(MsgStreamer.java:199) --- .../org/apache/geode/internal/net/BufferPool.java | 9 +++++++ .../org/apache/geode/internal/tcp/Connection.java | 10 +++++++- .../org/apache/geode/internal/tcp/MsgStreamer.java | 2 +- .../apache/geode/internal/tcp/MsgStreamerList.java | 29 ++++++++-------------- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java index 0997c6e..80a09a7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java @@ -19,11 +19,15 @@ import java.nio.ByteBuffer; import java.util.IdentityHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.logging.log4j.Logger; + import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.internal.Assert; +import org.apache.geode.logging.internal.log4j.api.LogService; public class BufferPool { private final DMStats stats; + private static final Logger logger = LogService.getLogger(); /** * Buffers may be acquired from the Buffers pool @@ -88,6 +92,8 @@ public class BufferPool { } else if (bb.capacity() >= size) { bb.rewind(); bb.limit(size); +// logger.warn("BRUCE: acquiring pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb))); + return bb; } else { // wasn't big enough so put it back in the queue @@ -105,6 +111,8 @@ public class BufferPool { ref = bufferQueue.poll(); } result = ByteBuffer.allocateDirect(size); +// logger.warn("BRUCE: allocating new pooled buffer {} hash {}", result, Integer.toHexString(System.identityHashCode(result))); +// logger.warn("BRUCE: ", new Exception("stack trace")); } else { // if we are using heap buffers then don't bother with keeping them around result = ByteBuffer.allocate(size); @@ -228,6 +236,7 @@ public class BufferPool { private void releaseBuffer(ByteBuffer bb, boolean send) { if (bb.isDirect()) { BBSoftReference bbRef = new BBSoftReference(bb, send); +// logger.warn("BRUCE: releasing pooled buffer {} hash {}", bb, Integer.toHexString(System.identityHashCode(bb))); bufferQueue.offer(bbRef); } else { if (send) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 719f48e..1723e09 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -2725,7 +2725,15 @@ public class Connection implements Runnable { private void processInputBuffer() throws ConnectionException, IOException { inputBuffer.flip(); - ByteBuffer peerDataBuffer = ioFilter.unwrap(inputBuffer); + ByteBuffer peerDataBuffer; + try { + peerDataBuffer = ioFilter.unwrap(inputBuffer); + } catch (SSLException e) { +// if (e.getMessage().contains("bad record MAC")) { +// logger.warn("BRUCE: exception unwrapping buffer {} hash {}", inputBuffer, Integer.toHexString(System.identityHashCode(inputBuffer))); +// } + throw e; + } peerDataBuffer.flip(); boolean done = false; diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java index 9783397..1c3a2b5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java @@ -155,7 +155,7 @@ public class MsgStreamer extends OutputStream int numVersioned = 0; for (Object c : cons) { con = (Connection) c; - if ((version = con.getRemoteVersion()) != null) { + if ((version = con.getRemoteVersion()) != null && Version.CURRENT_ORDINAL > version.ordinal()) { if (versionToConnMap == null) { versionToConnMap = new Object2ObjectOpenHashMap(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java index 3d2446c..08b573c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java @@ -63,25 +63,16 @@ public class MsgStreamerList implements BaseMsgStreamer { for (MsgStreamer streamer : this.streamers) { if (ex != null) { streamer.release(); - // TODO: shouldn't we call continue here? - // It seems wrong to call writeMessage on a streamer we have just released. - // But why do we call release on a streamer when we had an exception on one - // of the previous streamer? - // release clears the direct bb and returns it to the pool but leaves - // it has the "buffer". THen we call writeMessage and it will use "buffer" - // that has also been returned to the pool. - // I think we only have a MsgStreamerList when a DS has a mix of versions - // which usually is just during a rolling upgrade so that might be why we - // haven't noticed this causing a bug. - } - try { - result += streamer.writeMessage(); - // if there is an exception we need to finish the - // loop and release the other streamer's buffers - } catch (RuntimeException e) { - ex = e; - } catch (IOException e) { - ioex = e; + } else { + try { + result += streamer.writeMessage(); + // if there is an exception we need to finish the + // loop and release the other streamer's buffers + } catch (RuntimeException e) { + ex = e; + } catch (IOException e) { + ioex = e; + } } } if (ex != null) {