https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test 
case applied with thanks.

Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue 
in code that keeps data files with acks around pending message file gc. thanks 
jgenender - test case to follow"

This reverts commit dd68c61e65f24b7dc498b36e34960a4bc46ded4b.

resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case 
that nicely demonstrates the defect, thanks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e828dc79
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e828dc79
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e828dc79

Branch: refs/heads/activemq-5.11.x
Commit: e828dc791f59691d2d0ba2ce169082e4c1984139
Parents: b997bd4
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Jan 30 14:45:26 2015 +0000
Committer: Hadrian Zbarcea <hadr...@apache.org>
Committed: Thu Feb 12 18:18:29 2015 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  3 +-
 .../org/apache/activemq/bugs/AMQ2832Test.java   | 51 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e828dc79/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 35a59ce..477f42c 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             if (LOG.isTraceEnabled()) {
                 LOG.trace("gc candidates: " + gcCandidateSet);
             }
-            final TreeSet<Integer> gcCandidates = new 
TreeSet<Integer>(gcCandidateSet);
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
                 Integer candidate = candidates.next();
                 Set<Integer> referencedFileIds = 
metadata.ackMessageFileMap.get(candidate);
                 if (referencedFileIds != null) {
                     for (Integer referencedFileId : referencedFileIds) {
-                        if (completeFileSet.contains(referencedFileId) && 
!gcCandidates.contains(referencedFileId)) {
+                        if (completeFileSet.contains(referencedFileId) && 
!gcCandidateSet.contains(referencedFileId)) {
                             // active file that is not targeted for deletion 
is referenced so don't delete
                             candidates.remove();
                             break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/e828dc79/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
index 319fcc2..22ad6ab 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.bugs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -117,7 +118,57 @@ public class AMQ2832Test {
         }
     }
 
+   /**
+    * Scenario:
+    * db-1.log has an unacknowledged message,
+    * db-2.log contains acks for the messages from db-1.log,
+    * db-3.log contains acks for the messages from db-2.log
+    *
+    * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log 
should not be removed during the cleanup.
+    * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing 
all messages from db-2.log, whose acks were in db-3.log, to be replayed.
+    *
+    * @throws Exception
+    */
     @Test
+    public void testAckChain() throws Exception {
+       startBroker();
+
+       StagedConsumer consumer = new StagedConsumer();
+       // file #1
+       produceMessagesToConsumeMultipleDataFiles(5);
+       // acknowledge first 2 messages and leave the 3rd one unacknowledged 
blocking db-1.log
+       consumer.receive(3);
+
+       // send messages by consuming and acknowledging every message right 
after sent in order to get KahadbAdd and Remove command to be saved together
+       // this is necessary in order to get KahaAddMessageCommand to be saved 
in one db file and the corresponding KahaRemoveMessageCommand in the next one
+       produceAndConsumeImmediately(20, consumer);
+       consumer.receive(2).acknowledge(); // consume and ack the last 2 
unconsumed
+
+       // now we have 3 files written and started with #4
+       consumer.close();
+
+       broker.stop();
+       broker.waitUntilStopped();
+
+       recoverBroker();
+
+       consumer = new StagedConsumer();
+       Message message = consumer.receive(1);
+       assertNotNull("One message stays unacked from db-1.log", message);
+       message.acknowledge();
+       message = consumer.receive(1);
+       assertNull("There should not be any unconsumed messages any more", 
message);
+       consumer.close();
+   }
+
+   private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer 
consumer) throws Exception {
+      for (int i = 0; i < numOfMsgs; i++) {
+         produceMessagesToConsumeMultipleDataFiles(1);
+         consumer.receive(1).acknowledge();
+      }
+   }
+
+   @Test
     public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
 
         startBroker();

Reply via email to