Repository: qpid-proton-j Updated Branches: refs/heads/master 84b3ce477 -> e5a7dcade
PROTON-1828: add ability limit outgoing frame sizes Project: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/commit/e5a7dcad Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/tree/e5a7dcad Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton-j/diff/e5a7dcad Branch: refs/heads/master Commit: e5a7dcade2996b2b68967949ddf1377f954bf579 Parents: 84b3ce4 Author: Robbie Gemmell <rob...@apache.org> Authored: Fri Apr 13 16:03:27 2018 +0100 Committer: Robbie Gemmell <rob...@apache.org> Committed: Fri Apr 13 16:03:27 2018 +0100 ---------------------------------------------------------------------- .../apache/qpid/proton/engine/Transport.java | 13 ++ .../qpid/proton/engine/impl/TransportImpl.java | 20 ++- .../proton/engine/impl/TransportImplTest.java | 130 ++++++++++++++++++- 3 files changed, 161 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java index 35b2d50..f8de042 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java @@ -308,4 +308,17 @@ public interface Transport extends Endpoint void setEmitFlowEventOnSend(boolean emitFlowEventOnSend); boolean isEmitFlowEventOnSend(); + + /** + * Set an upper limit on the size of outgoing frames that will be sent + * to the peer. Allows constraining the transport not to emit Transfer + * frames over a given size even when the peers max frame size allows it. + * + * Must be set before receiving the peers Open frame to have effect. + * + * @param size the size limit to apply + */ + void setOutboundFrameSizeLimit(int size); + + int getOutboundFrameSizeLimit(); } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 1d0103e..afadb5f 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -104,6 +104,7 @@ public class TransportImpl extends EndpointImpl private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int _remoteMaxFrameSize = MIN_MAX_FRAME_SIZE; + private int _outboundFrameSizeLimit = 0; private int _channelMax = CHANNEL_MAX_LIMIT; private int _remoteChannelMax = CHANNEL_MAX_LIMIT; @@ -1105,12 +1106,19 @@ public class TransportImpl extends EndpointImpl _open = open; } + int effectiveMaxFrameSize = _remoteMaxFrameSize; if(open.getMaxFrameSize().longValue() > 0) { _remoteMaxFrameSize = (int) open.getMaxFrameSize().longValue(); - _frameWriter.setMaxFrameSize(_remoteMaxFrameSize); + effectiveMaxFrameSize = (int) Math.min(open.getMaxFrameSize().longValue(), Integer.MAX_VALUE); } + if(_outboundFrameSizeLimit > 0) { + effectiveMaxFrameSize = (int) Math.min(open.getMaxFrameSize().longValue(), _outboundFrameSizeLimit); + } + + _frameWriter.setMaxFrameSize(effectiveMaxFrameSize); + if (open.getChannelMax().longValue() > 0) { _remoteChannelMax = (int) open.getChannelMax().longValue(); @@ -1779,4 +1787,14 @@ public class TransportImpl extends EndpointImpl _additionalTransportLayers.add(layer); } } + + @Override + public void setOutboundFrameSizeLimit(int limit) { + _outboundFrameSizeLimit = limit; + } + + @Override + public int getOutboundFrameSizeLimit() { + return _outboundFrameSizeLimit; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/e5a7dcad/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index ead411f..50c04fd 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -34,6 +34,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; +import java.util.Random; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; @@ -78,7 +79,7 @@ public class TransportImplTest private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID, new Begin(), null); private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID, new Open(), null); - private static final int BUFFER_SIZE = 4096; + private static final int BUFFER_SIZE = 8 * 1024; @Rule public ExpectedException _expectedException = ExpectedException.none(); @@ -2503,6 +2504,122 @@ public class TransportImplTest } } + @Test + public void testMaxFrameSizeOfPeerHasEffect() + { + doMaxFrameSizeTestImpl(0, 0, 5700, 1); + doMaxFrameSizeTestImpl(1024, 0, 5700, 6); + } + + @Test + public void testMaxFrameSizeOutgoingFrameSizeLimitHasEffect() + { + doMaxFrameSizeTestImpl(0, 512, 5700, 12); + doMaxFrameSizeTestImpl(1024, 512, 5700, 12); + doMaxFrameSizeTestImpl(1024, 2048, 5700, 6); + } + + void doMaxFrameSizeTestImpl(int remoteMaxFrameSize, int outboundFrameSizeLimit, int contentLength, int expectedNumFrames) + { + MockTransportImpl transport = new MockTransportImpl(); + transport.setEmitFlowEventOnSend(false); + + // If we have been given an outboundFrameSizeLimit, configure it + if(outboundFrameSizeLimit != 0) { + transport.setOutboundFrameSizeLimit(outboundFrameSizeLimit); + } + + Connection connection = Proton.connection(); + transport.bind(connection); + + Session session = connection.session(); + session.open(); + + String linkName = "mySender"; + Sender sender = session.sender(linkName); + sender.open(); + + String messageContent = createLargeContent(contentLength); + sendMessage(sender, "tag1", messageContent); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size()); + + // Now open the connection, expect the Open and Begin frames but + // nothing else as we haven't opened the receiver itself yet. + connection.open(); + + pumpMockTransport(transport); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open); + assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin); + assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach); + + // Send the necessary responses to open/begin/attach then give sender credit + Open open = new Open(); + if(remoteMaxFrameSize != 0) { + open.setMaxFrameSize(UnsignedInteger.valueOf(remoteMaxFrameSize)); + } + transport.handleFrame(new TransportFrame(0, open, null)); + + Begin begin = new Begin(); + begin.setRemoteChannel(UnsignedShort.valueOf((short) 0)); + transport.handleFrame(new TransportFrame(0, begin, null)); + + Attach attach = new Attach(); + attach.setHandle(UnsignedInteger.ZERO); + attach.setRole(Role.RECEIVER); + attach.setName(linkName); + attach.setInitialDeliveryCount(UnsignedInteger.ZERO); + transport.handleFrame(new TransportFrame(0, attach, null)); + + Flow flow = new Flow(); + flow.setHandle(UnsignedInteger.ZERO); + flow.setDeliveryCount(UnsignedInteger.ZERO); + flow.setNextIncomingId(UnsignedInteger.ONE); + flow.setNextOutgoingId(UnsignedInteger.ZERO); + flow.setIncomingWindow(UnsignedInteger.valueOf(1024)); + flow.setOutgoingWindow(UnsignedInteger.valueOf(1024)); + flow.setLinkCredit(UnsignedInteger.valueOf(10)); + + transport.handleFrame(new TransportFrame(0, flow, null)); + + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size()); + + // Now pump the transport again and expect transfers for the message + pumpMockTransport(transport); + + // This calc isn't entirely precise, there is some added performative/frame overhead not + // accounted for...but values are chosen to work, and verified here. + final int frameCount; + if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit == 0) { + frameCount = 1; + } else if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit != 0) { + frameCount = (int) Math.ceil((double)contentLength / (double) outboundFrameSizeLimit); + } else { + int effectiveMaxFrameSize; + if(outboundFrameSizeLimit != 0) { + effectiveMaxFrameSize = Math.min(outboundFrameSizeLimit, remoteMaxFrameSize); + } else { + effectiveMaxFrameSize = remoteMaxFrameSize; + } + + frameCount = (int) Math.ceil((double)contentLength / (double) effectiveMaxFrameSize); + } + + assertEquals("Unexpected number of frames calculated", expectedNumFrames, frameCount); + + final int start = 3; + final int totalExpected = start + frameCount; + assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), totalExpected, transport.writes.size()); + for(int i = start; i < totalExpected; i++) { + assertTrue("Unexpected frame type", transport.writes.get(i) instanceof Transfer); + } + } + private void processInput(MockTransportImpl transport, ByteBuffer data) { while (data.remaining() > 0) { @@ -2515,4 +2632,15 @@ public class TransportImplTest } } + + private static String createLargeContent(int length) { + Random rand = new Random(System.currentTimeMillis()); + + byte[] payload = new byte[length]; + for (int i = 0; i < length; i++) { + payload[i] = (byte) (64 + 1 + rand.nextInt(9)); + } + + return new String(payload, StandardCharsets.UTF_8); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org