Repository: qpid-jms Updated Branches: refs/heads/master a2992a757 -> 867c1ba43
Use a single Netty buffer to read all message bytes and grow as needed. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/867c1ba4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/867c1ba4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/867c1ba4 Branch: refs/heads/master Commit: 867c1ba43446885f90bbc0a09e134976ac28aaee Parents: a2992a7 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Sep 30 17:02:03 2014 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Sep 30 17:02:03 2014 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/867c1ba4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 41fef20..bced2b4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -16,7 +16,9 @@ */ package org.apache.qpid.jms.provider.amqp; -import java.io.ByteArrayOutputStream; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; @@ -64,12 +66,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver protected static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local"); protected static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector"); + private static final int INITIAL_BUFFER_CAPACITY = 1024 * 128; + protected final AmqpSession session; protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>(); protected boolean presettle; - private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream(); - private final byte incomingBuffer[] = new byte[1024 * 64]; + private final ByteBuf incomingBuffer = Unpooled.buffer(INITIAL_BUFFER_CAPACITY); private final AtomicLong _incomingSequence = new AtomicLong(0); @@ -418,22 +421,21 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver // TODO - Find more efficient ways to produce the Message instance. protected Message decodeIncomingMessage(Delivery incoming) { - byte[] buffer; int count; - while ((count = endpoint.recv(incomingBuffer, 0, incomingBuffer.length)) > 0) { - streamBuffer.write(incomingBuffer, 0, count); + while ((count = endpoint.recv(incomingBuffer.array(), incomingBuffer.writerIndex(), incomingBuffer.writableBytes())) > 0) { + incomingBuffer.writerIndex(incomingBuffer.writerIndex() + count); + if (!incomingBuffer.isWritable()) { + incomingBuffer.capacity((int) (incomingBuffer.capacity() * 1.5)); + } } - // TODO - This will copy, replace with something better later. Pooled Netty Buffer ? - buffer = streamBuffer.toByteArray(); - try { Message protonMessage = Message.Factory.create(); - protonMessage.decode(buffer, 0, buffer.length); + protonMessage.decode(incomingBuffer.array(), 0, incomingBuffer.readableBytes()); return protonMessage; } finally { - streamBuffer.reset(); + incomingBuffer.clear(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org