This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new e04e3f8  QPIDJMS-476 Enhance anonymous fallback producer to allow 
asynchronous sends
e04e3f8 is described below

commit e04e3f847859cddecc4704d6a9c628cf435bf97f
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Mon Sep 30 15:07:50 2019 -0400

    QPIDJMS-476 Enhance anonymous fallback producer to allow asynchronous sends
    
    Improves performance of anonymous fallback producer mode by allowing
    asynchronous sends and sends that reuse existing cache anonymous
    fallback producer instances avoiding the need for open -> send -> close
    on each send.
---
 .../amqp/AmqpAnonymousFallbackProducer.java        |  388 +++++-
 .../qpid/jms/provider/amqp/AmqpConnection.java     |   37 +
 .../qpid/jms/provider/amqp/AmqpFixedProducer.java  |    9 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java       |   41 +
 .../AnonymousFallbackProducerIntegrationTest.java  | 1428 ++++++++++++++++++++
 .../jms/integration/ProducerIntegrationTest.java   |  278 ----
 .../jms/integration/SessionIntegrationTest.java    |   10 +-
 .../provider/failover/FailoverIntegrationTest.java |   61 +
 qpid-jms-docs/Configuration.md                     |    2 +
 9 files changed, 1908 insertions(+), 346 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 63e5cbb..b3ae647 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -16,16 +16,26 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
 
+import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.NoOpAsyncResult;
 import org.apache.qpid.jms.provider.ProviderException;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
+import org.apache.qpid.jms.provider.exceptions.ProviderIllegalStateException;
 import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Sender;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,8 +50,11 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
     private static final Logger LOG = 
LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class);
     private static final IdGenerator producerIdGenerator = new IdGenerator();
 
+    private final AmqpConnection connection;
+    private final Map<JmsDestination, AmqpFallbackProducer> producerCache = 
new LinkedHashMap<>(1, 0.75f, true);
     private final String producerIdKey = producerIdGenerator.generateId();
     private long producerIdCount;
+    private final ScheduledFuture<?> cacheProducerTimeoutTask;
 
     /**
      * Creates the Anonymous Producer object.
@@ -53,36 +66,94 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
      */
     public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo 
info) {
         super(session, info);
+
+        this.connection = session.getConnection();
+
+        final long sweeperInterval = 
connection.getAnonymousProducerCacheTimeout();
+        if (sweeperInterval > 0 && connection.getAnonymousProducerCacheSize() 
> 0) {
+            LOG.trace("Cached Producer timeout monitoring enabled: interval = 
{}ms", sweeperInterval);
+            cacheProducerTimeoutTask = connection.scheduleWithFixedDelay(new 
CachedProducerSweeper(), sweeperInterval);
+        } else {
+            LOG.trace("No Cached Producer timeout monitoring enabled based on 
configuration.");
+            cacheProducerTimeoutTask = null;
+        }
     }
 
     @Override
     public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) 
throws ProviderException {
         LOG.trace("Started send chain for anonymous producer: {}", 
getProducerId());
+        AmqpFallbackProducer producer = 
producerCache.get(envelope.getDestination());
+
+        if (producer != null && !producer.isAwaitingClose()) {
+            producer.send(envelope, request);
+        } else {
+            handleSendWhenCachedProducerNotAvailable(envelope, request);
+        }
+    }
+
+    private void 
handleSendWhenCachedProducerNotAvailable(JmsOutboundMessageDispatch envelope, 
AsyncResult request) throws ProviderException {
+        AmqpFallbackProducer producer = 
producerCache.get(envelope.getDestination());
+
+        if (producer != null && producer.isAwaitingClose()) {
+            // Producer timed out, or was closed due to send failure wait for 
close to finish then try
+            // to open a new link and send this new message.  This prevents 
the cache from carrying more
+            // than one producer on the same address (destination).
+            producer.close(new SendPendingCloseRequest(producer, envelope, 
request));
+        } else if (producerCache.size() < 
connection.getAnonymousProducerCacheSize()) {
+            startSendWithNewProducer(envelope, request);
+        } else {
+            startSendAfterOldestProducerEvicted(envelope, request);
+        }
+    }
 
+    private void 
startSendAfterOldestProducerEvicted(JmsOutboundMessageDispatch envelope, 
AsyncResult request) throws ProviderException {
+        // No producer in cache yet, or connection limit is zero so we always 
fall into trying to close oldest if one is open.
+        if (producerCache.isEmpty()) {
+            startSendWithNewProducer(envelope, request);
+        } else {
+            // Least recently used producer which will be closed to make room 
for the new one.
+            AmqpFallbackProducer producer = 
producerCache.values().iterator().next();
+            LOG.trace("Next send will commence after producer: {} has been 
closed", producer);
+            producer.close(new SendPendingCloseRequest(producer, envelope, 
request));
+        }
+    }
 
-        // Create a new ProducerInfo for the short lived producer that's 
created to perform the
-        // send to the given AMQP target.
+    private void startSendWithNewProducer(JmsOutboundMessageDispatch envelope, 
AsyncResult request) throws ProviderException {
+        LOG.trace("Next send will commence after producer for destination {} 
been opened", envelope.getDestination());
+
+        // Manufactured Producer state for a fixed producer that sends to the 
target of this specific envelope.
         JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
         info.setDestination(envelope.getDestination());
         info.setPresettle(this.getResourceInfo().isPresettle());
 
         // We open a Fixed Producer instance with the target destination.  
Once it opens
         // it will trigger the open event which will in turn trigger the send 
event.
-        // The created producer will be closed immediately after the delivery 
has been acknowledged.
-        AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
-        builder.buildResource(new AnonymousSendRequest(request, builder, 
envelope, envelope.isCompletionRequired()));
-
-        // Force sends to be sent synchronous so that the temporary producer 
instance can handle
-        // the failures and perform necessary completion work on the send.
-        envelope.setSendAsync(false);
-        envelope.setCompletionRequired(false);
+        AmqpProducerBuilder builder = new AmqpFallbackProducerBuilder(session, 
info);
+        builder.buildResource(new FallbackProducerOpenRequest(request, 
builder, envelope));
 
         getParent().getProvider().pumpToProtonTransport(request);
     }
 
     @Override
     public void close(AsyncResult request) {
-        request.onSuccess();
+        if (cacheProducerTimeoutTask != null) {
+            cacheProducerTimeoutTask.cancel(false);
+        }
+
+        if (producerCache.isEmpty()) {
+            request.onSuccess();
+        } else {
+            AmqpAnonymousFallbackProducerCloseRequest aggregate =
+                new AmqpAnonymousFallbackProducerCloseRequest(request, 
producerCache.size());
+
+            LOG.trace("Anonymous Fallback Producer close will wait for close 
on {} cached producers", producerCache.size());
+            final List<AmqpFallbackProducer> pending = new 
ArrayList<>(producerCache.values());
+            for (AmqpFallbackProducer producer : pending) {
+                producer.close(aggregate);
+            }
+
+            producerCache.clear();
+        }
     }
 
     @Override
@@ -104,116 +175,303 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
         return new JmsProducerId(producerIdKey, -1, producerIdCount++);
     }
 
-    //----- AsyncResult objects used to complete the sends 
-------------------//
+    //----- AsyncResult instances used to manage the life-cycle of the 
fallback producers
 
-    private abstract class AnonymousRequest extends WrappedAsyncResult {
+    /*
+     * AsyncResult instance that waits for the successful open of a fixed 
producer instance that
+     * will be used for sends and possibly cached for later reuse.  The 
handler ensures that proper
+     * cleanup occurs if the producer couldn't be opened or if the initial 
send attempt fails.
+     */
+    private final class FallbackProducerOpenRequest extends WrappedAsyncResult 
{
 
-        protected final JmsOutboundMessageDispatch envelope;
-        private final boolean completionRequired;
+        private final JmsOutboundMessageDispatch envelope;
+        private final AmqpProducerBuilder producerBuilder;
 
-        public AnonymousRequest(AsyncResult sendResult, 
JmsOutboundMessageDispatch envelope, boolean completionRequired) {
+        public FallbackProducerOpenRequest(AsyncResult sendResult, 
AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
             super(sendResult);
+
             this.envelope = envelope;
-            this.completionRequired = completionRequired;
+            this.producerBuilder = producerBuilder;
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Open phase of anonymous send complete: {} ", 
getProducerId());
+
+            AmqpFallbackProducer producer = (AmqpFallbackProducer) 
producerBuilder.getResource();
+            // The fixed producer opened so we start tracking it and once send 
returns indicating that
+            // it handled the request (not necessarily sent it but knows it 
exists) then we start the
+            // close clock and if not reused again this producer will 
eventually time itself out of
+            // existence.
+            producerCache.put(envelope.getDestination(), producer);
+
+            boolean closeNow = connection.getAnonymousProducerCacheSize() <= 0;
+
+            try {
+                producer.send(envelope, getWrappedRequest());
+            } catch (ProviderException e) {
+                closeNow = true;  // close on send fail to speed up processing 
of next send or ultimate close on error.
+                super.onFailure(e);
+            }
+
+            // No cache configured so close the only entry in the map but 
don't remove it until it
+            // has reported itself closed so that next send waits for that to 
happen preventing
+            // runaway link openings.  (Can also trigger if send failed 
immediately).
+            if (closeNow) {
+                LOG.trace("Immediate close of fallback producer {} after send 
triggered.", producer);
+                producer.close(new CloseRequest(producer));
+            }
         }
 
-        /**
-         * In all cases of the chain of events that make up the send for an 
anonymous
-         * producer a failure will trigger the original send request to fail.
-         */
         @Override
         public void onFailure(ProviderException result) {
-            LOG.debug("Send failed during {} step in chain: {}", 
this.getClass().getName(), getProducerId());
+            LOG.debug("Anonymous fallback Send failed because open of new 
fixed producer failed: {}", getProducerId());
+
+            // Producer open failed so it was never in the cache, just close 
it now to ensure
+            // that everything is cleaned up internally.  The originating send 
will be failed
+            // by failing this AsyncResult as it wraps the original request.
+            producerBuilder.getResource().close(NoOpAsyncResult.INSTANCE);
             super.onFailure(result);
         }
+    }
+
+    /*
+     * Close request handler for the individual anonymous fallback producer 
instances in the cache
+     * that will ensure the entry in the cache is cleared when the close 
completes.  If the close
+     * fails the handler ensures that the provider listener gets a shot of 
reporting it.
+     */
+    private final class CloseRequest implements AsyncResult {
+
+        private final AmqpFallbackProducer producer;
+
+        public CloseRequest(AmqpFallbackProducer producer) {
+            this.producer = producer;
+        }
 
-        public boolean isCompletionRequired() {
-            return completionRequired;
+        @Override
+        public void onFailure(ProviderException result) {
+            LOG.trace("Close of anonymous producer {} failed: {}", producer, 
result);
+            producerCache.remove(producer.getResourceInfo().getDestination());
+            producer.getParent().getProvider().fireProviderException(result);
         }
 
-        public abstract AmqpProducer getProducer();
+        @Override
+        public void onSuccess() {
+            LOG.trace("Close of anonymous producer {} complete", producer);
+            producerCache.remove(producer.getResourceInfo().getDestination());
+        }
+
+        @Override
+        public boolean isComplete() {
+            return producer.isClosed();
+        }
     }
 
-    private final class AnonymousSendRequest extends AnonymousRequest {
+    /*
+     * Close handler for the full anonymous fallback producer instance which 
waits for all
+     * the cached fallback producers to close before singling completion to 
the caller.  The
+     * signal will be success if all closed successfully and failure if any 
one of them failed.
+     */
+    private final class AmqpAnonymousFallbackProducerCloseRequest extends 
WrappedAsyncResult {
 
-        private final AmqpProducerBuilder producerBuilder;
+        private int pendingCloseRequests;
+        private ProviderException firstFailure;
 
-        public AnonymousSendRequest(AsyncResult sendResult, 
AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, 
boolean completionRequired) {
-            super(sendResult, envelope, completionRequired);
+        public AmqpAnonymousFallbackProducerCloseRequest(AsyncResult request, 
int pendingCloseRequets) {
+            super(request);
 
-            this.producerBuilder = producerBuilder;
+            this.pendingCloseRequests = pendingCloseRequets;
+        }
+
+        @Override
+        public void onFailure(ProviderException result) {
+            LOG.trace("Close of one anonymous producer done (with error), 
remaining: {}", pendingCloseRequests);
+            pendingCloseRequests--;
+            firstFailure = firstFailure == null ? result : firstFailure;
+            if (pendingCloseRequests == 0) {
+                super.onFailure(firstFailure);
+            }
         }
 
         @Override
         public void onSuccess() {
-            LOG.trace("Open phase of anonymous send complete: {} ", 
getProducerId());
-            AnonymousSendCompleteRequest send = new 
AnonymousSendCompleteRequest(this);
-            try {
-                getProducer().send(envelope, send);
-            } catch (ProviderException e) {
-                super.onFailure(e);
+            LOG.trace("Close of one anonymous producer done, remaining: {}", 
pendingCloseRequests);
+            pendingCloseRequests--;
+            if (pendingCloseRequests == 0) {
+                if (firstFailure == null) {
+                    super.onSuccess();
+                } else {
+                    super.onFailure(firstFailure);
+                }
             }
         }
 
         @Override
-        public AmqpProducer getProducer() {
-            return producerBuilder.getResource();
+        public boolean isComplete() {
+            return pendingCloseRequests == 0;
         }
     }
 
-    private final class AnonymousSendCompleteRequest extends AnonymousRequest {
-
-        private final AmqpProducer producer;
+    /*
+     * Request that will initiate a new send operation as soon as the pending 
close of an already
+     * cached fallback producer completes.  The send happens regardless of the 
outcome of the fallback
+     * close as it is unknown why that failed.  The send could then fail but 
that would allow for normal
+     * send failure processing to kick in.
+     */
+    private final class SendPendingCloseRequest implements AsyncResult {
 
-        public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
-            super(open.getWrappedRequest(), open.envelope, 
open.isCompletionRequired());
+        private final JmsOutboundMessageDispatch envelope;
+        private final AsyncResult sendRequest;
+        private final AmqpFallbackProducer pendingCloseProducer;
 
-            this.producer = open.getProducer();
+        public SendPendingCloseRequest(AmqpFallbackProducer producer, 
JmsOutboundMessageDispatch envelope, AsyncResult sendRequest) {
+            this.envelope = envelope;
+            this.sendRequest = sendRequest;
+            this.pendingCloseProducer = producer;
         }
 
         @Override
         public void onFailure(ProviderException result) {
-            LOG.trace("Send phase of anonymous send failed: {} ", 
getProducerId());
-            AnonymousCloseRequest close = new AnonymousCloseRequest(this);
-            producer.close(close);
-            super.onFailure(result);
+            LOG.trace("Close of anonymous producer {} failed: {}", 
pendingCloseProducer, result);
+            
producerCache.remove(pendingCloseProducer.getResourceInfo().getDestination());
+            
pendingCloseProducer.getParent().getProvider().fireProviderException(result);
+
+            try {
+                send(envelope, sendRequest);
+            } catch (ProviderException ex) {
+                sendRequest.onFailure(ex);
+            }
         }
 
         @Override
         public void onSuccess() {
-            LOG.trace("Send phase of anonymous send complete: {} ", 
getProducerId());
-            AnonymousCloseRequest close = new AnonymousCloseRequest(this);
-            producer.close(close);
+            LOG.trace("Close of anonymous producer {} complete", 
pendingCloseProducer);
+            
producerCache.remove(pendingCloseProducer.getResourceInfo().getDestination());
+
+            try {
+                send(envelope, sendRequest);
+            } catch (ProviderException ex) {
+                sendRequest.onFailure(ex);
+            }
+        }
+
+        @Override
+        public boolean isComplete() {
+            return pendingCloseProducer.getRemoteState() == 
EndpointState.CLOSED;
         }
+    }
+
+    /*
+     * Timeout task used to close any anonymous fallback producer instances 
that have been
+     * inactive for longer than the configured timeout.
+     */
+    private final class CachedProducerSweeper implements Runnable {
 
         @Override
-        public AmqpProducer getProducer() {
-            return producer;
+        public void run() {
+            final List<AmqpFallbackProducer> pending = new 
ArrayList<>(producerCache.values());
+            for (AmqpFallbackProducer producer : pending) {
+                if (producer.isExpired()) {
+                    LOG.trace("Cached Producer {} has timed out, initiating 
close", producer);
+                    producer.close(new CloseRequest(producer));
+                }
+            }
         }
     }
 
-    private final class AnonymousCloseRequest extends AnonymousRequest {
+    /*
+     * Extended fixed destination producer that provides insight into events 
like remote close which
+     * should evict producers from the cache.
+     */
+    private final class AmqpFallbackProducer extends AmqpFixedProducer {
 
-        private final AmqpProducer producer;
+        private int lastExpiryCheckValue;
+        private int activityCounter;
 
-        public AnonymousCloseRequest(AnonymousSendCompleteRequest 
sendComplete) {
-            super(sendComplete.getWrappedRequest(), sendComplete.envelope, 
sendComplete.isCompletionRequired());
+        public AmqpFallbackProducer(AmqpSession session, JmsProducerInfo info, 
Sender sender, int maxInactiveTime) {
+            super(session, info, sender);
+        }
 
-            this.producer = sendComplete.getProducer();
+        @Override
+        public void send(JmsOutboundMessageDispatch envelope, AsyncResult 
request) throws ProviderException {
+            activityCounter++;
+            super.send(envelope, request);
         }
 
         @Override
-        public void onSuccess() {
-            LOG.trace("Close phase of anonymous send complete: {} ", 
getProducerId());
-            super.onSuccess();
-            if (isCompletionRequired()) {
-                
getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+        public void close(AsyncResult request) {
+            if (isAwaitingClose()) {
+                // A pending close either from failed send or from expired 
producer can be superseded
+                // by a new send on close request without issue as it will 
handle any close failures
+                // otherwise something terribly wrong has happened.  Similarly 
it can be replaced with
+                // the close request aggregate that is used when the main 
producer is closed.
+                if (closeRequest instanceof CloseRequest) {
+                    this.closeRequest = request;
+                } else {
+                    LOG.error("Close called on producer that already has a 
pending send on close request: " , this);
+                    request.onFailure(new ProviderIllegalStateException(
+                            "Illegal send on close call encountered in 
anonymous fallback producer"));
+                }
+            } else {
+                super.close(request);
+            }
+
+            // Ensure that in all cases the close attempt is pushed to the 
wire immediately.
+            getParent().getProvider().pumpToProtonTransport();
+        }
+
+        public boolean isExpired() {
+            // When awaiting close the producer shouldn't be treated as 
expired as we already closed it
+            // and when it eventually closes it will be removed from the cache 
and any pending work will
+            // be triggered.
+            if (!isAwaitingClose() && activityCounter == lastExpiryCheckValue) 
{
+                return true;
+            } else {
+                lastExpiryCheckValue = activityCounter;
+                return false;
+            }
+        }
+
+        @Override
+        public void processDeliveryUpdates(AmqpProvider provider, Delivery 
delivery) throws ProviderException {
+            activityCounter++; // Sender is receiving delivery updates 
(outcomes) so still active.
+            super.processDeliveryUpdates(provider, delivery);
+        }
+
+        @Override
+        public void processFlowUpdates(AmqpProvider provider) throws 
ProviderException {
+            activityCounter++; // Sender just got a flow so allow for blocked 
sends to reactivate.
+            super.processFlowUpdates(provider);
+        }
+
+        @Override
+        public void processRemoteClose(AmqpProvider provider) throws 
ProviderException {
+            // When not already closing the remote close will clear a cache 
slot for use by
+            // another send to some other producer or to this same producer 
which will need to
+            // open a new link as the remote has forcibly closed this one.  If 
awaiting close
+            // then we initiated it an as such we will respond to it from the 
AsyncResult that
+            // was passed to the close method and clean out the cache slot 
once the state change
+            // has been processed.
+            if (!this.isAwaitingClose()) {
+                producerCache.remove(this.getResourceInfo().getDestination());
             }
+
+            super.processRemoteClose(provider);
+        }
+    }
+
+    /*
+     * Builder of the internal producer implementation type used when creating 
new fixed producers.
+     */
+    private final class AmqpFallbackProducerBuilder extends 
AmqpProducerBuilder {
+
+        public AmqpFallbackProducerBuilder(AmqpSession parent, JmsProducerInfo 
resourceInfo) {
+            super(parent, resourceInfo);
         }
 
         @Override
-        public AmqpProducer getProducer() {
-            return producer;
+        protected AmqpProducer createResource(AmqpSession parent, 
JmsProducerInfo resourceInfo, Sender endpoint) {
+            return new AmqpFallbackProducer(getParent(), getResourceInfo(), 
endpoint, connection.getAnonymousProducerCacheTimeout());
         }
     }
 }
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 5d55e6c..6bdca10 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -44,6 +44,8 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AmqpConnection.class);
 
+    // TODO - URI configuration
+
     private AmqpSubscriptionTracker subTracker = new AmqpSubscriptionTracker();
 
     private final AmqpJmsMessageFactory amqpMessageFactory;
@@ -218,6 +220,20 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
     }
 
     /**
+     * @return the configured max number of cached anonymous fallback 
producers to keep.
+     */
+    public int getAnonymousProducerCacheSize() {
+        return getProvider().getAnonymousFallbackCacheSize();
+    }
+
+    /**
+     * @return The configured time before a cache anonymous producer link is 
close due to inactivity.
+     */
+    public int getAnonymousProducerCacheTimeout() {
+        return getProvider().getAnonymousFallbackCacheTimeout();
+    }
+
+    /**
      * @return the AMQP based JmsMessageFactory for this Connection.
      */
     public AmqpJmsMessageFactory getAmqpMessageFactory() {
@@ -258,6 +274,27 @@ public class AmqpConnection extends 
AmqpAbstractResource<JmsConnectionInfo, Conn
         return getProvider().getScheduler().schedule(task, delay, 
TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Allows a connection resource to schedule a task for future execution 
which will start after the
+     * given delay and then repeat with a fixed delay between the end of one 
execution of the task and
+     * the beginning of the next execution.
+     *
+     * @param task
+     *      The Runnable task to be executed after the given delay.
+     * @param delay
+     *      The delay in milliseconds to schedule the given task for execution.
+     *
+     * @return a ScheduledFuture instance that can be used to cancel the task.
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, long 
delay) {
+        if (task == null) {
+            LOG.trace("Resource attempted to schedule a null task.");
+            return null;
+        }
+
+        return getProvider().getScheduler().scheduleWithFixedDelay(task, 
delay, delay, TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public String toString() {
         return "AmqpConnection { " + getResourceInfo().getId() + " }";
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 69fc240..f7e9c2a 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -295,8 +295,13 @@ public class AmqpFixedProducer extends AmqpProducer {
     @Override
     public void handleResourceClosure(AmqpProvider provider, ProviderException 
error) {
         if (error == null) {
-            // TODO: create/use a more specific/appropriate exception type?
-            error = new ProviderException("Producer closed remotely before 
message transfer result was notified");
+            // In case close was expected but remote did provide error context 
we propagate that to the outstanding
+            // sends that will be failed in order to provide more specific 
context to the send failure.
+            if (getEndpoint().getRemoteCondition() != null) {
+                error = AmqpSupport.convertToNonFatalException(provider, 
getEndpoint(), getEndpoint().getRemoteCondition());
+            } else {
+                error = new ProviderException("Producer closed remotely before 
message transfer result was notified");
+            }
         }
 
         Collection<InFlightSend> inflightSends = new 
ArrayList<InFlightSend>(sent.values());
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index c3c85a1..73a5789 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -119,6 +119,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
     private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
     private static final int DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH = 128 * 1024;
+    private static final int DEFAULT_ANONYMOUS_FALLBACK_CACHE_TIMEOUT = 30000;
+    private static final int DEFAULT_ANONYMOUS_FALLBACK_CACHE_SIZE = 1;
 
     private volatile ProviderListener listener;
     private volatile AmqpConnection connection;
@@ -137,6 +139,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
     private long sessionOutoingWindow = -1; // Use proton default
     private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxWriteBytesBeforeFlush = 
DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH;
+    private int anonymousFallbackCacheTimeout = 
DEFAULT_ANONYMOUS_FALLBACK_CACHE_TIMEOUT;
+    private int anonymousFallbackCacheSize = 
DEFAULT_ANONYMOUS_FALLBACK_CACHE_SIZE;
 
     private boolean allowNonSecureRedirects;
 
@@ -1272,6 +1276,43 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
     }
 
     /**
+     * @return the configured max number of cached anonymous fallback 
producers to keep.
+     */
+    public int getAnonymousFallbackCacheSize() {
+        return anonymousFallbackCacheSize;
+    }
+
+    /**
+     * Sets the number of anonymous fallback producers to keep open in a cache 
in order to improve
+     * overall performance of anonymous fallback producer sends.
+     *
+     * @param size
+     *                 The number of fallback producers to cache.
+     */
+    public void setAnonymousFallbackCacheSize(int size) {
+        this.anonymousFallbackCacheSize = size;
+    }
+
+    /**
+     * @return The configured time before a cache anonymous producer link is 
close due to inactivity.
+     */
+    public int getAnonymousFallbackCacheTimeout() {
+        return anonymousFallbackCacheTimeout;
+    }
+
+    /**
+     * Sets the timeout used to close cached anonymous producers that have not 
sent any messages in that
+     * time period.  The value is set in milliseconds with a value less that 
or equal to zero resulting in
+     * no timeout being applied.
+     *
+     * @param timeout
+     *                 Time in milliseconds that a cache anonymous producer 
can be idle before being close.
+     */
+    public void setAnonymousFallbackCacheTimeout(int timeout) {
+        this.anonymousFallbackCacheTimeout = timeout;
+    }
+
+    /**
      * Sets the max frame size (in bytes).
      *
      * Values of -1 indicates to use the proton default.
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java
new file mode 100644
index 0000000..6cd4bcb
--- /dev/null
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java
@@ -0,0 +1,1428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.CompletionListener;
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsSendTimedOutException;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
+import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import 
org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.Repeat;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for the Anonymous Fallback producer implementation.
+ *
+ * DO NOT add capability to indicate server support for ANONYMOUS-RELAY for 
any of these tests
+ */
+@RunWith(QpidJMSTestRunner.class)
+public class AnonymousFallbackProducerIntegrationTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AnonymousFallbackProducerIntegrationTest.class);
+
+    private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
+
+    @Test(timeout = 20000)
+    public void testCloseSenderWithNoActiveFallbackProducers() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectClose();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(null);
+
+            producer.close();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseProducerDuringSyncSendNoCache() throws 
Exception {
+        doTestRemotelyCloseProducerDuringSyncSend(0);
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseProducerDuringSyncSendOneCached() throws 
Exception {
+        doTestRemotelyCloseProducerDuringSyncSend(1);
+    }
+
+    private void doTestRemotelyCloseProducerDuringSyncSend(int cacheSize) 
throws Exception {
+        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // Use a long timeout to ensure no early evictions in this test.
+            Connection connection = testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + cacheSize + 
"&amqp.anonymousFallbackCacheTimeout=60000");
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Expect producer creation, give it credit.
+            testPeer.expectSenderAttach();
+
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new 
MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(text));
+
+            // Expect a message to be sent, but don't send a disposition in
+            // response, simply remotely close the producer instead.
+            testPeer.expectTransfer(messageMatcher, nullValue(), false, false, 
null, false);
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, 
true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+            testPeer.expectClose();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(null);
+
+            Message message = session.createTextMessage(text);
+
+            try {
+                producer.send(queue, message);
+                fail("Expected exception to be thrown");
+            } catch (JMSException jmse) {
+                LOG.trace("JMSException thrown from send: ", jmse);
+                // Expected but requires some context to be correct.
+                assertTrue(jmse instanceof ResourceAllocationException);
+                assertNotNull("Expected exception to have a message", 
jmse.getMessage());
+                assertTrue("Expected breadcrumb to be present in message", 
jmse.getMessage().contains(BREAD_CRUMB));
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseProducerWithSendWaitingForCreditNoCache() 
throws Exception {
+        doTestRemotelyCloseProducerWithSendWaitingForCredit(0);
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseProducerWithSendWaitingForCreditOneCached() 
throws Exception {
+        doTestRemotelyCloseProducerWithSendWaitingForCredit(1);
+    }
+
+    private void doTestRemotelyCloseProducerWithSendWaitingForCredit(int 
cacheSize) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Use a long timeout to ensure no early evictions in this test.
+            Connection connection = testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + cacheSize + 
"&amqp.anonymousFallbackCacheTimeout=60000");
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Expect producer creation, don't give it credit.
+            testPeer.expectSenderAttachWithoutGrantingCredit();
+
+            // Producer has no credit so the send should block waiting for it, 
then fail when the remote close occurs
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, 
true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
+            testPeer.expectClose();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(null);
+
+            Message message = session.createTextMessage("myMessage");
+
+            try {
+                producer.send(queue, message);
+                fail("Expected exception to be thrown due to close of 
producer");
+            } catch (ResourceAllocationException rae) {
+                // Expected if remote close beat the send to the provider
+            } catch (IllegalStateException ise) {
+                // Can happen if send fires before remote close if processed.
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseConnectionDuringSyncSendNoCache() throws 
Exception {
+        doTestRemotelyCloseConnectionDuringSyncSend(0);
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseConnectionDuringSyncSendOneCached() throws 
Exception {
+        doTestRemotelyCloseConnectionDuringSyncSend(1);
+    }
+
+    private void doTestRemotelyCloseConnectionDuringSyncSend(int cacheSize) 
throws Exception {
+        final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Use a long timeout to ensure no early evictions in this test.
+            Connection connection = testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + cacheSize + 
"&amqp.anonymousFallbackCacheTimeout=60000");
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Expect producer creation, give it credit.
+            testPeer.expectSenderAttach();
+
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new 
MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(text));
+
+            // Expect a message to be sent, but don't send a disposition in
+            // response, simply remotely close the connection instead.
+            testPeer.expectTransfer(messageMatcher, nullValue(), false, false, 
null, false);
+            testPeer.remotelyCloseConnection(true, 
AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(null);
+
+            Message message = session.createTextMessage(text);
+
+            try {
+                producer.send(queue, message);
+                fail("Expected exception to be thrown");
+            } catch (JMSException jmse) {
+                // Expected exception with specific context
+                assertTrue(jmse instanceof ResourceAllocationException);
+                assertNotNull("Expected exception to have a message", 
jmse.getMessage());
+                assertTrue("Expected breadcrumb to be present in message", 
jmse.getMessage().contains(BREAD_CRUMB));
+            }
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            connection.close();
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testSendWhenLinkCreditIsZeroAndTimeoutNoCache() throws 
Exception {
+        doTestSendWhenLinkCreditIsZeroAndTimeout(0);
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testSendWhenLinkCreditIsZeroAndTimeoutCacheOne() throws 
Exception {
+        doTestSendWhenLinkCreditIsZeroAndTimeout(1);
+    }
+
+    private void doTestSendWhenLinkCreditIsZeroAndTimeout(int cacheSize) 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Use a long timeout to ensure no early evictions in this test.
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + cacheSize + 
"&amqp.anonymousFallbackCacheTimeout=60000");
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+
+            Message message = session.createTextMessage("text");
+
+            // Expect the producer to attach. Don't send any credit so that 
the client will
+            // block on a send and we can test our timeouts.
+            testPeer.expectSenderAttachWithoutGrantingCredit();
+            if (cacheSize == 0) {
+                testPeer.expectDetach(true, true, true);
+            }
+            testPeer.expectClose();
+
+            MessageProducer producer = session.createProducer(null);
+
+            try {
+                producer.send(queue, message);
+                fail("Send should time out.");
+            } catch (JmsSendTimedOutException jmsEx) {
+                LOG.info("Caught expected error: {}", jmsEx.getMessage());
+            } catch (Throwable error) {
+                fail("Send should time out, but got: " + error.getMessage());
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testSyncSendFailureHandled() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            Message message = session.createMessage();
+            try {
+                producer.send(dest, message);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            // Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            producer.send(dest, message);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testAsyncSendFailureHandled() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch sendFailureReportedToListener = new 
CountDownLatch(1);
+            final AtomicReference<Throwable> sendFailureError = new 
AtomicReference<>();
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?jms.forceAsyncSend=true&amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+
+            connection.setExceptionListener((error) -> {
+                sendFailureError.compareAndSet(null, error);
+                sendFailureReportedToListener.countDown();
+            });
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
+
+            org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError 
= new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+            rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
+            rejectError.setDescription(BREAD_CRUMB);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected().setError(rejectError), true);
+            testPeer.expectDetach(true, true, true);
+
+            // Producer should act as synchronous regardless of asynchronous 
send setting.
+            Message message = session.createMessage();
+            try {
+                producer.send(dest, message);
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+                fail("Send should not fail as it should have fired 
asynchronously");
+            }
+
+            // Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            assertTrue("Send failure not reported to exception handler", 
sendFailureReportedToListener.await(5, TimeUnit.SECONDS));
+            assertNotNull(sendFailureError.get());
+            assertTrue(sendFailureError.get() instanceof 
ResourceAllocationException);
+            
assertTrue(sendFailureError.get().getMessage().contains(BREAD_CRUMB));
+
+            producer.send(dest, message);
+
+            // Send here is asynchronous so we need to wait for disposition to 
arrive and detach to happen
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testAsyncCompletionListenerSendFailureHandled() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
+
+            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
+
+            org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError 
= new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+            rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
+            rejectError.setDescription(BREAD_CRUMB);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected().setError(rejectError), true);
+            testPeer.expectDetach(true, true, true);
+
+            // The fallback producer acts as synchronous regardless of the 
completion listener,
+            // so exceptions are thrown from send. Only onComplete uses the 
listener.
+            try {
+                producer.send(dest, message, completionListener);
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught unexpected error from failed send.");
+                fail("Send should not fail for asychrnous completion sends");
+            }
+
+            // Repeat the send (but accept this time) and observe another 
attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            assertTrue("Send failure not reported to exception handler", 
completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNotNull(completionListener.exception);
+            assertTrue(completionListener.exception instanceof 
ResourceAllocationException);
+            
assertTrue(completionListener.exception.getMessage().contains(BREAD_CRUMB));
+
+            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            // Asynchronous send requires a wait otherwise we can close before 
the detach which we are testing for.
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testAsyncCompletionListenerSendWhenNoCacheConfigured() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            producer.send(dest, message, completionListener);
+
+            assertTrue("Did not get completion callback", 
completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener.exception);
+            Message receivedMessage = completionListener.message;
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage).getText());
+
+            // Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyEndFallbackProducerCompletesAsyncSends() throws 
Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch producerClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onProducerClosed(MessageProducer producer, 
Throwable exception) {
+                    producerClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, 
true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            // Verify the sends gets marked as having failed
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount);
+
+            // Verify the producer gets marked closed
+            assertTrue("Producer closed callback didn't trigger", 
producerClosed.await(5, TimeUnit.SECONDS));
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                
assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void 
testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                    
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+
+                @Override
+                public void onSessionClosed(Session session, Throwable cause) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(null);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo("myQueue"));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
+
+            // Perform a send and observe an attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+            testPeer.remotelyEndLastOpenedSession(true);
+
+            Message message1 = session.createTextMessage(content);
+            Message message2 = session.createTextMessage(content);
+            assertNull("Should not yet have a JMSDestination", 
message1.getJMSDestination());
+            assertNull("Should not yet have a JMSDestination", 
message2.getJMSDestination());
+            producer.send(queue, message1);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertTrue("Session should have been closed", 
sessionClosed.await(2, TimeUnit.SECONDS));
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+            try {
+                producer.send(queue, message2, listener);
+                fail("Expected exception to be thrown for this send.");
+            } catch (JMSException e) {
+                LOG.trace("Caught expected exception: {}", e.getMessage());
+            }
+
+            assertFalse("Should not get async callback", 
listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
+
+            // Message should be readable but not carry a destination as it 
wasn't actually sent anywhere
+            assertNull("Should not have a readable JMSDestination", 
message2.getJMSDestination());
+            assertEquals("Message body not as expected", content, 
((TextMessage) message2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void 
testFallbackProducerRecoversFromRefusalOfSenderOpenOnNextSend() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            Topic dest = session.createTopic(topicName);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            Message message = session.createMessage();
+
+            // Refuse the attach which should result in fallback producer 
detaching the refused
+            // link attach and the send should then fail.
+            testPeer.expectSenderAttach(targetMatcher, true, false);
+            testPeer.expectDetach(true, false, false);
+
+            try {
+                producer.send(dest, message);
+                fail("Send should have failed because sender link was 
refused.");
+            } catch (JMSException ex) {
+                LOG.trace("Caught expected exception: ", ex);
+            }
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            producer.send(dest, message);
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 60000)
+    public void 
testRepeatedSendToSameAddressWhenCacheSizeOfOneKeepsFallbackProducerInCache() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int MESSAGE_COUNT = 25;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=200");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            testPeer.expectSenderAttach(targetMatcher, false, true);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            Topic dest = session.createTopic(topicName);
+            Message message = session.createMessage();
+
+            // Setup our expectations
+            for (int i = 1; i <= MESSAGE_COUNT; ++i) {
+                testPeer.expectTransfer(messageMatcher);
+            }
+
+            testPeer.expectDetach(true, true, true);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i <= MESSAGE_COUNT; ++i) {
+                producer.send(dest, message);
+            }
+
+            LOG.debug("Finished with send cycle, producer should now timeout");
+
+            // The eviction timer should reduce the cache to zero after we go 
idle
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testSendToMultipleDestinationsOpensNewSendersWhenCaching() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int CACHE_SIZE = 5;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + 
"&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                // Expect a new message sent by the above producer to cause 
creation of a new
+                // sender link to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // This round of sends should reuse the cached links
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // Cached senders should all close when the producer is closed.
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 30000)
+    public void testCachedFallbackProducersAreTimedOut() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int CACHE_SIZE = 5;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + 
"&amqp.anonymousFallbackCacheTimeout=300");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                // Expect a new message sent by the above producer to cause 
creation of a new
+                // sender link to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+                MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+                TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+                messageMatcher.setHeadersMatcher(headersMatcher);
+                
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+                Message message = session.createMessage();
+
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // Cached senders should all close when the cache timeout is 
reached and they are expired
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            // On a slow CI machine we could fail here due to the timeouts not 
having run.
+            testPeer.waitForAllHandlersToComplete(6000);
+
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testCachedFallbackProducerEvictedBySendToUncachedAddress() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int CACHE_SIZE = 2;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + 
"&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                // Expect a new message sent by the above producer to cause 
creation of a new
+                // sender link to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // Second round using different addresses for the sends should 
evict old links and open new ones
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + 
UUID.randomUUID().toString());
+
+                // Expect a new message sent by the above producer to cause 
creation of a new
+                // sender link to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectDetach(true, true, true);
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // The current cached senders should all close when the producer 
is closed.
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void 
testCachedFallbackProducerEvictedBySendToUncachedAddressHandlesDelayedResponse()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            Topic dest1 = session.createTopic(topicName + 1);
+            Topic dest2 = session.createTopic(topicName + 2);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(dest1.getTopicName()));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            Message message = session.createMessage();
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+
+            producer.send(dest1, message);
+
+            // Expect new send to a different destination to detach the 
previous cached link
+            // and once the response arrives the send should complete normally.
+            targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(dest2.getTopicName()));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            message = session.createMessage();
+
+            testPeer.expectDetach(true, false, false);
+            // Workaround to allow a deferred detach at a later time.
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, 
true, AmqpError.RESOURCE_DELETED, "error", 20);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            producer.send(dest2, message);
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testRemotelyCloseCachedFallbackProducerFreesSlotInCache() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int CACHE_SIZE = 3;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + 
"&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i < CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                // Expect a new message sent to the above created destination 
to result in a new
+                // sender link attached to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            Topic dest = session.createTopic(topicName + CACHE_SIZE);
+
+            // Expect a new message sent to the above created destination to 
result in a new
+            // sender link attached to the given destination.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(dest.getTopicName()));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            Message message = session.createMessage();
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, 
true);
+
+            producer.send(dest, message);
+
+            // Must ensure that the next send only fires after the remote 
detach has occurred otherwise
+            // it will definitely evict another producer from the cache.
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            dest = session.createTopic(topicName + 
UUID.randomUUID().toString());
+
+            // Expect a new message sent to the above created destination to 
result in a new
+            // sender link attached to the given destination.  Existing cached 
producers should
+            // remain in the cache as a slot should now be open.
+            targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(dest.getTopicName()));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            message = session.createMessage();
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+
+            producer.send(dest, message);
+
+            // The current cached senders should all close when the producer 
is closed.
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            producer.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void 
testFailureOfOneCacheProducerCloseOnPropagatedToMainProducerClose() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            final int CACHE_SIZE = 3;
+
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer,
+                "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + 
"&amqp.anonymousFallbackCacheTimeout=0");
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            // Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // First round of sends should open and cache sender links
+            for (int i = 1; i <= CACHE_SIZE; ++i) {
+                Topic dest = session.createTopic(topicName + i);
+
+                // Expect a new message sent by the above producer to cause 
creation of a new
+                // sender link to the given destination.
+                TargetMatcher targetMatcher = new TargetMatcher();
+                targetMatcher.withAddress(equalTo(dest.getTopicName()));
+                targetMatcher.withDynamic(equalTo(false));
+                targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+                Message message = session.createMessage();
+
+                testPeer.expectSenderAttach(targetMatcher, false, false);
+                testPeer.expectTransfer(messageMatcher);
+
+                producer.send(dest, message);
+            }
+
+            // The current cached senders should all close when the producer 
is closed.
+            for (int i = 1; i < CACHE_SIZE; ++i) {
+                testPeer.expectDetach(true, true, true);
+            }
+
+            // Last one carries error but since we asked for close it should 
be ignored as we got what we wanted.
+            testPeer.expectDetach(true, true, true, AmqpError.RESOURCE_LOCKED, 
"Some error on detach");
+
+            try {
+                producer.close();
+            } catch (JMSException ex) {
+                LOG.trace("Caught unexpected error: ", ex);
+                fail("Should not have thrown an error as close was requested 
so errors are ignored.");
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private class TestJmsCompletionListener implements CompletionListener {
+
+        private final CountDownLatch completed;
+
+        @SuppressWarnings("unused")
+        public volatile int successCount;
+        public volatile int errorCount;
+
+        public volatile Message message;
+        public volatile Exception exception;
+
+        public TestJmsCompletionListener() {
+            this(1);
+        }
+
+        public TestJmsCompletionListener(int expected) {
+            this.completed = new CountDownLatch(expected);
+        }
+
+        public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
+            return completed.await(timeout, units);
+        }
+
+        @Override
+        public void onCompletion(Message message) {
+            LOG.info("JmsCompletionListener onCompletion called with message: 
{}", message);
+            this.message = message;
+            this.successCount++;
+
+            completed.countDown();
+        }
+
+        @Override
+        public void onException(Message message, Exception exception) {
+            LOG.info("JmsCompletionListener onException called with message: 
{} error {}", message, exception);
+
+            this.message = message;
+            this.exception = exception;
+            this.errorCount++;
+
+            completed.countDown();
+        }
+    }
+}
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 0e5ce7d..5f74521 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -76,7 +76,6 @@ import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.ListDescribedType;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
-import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
@@ -1220,7 +1219,6 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
                 assertTrue(errorMessage.contains(BREAD_CRUMB));
             }
 
-
             // Try closing it explicitly, should effectively no-op in client.
             // The test peer will throw during close if it sends anything.
             producer.close();
@@ -2717,282 +2715,6 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
-    public void 
testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() 
throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-
-            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
-
-            Connection connection = testFixture.establishConnecton(testPeer);
-            connection.start();
-
-            testPeer.expectBegin();
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-            String topicName = "myTopic";
-            Topic dest = session.createTopic(topicName);
-
-            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
-            // for an actual send to occur on the producer before anything 
occurs on the wire
-
-            //Create an anonymous producer
-            MessageProducer producer = session.createProducer(null);
-            assertNotNull("Producer object was null", producer);
-
-            // Expect a new message sent by the above producer to cause 
creation of a new
-            // sender link to the given destination, then closing the link 
after the message is sent.
-            TargetMatcher targetMatcher = new TargetMatcher();
-            targetMatcher.withAddress(equalTo(topicName));
-            targetMatcher.withDynamic(equalTo(false));
-            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
-
-            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
-            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(headersMatcher);
-            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
-
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected(), true);
-            testPeer.expectDetach(true, true, true);
-
-            Message message = session.createMessage();
-            try {
-                producer.send(dest, message);
-                fail("Send should fail");
-            } catch (JMSException jmsEx) {
-                LOG.debug("Caught expected error from failed send.");
-            }
-
-            //Repeat the send and observe another attach->transfer->detach.
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher);
-            testPeer.expectDetach(true, true, true);
-
-            producer.send(dest, message);
-
-            testPeer.expectClose();
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout = 20000)
-    public void 
testAnonymousProducerAsyncSendFailureHandledWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-
-            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
-
-            Connection connection = testFixture.establishConnecton(testPeer, 
"?jms.forceAsyncSend=true");
-
-            connection.start();
-
-            testPeer.expectBegin();
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-            String topicName = "myTopic";
-            Topic dest = session.createTopic(topicName);
-
-            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
-            // for an actual send to occur on the producer before anything 
occurs on the wire
-
-            //Create an anonymous producer
-            MessageProducer producer = session.createProducer(null);
-            assertNotNull("Producer object was null", producer);
-
-            // Expect a new message sent by the above producer to cause 
creation of a new
-            // sender link to the given destination, then closing the link 
after the message is sent.
-            TargetMatcher targetMatcher = new TargetMatcher();
-            targetMatcher.withAddress(equalTo(topicName));
-            targetMatcher.withDynamic(equalTo(false));
-            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
-
-            MessageHeaderSectionMatcher headersMatcher = new 
MessageHeaderSectionMatcher(true);
-            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new 
MessageAnnotationsSectionMatcher(true);
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(headersMatcher);
-            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
-
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected(), true);
-            testPeer.expectDetach(true, true, true);
-
-            // Producer should act as synchronous regardless of asynchronous 
send setting.
-            Message message = session.createMessage();
-            try {
-                producer.send(dest, message);
-                fail("Send should fail");
-            } catch (JMSException jmsEx) {
-                LOG.debug("Caught expected error from failed send.");
-            }
-
-            //Repeat the send and observe another attach->transfer->detach.
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher);
-            testPeer.expectDetach(true, true, true);
-
-            producer.send(dest, message);
-
-            testPeer.expectClose();
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout = 20000)
-    public void 
testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-
-            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
-
-            Connection connection = testFixture.establishConnecton(testPeer);
-
-            connection.start();
-
-            testPeer.expectBegin();
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-            String topicName = "myTopic";
-            Topic dest = session.createTopic(topicName);
-
-            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
-            // for an actual send to occur on the producer before anything 
occurs on the wire
-
-            //Create an anonymous producer
-            MessageProducer producer = session.createProducer(null);
-            assertNotNull("Producer object was null", producer);
-
-            // Expect a new message sent by the above producer to cause 
creation of a new
-            // sender link to the given destination, then closing the link 
after the message is sent.
-            TargetMatcher targetMatcher = new TargetMatcher();
-            targetMatcher.withAddress(equalTo(topicName));
-            targetMatcher.withDynamic(equalTo(false));
-            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
-
-            String content = "testContent";
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
-            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
-            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
-            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
-
-            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
-            Message message = session.createTextMessage(content);
-
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected(), true);
-            testPeer.expectDetach(true, true, true);
-
-            // The fallback producer acts as synchronous regardless of the 
completion listener,
-            // so exceptions are thrown from send. Only onComplete uses the 
listener.
-            try {
-                producer.send(dest, message, completionListener);
-                fail("Send should fail");
-            } catch (JMSException jmsEx) {
-                LOG.debug("Caught expected error from failed send.");
-            }
-
-            //Repeat the send (but accept this time) and observe another 
attach->transfer->detach.
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher);
-            testPeer.expectDetach(true, true, true);
-
-            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
-
-            producer.send(dest, message, completionListener2);
-
-            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
-            assertNull(completionListener2.exception);
-            Message receivedMessage2 = completionListener2.message;
-            assertNotNull(receivedMessage2);
-            assertTrue(receivedMessage2 instanceof TextMessage);
-            assertEquals(content, ((TextMessage) receivedMessage2).getText());
-
-            testPeer.expectClose();
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout = 20000)
-    public void 
testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-
-            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
-
-            Connection connection = testFixture.establishConnecton(testPeer);
-
-            connection.start();
-
-            testPeer.expectBegin();
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-            String topicName = "myTopic";
-            Topic dest = session.createTopic(topicName);
-
-            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
-            // for an actual send to occur on the producer before anything 
occurs on the wire
-
-            //Create an anonymous producer
-            MessageProducer producer = session.createProducer(null);
-            assertNotNull("Producer object was null", producer);
-
-            // Expect a new message sent by the above producer to cause 
creation of a new
-            // sender link to the given destination, then closing the link 
after the message is sent.
-            TargetMatcher targetMatcher = new TargetMatcher();
-            targetMatcher.withAddress(equalTo(topicName));
-            targetMatcher.withDynamic(equalTo(false));
-            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
-
-            String content = "testContent";
-            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
-            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
-            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
-            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
-            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
-
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher);
-            testPeer.expectDetach(true, true, true);
-
-            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
-            Message message = session.createTextMessage(content);
-
-            producer.send(dest, message, completionListener);
-
-            assertTrue("Did not get completion callback", 
completionListener.awaitCompletion(5, TimeUnit.SECONDS));
-            assertNull(completionListener.exception);
-            Message receivedMessage = completionListener.message;
-            assertNotNull(receivedMessage);
-            assertTrue(receivedMessage instanceof TextMessage);
-            assertEquals(content, ((TextMessage) receivedMessage).getText());
-
-            //Repeat the send and observe another attach->transfer->detach.
-            testPeer.expectSenderAttach(targetMatcher, false, false);
-            testPeer.expectTransfer(messageMatcher);
-            testPeer.expectDetach(true, true, true);
-
-            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
-
-            producer.send(dest, message, completionListener2);
-
-            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
-            assertNull(completionListener2.exception);
-            Message receivedMessage2 = completionListener2.message;
-            assertNotNull(receivedMessage2);
-            assertTrue(receivedMessage2 instanceof TextMessage);
-            assertEquals(content, ((TextMessage) receivedMessage2).getText());
-
-            testPeer.expectClose();
-            connection.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
-        }
-    }
-
-    @Test(timeout = 20000)
     public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws 
Exception {
         doSendingMessageSetsJMSDeliveryTimeTestImpl(true);
     }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index a5bca62..46e7ba5 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1079,21 +1079,25 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void 
testCreateAnonymousProducerTargetContainsQueueCapabilityWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
         
doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(Queue.class);
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void 
testCreateAnonymousProducerTargetContainsTopicCapabilityWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
         
doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(Topic.class);
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void 
testCreateAnonymousProducerTargetContainsTempQueueCapabilityWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
         
doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(TemporaryQueue.class);
     }
 
+    @Repeat(repetitions = 1)
     @Test(timeout = 20000)
     public void 
testCreateAnonymousProducerTargetContainsTempTopicCapabilityWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
         
doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(TemporaryQueue.class);
@@ -1505,7 +1509,10 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
 
             // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
 
-            Connection connection = testFixture.establishConnecton(testPeer);
+            // Configure for a known state such that no fallback producers are 
cached.
+            Connection connection = testFixture.establishConnecton(testPeer,
+                
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=0");
+
             connection.start();
 
             testPeer.expectBegin();
@@ -1547,6 +1554,7 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             testPeer.expectDetach(true, true, true);
 
             producer.send(dest, message);
+            producer.close();
 
             testPeer.expectClose();
             connection.close();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 5e572b3..9352fee 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -3609,6 +3609,67 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Repeat(repetitions = 1)
+    @Test(timeout = 20000)
+    public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws 
Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            // Ensure that sender detach is not answered so that next send 
must wait for close
+            originalPeer.expectDetach(true, false, false);
+            originalPeer.dropAfterLastHandler(20);  // Wait for sender to get 
into wait state
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+            finalPeer.expectDetach(true, true, true);
+            finalPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(
+                    "failover.initialReconnectDelay=25" +
+                    "&failover.nested.amqp.anonymousFallbackCacheSize=0" +
+                    "&failover.nested.amqp.anonymousFallbackCacheTimeout=0",
+                    originalPeer, finalPeer);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(null);
+
+            // Send 2 messages
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+
+            producer.send(queue, message);
+            producer.send(queue, message);
+
+            producer.close();
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     @Test(timeout = 20000)
     public void 
testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred()
 throws Exception {
         doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false);
diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md
index 87d95bd..923ebea 100644
--- a/qpid-jms-docs/Configuration.md
+++ b/qpid-jms-docs/Configuration.md
@@ -209,6 +209,8 @@ These options apply to the behaviour of certain AMQP 
functionality.
 + **amqp.maxFrameSize** The connection max-frame-size value in bytes. Default 
is 1048576.
 + **amqp.drainTimeout** The time in milliseconds that the client will wait for 
a response from the remote when a consumer drain request is made. If no 
response is seen in the allotted timeout period the link will be considered 
failed and the associated consumer will be closed. Default is 60000.
 + **amqp.allowNonSecureRedirects** Controls whether an AMQP connection will 
allow for a redirect to an alternative host over a connection that is not 
secure when the existing connection is secure, e.g. redirecting an SSL 
connection to a raw TCP connection.  This value defaults to false.
+**amqp.anonymousFallbackCacheSize** Controls the number of underlying 
per-destination fallback sending links that are cached for an anonymous 
producer to improve performance of sending when a peer doesn't offer support 
for the anonymous relay. By default only one sender link is cached which means 
that sending to multiple destinations will cause the cached sender to be closed 
and new sender to be opened each time the destination changes. Increasing the 
cache size can reduce the amount of  [...]
+**amqp.anonymousFallbackCacheTimeout** Controls how long in milliseconds an 
underlying per-destination fallback sender link can remain in an anonymous 
producers cache when inactive before it is automatically closed.  The default 
is 30000 milliseconds (30 seconds) and can be set to zero to disable the 
timeouts.
 
 ### Failover Configuration options
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to