Repository: activemq
Updated Branches:
  refs/heads/trunk ab3de0c4c -> 67ead201e


https://issues.apache.org/jira/browse/AMQ-5266 
https://issues.apache.org/jira/browse/AMQ-4485 - single dest test with low 
limit exposed ignored setbatch in kahadb when sequence was not found in the 
index due to acking - resolved and validated with test that verifies dlq is 
empty


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/67ead201
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/67ead201
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/67ead201

Branch: refs/heads/trunk
Commit: 67ead201e1cb0e7dc019002d7a2e4be53184261d
Parents: ab3de0c
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Oct 21 16:04:54 2014 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Tue Oct 21 16:05:46 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  4 +-
 .../activemq/store/kahadb/MessageDatabase.java  | 17 +++-----
 .../java/org/apache/activemq/TestSupport.java   |  2 +-
 .../activemq/bugs/AMQ5266SingleDestTest.java    | 45 +++++---------------
 4 files changed, 21 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 21d7522..5778fed 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1200,7 +1200,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         } finally {
             pagedInMessagesLock.readLock().unlock();
         }
-        messagesLock.readLock().lock();
+        messagesLock.writeLock().lock();
         try{
             try {
                 messages.reset();
@@ -1217,7 +1217,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 messages.release();
             }
         }finally {
-            messagesLock.readLock().unlock();
+            messagesLock.writeLock().unlock();
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 4de5f16..88dde75 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2791,7 +2791,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
         BTreeIndex<Long, MessageKeys> lowPriorityIndex;
         BTreeIndex<Long, MessageKeys> highPriorityIndex;
-        MessageOrderCursor cursor = new MessageOrderCursor();
+        final MessageOrderCursor cursor = new MessageOrderCursor();
         Long lastDefaultKey;
         Long lastHighKey;
         Long lastLowKey;
@@ -2892,16 +2892,13 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 if (defaultPriorityIndex.containsKey(tx, sequence)) {
                     lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
-                } else if (highPriorityIndex != null) {
-                    if (highPriorityIndex.containsKey(tx, sequence)) {
-                        lastHighKey = sequence;
-                        cursor.highPriorityCursorPosition = 
nextPosition.longValue();
-                    } else if (lowPriorityIndex.containsKey(tx, sequence)) {
-                        lastLowKey = sequence;
-                        cursor.lowPriorityCursorPosition = 
nextPosition.longValue();
-                    }
+                } else if (highPriorityIndex != null && 
highPriorityIndex.containsKey(tx, sequence)) {
+                    lastHighKey = sequence;
+                    cursor.highPriorityCursorPosition = 
nextPosition.longValue();
+                } else if (lowPriorityIndex.containsKey(tx, sequence)) {
+                    lastLowKey = sequence;
+                    cursor.lowPriorityCursorPosition = 
nextPosition.longValue();
                 } else {
-                    LOG.warn("setBatch: sequence " + sequence + " not found in 
orderindex:" + this);
                     lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 46eecfb..80aac14 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -179,7 +179,7 @@ public abstract class TestSupport extends 
CombinationTestSupport {
         return setPersistenceAdapter(broker, defaultPersistenceAdapter);
     }
 
-    public PersistenceAdapter setPersistenceAdapter(BrokerService broker, 
PersistenceAdapterChoice choice) throws IOException {
+    public static PersistenceAdapter setPersistenceAdapter(BrokerService 
broker, PersistenceAdapterChoice choice) throws IOException {
         PersistenceAdapter adapter = null;
         switch (choice) {
         case JDBC:

http://git-wip-us.apache.org/repos/asf/activemq/blob/67ead201/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
index afcf54f..131f807 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -35,15 +35,13 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,7 +61,6 @@ public class AMQ5266SingleDestTest {
     static Logger LOG = LoggerFactory.getLogger(AMQ5266SingleDestTest.class);
     String activemqURL;
     BrokerService brokerService;
-    private EmbeddedDataSource dataSource;
 
     public int numDests = 1;
     public int messageSize = 10*1000;
@@ -84,7 +81,7 @@ public class AMQ5266SingleDestTest {
     public boolean useCache = true;
 
     @Parameterized.Parameter(5)
-    public boolean useDefaultStore = false;
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = 
TestSupport.PersistenceAdapterChoice.KahaDB;
 
     @Parameterized.Parameter(6)
     public boolean optimizeDispatch = false;
@@ -92,7 +89,7 @@ public class AMQ5266SingleDestTest {
     
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
-                {1000,  80,  80,   1024*1024*5,  true, true, false},
+                {1000,  80,  80,   1024*1024*1,  true, 
TestSupport.PersistenceAdapterChoice.KahaDB, false},
         });
     }
 
@@ -102,22 +99,10 @@ public class AMQ5266SingleDestTest {
     public void startBroker() throws Exception {
         brokerService = new BrokerService();
 
-        dataSource = new EmbeddedDataSource();
-        dataSource.setDatabaseName("target/derbyDb");
-        dataSource.setCreateDatabase("create");
-
-        JDBCPersistenceAdapter jdbcPersistenceAdapter = new 
JDBCPersistenceAdapter();
-        jdbcPersistenceAdapter.setDataSource(dataSource);
-        jdbcPersistenceAdapter.setUseLock(false);
-
-        if (!useDefaultStore) {
-            brokerService.setPersistenceAdapter(jdbcPersistenceAdapter);
-        } else {
-            KahaDBPersistenceAdapter kahaDBPersistenceAdapter = 
(KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-            kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
-        }
+        TestSupport.setPersistenceAdapter(brokerService, 
persistenceAdapterChoice);
         brokerService.setDeleteAllMessagesOnStartup(true);
         brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
 
 
         PolicyMap policyMap = new PolicyMap();
@@ -133,11 +118,12 @@ public class AMQ5266SingleDestTest {
         policyMap.setDefaultEntry(defaultEntry);
         brokerService.setDestinationPolicy(policyMap);
 
-        brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 
1024);
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(64 * 1024 * 
1024);
 
         TransportConnector transportConnector = 
brokerService.addConnector("tcp://0.0.0.0:0");
         brokerService.start();
         activemqURL = transportConnector.getPublishableConnectString();
+        activemqURL += "?jms.watchTopicAdvisories=false"; // ensure all 
messages are queue or dlq messages
     }
 
     @After
@@ -145,10 +131,6 @@ public class AMQ5266SingleDestTest {
         if (brokerService != null) {
             brokerService.stop();
         }
-        try {
-            dataSource.setShutdownDatabase("shutdown");
-            dataSource.getConnection();
-        } catch (Exception ignored) {}
     }
 
     @Test
@@ -202,9 +184,6 @@ public class AMQ5266SingleDestTest {
             try {
                 int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
                 LOG.info("Waiting For Consumer Completion. Time left: " + secs 
+ " secs");
-                if (!useDefaultStore) {
-                    DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-                }
                 Thread.sleep(1000);
             } catch (Exception e) {
             }
@@ -217,11 +196,6 @@ public class AMQ5266SingleDestTest {
         consumer.shutdown();
 
         TimeUnit.SECONDS.sleep(2);
-        LOG.info("DB Contents START");
-        if (!useDefaultStore) {
-            DefaultJDBCAdapter.dumpTables(dataSource.getConnection());
-        }
-        LOG.info("DB Contents END");
 
         LOG.info("Consumer Stats:");
 
@@ -243,6 +217,9 @@ public class AMQ5266SingleDestTest {
             assertEquals("expect to get all messages!", 0, diff);
 
         }
+
+        // verify empty dlq
+        assertEquals("No pending messages", 0l, ((RegionBroker) 
brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
     }
 
     public class ExportQueuePublisher {

Reply via email to