Author: gtully Date: Wed Feb 4 15:19:56 2009 New Revision: 740765 URL: http://svn.apache.org/viewvc?rev=740765&view=rev Log: move setBatch to MessageStore interface to keep cursors store agnostic - http://issues.apache.org/activemq/browse/AMQ-2020 - some store specific tests to follow
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Wed Feb 4 15:19:56 2009 @@ -75,15 +75,7 @@ } protected void setBatch(MessageId messageId) { - AMQMessageStore amqStore = (AMQMessageStore) store; - try { - amqStore.flush(); - } catch (InterruptedIOException e) { - LOG.debug("flush on setBatch resulted in exception", e); - } - KahaReferenceStore kahaStore = - (KahaReferenceStore) amqStore.getReferenceStore(); - kahaStore.setBatch(messageId); + store.setBatch(messageId); batchResetNeeded = false; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java Wed Feb 4 15:19:56 2009 @@ -17,6 +17,7 @@ package org.apache.activemq.store; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.MessageId; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.usage.MemoryUsage; @@ -42,4 +43,7 @@ public void setMemoryUsage(MemoryUsage memoryUsage) { } + + public void setBatch(MessageId messageId) { + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Feb 4 15:19:56 2009 @@ -110,4 +110,11 @@ void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception; void dispose(ConnectionContext context); + + /** + * allow caching cursors to set the current batch offset when cache is exhausted + * @param messageId + */ + void setBatch(MessageId messageId); + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Feb 4 15:19:56 2009 @@ -92,4 +92,8 @@ delegate.resetBatching(); } + + public void setBatch(MessageId messageId) { + delegate.setBatch(messageId); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Feb 4 15:19:56 2009 @@ -134,4 +134,8 @@ delegate.resetBatching(); } + + public void setBatch(MessageId messageId) { + delegate.setBatch(messageId); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Wed Feb 4 15:19:56 2009 @@ -558,4 +558,14 @@ referenceStore.dispose(context); super.dispose(context); } + + public void setBatch(MessageId messageId) { + try { + flush(); + } catch (InterruptedIOException e) { + LOG.debug("flush on setBatch resulted in exception", e); + } + getReferenceStore().setBatch(messageId); + } + } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=740765&r1=740764&r2=740765&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java Wed Feb 4 15:19:56 2009 @@ -46,6 +46,7 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.amq.AMQPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,16 +71,20 @@ final int ackWindow = 50; final int ackBatchSize = 50; final int fullWindow = 200; - final int count = 20000; + protected int count = 20000; public void setUp() throws Exception { - brokerService = new BrokerService(); + brokerService = createBroker(); brokerService.setUseJmx(false); brokerService.deleteAllMessages(); brokerService.start(); } - public void tearDown() throws Exception { + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + public void tearDown() throws Exception { brokerService.stop(); } @@ -92,8 +97,7 @@ } public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { - final AMQPersistenceAdapter persistenceAdapter = - (AMQPersistenceAdapter) brokerService.getPersistenceAdapter(); + final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); final MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(destination); final ConnectionContext contextNotInTx = new ConnectionContext(); @@ -127,10 +131,9 @@ Message message = getMessage(i); queue.send(producerExchange, message); } - - assertEquals("store count is correct", count, queueMessageStore - .getMessageCount()); + assertEquals("store count is correct", count, queueMessageStore.getMessageCount()); + // pull from store in small windows Subscription subscription = new Subscription() { @@ -305,7 +308,6 @@ if (removeIndex % 1000 == 0) { LOG.info("acked: " + removeIndex); persistenceAdapter.checkpoint(true); - persistenceAdapter.cleanup(); } } }