Repository: activemq Updated Branches: refs/heads/activemq-5.12.x 3faf87ba8 -> 14f6abc55
https://issues.apache.org/jira/browse/AMQ-6014 - fix up reference counting for durable subs such that ack on one sub does not decrement usage for offline subs. Fix and test (cherry picked from commit 1ad0117932ae73603b963860c92c0980a3572b9e) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e54c9ccf Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e54c9ccf Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e54c9ccf Branch: refs/heads/activemq-5.12.x Commit: e54c9ccfa0a2bbc419fc10c39825610bf078c3db Parents: 3faf87b Author: gtully <gary.tu...@gmail.com> Authored: Mon Oct 19 16:22:29 2015 +0100 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Fri Nov 13 18:41:21 2015 +0000 ---------------------------------------------------------------------- .../broker/region/DurableTopicSubscription.java | 8 +- .../broker/region/PrefetchSubscription.java | 3 +- .../TopicProducerDurableSubFlowControlTest.java | 201 +++++++++++++++++++ 3 files changed, 209 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e54c9ccf/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 44228d7..b895cc7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -228,9 +228,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us if (keepDurableSubsActive && pending.isTransient()) { pending.addMessageFirst(node); pending.rollback(node.getMessageId()); - } else { - node.decrementReferenceCount(); + // not sure why pending.addMessageFirst does not take ownership of message reference + // by incrementing + node.incrementReferenceCount(); } + // createMessageDispatch increments on remove from pending for dispatch + node.decrementReferenceCount(); } if (!topicsToDeactivate.isEmpty()) { @@ -262,6 +265,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { MessageDispatch md = super.createMessageDispatch(node, message); if (node != QueueMessageReference.NULL_MESSAGE) { + node.incrementReferenceCount(); Integer count = redeliveredMessages.get(node.getMessageId()); if (count != null) { md.setRedeliveryCounter(count.intValue()); http://git-wip-us.apache.org/repos/asf/activemq/blob/e54c9ccf/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index ef1b372..a1cff89 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -689,7 +689,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription { // related to remove subscription action synchronized(dispatchLock) { pending.remove(); - node.decrementReferenceCount(); if (!isDropped(node) && canDispatch(node)) { // Message may have been sitting in the pending @@ -709,6 +708,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { count++; } } + // decrement after dispatch has taken ownership to avoid usage jitter + node.decrementReferenceCount(); } } else if (!isSlowConsumer()) { setSlowConsumer(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/e54c9ccf/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerDurableSubFlowControlTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerDurableSubFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerDurableSubFlowControlTest.java new file mode 100644 index 0000000..9bc7c11 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerDurableSubFlowControlTest.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.management.ObjectName; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicProducerDurableSubFlowControlTest extends TestCase implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(TopicProducerDurableSubFlowControlTest.class); + private static final String brokerName = "testBroker"; + private static final String brokerUrl = "vm://" + brokerName; + protected static final int destinationMemLimit = 2097152; // 2MB + private static final AtomicLong produced = new AtomicLong(); + private static final AtomicLong consumed = new AtomicLong(); + private static final int numMessagesToSend = 10000; + + private BrokerService broker; + + protected void setUp() throws Exception { + doSetup(true); + } + + private void doSetup(boolean deleteAll) throws Exception { + // Setup and start the broker + broker = new BrokerService(); + broker.setBrokerName(brokerName); + broker.setSchedulerSupport(false); + broker.setUseJmx(true); + broker.setUseShutdownHook(false); + broker.addConnector(brokerUrl); + broker.setAdvisorySupport(false); + + broker.getSystemUsage().getMemoryUsage().setLimit(destinationMemLimit * 10); + + broker.setDeleteAllMessagesOnStartup(deleteAll); + + // Setup the destination policy + PolicyMap pm = new PolicyMap(); + + // Setup the topic destination policy + PolicyEntry tpe = new PolicyEntry(); + tpe.setTopic(">"); + tpe.setMemoryLimit(destinationMemLimit); + tpe.setCursorMemoryHighWaterMark(10); + tpe.setProducerFlowControl(true); + tpe.setAdvisoryWhenFull(true); + tpe.setExpireMessagesPeriod(0); + + + pm.setPolicyEntries(Arrays.asList(new PolicyEntry[]{tpe})); + + setDestinationPolicy(broker, pm); + + // Start the broker + broker.start(); + broker.waitUntilStarted(); + } + + protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) { + broker.setDestinationPolicy(pm); + } + + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + public void testTopicProducerFlowControl() throws Exception { + + // Create the connection factory + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + connectionFactory.setAlwaysSyncSend(true); + + // Start the test destination listener + Connection c = connectionFactory.createConnection(); + c.setClientID("cliId1"); + c.start(); + Session listenerSession = c.createSession(false, 1); + + TopicSubscriber durable = listenerSession.createDurableSubscriber(createDestination(), "DurableSub-0"); + durable.close(); + + durable = listenerSession.createDurableSubscriber(createDestination(), "DurableSub-1"); + durable.setMessageListener(this); + + + // Start producing the test messages + final Session session = connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(createDestination()); + + final Thread producingThread = new Thread("Producing Thread") { + public void run() { + try { + for (long i = 0; i < numMessagesToSend; i++) { + producer.send(session.createTextMessage("test")); + + long count = produced.incrementAndGet(); + if (count % 10000 == 0) { + LOG.info("Produced " + count + " messages"); + } + } + } catch (Throwable ex) { + ex.printStackTrace(); + } finally { + try { + producer.close(); + session.close(); + } catch (Exception e) { + } + } + } + }; + + producingThread.start(); + + ArrayList<ObjectName> subON = new ArrayList<>(); + + final ArrayList<DurableSubscriptionViewMBean> subViews = new ArrayList<>(); + subON.addAll(Arrays.asList(broker.getAdminView().getInactiveDurableTopicSubscribers())); + subON.addAll(Arrays.asList(broker.getAdminView().getDurableTopicSubscribers())); + assertTrue("have a sub", !subON.isEmpty()); + + for (ObjectName subName : subON) { + subViews.add((DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true)); + } + + LOG.info("Wait for producer to stop"); + + assertTrue("producer thread is done", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + for (DurableSubscriptionViewMBean sub : subViews) { + LOG.info("name: " + sub.getSubscriptionName()); + LOG.info("cursor size: " + sub.cursorSize()); + LOG.info("mem usage: " + sub.getCursorMemoryUsage()); + LOG.info("mem % usage: " + sub.getCursorPercentUsage()); + } + + return !producingThread.isAlive(); + + } + }, 5 * 60 * 1000)); + + for (DurableSubscriptionViewMBean sub : subViews) { + LOG.info("name: " + sub.getSubscriptionName()); + LOG.info("cursor size: " + sub.cursorSize()); + LOG.info("mem usage: " + sub.getCursorMemoryUsage()); + LOG.info("mem % usage: " + sub.getCursorPercentUsage()); + + if (sub.cursorSize() > 0 ) { + assertTrue("Has a decent usage", sub.getCursorPercentUsage() > 5); + } + } + + } + + protected ActiveMQTopic createDestination() throws Exception { + return new ActiveMQTopic("test"); + } + + @Override + public void onMessage(Message message) { + long count = consumed.incrementAndGet(); + if (count % 10000 == 0) { + LOG.info("\tConsumed " + count + " messages"); + } + } +}