Author: rgodfrey
Date: Mon May 12 13:19:50 2008
New Revision: 655630

URL: http://svn.apache.org/viewvc?rev=655630&view=rev
Log:
More fixing up of refactoring stuff; getting all maven tests passing and 
implementing management methods

Added:
    
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
      - copied unchanged from r655618, 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
Modified:
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
    
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
    
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Mon May 12 13:19:50 2008
@@ -495,9 +495,6 @@
                 // Deliver Message
                 deliveryContext.requeue(unacked);
 
-                // Should we allow access To the DM to directy deliver the 
message?
-                // As we don't need to check for Consumers or worry about 
incrementing the message count?
-                // unacked.queue.getDeliveryManager().deliver(_storeContext, 
unacked.queue.getName(), unacked.message, false);
             }
             else
             {

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
 Mon May 12 13:19:50 2008
@@ -279,6 +279,12 @@
             ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
         }
 
+
+        if(connectorConfig.useBiasedWrites)
+        {
+            System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+        }
+
         int port = connectorConfig.port;
 
         String portStr = commandLine.getOptionValue("p");

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
 Mon May 12 13:19:50 2008
@@ -49,6 +49,7 @@
 
     public void update(long deliveryTag, boolean multiple)
     {
+        _unacked.clear();
         if (!multiple)
         {
             if(_individual == null)

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Mon May 12 13:19:50 2008
@@ -278,9 +278,17 @@
     }
 
     /** Threadsafe. Increment the reference count on the message. */
-    public void incrementReference()
+    public boolean incrementReference()
     {
-        _referenceCount.incrementAndGet();
+        if(_referenceCount.incrementAndGet() <= 1)
+        {
+            _referenceCount.decrementAndGet();
+            return false;
+        }
+        else
+        {
+            return true;
+        }
         // if (_log.isDebugEnabled())
         // {
         // _log.debug("Ref count on message " + debugIdentity() + " 
incremented " + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
@@ -298,6 +306,7 @@
      */
     public void decrementReference(StoreContext storeContext) throws 
MessageCleanupException
     {
+
         int count = _referenceCount.decrementAndGet();
 
         // note that the operation of decrementing the reference count and 
then removing the message does not
@@ -306,6 +315,11 @@
         // not relying on the all the increments having taken place before the 
delivery manager decrements.
         if (count == 0)
         {
+            // set the reference count way below 0 so that we can detect that 
the message has been deleted
+            // this is to guard against the message being spontaneously 
recreated (from the mgmt console)
+            // by copying from other queues at the same time as it is being 
removed.
+            _referenceCount.set(Integer.MIN_VALUE/2);
+
             try
             {
                 // if (_log.isDebugEnabled())

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Mon May 12 13:19:50 2008
@@ -103,10 +103,6 @@
 
     void start();
 
-    void enqueueMovedMessages(final StoreContext storeContext, final 
List<QueueEntry> foundMessagesList);
-
-
-
     long getMaximumMessageSize();
 
     void setMaximumMessageSize(long value);

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 Mon May 12 13:19:50 2008
@@ -38,7 +38,7 @@
             throws AMQException
     {
 
-        final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ? 
arguments.getInteger(X_QPID_PRIORITIES) : 1;
+        final int priorities = arguments == null ? 1 : 
arguments.containsKey(X_QPID_PRIORITIES) ? 
arguments.getInteger(X_QPID_PRIORITIES) : 1;
 
         if(priorities > 1)
         {

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Mon May 12 13:19:50 2008
@@ -34,7 +34,8 @@
         AVAILABLE,
         ACQUIRED,
         EXPIRED,
-        DEQUEUED
+        DEQUEUED,
+        DELETED
     }
 
     public static interface StateChangeListener
@@ -62,7 +63,7 @@
     }
 
 
-    public final class DeletedState extends EntryState
+    public final class DequeuedState extends EntryState
     {
 
         public State getState()
@@ -71,6 +72,16 @@
         }
     }
 
+
+    public final class DeletedState extends EntryState
+    {
+
+        public State getState()
+        {
+            return State.DELETED;
+        }
+    }
+
     public final class ExpiredState extends EntryState
     {
 
@@ -113,6 +124,7 @@
 
     final static EntryState AVAILABLE_STATE = new AvailableState();
     final static EntryState DELETED_STATE = new DeletedState();
+    final static EntryState DEQUEUED_STATE = new DequeuedState();
     final static EntryState EXPIRED_STATE = new ExpiredState();
     final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new 
NonSubscriptionAcquiredState();
 
@@ -165,7 +177,7 @@
 
     void restoreCredit();
 
-    void discard(StoreContext storeContext) throws AMQException;
+    void discard(StoreContext storeContext) throws FailedDequeueException, 
MessageCleanupException;
 
     boolean isQueueDeleted();
 

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Mon May 12 13:19:50 2008
@@ -252,13 +252,18 @@
 
     public void dequeue(final StoreContext storeContext) throws 
FailedDequeueException
     {
+        EntryState state = _state;
 
-
-        getQueue().dequeue(storeContext, this);
-        if(_stateChangeListeners != null)
+        if((state.getState() == State.ACQUIRED) 
&&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
         {
-            notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+            getQueue().dequeue(storeContext, this);
+            if(_stateChangeListeners != null)
+            {
+                notifyStateChange(state.getState() , 
QueueEntry.State.DEQUEUED);
+            }
+
         }
+
     }
 
     private void notifyStateChange(final State oldState, final State newState)
@@ -271,8 +276,10 @@
 
     public void dispose(final StoreContext storeContext) throws 
MessageCleanupException
     {
-        getMessage().decrementReference(storeContext);
-        delete();
+        if(delete())
+        {
+            getMessage().decrementReference(storeContext);
+        }
     }
 
     public void restoreCredit()

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Mon May 12 13:19:50 2008
@@ -5,6 +5,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -135,9 +136,6 @@
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
     private AtomicInteger _deliveredMessages = new AtomicInteger();
 
-
-
-
     protected SimpleAMQQueue(AMQShortString name, boolean durable, 
AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
     {
@@ -428,6 +426,7 @@
         if(entry.immediateAndNotDelivered())
         {
             dequeue(storeContext, entry);
+            entry.dispose(storeContext);
         }
         else if(!entry.isAcquired())
         {
@@ -582,7 +581,7 @@
             {
                 _virtualHost.getMessageStore().dequeueMessage(storeContext, 
getName(), msg.getMessageId());
             }
-            entry.delete();
+            //entry.dispose(storeContext);
 
         }
         catch (MessageCleanupException e)
@@ -685,7 +684,13 @@
 
     public long getOldestMessageArrivalTime()
     {
-        return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
+        QueueEntry entry = getOldestQueueEntry();
+        return entry == null ? Long.MAX_VALUE : 
entry.getMessage().getArrivalTime();
+    }
+
+    protected QueueEntry getOldestQueueEntry()
+    {
+        return _entries.next(_entries.getHead());
     }
 
     public boolean isDeleted()
@@ -809,35 +814,217 @@
     }
 
 
-    public void moveMessagesToAnotherQueue(long fromMessageId,
-                                           long toMessageId,
+    public void moveMessagesToAnotherQueue(final long fromMessageId,
+                                           final long toMessageId,
                                            String queueName,
                                            StoreContext storeContext)
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
+
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
+        MessageStore store = getVirtualHost().getMessageStore();
+
+
+        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+                                        {
+
+                                            public boolean accept(QueueEntry 
entry)
+                                            {
+                                                final long messageId = 
entry.getMessage().getMessageId();
+                                                return (messageId >= 
fromMessageId)
+                                                       && (messageId <= 
toMessageId)
+                                                       && entry.acquire();
+                                            }
+
+                                            public boolean filterComplete()
+                                            {
+                                                return false;
+                                            }
+                                        });
+
+
+        try
+        {
+            store.beginTran(storeContext);
+
+            // Move the messages in on the message store.
+            for (QueueEntry entry : entries)
+            {
+                AMQMessage message = entry.getMessage();
+
+                if(message.isPersistent() && toQueue.isDurable())
+                {
+                    store.enqueueMessage(storeContext, toQueue.getName(), 
message.getMessageId());
+                }
+                // dequeue does not decrement the refence count
+                entry.dequeue(storeContext);
+            }
+
+            // Commit and flush the move transcations.
+            try
+            {
+                store.commitTran(storeContext);
+            }
+            catch (AMQException e)
+            {
+                throw new RuntimeException("Failed to commit transaction 
whilst moving messages on message store.", e);
+            }
+        }
+        catch (AMQException e)
+        {
+            try
+            {
+                store.abortTran(storeContext);
+            }
+            catch (AMQException rollbackEx)
+            {
+                _logger.error("Failed to rollback transaction when error 
occured moving messages", rollbackEx);
+            }
+            throw new RuntimeException(e);
+        }
+
+        try
+        {
+            for (QueueEntry entry : entries)
+            {
+                toQueue.enqueue(storeContext, entry.getMessage());
+
+            }
+        }
+        catch (MessageCleanupException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+
     }
 
-    public void copyMessagesToAnotherQueue(long fromMessageId,
-                                           long toMessageId,
+    public void copyMessagesToAnotherQueue(final long fromMessageId,
+                                           final long toMessageId,
                                            String queueName,
-                                           StoreContext storeContext)
+                                           final StoreContext storeContext)
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
+        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
+        MessageStore store = getVirtualHost().getMessageStore();
+
+
+        List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+                                        {
+
+                                            public boolean accept(QueueEntry 
entry)
+                                            {
+                                                final long messageId = 
entry.getMessage().getMessageId();
+                                                if((messageId >= fromMessageId)
+                                                       && (messageId <= 
toMessageId))
+                                                {
+                                                    if(!entry.isDeleted())
+                                                    {
+                                                        return 
entry.getMessage().incrementReference();
+                                                    }
+                                                }
+
+                                                return false;
+                                            }
+
+                                            public boolean filterComplete()
+                                            {
+                                                return false;
+                                            }
+                                        });
+
+        try
+        {
+            store.beginTran(storeContext);
+
+            // Move the messages in on the message store.
+            for (QueueEntry entry : entries)
+            {
+                AMQMessage message = entry.getMessage();
+
+                if(message.isReferenced() && message.isPersistent() && 
toQueue.isDurable())
+                {
+                    store.enqueueMessage(storeContext, toQueue.getName(), 
message.getMessageId());
+                }
+            }
+
+            // Commit and flush the move transcations.
+            try
+            {
+                store.commitTran(storeContext);
+            }
+            catch (AMQException e)
+            {
+                throw new RuntimeException("Failed to commit transaction 
whilst moving messages on message store.", e);
+            }
+        }
+        catch (AMQException e)
+        {
+            try
+            {
+                store.abortTran(storeContext);
+            }
+            catch (AMQException rollbackEx)
+            {
+                _logger.error("Failed to rollback transaction when error 
occured moving messages", rollbackEx);
+            }
+            throw new RuntimeException(e);
+        }
+
+        try
+        {
+            for (QueueEntry entry : entries)
+            {
+                if(entry.getMessage().isReferenced())
+                {
+                    toQueue.enqueue(storeContext, entry.getMessage());
+                }
+            }
+        }
+        catch (MessageCleanupException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+
     }
 
     public void removeMessagesFromQueue(long fromMessageId, long toMessageId, 
StoreContext storeContext)
     {
-        //To change body of implemented methods use File | Settings | File 
Templates.
-    }
 
+        try
+        {
+            QueueEntryIterator queueListIterator = _entries.iterator();
 
-    public void enqueueMovedMessages(final StoreContext storeContext, final 
List<QueueEntry> foundMessagesList)
-    {
-        //To change body of implemented methods use File | Settings | File 
Templates.
-    }
 
+            while(queueListIterator.advance())
+            {
+                QueueEntry node = queueListIterator.getNode();
+
+                final long messageId = node.getMessage().getMessageId();
+
+                if((messageId >= fromMessageId)
+                           && (messageId <= toMessageId)
+                           && !node.isDeleted()
+                           && node.acquire())
+                {
+                    node.discard(storeContext);
+                }
 
+            }
+        }
+        catch (AMQException e)
+        {
+            throw new RuntimeException(e);
+        }
 
+    }
 
     public void quiesce()
     {
@@ -868,10 +1055,7 @@
             QueueEntry node = queueListIterator.getNode();
             if(!node.isDeleted() && node.acquire())
             {
-                node.dequeue(storeContext);
-
-                node.dispose(storeContext);                
-
+                node.discard(storeContext);
                 noDeletes = false;
             }
 
@@ -889,10 +1073,7 @@
             QueueEntry node = queueListIterator.getNode();
             if(!node.isDeleted() && node.acquire())
             {
-                node.dequeue(storeContext);
-
-                node.dispose(storeContext);
-
+                node.discard(storeContext);
                 count++;
             }
 
@@ -1046,9 +1227,7 @@
                 if(node.acquire())
                 {
                     final StoreContext reapingStoreContext = new 
StoreContext();
-                    node.dequeue(reapingStoreContext);
-                    node.dispose(reapingStoreContext);
-
+                    node.discard(reapingStoreContext);
                 }
             }
             QueueEntry newNode = _entries.next(node);
@@ -1209,10 +1388,7 @@
             if(!node.isDeleted() && node.expired() && node.acquire())
             {
 
-                node.dequeue(storeContext);
-
-                node.dispose(storeContext);
-
+                node.discard(storeContext);
             }
 
         }

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
 Mon May 12 13:19:50 2008
@@ -203,12 +203,15 @@
 
         MessageMetaData mmd = new MessageMetaData(publishBody, 
contentHeaderBody, _contentBodies.size(), arrivalTime);
 
-        _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
 
 
         _persistent =  contentHeaderBody.properties instanceof 
BasicContentHeaderProperties &&
                ((BasicContentHeaderProperties) 
contentHeaderBody.properties).getDeliveryMode() == 2;
 
+        _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
+
         populateFromMessageMetaData(mmd);
     }
 

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
 Mon May 12 13:19:50 2008
@@ -97,6 +97,10 @@
                 defaultValue = "false")
     public boolean _multiThreadNIO;
 
+    @Configured(path = "advanced.useWriteBiasedPool",
+                    defaultValue = "true")        
+    public boolean useBiasedWrites;
+
 
     public IoAcceptor createAcceptor()
     {

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Mon May 12 13:19:50 2008
@@ -538,7 +538,8 @@
         try
         {
 
-            
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, 
brokerDetail);
+            TransportConnection.connect(_protocolHandler,brokerDetail);
+            
             // this blocks until the connection has been set up or when an 
error
             // has prevented the connection being set up
 

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Mon May 12 13:19:50 2008
@@ -29,6 +29,7 @@
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.slf4j.Logger;
@@ -346,4 +347,9 @@
         }
     }
 
+    public static synchronized void connect(final AMQProtocolHandler 
protocolHandler, final BrokerDetails brokerDetail)
+            throws AMQTransportConnectionException, IOException
+    {
+        getInstance(brokerDetail).connect(protocolHandler, brokerDetail);
+    }
 }

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
 Mon May 12 13:19:50 2008
@@ -87,6 +87,8 @@
     /** Holds the number of executor threads to create. */
     private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", 
DEFAULT_POOL_SIZE);
 
+    private final boolean _useBiasedPool = 
Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
     /**
      * Retrieves the singleton instance of this reference counter.
      *
@@ -117,11 +119,19 @@
 //                _pool = Executors.newFixedThreadPool(_poolSize);
 
                 // Use a job queue that biases to writes
-                _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
-                                      0L, TimeUnit.MILLISECONDS,
-                                      new ReadWriteJobQueue());
+                if(_useBiasedPool)
+                {
+                    _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
+                                          0L, TimeUnit.MILLISECONDS,
+                                          new ReadWriteJobQueue());
+                }
+                else
+                {
+                    _pool = Executors.newFixedThreadPool(_poolSize);
+                }
             }
 
+
             return _pool;
         }
     }

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
 Mon May 12 13:19:50 2008
@@ -243,9 +243,10 @@
         }
 
 
-        public void incrementReference()
+        public boolean incrementReference()
         {
             _count++;
+            return true;
         }
 
         public void decrementReference(StoreContext context)


Reply via email to