Repository: qpid-jms Updated Branches: refs/heads/master f29284c6c -> fa1334718
QPIDJMS-401 Refactor fixed producer send path Remove need for instanceof on each send call when the common case it that the send fires and we know what the type is. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fa133471 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fa133471 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fa133471 Branch: refs/heads/master Commit: fa1334718e4b8a57748cc3db9022b560cd4443db Parents: f29284c Author: Timothy Bish <tabish...@gmail.com> Authored: Wed Jul 11 15:20:43 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Wed Jul 11 15:20:43 2018 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpFixedProducer.java | 38 ++++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fa133471/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 0f50b5c..4b2e5b1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -107,18 +107,18 @@ public class AmqpFixedProducer extends AmqpProducer { blocked.put(envelope.getMessageId(), send); getParent().getProvider().pumpToProtonTransport(request); } else { - doSend(envelope, request); - } - } + // If the transaction has failed due to remote termination etc then we just indicate + // the send has succeeded until the a new transaction is started. + if (session.isTransacted() && session.isTransactionFailed()) { + request.onSuccess(); + return; + } - private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { - // If the transaction has failed due to remote termination etc then we just indicate - // the send has succeeded until the a new transaction is started. - if (session.isTransacted() && session.isTransactionFailed()) { - request.onSuccess(); - return; + doSend(envelope, new InFlightSend(envelope, request)); } + } + private void doSend(JmsOutboundMessageDispatch envelope, InFlightSend send) throws IOException, JMSException { LOG.trace("Producer sending message: {}", envelope); boolean presettle = envelope.isPresettle() || isPresettle(); @@ -143,15 +143,8 @@ public class AmqpFixedProducer extends AmqpProducer { AmqpProvider provider = getParent().getProvider(); - InFlightSend send = null; - if (request instanceof InFlightSend) { - send = (InFlightSend) request; - } else { - send = new InFlightSend(envelope, request); - - if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE) { - send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send); - } + if (!presettle && getSendTimeout() != JmsConnectionInfo.INFINITE && send.requestTimeout == null) { + send.requestTimeout = getParent().getProvider().scheduleRequestTimeout(send, getSendTimeout(), send); } if (presettle) { @@ -166,7 +159,7 @@ public class AmqpFixedProducer extends AmqpProducer { // Put it on the wire and let it fail if the connection is broken, if it does // get written then continue on to determine when we should complete it. - if (provider.pumpToProtonTransport(request, false)) { + if (provider.pumpToProtonTransport(send, false)) { // For presettled messages we can just mark as successful and we are done, but // for any other message we still track it until the remote settles. If the send // was tagged as asynchronous we must mark the original request as complete but @@ -190,6 +183,13 @@ public class AmqpFixedProducer extends AmqpProducer { LOG.trace("Dispatching previously held send"); InFlightSend held = blockedSends.next(); try { + // If the transaction has failed due to remote termination etc then we just indicate + // the send has succeeded until the a new transaction is started. + if (session.isTransacted() && session.isTransactionFailed()) { + held.onSuccess(); + return; + } + doSend(held.getEnvelope(), held); } catch (JMSException e) { throw IOExceptionSupport.create(e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org