This is an automated email from the ASF dual-hosted git repository. burcham pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push: new 26f7c17 GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562) 26f7c17 is described below commit 26f7c17b49b4e83f2af40fca66c8719658a53bf2 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Tue Sep 29 10:17:56 2020 -0700 GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (#5562) Limit the size of message chunks to the maximum message size allowed by org.apache.geode.internal.tcp.Connection. (cherry picked from commit b439d3301dc15a81a9917b05ca4bd0717d1718bc) (cherry picked from commit cb07f831b1ce1023608e59f873b015f1ae2768bc) --- .../org/apache/geode/internal/tcp/MsgStreamer.java | 9 +- .../apache/geode/internal/tcp/MsgStreamerTest.java | 129 +++++++++++++++++++++ 2 files changed, 137 insertions(+), 1 deletion(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java index ed25ce3..69f8047 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java @@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; import org.apache.geode.DataSerializer; +import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.internal.Assert; @@ -129,7 +130,8 @@ public class MsgStreamer extends OutputStream this.stats = stats; this.msg = msg; this.cons = cons; - this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize); + int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE); + this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize); this.buffer.clear(); this.buffer.position(Connection.MSG_HEADER_BYTES); this.msgId = MsgIdGenerator.NO_MSG_ID; @@ -347,6 +349,11 @@ public class MsgStreamer extends OutputStream this.buffer.position(Connection.MSG_HEADER_BYTES); } + @VisibleForTesting + protected ByteBuffer getBuffer() { + return buffer; + } + @Override public void close() throws IOException { try { diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java new file mode 100644 index 0000000..22f5756 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.tcp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +import javax.net.ssl.SSLException; + +import org.junit.Test; + +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.DistributionMessage; +import org.apache.geode.distributed.internal.SerialAckedMessage; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.net.BufferPool; +import org.apache.geode.internal.serialization.Version; + +public class MsgStreamerTest { + private DMStats stats = mock(DMStats.class); + private BufferPool pool = spy(new BufferPool(stats)); + Connection connection1 = mock(Connection.class); + Connection connection2 = mock(Connection.class); + + // This test relies on GEODE-8020 fix, which has not yet been back-ported to 1.12 + // @Test + // public void create() { + // final BaseMsgStreamer msgStreamer = createMsgStreamer(false); + // assertThat(msgStreamer).isInstanceOf(MsgStreamer.class); + // } + + @Test + public void createWithMixedVersions() { + final BaseMsgStreamer msgStreamer = createMsgStreamer(true); + assertThat(msgStreamer).isInstanceOf(MsgStreamerList.class); + } + + @Test + public void streamerListRelease() throws IOException { + final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true); + msgStreamer.writeMessage(); + verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class)); + } + + @Test + public void streamerListReleaseWithException() throws IOException { + final MsgStreamerList msgStreamer = (MsgStreamerList) createMsgStreamer(true); + // if the first streamer throws an exception while writing the message we should still only + // release two buffers (one for each streamer) + doThrow(new SSLException("")).when(connection1).sendPreserialized(any(ByteBuffer.class), + any(Boolean.class), any(DistributionMessage.class)); + msgStreamer.writeMessage(); + verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class)); + } + + @Test + public void streamerRespectsMaxMessageSize() { + InternalDistributedMember member1; + member1 = new InternalDistributedMember("localhost", 1234); + + DistributionMessage message = new SerialAckedMessage(); + message.setRecipients(Arrays.asList(member1)); + + when(connection1.getRemoteAddress()).thenReturn(member1); + when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT); + // create a streamer for a Connection that has a buffer size that's larger than the + // biggest message we can actually send. This is picked up by the MsgStreamer to allocate + // a buffer + when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1); + List<Connection> connections = Arrays.asList(connection1); + + final BaseMsgStreamer msgStreamer = + MsgStreamer.create(connections, message, false, stats, pool); + // the streamer ought to have limited the message buffer to MAX_MSG_SIZE + assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity()) + .isEqualTo(Connection.MAX_MSG_SIZE); + } + + + + protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) { + + InternalDistributedMember member1, member2; + member1 = new InternalDistributedMember("localhost", 1234); + member2 = new InternalDistributedMember("localhost", 2345); + + DistributionMessage message = new SerialAckedMessage(); + message.setRecipients(Arrays.asList(member1, member2)); + + when(connection1.getRemoteAddress()).thenReturn(member1); + when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT); + when(connection1.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE); + when(connection2.getRemoteAddress()).thenReturn(member2); + when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE); + if (mixedDestinationVersions) { + when(connection2.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0); + } else { + when(connection2.getRemoteVersion()).thenReturn(Version.CURRENT); + } + List<Connection> connections = Arrays.asList(connection1, connection2); + + return MsgStreamer.create(connections, message, false, stats, pool); + } +}