Repository: activemq
Updated Branches:
  refs/heads/master 03b19b9da -> b29ccf348


[AMQ-6854] log warn if page in is blocked on usage preventing dispatch. Reuse 
blockedProducerWarningInterval to tweak reporting


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b29ccf34
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b29ccf34
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b29ccf34

Branch: refs/heads/master
Commit: b29ccf348803249b9d62cc607a8c7e22f06cf8a5
Parents: 03b19b9
Author: gtully <gary.tu...@gmail.com>
Authored: Thu Nov 2 16:04:30 2017 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Thu Nov 2 16:04:30 2017 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  59 +++----
 .../usecases/UsageBlockedDispatchTest.java      | 156 +++++++++++++++++++
 2 files changed, 188 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b29ccf34/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 04ef3fd..c43a8ba 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2029,38 +2029,43 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             } finally {
                 messagesLock.writeLock().unlock();
             }
-            // Only add new messages, not already pagedIn to avoid multiple
-            // dispatch attempts
-            pagedInMessagesLock.writeLock().lock();
-            try {
-                if(isPrioritizedMessages()) {
-                    resultList = new PrioritizedPendingList();
-                } else {
-                    resultList = new OrderedPendingList();
-                }
-                for (QueueMessageReference ref : result) {
-                    if (!pagedInMessages.contains(ref)) {
-                        pagedInMessages.addMessageLast(ref);
-                        resultList.addMessageLast(ref);
+
+            if (count > 0) {
+                // Only add new messages, not already pagedIn to avoid multiple
+                // dispatch attempts
+                pagedInMessagesLock.writeLock().lock();
+                try {
+                    if (isPrioritizedMessages()) {
+                        resultList = new PrioritizedPendingList();
                     } else {
-                        ref.decrementReferenceCount();
-                        // store should have trapped duplicate in it's index, 
or cursor audit trapped insert
-                        // or producerBrokerExchange suppressed send.
-                        // note: jdbc store will not trap unacked messages as 
a duplicate b/c it gives each message a unique sequence id
-                        LOG.warn("{}, duplicate message {} - {} from cursor, 
is cursor audit disabled or too constrained? Redirecting to dlq", this, 
ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
-                        if (store != null) {
-                            ConnectionContext connectionContext = 
createConnectionContext();
-                            dropMessage(ref);
-                            if (gotToTheStore(ref.getMessage())) {
-                                LOG.debug("Duplicate message {} from cursor, 
removing from store", this, ref.getMessage());
-                                store.removeMessage(connectionContext, new 
MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+                        resultList = new OrderedPendingList();
+                    }
+                    for (QueueMessageReference ref : result) {
+                        if (!pagedInMessages.contains(ref)) {
+                            pagedInMessages.addMessageLast(ref);
+                            resultList.addMessageLast(ref);
+                        } else {
+                            ref.decrementReferenceCount();
+                            // store should have trapped duplicate in it's 
index, or cursor audit trapped insert
+                            // or producerBrokerExchange suppressed send.
+                            // note: jdbc store will not trap unacked messages 
as a duplicate b/c it gives each message a unique sequence id
+                            LOG.warn("{}, duplicate message {} - {} from 
cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, 
ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
+                            if (store != null) {
+                                ConnectionContext connectionContext = 
createConnectionContext();
+                                dropMessage(ref);
+                                if (gotToTheStore(ref.getMessage())) {
+                                    LOG.debug("Duplicate message {} from 
cursor, removing from store", this, ref.getMessage());
+                                    store.removeMessage(connectionContext, new 
MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+                                }
+                                
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), 
null, new Throwable("duplicate paged in from cursor for " + destination));
                             }
-                            
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), 
null, new Throwable("duplicate paged in from cursor for " + destination));
                         }
                     }
+                } finally {
+                    pagedInMessagesLock.writeLock().unlock();
                 }
-            } finally {
-                pagedInMessagesLock.writeLock().unlock();
+            } else if (!messages.hasSpace() && isFlowControlLogRequired()) {
+                LOG.warn("{} cursor blocked, no space available to page in 
messages; usage: {}", this, this.systemUsage.getMemoryUsage());
             }
         } else {
             // Avoid return null list, if condition is not validated

http://git-wip-us.apache.org/repos/asf/activemq/blob/b29ccf34/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
new file mode 100644
index 0000000..7767672
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+
+import javax.jms.*;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class UsageBlockedDispatchTest extends TestSupport {
+
+    private static final int MESSAGES_COUNT = 100;
+    private static byte[] buf = new byte[2 * 1024];
+    private BrokerService broker;
+
+    protected long messageReceiveTimeout = 4000L;
+
+    private String connectionUri;
+
+    @Override
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        setDefaultPersistenceAdapter(broker);
+        SystemUsage sysUsage = broker.getSystemUsage();
+        sysUsage.getMemoryUsage().setLimit(100*1024);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setProducerFlowControl(false);
+        defaultPolicy.setCursorMemoryHighWaterMark(100);
+        defaultPolicy.setMemoryLimit(50*1024);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setSystemUsage(sysUsage);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+
+        connectionUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public void testFillMemToBlockConsumer() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(10);
+        factory.setPrefetchPolicy(prefetch);
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(null);
+        BytesMessage message = producerSession.createBytesMessage();
+        message.writeBytes(buf);
+
+        int numFillers = 4;
+        ArrayList<ActiveMQQueue> fillers = new ArrayList<ActiveMQQueue>();
+        for (int i=0; i<numFillers; i++) {
+            fillers.add(new ActiveMQQueue("Q" + i));
+        }
+
+        // fill cache and consume all memory
+        for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+            for (ActiveMQQueue q : fillers) {
+                producer.send(q, message);
+            }
+        }
+        ActiveMQQueue willGetAPage = new ActiveMQQueue("Q" + numFillers++);
+        for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+            producer.send(willGetAPage, message);
+        }
+
+        ActiveMQQueue shouldBeStuckForDispatch = new ActiveMQQueue("Q" + 
numFillers);
+        for (int idx = 0; idx < MESSAGES_COUNT; ++idx) {
+            producer.send(shouldBeStuckForDispatch, message);
+        }
+
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = 
consumerSession.createConsumer(willGetAPage);
+
+        Message m = consumer.receive(messageReceiveTimeout);
+        assertNotNull("got a message", m);
+
+        final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false);
+        Appender appender = new DefaultTestAppender() {
+
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel() == Level.WARN && 
event.getRenderedMessage().contains("cursor blocked")) {
+                    gotExpectedLogEvent.set(true);
+                }
+            }
+        };
+
+        try {
+            
org.apache.log4j.Logger.getLogger(Queue.class).addAppender(appender);
+
+            MessageConsumer noDispatchConsumer = 
consumerSession.createConsumer(shouldBeStuckForDispatch);
+
+            m = noDispatchConsumer.receive(messageReceiveTimeout);
+            assertNull("did not get a message", m);
+
+            assertTrue("Got the new warning about the blocked cursor", 
gotExpectedLogEvent.get());
+        } finally {
+            
org.apache.log4j.Logger.getLogger(Queue.class).removeAppender(appender);
+            org.apache.log4j.Logger.getRootLogger().removeAppender(appender);
+        }
+    }
+}

Reply via email to