Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x 8aeb634bb -> 35e3a9c4d


This closes #327 - with thanks to Heath Kesler


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/35e3a9c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/35e3a9c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/35e3a9c4

Branch: refs/heads/activemq-5.15.x
Commit: 35e3a9c4d5ce010307c7bdb07a73cca3f4f49a73
Parents: 8aeb634
Author: jgoodyear <jamie.goody...@gmail.com>
Authored: Tue Dec 4 12:24:24 2018 -0330
Committer: jgoodyear <jamie.goody...@gmail.com>
Committed: Tue Dec 4 12:24:24 2018 -0330

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  13 +-
 .../org/apache/activemq/bugs/AMQ7118Test.java   | 228 +++++++++++++++++++
 .../apache/activemq/bugs/amq7118/activemq.xml   | 109 +++++++++
 3 files changed, 349 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/35e3a9c4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index cb7173f..5fb1e90 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -149,6 +149,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         protected Location ackMessageFileMapLocation = null;
         protected transient ActiveMQMessageAuditNoSync 
producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
         protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new 
HashMap<>();
+        protected transient AtomicBoolean ackMessageFileMapDirtyFlag = new 
AtomicBoolean(false);
         protected int version = VERSION;
         protected int openwireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
 
@@ -825,6 +826,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 KahaAckMessageFileMapCommand audit = 
(KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
                 ObjectInputStream objectIn = new 
ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) 
objectIn.readObject();
+                metadata.ackMessageFileMapDirtyFlag.lazySet(true);
                 requiresReplay = false;
             } catch (Exception e) {
                 LOG.warn("Cannot recover ackMessageFileMap", e);
@@ -1634,6 +1636,8 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             referenceFileIds = new HashSet<>();
             referenceFileIds.add(messageLocation.getDataFileId());
             metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), 
referenceFileIds);
+            metadata.ackMessageFileMapDirtyFlag.lazySet(true);
+
         } else {
             Integer id = Integer.valueOf(messageLocation.getDataFileId());
             if (!referenceFileIds.contains(id)) {
@@ -1760,7 +1764,10 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 
         metadata.state = OPEN_STATE;
         metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
-        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
+        if (metadata.ackMessageFileMapDirtyFlag.get() || 
(metadata.ackMessageFileMapLocation == null)) {
+            metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
+        }
+        metadata.ackMessageFileMapDirtyFlag.lazySet(false);
         Location[] inProgressTxRange = getInProgressTxLocationRange();
         metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
         tx.store(metadata.page, metadataMarshaller, true);
@@ -1931,6 +1938,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                     }
                     if (gcCandidateSet.contains(candidate)) {
                         ackMessageFileMapMod |= 
(metadata.ackMessageFileMap.remove(candidate) != null);
+                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
                     } else {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("not removing data file: " + candidate
@@ -1945,6 +1953,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 for (Integer candidate : gcCandidateSet) {
                     for (Set<Integer> ackFiles : 
metadata.ackMessageFileMap.values()) {
                         ackMessageFileMapMod |= ackFiles.remove(candidate);
+                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
                     }
                 }
                 if (ackMessageFileMapMod) {
@@ -2127,6 +2136,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 referenceFileIds = new HashSet<>();
                 referenceFileIds.addAll(entry.getValue());
                 metadata.ackMessageFileMap.put(entry.getKey(), 
referenceFileIds);
+                metadata.ackMessageFileMapDirtyFlag.lazySet(true);
             } else {
                 referenceFileIds.addAll(entry.getValue());
             }
@@ -2135,6 +2145,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         // remove the old location data from the ack map so that the old 
journal log file can
         // be removed on next GC.
         metadata.ackMessageFileMap.remove(journalToRead);
+        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
 
         indexLock.writeLock().unlock();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/35e3a9c4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
new file mode 100644
index 0000000..0e845d2
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class AMQ7118Test {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(AMQ7118Test.class);
+
+    protected static Random r = new Random();
+    final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
+    protected BrokerService broker;
+    protected Connection producerConnection;
+    protected Session pSession;
+    protected Connection cConnection;
+    protected Session cSession;
+    private final String xbean = "xbean:";
+    private final String confBase = 
"src/test/resources/org/apache/activemq/bugs/amq7118";
+    int checkpointIndex = 0;
+
+    private static final ActiveMQConnectionFactory 
ACTIVE_MQ_CONNECTION_FACTORY = new 
ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
+
+    @Before
+    public void setup() throws Exception {
+        deleteData(new File("target/data"));
+        createBroker();
+    }
+
+    @After
+    public void shutdown() throws Exception {
+        broker.stop();
+    }
+
+    public void setupProducerConnection() throws Exception {
+        producerConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
+        producerConnection.start();
+        pSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public void setupConsumerConnection() throws Exception {
+        cConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
+        cConnection.setClientID("myClient1");
+        cConnection.start();
+        cSession = cConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+    private void createBroker() throws Exception {
+        broker = new BrokerService();
+        broker = BrokerFactory.createBroker(xbean + confBase + 
"/activemq.xml");
+        broker.start();
+    }
+
+
+    @Test
+    public void testCompaction() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        setupProducerConnection();
+        setupConsumerConnection();
+
+        Topic topic = pSession.createTopic("test");
+
+        MessageConsumer consumer = cSession.createDurableSubscriber(topic, 
"clientId1");
+
+        LOG.info("Produce message to test topic");
+        produce(pSession, topic, 1, 512 ); // just one message
+
+        LOG.info("Consume message from test topic");
+        Message msg = consumer.receive(5000);
+        assertNotNull(msg);
+
+        LOG.info("Produce more messages to test topic and get into PFC");
+        boolean sent = produce(cSession, topic, 20, 512 * 1024); // Fill the 
store
+
+        assertFalse("Never got to PFC condition", sent);
+
+        LOG.info("PFC hit");
+
+        //We hit PFC, so shut down the producer
+        producerConnection.close();
+
+        //Lets check the db-*.log file count before checkpointUpdate
+        checkFiles(false, 21, "db-21.log");
+
+        // Force checkFiles update
+        checkFiles(true, 23, "db-23.log");
+
+        //The ackMessageFileMap should be clean, so no more writing
+        checkFiles(true, 23, "db-23.log");
+
+        //One more time just to be sure - The ackMessageFileMap should be 
clean, so no more writing
+        checkFiles(true, 23, "db-23.log");
+
+        //Read out the rest of the messages
+        LOG.info("Consuming the rest of the files...");
+        for (int i = 0; i < 20; i++) {
+            msg = consumer.receive(5000);
+        }
+        LOG.info("All messages Consumed.");
+
+        //Clean up the log files and be sure its stable
+        checkFiles(true, 2, "db-33.log");
+        checkFiles(true, 3, "db-34.log");
+        checkFiles(true, 2, "db-34.log");
+        checkFiles(true, 2, "db-34.log");
+        checkFiles(true, 2, "db-34.log");
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected static boolean produce(Session session, Topic topic, int 
messageCount, int messageSize) throws JMSException {
+        MessageProducer producer = session.createProducer(topic);
+
+        for (int i = 0; i < messageCount; i++) {
+            TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", messageSize));
+
+            try {
+                producer.send(helloMessage);
+            } catch (ResourceAllocationException e){
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private void deleteData(File file) {
+        String[] entries = file.list();
+        if (entries == null) return;
+        for (String s : entries) {
+            File currentFile = new File(file.getPath(), s);
+            if (currentFile.isDirectory()) {
+                deleteData(currentFile);
+            }
+            currentFile.delete();
+        }
+        file.delete();
+    }
+
+    private void checkFiles(boolean doCheckpoint, int expectedCount, String 
lastFileName) throws Exception {
+
+        File dbfiles = new File("target/data/kahadb");
+        FilenameFilter lff = new FilenameFilter(){
+            @Override
+            public boolean accept(File dir, String name) {
+                return name.toLowerCase().startsWith("db-") && 
name.toLowerCase().endsWith("log");
+            }
+        };
+
+        if(doCheckpoint) {
+            LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " 
...");
+            broker.getPersistenceAdapter().checkpoint(true);
+            TimeUnit.SECONDS.sleep(2);
+            LOG.info("Checkpoint complete.");
+        }
+        File files[] = dbfiles.listFiles(lff);
+        Arrays.sort(files,  new DBFileComparator() );
+        logfiles(files);
+        assertEquals(expectedCount, files.length);
+        assertEquals(lastFileName, files[files.length-1].getName());
+
+    }
+
+    private void logfiles(File[] files){
+
+        LOG.info("Files found in KahaDB:");
+        for (File file : files) {
+            LOG.info("    " + file.getName());
+        }
+    }
+
+    private class DBFileComparator implements Comparator<File> {
+        @Override
+        public int compare(File o1, File o2) {
+            int n1 = extractNumber(o1.getName());
+            int n2 = extractNumber(o2.getName());
+            return n1 - n2;
+        }
+
+        private int extractNumber(String name) {
+            int i = 0;
+            try {
+                int s = name.indexOf('-')+1;
+                int e = name.lastIndexOf('.');
+                String number = name.substring(s, e);
+                i = Integer.parseInt(number);
+            } catch(Exception e) {
+                i = 0; // if filename does not match the format
+                // then default to 0
+            }
+            return i;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/35e3a9c4/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml
new file mode 100644
index 0000000..b610ae0
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq7118/activemq.xml
@@ -0,0 +1,109 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+    <!--
+        The <broker> element is used to configure the ActiveMQ broker.
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core"; 
brokerName="localhost" dataDirectory="target/data">
+
+        <destinationPolicy>
+            <policyMap>
+              <policyEntries>
+                <policyEntry topic=">" >
+                    <!-- The constantPendingMessageLimitStrategy is used to 
prevent
+                         slow topic consumers to block producers and affect 
other consumers
+                         by limiting the number of messages that are retained
+                         For more information, see:
+
+                         http://activemq.apache.org/slow-consumer-handling.html
+
+                    -->
+                  <pendingMessageLimitStrategy>
+                    <constantPendingMessageLimitStrategy limit="1000"/>
+                  </pendingMessageLimitStrategy>
+                </policyEntry>
+              </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+
+
+        <!--
+            The managementContext is used to configure how ActiveMQ is exposed 
in
+            JMX. By default, ActiveMQ uses the MBean server that is started by
+            the JVM. For more information, see:
+
+            http://activemq.apache.org/jmx.html
+        -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <!--
+            Configure message persistence for the broker. The default 
persistence
+            mechanism is the KahaDB store (identified by the kahaDB tag).
+            For more information, see:
+
+            http://activemq.apache.org/persistence.html
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="target/data/kahadb" journalMaxFileLength="1k"/>
+        </persistenceAdapter>
+
+
+          <!--
+            The systemUsage controls the maximum amount of space the broker 
will
+            use before disabling caching and/or slowing down producers. For 
more information, see:
+            http://activemq.apache.org/producer-flow-control.html
+          -->
+          <systemUsage>
+            <systemUsage sendFailIfNoSpace="true">
+                <memoryUsage>
+                    <memoryUsage percentOfJvmHeap="70" />
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="10 mb" total="10000000"/>
+                </storeUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <!--
+            The transport connectors expose ActiveMQ over a given protocol to
+            clients and other brokers. For more information, see:
+
+            http://activemq.apache.org/configuring-transports.html
+        -->
+        <transportConnectors>
+            <!-- DOS protection, limit concurrent connections to 1000 and 
frame size to 100MB -->
+            <transportConnector name="openwire" 
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
+        </transportConnectors>
+
+        <!-- destroy the spring context on shutdown to stop jetty -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans"; 
class="org.apache.activemq.hooks.SpringContextHook" />
+        </shutdownHooks>
+
+    </broker>
+
+</beans>
+<!-- END SNIPPET: example -->
+

Reply via email to