Author: gtully
Date: Wed Feb  4 15:19:56 2009
New Revision: 740765

URL: http://svn.apache.org/viewvc?rev=740765&view=rev
Log:
move setBatch to MessageStore interface to keep cursors store agnostic -  
http://issues.apache.org/activemq/browse/AMQ-2020 - some store specific tests 
to follow

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
 Wed Feb  4 15:19:56 2009
@@ -75,15 +75,7 @@
     }
     
     protected void setBatch(MessageId messageId) {
-        AMQMessageStore amqStore = (AMQMessageStore) store;
-        try {
-            amqStore.flush();
-        } catch (InterruptedIOException e) {
-            LOG.debug("flush on setBatch resulted in exception", e);        
-        }
-        KahaReferenceStore kahaStore = 
-            (KahaReferenceStore) amqStore.getReferenceStore();
-        kahaStore.setBatch(messageId);
+        store.setBatch(messageId);
         batchResetNeeded = false;
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
 Wed Feb  4 15:19:56 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.store;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.usage.MemoryUsage;
 
@@ -42,4 +43,7 @@
 
     public void setMemoryUsage(MemoryUsage memoryUsage) {
     }
+    
+    public void setBatch(MessageId messageId) {
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
 Wed Feb  4 15:19:56 2009
@@ -110,4 +110,11 @@
     void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception;
 
     void dispose(ConnectionContext context);
+
+    /**
+     * allow caching cursors to set the current batch offset when cache is 
exhausted
+     * @param messageId
+     */
+    void setBatch(MessageId messageId);
+    
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
 Wed Feb  4 15:19:56 2009
@@ -92,4 +92,8 @@
         delegate.resetBatching();
 
     }
+
+    public void setBatch(MessageId messageId) {
+        delegate.setBatch(messageId);
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
 Wed Feb  4 15:19:56 2009
@@ -134,4 +134,8 @@
         delegate.resetBatching();
 
     }
+
+    public void setBatch(MessageId messageId) {
+        delegate.setBatch(messageId);
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
 Wed Feb  4 15:19:56 2009
@@ -558,4 +558,14 @@
         referenceStore.dispose(context);
         super.dispose(context);
     }
+
+    public void setBatch(MessageId messageId) {
+        try {
+            flush();
+        } catch (InterruptedIOException e) {
+            LOG.debug("flush on setBatch resulted in exception", e);
+        }
+        getReferenceStore().setBatch(messageId);
+    }
+    
 }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=740765&r1=740764&r2=740765&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
 Wed Feb  4 15:19:56 2009
@@ -46,6 +46,7 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,16 +71,20 @@
     final int ackWindow = 50;
     final int ackBatchSize = 50;
     final int fullWindow = 200;
-    final int count = 20000;
+    protected int count = 20000;
 
     public void setUp() throws Exception {
-        brokerService = new BrokerService();
+        brokerService = createBroker();
         brokerService.setUseJmx(false);
         brokerService.deleteAllMessages();
         brokerService.start();        
     }
 
-    public void tearDown() throws Exception {
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+       public void tearDown() throws Exception {
         brokerService.stop();
     }
 
@@ -92,8 +97,7 @@
     }
 
     public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) 
throws Exception {
-        final AMQPersistenceAdapter persistenceAdapter = 
-            (AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
+        final PersistenceAdapter persistenceAdapter =  
brokerService.getPersistenceAdapter();
         final MessageStore queueMessageStore = 
             persistenceAdapter.createQueueMessageStore(destination);
         final ConnectionContext contextNotInTx = new ConnectionContext();
@@ -127,10 +131,9 @@
             Message message = getMessage(i);
             queue.send(producerExchange, message);
         }
-        
-        assertEquals("store count is correct", count, queueMessageStore
-                .getMessageCount());
 
+        assertEquals("store count is correct", count, 
queueMessageStore.getMessageCount());
+        
         // pull from store in small windows
         Subscription subscription = new Subscription() {
 
@@ -305,7 +308,6 @@
                     if (removeIndex % 1000 == 0) {
                         LOG.info("acked: " + removeIndex);
                         persistenceAdapter.checkpoint(true);
-                        persistenceAdapter.cleanup();
                     }
                 }
             }


Reply via email to