This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new e04e3f8 QPIDJMS-476 Enhance anonymous fallback producer to allow asynchronous sends e04e3f8 is described below commit e04e3f847859cddecc4704d6a9c628cf435bf97f Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Mon Sep 30 15:07:50 2019 -0400 QPIDJMS-476 Enhance anonymous fallback producer to allow asynchronous sends Improves performance of anonymous fallback producer mode by allowing asynchronous sends and sends that reuse existing cache anonymous fallback producer instances avoiding the need for open -> send -> close on each send. --- .../amqp/AmqpAnonymousFallbackProducer.java | 388 +++++- .../qpid/jms/provider/amqp/AmqpConnection.java | 37 + .../qpid/jms/provider/amqp/AmqpFixedProducer.java | 9 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 41 + .../AnonymousFallbackProducerIntegrationTest.java | 1428 ++++++++++++++++++++ .../jms/integration/ProducerIntegrationTest.java | 278 ---- .../jms/integration/SessionIntegrationTest.java | 10 +- .../provider/failover/FailoverIntegrationTest.java | 61 + qpid-jms-docs/Configuration.md | 2 + 9 files changed, 1908 insertions(+), 346 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 63e5cbb..b3ae647 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -16,16 +16,26 @@ */ package org.apache.qpid.jms.provider.amqp; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.NoOpAsyncResult; import org.apache.qpid.jms.provider.ProviderException; import org.apache.qpid.jms.provider.WrappedAsyncResult; import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder; +import org.apache.qpid.jms.provider.exceptions.ProviderIllegalStateException; import org.apache.qpid.jms.util.IdGenerator; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sender; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +50,11 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class); private static final IdGenerator producerIdGenerator = new IdGenerator(); + private final AmqpConnection connection; + private final Map<JmsDestination, AmqpFallbackProducer> producerCache = new LinkedHashMap<>(1, 0.75f, true); private final String producerIdKey = producerIdGenerator.generateId(); private long producerIdCount; + private final ScheduledFuture<?> cacheProducerTimeoutTask; /** * Creates the Anonymous Producer object. @@ -53,36 +66,94 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { */ public AmqpAnonymousFallbackProducer(AmqpSession session, JmsProducerInfo info) { super(session, info); + + this.connection = session.getConnection(); + + final long sweeperInterval = connection.getAnonymousProducerCacheTimeout(); + if (sweeperInterval > 0 && connection.getAnonymousProducerCacheSize() > 0) { + LOG.trace("Cached Producer timeout monitoring enabled: interval = {}ms", sweeperInterval); + cacheProducerTimeoutTask = connection.scheduleWithFixedDelay(new CachedProducerSweeper(), sweeperInterval); + } else { + LOG.trace("No Cached Producer timeout monitoring enabled based on configuration."); + cacheProducerTimeoutTask = null; + } } @Override public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); + AmqpFallbackProducer producer = producerCache.get(envelope.getDestination()); + + if (producer != null && !producer.isAwaitingClose()) { + producer.send(envelope, request); + } else { + handleSendWhenCachedProducerNotAvailable(envelope, request); + } + } + + private void handleSendWhenCachedProducerNotAvailable(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { + AmqpFallbackProducer producer = producerCache.get(envelope.getDestination()); + + if (producer != null && producer.isAwaitingClose()) { + // Producer timed out, or was closed due to send failure wait for close to finish then try + // to open a new link and send this new message. This prevents the cache from carrying more + // than one producer on the same address (destination). + producer.close(new SendPendingCloseRequest(producer, envelope, request)); + } else if (producerCache.size() < connection.getAnonymousProducerCacheSize()) { + startSendWithNewProducer(envelope, request); + } else { + startSendAfterOldestProducerEvicted(envelope, request); + } + } + private void startSendAfterOldestProducerEvicted(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { + // No producer in cache yet, or connection limit is zero so we always fall into trying to close oldest if one is open. + if (producerCache.isEmpty()) { + startSendWithNewProducer(envelope, request); + } else { + // Least recently used producer which will be closed to make room for the new one. + AmqpFallbackProducer producer = producerCache.values().iterator().next(); + LOG.trace("Next send will commence after producer: {} has been closed", producer); + producer.close(new SendPendingCloseRequest(producer, envelope, request)); + } + } - // Create a new ProducerInfo for the short lived producer that's created to perform the - // send to the given AMQP target. + private void startSendWithNewProducer(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { + LOG.trace("Next send will commence after producer for destination {} been opened", envelope.getDestination()); + + // Manufactured Producer state for a fixed producer that sends to the target of this specific envelope. JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); info.setDestination(envelope.getDestination()); info.setPresettle(this.getResourceInfo().isPresettle()); // We open a Fixed Producer instance with the target destination. Once it opens // it will trigger the open event which will in turn trigger the send event. - // The created producer will be closed immediately after the delivery has been acknowledged. - AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info); - builder.buildResource(new AnonymousSendRequest(request, builder, envelope, envelope.isCompletionRequired())); - - // Force sends to be sent synchronous so that the temporary producer instance can handle - // the failures and perform necessary completion work on the send. - envelope.setSendAsync(false); - envelope.setCompletionRequired(false); + AmqpProducerBuilder builder = new AmqpFallbackProducerBuilder(session, info); + builder.buildResource(new FallbackProducerOpenRequest(request, builder, envelope)); getParent().getProvider().pumpToProtonTransport(request); } @Override public void close(AsyncResult request) { - request.onSuccess(); + if (cacheProducerTimeoutTask != null) { + cacheProducerTimeoutTask.cancel(false); + } + + if (producerCache.isEmpty()) { + request.onSuccess(); + } else { + AmqpAnonymousFallbackProducerCloseRequest aggregate = + new AmqpAnonymousFallbackProducerCloseRequest(request, producerCache.size()); + + LOG.trace("Anonymous Fallback Producer close will wait for close on {} cached producers", producerCache.size()); + final List<AmqpFallbackProducer> pending = new ArrayList<>(producerCache.values()); + for (AmqpFallbackProducer producer : pending) { + producer.close(aggregate); + } + + producerCache.clear(); + } } @Override @@ -104,116 +175,303 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { return new JmsProducerId(producerIdKey, -1, producerIdCount++); } - //----- AsyncResult objects used to complete the sends -------------------// + //----- AsyncResult instances used to manage the life-cycle of the fallback producers - private abstract class AnonymousRequest extends WrappedAsyncResult { + /* + * AsyncResult instance that waits for the successful open of a fixed producer instance that + * will be used for sends and possibly cached for later reuse. The handler ensures that proper + * cleanup occurs if the producer couldn't be opened or if the initial send attempt fails. + */ + private final class FallbackProducerOpenRequest extends WrappedAsyncResult { - protected final JmsOutboundMessageDispatch envelope; - private final boolean completionRequired; + private final JmsOutboundMessageDispatch envelope; + private final AmqpProducerBuilder producerBuilder; - public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope, boolean completionRequired) { + public FallbackProducerOpenRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) { super(sendResult); + this.envelope = envelope; - this.completionRequired = completionRequired; + this.producerBuilder = producerBuilder; + } + + @Override + public void onSuccess() { + LOG.trace("Open phase of anonymous send complete: {} ", getProducerId()); + + AmqpFallbackProducer producer = (AmqpFallbackProducer) producerBuilder.getResource(); + // The fixed producer opened so we start tracking it and once send returns indicating that + // it handled the request (not necessarily sent it but knows it exists) then we start the + // close clock and if not reused again this producer will eventually time itself out of + // existence. + producerCache.put(envelope.getDestination(), producer); + + boolean closeNow = connection.getAnonymousProducerCacheSize() <= 0; + + try { + producer.send(envelope, getWrappedRequest()); + } catch (ProviderException e) { + closeNow = true; // close on send fail to speed up processing of next send or ultimate close on error. + super.onFailure(e); + } + + // No cache configured so close the only entry in the map but don't remove it until it + // has reported itself closed so that next send waits for that to happen preventing + // runaway link openings. (Can also trigger if send failed immediately). + if (closeNow) { + LOG.trace("Immediate close of fallback producer {} after send triggered.", producer); + producer.close(new CloseRequest(producer)); + } } - /** - * In all cases of the chain of events that make up the send for an anonymous - * producer a failure will trigger the original send request to fail. - */ @Override public void onFailure(ProviderException result) { - LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId()); + LOG.debug("Anonymous fallback Send failed because open of new fixed producer failed: {}", getProducerId()); + + // Producer open failed so it was never in the cache, just close it now to ensure + // that everything is cleaned up internally. The originating send will be failed + // by failing this AsyncResult as it wraps the original request. + producerBuilder.getResource().close(NoOpAsyncResult.INSTANCE); super.onFailure(result); } + } + + /* + * Close request handler for the individual anonymous fallback producer instances in the cache + * that will ensure the entry in the cache is cleared when the close completes. If the close + * fails the handler ensures that the provider listener gets a shot of reporting it. + */ + private final class CloseRequest implements AsyncResult { + + private final AmqpFallbackProducer producer; + + public CloseRequest(AmqpFallbackProducer producer) { + this.producer = producer; + } - public boolean isCompletionRequired() { - return completionRequired; + @Override + public void onFailure(ProviderException result) { + LOG.trace("Close of anonymous producer {} failed: {}", producer, result); + producerCache.remove(producer.getResourceInfo().getDestination()); + producer.getParent().getProvider().fireProviderException(result); } - public abstract AmqpProducer getProducer(); + @Override + public void onSuccess() { + LOG.trace("Close of anonymous producer {} complete", producer); + producerCache.remove(producer.getResourceInfo().getDestination()); + } + + @Override + public boolean isComplete() { + return producer.isClosed(); + } } - private final class AnonymousSendRequest extends AnonymousRequest { + /* + * Close handler for the full anonymous fallback producer instance which waits for all + * the cached fallback producers to close before singling completion to the caller. The + * signal will be success if all closed successfully and failure if any one of them failed. + */ + private final class AmqpAnonymousFallbackProducerCloseRequest extends WrappedAsyncResult { - private final AmqpProducerBuilder producerBuilder; + private int pendingCloseRequests; + private ProviderException firstFailure; - public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, boolean completionRequired) { - super(sendResult, envelope, completionRequired); + public AmqpAnonymousFallbackProducerCloseRequest(AsyncResult request, int pendingCloseRequets) { + super(request); - this.producerBuilder = producerBuilder; + this.pendingCloseRequests = pendingCloseRequets; + } + + @Override + public void onFailure(ProviderException result) { + LOG.trace("Close of one anonymous producer done (with error), remaining: {}", pendingCloseRequests); + pendingCloseRequests--; + firstFailure = firstFailure == null ? result : firstFailure; + if (pendingCloseRequests == 0) { + super.onFailure(firstFailure); + } } @Override public void onSuccess() { - LOG.trace("Open phase of anonymous send complete: {} ", getProducerId()); - AnonymousSendCompleteRequest send = new AnonymousSendCompleteRequest(this); - try { - getProducer().send(envelope, send); - } catch (ProviderException e) { - super.onFailure(e); + LOG.trace("Close of one anonymous producer done, remaining: {}", pendingCloseRequests); + pendingCloseRequests--; + if (pendingCloseRequests == 0) { + if (firstFailure == null) { + super.onSuccess(); + } else { + super.onFailure(firstFailure); + } } } @Override - public AmqpProducer getProducer() { - return producerBuilder.getResource(); + public boolean isComplete() { + return pendingCloseRequests == 0; } } - private final class AnonymousSendCompleteRequest extends AnonymousRequest { - - private final AmqpProducer producer; + /* + * Request that will initiate a new send operation as soon as the pending close of an already + * cached fallback producer completes. The send happens regardless of the outcome of the fallback + * close as it is unknown why that failed. The send could then fail but that would allow for normal + * send failure processing to kick in. + */ + private final class SendPendingCloseRequest implements AsyncResult { - public AnonymousSendCompleteRequest(AnonymousSendRequest open) { - super(open.getWrappedRequest(), open.envelope, open.isCompletionRequired()); + private final JmsOutboundMessageDispatch envelope; + private final AsyncResult sendRequest; + private final AmqpFallbackProducer pendingCloseProducer; - this.producer = open.getProducer(); + public SendPendingCloseRequest(AmqpFallbackProducer producer, JmsOutboundMessageDispatch envelope, AsyncResult sendRequest) { + this.envelope = envelope; + this.sendRequest = sendRequest; + this.pendingCloseProducer = producer; } @Override public void onFailure(ProviderException result) { - LOG.trace("Send phase of anonymous send failed: {} ", getProducerId()); - AnonymousCloseRequest close = new AnonymousCloseRequest(this); - producer.close(close); - super.onFailure(result); + LOG.trace("Close of anonymous producer {} failed: {}", pendingCloseProducer, result); + producerCache.remove(pendingCloseProducer.getResourceInfo().getDestination()); + pendingCloseProducer.getParent().getProvider().fireProviderException(result); + + try { + send(envelope, sendRequest); + } catch (ProviderException ex) { + sendRequest.onFailure(ex); + } } @Override public void onSuccess() { - LOG.trace("Send phase of anonymous send complete: {} ", getProducerId()); - AnonymousCloseRequest close = new AnonymousCloseRequest(this); - producer.close(close); + LOG.trace("Close of anonymous producer {} complete", pendingCloseProducer); + producerCache.remove(pendingCloseProducer.getResourceInfo().getDestination()); + + try { + send(envelope, sendRequest); + } catch (ProviderException ex) { + sendRequest.onFailure(ex); + } + } + + @Override + public boolean isComplete() { + return pendingCloseProducer.getRemoteState() == EndpointState.CLOSED; } + } + + /* + * Timeout task used to close any anonymous fallback producer instances that have been + * inactive for longer than the configured timeout. + */ + private final class CachedProducerSweeper implements Runnable { @Override - public AmqpProducer getProducer() { - return producer; + public void run() { + final List<AmqpFallbackProducer> pending = new ArrayList<>(producerCache.values()); + for (AmqpFallbackProducer producer : pending) { + if (producer.isExpired()) { + LOG.trace("Cached Producer {} has timed out, initiating close", producer); + producer.close(new CloseRequest(producer)); + } + } } } - private final class AnonymousCloseRequest extends AnonymousRequest { + /* + * Extended fixed destination producer that provides insight into events like remote close which + * should evict producers from the cache. + */ + private final class AmqpFallbackProducer extends AmqpFixedProducer { - private final AmqpProducer producer; + private int lastExpiryCheckValue; + private int activityCounter; - public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) { - super(sendComplete.getWrappedRequest(), sendComplete.envelope, sendComplete.isCompletionRequired()); + public AmqpFallbackProducer(AmqpSession session, JmsProducerInfo info, Sender sender, int maxInactiveTime) { + super(session, info, sender); + } - this.producer = sendComplete.getProducer(); + @Override + public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { + activityCounter++; + super.send(envelope, request); } @Override - public void onSuccess() { - LOG.trace("Close phase of anonymous send complete: {} ", getProducerId()); - super.onSuccess(); - if (isCompletionRequired()) { - getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope); + public void close(AsyncResult request) { + if (isAwaitingClose()) { + // A pending close either from failed send or from expired producer can be superseded + // by a new send on close request without issue as it will handle any close failures + // otherwise something terribly wrong has happened. Similarly it can be replaced with + // the close request aggregate that is used when the main producer is closed. + if (closeRequest instanceof CloseRequest) { + this.closeRequest = request; + } else { + LOG.error("Close called on producer that already has a pending send on close request: " , this); + request.onFailure(new ProviderIllegalStateException( + "Illegal send on close call encountered in anonymous fallback producer")); + } + } else { + super.close(request); + } + + // Ensure that in all cases the close attempt is pushed to the wire immediately. + getParent().getProvider().pumpToProtonTransport(); + } + + public boolean isExpired() { + // When awaiting close the producer shouldn't be treated as expired as we already closed it + // and when it eventually closes it will be removed from the cache and any pending work will + // be triggered. + if (!isAwaitingClose() && activityCounter == lastExpiryCheckValue) { + return true; + } else { + lastExpiryCheckValue = activityCounter; + return false; + } + } + + @Override + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws ProviderException { + activityCounter++; // Sender is receiving delivery updates (outcomes) so still active. + super.processDeliveryUpdates(provider, delivery); + } + + @Override + public void processFlowUpdates(AmqpProvider provider) throws ProviderException { + activityCounter++; // Sender just got a flow so allow for blocked sends to reactivate. + super.processFlowUpdates(provider); + } + + @Override + public void processRemoteClose(AmqpProvider provider) throws ProviderException { + // When not already closing the remote close will clear a cache slot for use by + // another send to some other producer or to this same producer which will need to + // open a new link as the remote has forcibly closed this one. If awaiting close + // then we initiated it an as such we will respond to it from the AsyncResult that + // was passed to the close method and clean out the cache slot once the state change + // has been processed. + if (!this.isAwaitingClose()) { + producerCache.remove(this.getResourceInfo().getDestination()); } + + super.processRemoteClose(provider); + } + } + + /* + * Builder of the internal producer implementation type used when creating new fixed producers. + */ + private final class AmqpFallbackProducerBuilder extends AmqpProducerBuilder { + + public AmqpFallbackProducerBuilder(AmqpSession parent, JmsProducerInfo resourceInfo) { + super(parent, resourceInfo); } @Override - public AmqpProducer getProducer() { - return producer; + protected AmqpProducer createResource(AmqpSession parent, JmsProducerInfo resourceInfo, Sender endpoint) { + return new AmqpFallbackProducer(getParent(), getResourceInfo(), endpoint, connection.getAnonymousProducerCacheTimeout()); } } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 5d55e6c..6bdca10 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -44,6 +44,8 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class); + // TODO - URI configuration + private AmqpSubscriptionTracker subTracker = new AmqpSubscriptionTracker(); private final AmqpJmsMessageFactory amqpMessageFactory; @@ -218,6 +220,20 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn } /** + * @return the configured max number of cached anonymous fallback producers to keep. + */ + public int getAnonymousProducerCacheSize() { + return getProvider().getAnonymousFallbackCacheSize(); + } + + /** + * @return The configured time before a cache anonymous producer link is close due to inactivity. + */ + public int getAnonymousProducerCacheTimeout() { + return getProvider().getAnonymousFallbackCacheTimeout(); + } + + /** * @return the AMQP based JmsMessageFactory for this Connection. */ public AmqpJmsMessageFactory getAmqpMessageFactory() { @@ -258,6 +274,27 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS); } + /** + * Allows a connection resource to schedule a task for future execution which will start after the + * given delay and then repeat with a fixed delay between the end of one execution of the task and + * the beginning of the next execution. + * + * @param task + * The Runnable task to be executed after the given delay. + * @param delay + * The delay in milliseconds to schedule the given task for execution. + * + * @return a ScheduledFuture instance that can be used to cancel the task. + */ + public ScheduledFuture<?> scheduleWithFixedDelay(final Runnable task, long delay) { + if (task == null) { + LOG.trace("Resource attempted to schedule a null task."); + return null; + } + + return getProvider().getScheduler().scheduleWithFixedDelay(task, delay, delay, TimeUnit.MILLISECONDS); + } + @Override public String toString() { return "AmqpConnection { " + getResourceInfo().getId() + " }"; diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 69fc240..f7e9c2a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -295,8 +295,13 @@ public class AmqpFixedProducer extends AmqpProducer { @Override public void handleResourceClosure(AmqpProvider provider, ProviderException error) { if (error == null) { - // TODO: create/use a more specific/appropriate exception type? - error = new ProviderException("Producer closed remotely before message transfer result was notified"); + // In case close was expected but remote did provide error context we propagate that to the outstanding + // sends that will be failed in order to provide more specific context to the send failure. + if (getEndpoint().getRemoteCondition() != null) { + error = AmqpSupport.convertToNonFatalException(provider, getEndpoint(), getEndpoint().getRemoteCondition()); + } else { + error = new ProviderException("Producer closed remotely before message transfer result was notified"); + } } Collection<InFlightSend> inflightSends = new ArrayList<InFlightSend>(sent.values()); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index c3c85a1..73a5789 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -119,6 +119,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); private static final int DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH = 128 * 1024; + private static final int DEFAULT_ANONYMOUS_FALLBACK_CACHE_TIMEOUT = 30000; + private static final int DEFAULT_ANONYMOUS_FALLBACK_CACHE_SIZE = 1; private volatile ProviderListener listener; private volatile AmqpConnection connection; @@ -137,6 +139,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private long sessionOutoingWindow = -1; // Use proton default private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int maxWriteBytesBeforeFlush = DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH; + private int anonymousFallbackCacheTimeout = DEFAULT_ANONYMOUS_FALLBACK_CACHE_TIMEOUT; + private int anonymousFallbackCacheSize = DEFAULT_ANONYMOUS_FALLBACK_CACHE_SIZE; private boolean allowNonSecureRedirects; @@ -1272,6 +1276,43 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } /** + * @return the configured max number of cached anonymous fallback producers to keep. + */ + public int getAnonymousFallbackCacheSize() { + return anonymousFallbackCacheSize; + } + + /** + * Sets the number of anonymous fallback producers to keep open in a cache in order to improve + * overall performance of anonymous fallback producer sends. + * + * @param size + * The number of fallback producers to cache. + */ + public void setAnonymousFallbackCacheSize(int size) { + this.anonymousFallbackCacheSize = size; + } + + /** + * @return The configured time before a cache anonymous producer link is close due to inactivity. + */ + public int getAnonymousFallbackCacheTimeout() { + return anonymousFallbackCacheTimeout; + } + + /** + * Sets the timeout used to close cached anonymous producers that have not sent any messages in that + * time period. The value is set in milliseconds with a value less that or equal to zero resulting in + * no timeout being applied. + * + * @param timeout + * Time in milliseconds that a cache anonymous producer can be idle before being close. + */ + public void setAnonymousFallbackCacheTimeout(int timeout) { + this.anonymousFallbackCacheTimeout = timeout; + } + + /** * Sets the max frame size (in bytes). * * Values of -1 indicates to use the proton default. diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java new file mode 100644 index 0000000..6cd4bcb --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AnonymousFallbackProducerIntegrationTest.java @@ -0,0 +1,1428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.CompletionListener; +import javax.jms.Connection; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsSendTimedOutException; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; +import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; +import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; +import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.jms.util.QpidJMSTestRunner; +import org.apache.qpid.jms.util.Repeat; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for the Anonymous Fallback producer implementation. + * + * DO NOT add capability to indicate server support for ANONYMOUS-RELAY for any of these tests + */ +@RunWith(QpidJMSTestRunner.class) +public class AnonymousFallbackProducerIntegrationTest extends QpidJmsTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(AnonymousFallbackProducerIntegrationTest.class); + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + @Test(timeout = 20000) + public void testCloseSenderWithNoActiveFallbackProducers() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectClose(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + producer.close(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseProducerDuringSyncSendNoCache() throws Exception { + doTestRemotelyCloseProducerDuringSyncSend(0); + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseProducerDuringSyncSendOneCached() throws Exception { + doTestRemotelyCloseProducerDuringSyncSend(1); + } + + private void doTestRemotelyCloseProducerDuringSyncSend(int cacheSize) throws Exception { + final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // Use a long timeout to ensure no early evictions in this test. + Connection connection = testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000"); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Expect producer creation, give it credit. + testPeer.expectSenderAttach(); + + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + + // Expect a message to be sent, but don't send a disposition in + // response, simply remotely close the producer instead. + testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false); + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB); + testPeer.expectClose(); + + Queue queue = session.createQueue("myQueue"); + final MessageProducer producer = session.createProducer(null); + + Message message = session.createTextMessage(text); + + try { + producer.send(queue, message); + fail("Expected exception to be thrown"); + } catch (JMSException jmse) { + LOG.trace("JMSException thrown from send: ", jmse); + // Expected but requires some context to be correct. + assertTrue(jmse instanceof ResourceAllocationException); + assertNotNull("Expected exception to have a message", jmse.getMessage()); + assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseProducerWithSendWaitingForCreditNoCache() throws Exception { + doTestRemotelyCloseProducerWithSendWaitingForCredit(0); + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseProducerWithSendWaitingForCreditOneCached() throws Exception { + doTestRemotelyCloseProducerWithSendWaitingForCredit(1); + } + + private void doTestRemotelyCloseProducerWithSendWaitingForCredit(int cacheSize) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Use a long timeout to ensure no early evictions in this test. + Connection connection = testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000"); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Expect producer creation, don't give it credit. + testPeer.expectSenderAttachWithoutGrantingCredit(); + + // Producer has no credit so the send should block waiting for it, then fail when the remote close occurs + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50); + testPeer.expectClose(); + + Queue queue = session.createQueue("myQueue"); + final MessageProducer producer = session.createProducer(null); + + Message message = session.createTextMessage("myMessage"); + + try { + producer.send(queue, message); + fail("Expected exception to be thrown due to close of producer"); + } catch (ResourceAllocationException rae) { + // Expected if remote close beat the send to the provider + } catch (IllegalStateException ise) { + // Can happen if send fires before remote close if processed. + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(3000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseConnectionDuringSyncSendNoCache() throws Exception { + doTestRemotelyCloseConnectionDuringSyncSend(0); + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseConnectionDuringSyncSendOneCached() throws Exception { + doTestRemotelyCloseConnectionDuringSyncSend(1); + } + + private void doTestRemotelyCloseConnectionDuringSyncSend(int cacheSize) throws Exception { + final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Use a long timeout to ensure no early evictions in this test. + Connection connection = testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000"); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Expect producer creation, give it credit. + testPeer.expectSenderAttach(); + + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + + // Expect a message to be sent, but don't send a disposition in + // response, simply remotely close the connection instead. + testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false); + testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB); + + Queue queue = session.createQueue("myQueue"); + final MessageProducer producer = session.createProducer(null); + + Message message = session.createTextMessage(text); + + try { + producer.send(queue, message); + fail("Expected exception to be thrown"); + } catch (JMSException jmse) { + // Expected exception with specific context + assertTrue(jmse instanceof ResourceAllocationException); + assertNotNull("Expected exception to have a message", jmse.getMessage()); + assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB)); + } + + testPeer.waitForAllHandlersToComplete(3000); + + connection.close(); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testSendWhenLinkCreditIsZeroAndTimeoutNoCache() throws Exception { + doTestSendWhenLinkCreditIsZeroAndTimeout(0); + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testSendWhenLinkCreditIsZeroAndTimeoutCacheOne() throws Exception { + doTestSendWhenLinkCreditIsZeroAndTimeout(1); + } + + private void doTestSendWhenLinkCreditIsZeroAndTimeout(int cacheSize) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // Use a long timeout to ensure no early evictions in this test. + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000"); + connection.setSendTimeout(500); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + + Message message = session.createTextMessage("text"); + + // Expect the producer to attach. Don't send any credit so that the client will + // block on a send and we can test our timeouts. + testPeer.expectSenderAttachWithoutGrantingCredit(); + if (cacheSize == 0) { + testPeer.expectDetach(true, true, true); + } + testPeer.expectClose(); + + MessageProducer producer = session.createProducer(null); + + try { + producer.send(queue, message); + fail("Send should time out."); + } catch (JmsSendTimedOutException jmsEx) { + LOG.info("Caught expected error: {}", jmsEx.getMessage()); + } catch (Throwable error) { + fail("Send should time out, but got: " + error.getMessage()); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testSyncSendFailureHandled() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true); + testPeer.expectDetach(true, true, true); + + Message message = session.createMessage(); + try { + producer.send(dest, message); + fail("Send should fail"); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + } + + // Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + producer.send(dest, message); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testAsyncSendFailureHandled() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final CountDownLatch sendFailureReportedToListener = new CountDownLatch(1); + final AtomicReference<Throwable> sendFailureError = new AtomicReference<>(); + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?jms.forceAsyncSend=true&amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + + connection.setExceptionListener((error) -> { + sendFailureError.compareAndSet(null, error); + sendFailureReportedToListener.countDown(); + }); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + final String BREAD_CRUMB = "SEND FAILURE EXPECTED"; + + org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); + rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); + rejectError.setDescription(BREAD_CRUMB); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true); + testPeer.expectDetach(true, true, true); + + // Producer should act as synchronous regardless of asynchronous send setting. + Message message = session.createMessage(); + try { + producer.send(dest, message); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + fail("Send should not fail as it should have fired asynchronously"); + } + + // Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + assertTrue("Send failure not reported to exception handler", sendFailureReportedToListener.await(5, TimeUnit.SECONDS)); + assertNotNull(sendFailureError.get()); + assertTrue(sendFailureError.get() instanceof ResourceAllocationException); + assertTrue(sendFailureError.get().getMessage().contains(BREAD_CRUMB)); + + producer.send(dest, message); + + // Send here is asynchronous so we need to wait for disposition to arrive and detach to happen + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testAsyncCompletionListenerSendFailureHandled() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + String content = "testContent"; + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); + + TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); + Message message = session.createTextMessage(content); + + final String BREAD_CRUMB = "SEND FAILURE EXPECTED"; + + org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); + rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED); + rejectError.setDescription(BREAD_CRUMB); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true); + testPeer.expectDetach(true, true, true); + + // The fallback producer acts as synchronous regardless of the completion listener, + // so exceptions are thrown from send. Only onComplete uses the listener. + try { + producer.send(dest, message, completionListener); + } catch (JMSException jmsEx) { + LOG.debug("Caught unexpected error from failed send."); + fail("Send should not fail for asychrnous completion sends"); + } + + // Repeat the send (but accept this time) and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + assertTrue("Send failure not reported to exception handler", completionListener.awaitCompletion(5, TimeUnit.SECONDS)); + assertNotNull(completionListener.exception); + assertTrue(completionListener.exception instanceof ResourceAllocationException); + assertTrue(completionListener.exception.getMessage().contains(BREAD_CRUMB)); + + TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); + + producer.send(dest, message, completionListener2); + + assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener2.exception); + Message receivedMessage2 = completionListener2.message; + assertNotNull(receivedMessage2); + assertTrue(receivedMessage2 instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage2).getText()); + + // Asynchronous send requires a wait otherwise we can close before the detach which we are testing for. + testPeer.waitForAllHandlersToComplete(1000); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testAsyncCompletionListenerSendWhenNoCacheConfigured() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + String content = "testContent"; + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); + Message message = session.createTextMessage(content); + + producer.send(dest, message, completionListener); + + assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener.exception); + Message receivedMessage = completionListener.message; + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage).getText()); + + // Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); + + producer.send(dest, message, completionListener2); + + assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener2.exception); + Message receivedMessage2 = completionListener2.message; + assertNotNull(receivedMessage2); + assertTrue(receivedMessage2 instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage2).getText()); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyEndFallbackProducerCompletesAsyncSends() throws Exception { + final String BREAD_CRUMB = "ErrorMessage"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final CountDownLatch producerClosed = new CountDownLatch(1); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onProducerClosed(MessageProducer producer, Throwable exception) { + producerClosed.countDown(); + } + }); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a producer, then remotely end the session afterwards. + testPeer.expectSenderAttach(); + + Queue queue = session.createQueue("myQueue"); + final MessageProducer producer = session.createProducer(queue); + + final int MSG_COUNT = 3; + + for (int i = 0; i < MSG_COUNT; ++i) { + testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); + } + + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT); + try { + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = session.createTextMessage("content"); + producer.send(message, listener); + } + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + testPeer.waitForAllHandlersToComplete(2000); + + // Verify the sends gets marked as having failed + assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS)); + assertEquals(MSG_COUNT, listener.errorCount); + + // Verify the producer gets marked closed + assertTrue("Producer closed callback didn't trigger", producerClosed.await(5, TimeUnit.SECONDS)); + try { + producer.getDeliveryMode(); + fail("Expected ISE to be thrown due to being closed"); + } catch (IllegalStateException jmsise) { + String errorMessage = jmsise.getCause().getMessage(); + assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString())); + assertTrue(errorMessage.contains(BREAD_CRUMB)); + } + + // Try closing it explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything. + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000"); + + final CountDownLatch sessionClosed = new CountDownLatch(1); + + connection.addConnectionListener(new JmsDefaultConnectionListener() { + + @Override + public void onSessionClosed(Session session, Throwable cause) { + sessionClosed.countDown(); + } + }); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageProducer producer = session.createProducer(null); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo("myQueue")); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + String content = "testContent"; + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); + + // Perform a send and observe an attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + testPeer.remotelyEndLastOpenedSession(true); + + Message message1 = session.createTextMessage(content); + Message message2 = session.createTextMessage(content); + assertNull("Should not yet have a JMSDestination", message1.getJMSDestination()); + assertNull("Should not yet have a JMSDestination", message2.getJMSDestination()); + producer.send(queue, message1); + + testPeer.waitForAllHandlersToComplete(2000); + + assertTrue("Session should have been closed", sessionClosed.await(2, TimeUnit.SECONDS)); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(queue, message2, listener); + fail("Expected exception to be thrown for this send."); + } catch (JMSException e) { + LOG.trace("Caught expected exception: {}", e.getMessage()); + } + + assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS)); + + // Message should be readable but not carry a destination as it wasn't actually sent anywhere + assertNull("Should not have a readable JMSDestination", message2.getJMSDestination()); + assertEquals("Message body not as expected", content, ((TextMessage) message2).getText()); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testFallbackProducerRecoversFromRefusalOfSenderOpenOnNextSend() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Topic dest = session.createTopic(topicName); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Message message = session.createMessage(); + + // Refuse the attach which should result in fallback producer detaching the refused + // link attach and the send should then fail. + testPeer.expectSenderAttach(targetMatcher, true, false); + testPeer.expectDetach(true, false, false); + + try { + producer.send(dest, message); + fail("Send should have failed because sender link was refused."); + } catch (JMSException ex) { + LOG.trace("Caught expected exception: ", ex); + } + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + producer.send(dest, message); + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 60000) + public void testRepeatedSendToSameAddressWhenCacheSizeOfOneKeepsFallbackProducerInCache() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int MESSAGE_COUNT = 25; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=200"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + testPeer.expectSenderAttach(targetMatcher, false, true); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Topic dest = session.createTopic(topicName); + Message message = session.createMessage(); + + // Setup our expectations + for (int i = 1; i <= MESSAGE_COUNT; ++i) { + testPeer.expectTransfer(messageMatcher); + } + + testPeer.expectDetach(true, true, true); + + // First round of sends should open and cache sender links + for (int i = 1; i <= MESSAGE_COUNT; ++i) { + producer.send(dest, message); + } + + LOG.debug("Finished with send cycle, producer should now timeout"); + + // The eviction timer should reduce the cache to zero after we go idle + testPeer.waitForAllHandlersToComplete(3000); + + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testSendToMultipleDestinationsOpensNewSendersWhenCaching() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int CACHE_SIZE = 5; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + // First round of sends should open and cache sender links + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // This round of sends should reuse the cached links + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // Cached senders should all close when the producer is closed. + for (int i = 1; i <= CACHE_SIZE; ++i) { + testPeer.expectDetach(true, true, true); + } + + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 30000) + public void testCachedFallbackProducersAreTimedOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int CACHE_SIZE = 5; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=300"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // First round of sends should open and cache sender links + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // Cached senders should all close when the cache timeout is reached and they are expired + for (int i = 1; i <= CACHE_SIZE; ++i) { + testPeer.expectDetach(true, true, true); + } + + // On a slow CI machine we could fail here due to the timeouts not having run. + testPeer.waitForAllHandlersToComplete(6000); + + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testCachedFallbackProducerEvictedBySendToUncachedAddress() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int CACHE_SIZE = 2; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + // First round of sends should open and cache sender links + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // Second round using different addresses for the sends should evict old links and open new ones + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + UUID.randomUUID().toString()); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectDetach(true, true, true); + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // The current cached senders should all close when the producer is closed. + for (int i = 1; i <= CACHE_SIZE; ++i) { + testPeer.expectDetach(true, true, true); + } + + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testCachedFallbackProducerEvictedBySendToUncachedAddressHandlesDelayedResponse() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Topic dest1 = session.createTopic(topicName + 1); + Topic dest2 = session.createTopic(topicName + 2); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest1.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest1, message); + + // Expect new send to a different destination to detach the previous cached link + // and once the response arrives the send should complete normally. + targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest2.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + message = session.createMessage(); + + testPeer.expectDetach(true, false, false); + // Workaround to allow a deferred detach at a later time. + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true, AmqpError.RESOURCE_DELETED, "error", 20); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + producer.send(dest2, message); + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyCloseCachedFallbackProducerFreesSlotInCache() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int CACHE_SIZE = 3; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + // First round of sends should open and cache sender links + for (int i = 1; i < CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + // Expect a new message sent to the above created destination to result in a new + // sender link attached to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + Topic dest = session.createTopic(topicName + CACHE_SIZE); + + // Expect a new message sent to the above created destination to result in a new + // sender link attached to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true); + + producer.send(dest, message); + + // Must ensure that the next send only fires after the remote detach has occurred otherwise + // it will definitely evict another producer from the cache. + testPeer.waitForAllHandlersToComplete(1000); + + dest = session.createTopic(topicName + UUID.randomUUID().toString()); + + // Expect a new message sent to the above created destination to result in a new + // sender link attached to the given destination. Existing cached producers should + // remain in the cache as a slot should now be open. + targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + + // The current cached senders should all close when the producer is closed. + for (int i = 1; i <= CACHE_SIZE; ++i) { + testPeer.expectDetach(true, true, true); + } + + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testFailureOfOneCacheProducerCloseOnPropagatedToMainProducerClose() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + final int CACHE_SIZE = 3; + + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0"); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + // Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + // First round of sends should open and cache sender links + for (int i = 1; i <= CACHE_SIZE; ++i) { + Topic dest = session.createTopic(topicName + i); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(dest.getTopicName())); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + Message message = session.createMessage(); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + + producer.send(dest, message); + } + + // The current cached senders should all close when the producer is closed. + for (int i = 1; i < CACHE_SIZE; ++i) { + testPeer.expectDetach(true, true, true); + } + + // Last one carries error but since we asked for close it should be ignored as we got what we wanted. + testPeer.expectDetach(true, true, true, AmqpError.RESOURCE_LOCKED, "Some error on detach"); + + try { + producer.close(); + } catch (JMSException ex) { + LOG.trace("Caught unexpected error: ", ex); + fail("Should not have thrown an error as close was requested so errors are ignored."); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + private class TestJmsCompletionListener implements CompletionListener { + + private final CountDownLatch completed; + + @SuppressWarnings("unused") + public volatile int successCount; + public volatile int errorCount; + + public volatile Message message; + public volatile Exception exception; + + public TestJmsCompletionListener() { + this(1); + } + + public TestJmsCompletionListener(int expected) { + this.completed = new CountDownLatch(expected); + } + + public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { + return completed.await(timeout, units); + } + + @Override + public void onCompletion(Message message) { + LOG.info("JmsCompletionListener onCompletion called with message: {}", message); + this.message = message; + this.successCount++; + + completed.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); + + this.message = message; + this.exception = exception; + this.errorCount++; + + completed.countDown(); + } + } +} diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 0e5ce7d..5f74521 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -76,7 +76,6 @@ import org.apache.qpid.jms.test.Wait; import org.apache.qpid.jms.test.testpeer.ListDescribedType; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; -import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; @@ -1220,7 +1219,6 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { assertTrue(errorMessage.contains(BREAD_CRUMB)); } - // Try closing it explicitly, should effectively no-op in client. // The test peer will throw during close if it sends anything. producer.close(); @@ -2717,282 +2715,6 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) - public void testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - - // DO NOT add capability to indicate server support for ANONYMOUS-RELAY - - Connection connection = testFixture.establishConnecton(testPeer); - connection.start(); - - testPeer.expectBegin(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String topicName = "myTopic"; - Topic dest = session.createTopic(topicName); - - // Expect no AMQP traffic when we create the anonymous producer, as it will wait - // for an actual send to occur on the producer before anything occurs on the wire - - //Create an anonymous producer - MessageProducer producer = session.createProducer(null); - assertNotNull("Producer object was null", producer); - - // Expect a new message sent by the above producer to cause creation of a new - // sender link to the given destination, then closing the link after the message is sent. - TargetMatcher targetMatcher = new TargetMatcher(); - targetMatcher.withAddress(equalTo(topicName)); - targetMatcher.withDynamic(equalTo(false)); - targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); - - MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); - MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(headersMatcher); - messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); - - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true); - testPeer.expectDetach(true, true, true); - - Message message = session.createMessage(); - try { - producer.send(dest, message); - fail("Send should fail"); - } catch (JMSException jmsEx) { - LOG.debug("Caught expected error from failed send."); - } - - //Repeat the send and observe another attach->transfer->detach. - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher); - testPeer.expectDetach(true, true, true); - - producer.send(dest, message); - - testPeer.expectClose(); - connection.close(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout = 20000) - public void testAnonymousProducerAsyncSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - - // DO NOT add capability to indicate server support for ANONYMOUS-RELAY - - Connection connection = testFixture.establishConnecton(testPeer, "?jms.forceAsyncSend=true"); - - connection.start(); - - testPeer.expectBegin(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String topicName = "myTopic"; - Topic dest = session.createTopic(topicName); - - // Expect no AMQP traffic when we create the anonymous producer, as it will wait - // for an actual send to occur on the producer before anything occurs on the wire - - //Create an anonymous producer - MessageProducer producer = session.createProducer(null); - assertNotNull("Producer object was null", producer); - - // Expect a new message sent by the above producer to cause creation of a new - // sender link to the given destination, then closing the link after the message is sent. - TargetMatcher targetMatcher = new TargetMatcher(); - targetMatcher.withAddress(equalTo(topicName)); - targetMatcher.withDynamic(equalTo(false)); - targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); - - MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); - MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(headersMatcher); - messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); - - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true); - testPeer.expectDetach(true, true, true); - - // Producer should act as synchronous regardless of asynchronous send setting. - Message message = session.createMessage(); - try { - producer.send(dest, message); - fail("Send should fail"); - } catch (JMSException jmsEx) { - LOG.debug("Caught expected error from failed send."); - } - - //Repeat the send and observe another attach->transfer->detach. - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher); - testPeer.expectDetach(true, true, true); - - producer.send(dest, message); - - testPeer.expectClose(); - connection.close(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout = 20000) - public void testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - - // DO NOT add capability to indicate server support for ANONYMOUS-RELAY - - Connection connection = testFixture.establishConnecton(testPeer); - - connection.start(); - - testPeer.expectBegin(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String topicName = "myTopic"; - Topic dest = session.createTopic(topicName); - - // Expect no AMQP traffic when we create the anonymous producer, as it will wait - // for an actual send to occur on the producer before anything occurs on the wire - - //Create an anonymous producer - MessageProducer producer = session.createProducer(null); - assertNotNull("Producer object was null", producer); - - // Expect a new message sent by the above producer to cause creation of a new - // sender link to the given destination, then closing the link after the message is sent. - TargetMatcher targetMatcher = new TargetMatcher(); - targetMatcher.withAddress(equalTo(topicName)); - targetMatcher.withDynamic(equalTo(false)); - targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); - - String content = "testContent"; - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); - messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); - messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); - messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); - - TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); - Message message = session.createTextMessage(content); - - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true); - testPeer.expectDetach(true, true, true); - - // The fallback producer acts as synchronous regardless of the completion listener, - // so exceptions are thrown from send. Only onComplete uses the listener. - try { - producer.send(dest, message, completionListener); - fail("Send should fail"); - } catch (JMSException jmsEx) { - LOG.debug("Caught expected error from failed send."); - } - - //Repeat the send (but accept this time) and observe another attach->transfer->detach. - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher); - testPeer.expectDetach(true, true, true); - - TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); - - producer.send(dest, message, completionListener2); - - assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); - assertNull(completionListener2.exception); - Message receivedMessage2 = completionListener2.message; - assertNotNull(receivedMessage2); - assertTrue(receivedMessage2 instanceof TextMessage); - assertEquals(content, ((TextMessage) receivedMessage2).getText()); - - testPeer.expectClose(); - connection.close(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout = 20000) - public void testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - - // DO NOT add capability to indicate server support for ANONYMOUS-RELAY - - Connection connection = testFixture.establishConnecton(testPeer); - - connection.start(); - - testPeer.expectBegin(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String topicName = "myTopic"; - Topic dest = session.createTopic(topicName); - - // Expect no AMQP traffic when we create the anonymous producer, as it will wait - // for an actual send to occur on the producer before anything occurs on the wire - - //Create an anonymous producer - MessageProducer producer = session.createProducer(null); - assertNotNull("Producer object was null", producer); - - // Expect a new message sent by the above producer to cause creation of a new - // sender link to the given destination, then closing the link after the message is sent. - TargetMatcher targetMatcher = new TargetMatcher(); - targetMatcher.withAddress(equalTo(topicName)); - targetMatcher.withDynamic(equalTo(false)); - targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); - - String content = "testContent"; - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); - messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); - messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); - messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); - - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher); - testPeer.expectDetach(true, true, true); - - TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); - Message message = session.createTextMessage(content); - - producer.send(dest, message, completionListener); - - assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS)); - assertNull(completionListener.exception); - Message receivedMessage = completionListener.message; - assertNotNull(receivedMessage); - assertTrue(receivedMessage instanceof TextMessage); - assertEquals(content, ((TextMessage) receivedMessage).getText()); - - //Repeat the send and observe another attach->transfer->detach. - testPeer.expectSenderAttach(targetMatcher, false, false); - testPeer.expectTransfer(messageMatcher); - testPeer.expectDetach(true, true, true); - - TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); - - producer.send(dest, message, completionListener2); - - assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); - assertNull(completionListener2.exception); - Message receivedMessage2 = completionListener2.message; - assertNotNull(receivedMessage2); - assertTrue(receivedMessage2 instanceof TextMessage); - assertEquals(content, ((TextMessage) receivedMessage2).getText()); - - testPeer.expectClose(); - connection.close(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } - - @Test(timeout = 20000) public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws Exception { doSendingMessageSetsJMSDeliveryTimeTestImpl(true); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index a5bca62..46e7ba5 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -1079,21 +1079,25 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } } + @Repeat(repetitions = 1) @Test(timeout = 20000) public void testCreateAnonymousProducerTargetContainsQueueCapabilityWhenAnonymousRelayNodeIsNotSupported() throws Exception { doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(Queue.class); } + @Repeat(repetitions = 1) @Test(timeout = 20000) public void testCreateAnonymousProducerTargetContainsTopicCapabilityWhenAnonymousRelayNodeIsNotSupported() throws Exception { doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(Topic.class); } + @Repeat(repetitions = 1) @Test(timeout = 20000) public void testCreateAnonymousProducerTargetContainsTempQueueCapabilityWhenAnonymousRelayNodeIsNotSupported() throws Exception { doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(TemporaryQueue.class); } + @Repeat(repetitions = 1) @Test(timeout = 20000) public void testCreateAnonymousProducerTargetContainsTempTopicCapabilityWhenAnonymousRelayNodeIsNotSupported() throws Exception { doCreateAnonymousProducerTargetContainsCapabilityWhenAnonymousRelayNodeIsNotSupportedTestImpl(TemporaryQueue.class); @@ -1505,7 +1509,10 @@ public class SessionIntegrationTest extends QpidJmsTestCase { // DO NOT add capability to indicate server support for ANONYMOUS-RELAY - Connection connection = testFixture.establishConnecton(testPeer); + // Configure for a known state such that no fallback producers are cached. + Connection connection = testFixture.establishConnecton(testPeer, + "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=0"); + connection.start(); testPeer.expectBegin(); @@ -1547,6 +1554,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { testPeer.expectDetach(true, true, true); producer.send(dest, message); + producer.close(); testPeer.expectClose(); connection.close(); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 5e572b3..9352fee 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -3609,6 +3609,67 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } } + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Final peer is at: {}", finalURI); + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectSenderAttach(); + originalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + // Ensure that sender detach is not answered so that next send must wait for close + originalPeer.expectDetach(true, false, false); + originalPeer.dropAfterLastHandler(20); // Wait for sender to get into wait state + + // --- Post Failover Expectations of sender --- // + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectSenderAttach(); + finalPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + finalPeer.expectDetach(true, true, true); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton( + "failover.initialReconnectDelay=25" + + "&failover.nested.amqp.anonymousFallbackCacheSize=0" + + "&failover.nested.amqp.anonymousFallbackCacheTimeout=0", + originalPeer, finalPeer); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(null); + + // Send 2 messages + String text = "myMessage"; + + TextMessage message = session.createTextMessage(text); + + producer.send(queue, message); + producer.send(queue, message); + + producer.close(); + connection.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + @Test(timeout = 20000) public void testPassthroughCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false); diff --git a/qpid-jms-docs/Configuration.md b/qpid-jms-docs/Configuration.md index 87d95bd..923ebea 100644 --- a/qpid-jms-docs/Configuration.md +++ b/qpid-jms-docs/Configuration.md @@ -209,6 +209,8 @@ These options apply to the behaviour of certain AMQP functionality. + **amqp.maxFrameSize** The connection max-frame-size value in bytes. Default is 1048576. + **amqp.drainTimeout** The time in milliseconds that the client will wait for a response from the remote when a consumer drain request is made. If no response is seen in the allotted timeout period the link will be considered failed and the associated consumer will be closed. Default is 60000. + **amqp.allowNonSecureRedirects** Controls whether an AMQP connection will allow for a redirect to an alternative host over a connection that is not secure when the existing connection is secure, e.g. redirecting an SSL connection to a raw TCP connection. This value defaults to false. +**amqp.anonymousFallbackCacheSize** Controls the number of underlying per-destination fallback sending links that are cached for an anonymous producer to improve performance of sending when a peer doesn't offer support for the anonymous relay. By default only one sender link is cached which means that sending to multiple destinations will cause the cached sender to be closed and new sender to be opened each time the destination changes. Increasing the cache size can reduce the amount of [...] +**amqp.anonymousFallbackCacheTimeout** Controls how long in milliseconds an underlying per-destination fallback sender link can remain in an anonymous producers cache when inactive before it is automatically closed. The default is 30000 milliseconds (30 seconds) and can be set to zero to disable the timeouts. ### Failover Configuration options --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org