Author: rgodfrey
Date: Mon May 12 13:19:50 2008
New Revision: 655630
URL: http://svn.apache.org/viewvc?rev=655630&view=rev
Log:
More fixing up of refactoring stuff; getting all maven tests passing and
implementing management methods
Added:
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
- copied unchanged from r655618,
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/mina/common/support/IoServiceListenerSupport.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Mon May 12 13:19:50 2008
@@ -495,9 +495,6 @@
// Deliver Message
deliveryContext.requeue(unacked);
- // Should we allow access To the DM to directy deliver the
message?
- // As we don't need to check for Consumers or worry about
incrementing the message count?
- // unacked.queue.getDeliveryManager().deliver(_storeContext,
unacked.queue.getName(), unacked.message, false);
}
else
{
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/Main.java
Mon May 12 13:19:50 2008
@@ -279,6 +279,12 @@
ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
}
+
+ if(connectorConfig.useBiasedWrites)
+ {
+ System.setProperty("org.apache.qpid.use_write_biased_pool","true");
+ }
+
int port = connectorConfig.port;
String portStr = commandLine.getOptionValue("p");
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
Mon May 12 13:19:50 2008
@@ -49,6 +49,7 @@
public void update(long deliveryTag, boolean multiple)
{
+ _unacked.clear();
if (!multiple)
{
if(_individual == null)
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Mon May 12 13:19:50 2008
@@ -278,9 +278,17 @@
}
/** Threadsafe. Increment the reference count on the message. */
- public void incrementReference()
+ public boolean incrementReference()
{
- _referenceCount.incrementAndGet();
+ if(_referenceCount.incrementAndGet() <= 1)
+ {
+ _referenceCount.decrementAndGet();
+ return false;
+ }
+ else
+ {
+ return true;
+ }
// if (_log.isDebugEnabled())
// {
// _log.debug("Ref count on message " + debugIdentity() + "
incremented " +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
@@ -298,6 +306,7 @@
*/
public void decrementReference(StoreContext storeContext) throws
MessageCleanupException
{
+
int count = _referenceCount.decrementAndGet();
// note that the operation of decrementing the reference count and
then removing the message does not
@@ -306,6 +315,11 @@
// not relying on the all the increments having taken place before the
delivery manager decrements.
if (count == 0)
{
+ // set the reference count way below 0 so that we can detect that
the message has been deleted
+ // this is to guard against the message being spontaneously
recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being
removed.
+ _referenceCount.set(Integer.MIN_VALUE/2);
+
try
{
// if (_log.isDebugEnabled())
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Mon May 12 13:19:50 2008
@@ -103,10 +103,6 @@
void start();
- void enqueueMovedMessages(final StoreContext storeContext, final
List<QueueEntry> foundMessagesList);
-
-
-
long getMaximumMessageSize();
void setMaximumMessageSize(long value);
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
Mon May 12 13:19:50 2008
@@ -38,7 +38,7 @@
throws AMQException
{
- final int priorities = arguments.containsKey(X_QPID_PRIORITIES) ?
arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ final int priorities = arguments == null ? 1 :
arguments.containsKey(X_QPID_PRIORITIES) ?
arguments.getInteger(X_QPID_PRIORITIES) : 1;
if(priorities > 1)
{
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Mon May 12 13:19:50 2008
@@ -34,7 +34,8 @@
AVAILABLE,
ACQUIRED,
EXPIRED,
- DEQUEUED
+ DEQUEUED,
+ DELETED
}
public static interface StateChangeListener
@@ -62,7 +63,7 @@
}
- public final class DeletedState extends EntryState
+ public final class DequeuedState extends EntryState
{
public State getState()
@@ -71,6 +72,16 @@
}
}
+
+ public final class DeletedState extends EntryState
+ {
+
+ public State getState()
+ {
+ return State.DELETED;
+ }
+ }
+
public final class ExpiredState extends EntryState
{
@@ -113,6 +124,7 @@
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
+ final static EntryState DEQUEUED_STATE = new DequeuedState();
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new
NonSubscriptionAcquiredState();
@@ -165,7 +177,7 @@
void restoreCredit();
- void discard(StoreContext storeContext) throws AMQException;
+ void discard(StoreContext storeContext) throws FailedDequeueException,
MessageCleanupException;
boolean isQueueDeleted();
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Mon May 12 13:19:50 2008
@@ -252,13 +252,18 @@
public void dequeue(final StoreContext storeContext) throws
FailedDequeueException
{
+ EntryState state = _state;
-
- getQueue().dequeue(storeContext, this);
- if(_stateChangeListeners != null)
+ if((state.getState() == State.ACQUIRED)
&&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- notifyStateChange(_state.getState() , QueueEntry.State.DEQUEUED);
+ getQueue().dequeue(storeContext, this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(state.getState() ,
QueueEntry.State.DEQUEUED);
+ }
+
}
+
}
private void notifyStateChange(final State oldState, final State newState)
@@ -271,8 +276,10 @@
public void dispose(final StoreContext storeContext) throws
MessageCleanupException
{
- getMessage().decrementReference(storeContext);
- delete();
+ if(delete())
+ {
+ getMessage().decrementReference(storeContext);
+ }
}
public void restoreCredit()
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Mon May 12 13:19:50 2008
@@ -5,6 +5,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -135,9 +136,6 @@
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
-
-
-
protected SimpleAMQQueue(AMQShortString name, boolean durable,
AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -428,6 +426,7 @@
if(entry.immediateAndNotDelivered())
{
dequeue(storeContext, entry);
+ entry.dispose(storeContext);
}
else if(!entry.isAcquired())
{
@@ -582,7 +581,7 @@
{
_virtualHost.getMessageStore().dequeueMessage(storeContext,
getName(), msg.getMessageId());
}
- entry.delete();
+ //entry.dispose(storeContext);
}
catch (MessageCleanupException e)
@@ -685,7 +684,13 @@
public long getOldestMessageArrivalTime()
{
- return 0; //To change body of implemented methods use File | Settings
| File Templates.
+ QueueEntry entry = getOldestQueueEntry();
+ return entry == null ? Long.MAX_VALUE :
entry.getMessage().getArrivalTime();
+ }
+
+ protected QueueEntry getOldestQueueEntry()
+ {
+ return _entries.next(_entries.getHead());
}
public boolean isDeleted()
@@ -809,35 +814,217 @@
}
- public void moveMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void moveMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File
Templates.
+
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new
AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry
entry)
+ {
+ final long messageId =
entry.getMessage().getMessageId();
+ return (messageId >=
fromMessageId)
+ && (messageId <=
toMessageId)
+ && entry.acquire();
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isPersistent() && toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(),
message.getMessageId());
+ }
+ // dequeue does not decrement the refence count
+ entry.dequeue(storeContext);
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction
whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error
occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
- public void copyMessagesToAnotherQueue(long fromMessageId,
- long toMessageId,
+ public void copyMessagesToAnotherQueue(final long fromMessageId,
+ final long toMessageId,
String queueName,
- StoreContext storeContext)
+ final StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File
Templates.
+ AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new
AMQShortString(queueName));
+ MessageStore store = getVirtualHost().getMessageStore();
+
+
+ List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+ {
+
+ public boolean accept(QueueEntry
entry)
+ {
+ final long messageId =
entry.getMessage().getMessageId();
+ if((messageId >= fromMessageId)
+ && (messageId <=
toMessageId))
+ {
+ if(!entry.isDeleted())
+ {
+ return
entry.getMessage().incrementReference();
+ }
+ }
+
+ return false;
+ }
+
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
+
+ try
+ {
+ store.beginTran(storeContext);
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
+ {
+ AMQMessage message = entry.getMessage();
+
+ if(message.isReferenced() && message.isPersistent() &&
toQueue.isDurable())
+ {
+ store.enqueueMessage(storeContext, toQueue.getName(),
message.getMessageId());
+ }
+ }
+
+ // Commit and flush the move transcations.
+ try
+ {
+ store.commitTran(storeContext);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException("Failed to commit transaction
whilst moving messages on message store.", e);
+ }
+ }
+ catch (AMQException e)
+ {
+ try
+ {
+ store.abortTran(storeContext);
+ }
+ catch (AMQException rollbackEx)
+ {
+ _logger.error("Failed to rollback transaction when error
occured moving messages", rollbackEx);
+ }
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ for (QueueEntry entry : entries)
+ {
+ if(entry.getMessage().isReferenced())
+ {
+ toQueue.enqueue(storeContext, entry.getMessage());
+ }
+ }
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId,
StoreContext storeContext)
{
- //To change body of implemented methods use File | Settings | File
Templates.
- }
+ try
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
- public void enqueueMovedMessages(final StoreContext storeContext, final
List<QueueEntry> foundMessagesList)
- {
- //To change body of implemented methods use File | Settings | File
Templates.
- }
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ final long messageId = node.getMessage().getMessageId();
+
+ if((messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && !node.isDeleted()
+ && node.acquire())
+ {
+ node.discard(storeContext);
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
public void quiesce()
{
@@ -868,10 +1055,7 @@
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
noDeletes = false;
}
@@ -889,10 +1073,7 @@
QueueEntry node = queueListIterator.getNode();
if(!node.isDeleted() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
count++;
}
@@ -1046,9 +1227,7 @@
if(node.acquire())
{
final StoreContext reapingStoreContext = new
StoreContext();
- node.dequeue(reapingStoreContext);
- node.dispose(reapingStoreContext);
-
+ node.discard(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
@@ -1209,10 +1388,7 @@
if(!node.isDeleted() && node.expired() && node.acquire())
{
- node.dequeue(storeContext);
-
- node.dispose(storeContext);
-
+ node.discard(storeContext);
}
}
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
Mon May 12 13:19:50 2008
@@ -203,12 +203,15 @@
MessageMetaData mmd = new MessageMetaData(publishBody,
contentHeaderBody, _contentBodies.size(), arrivalTime);
- _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
_persistent = contentHeaderBody.properties instanceof
BasicContentHeaderProperties &&
((BasicContentHeaderProperties)
contentHeaderBody.properties).getDeliveryMode() == 2;
+ _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
+
+
populateFromMessageMetaData(mmd);
}
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
Mon May 12 13:19:50 2008
@@ -97,6 +97,10 @@
defaultValue = "false")
public boolean _multiThreadNIO;
+ @Configured(path = "advanced.useWriteBiasedPool",
+ defaultValue = "true")
+ public boolean useBiasedWrites;
+
public IoAcceptor createAcceptor()
{
Modified:
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Mon May 12 13:19:50 2008
@@ -538,7 +538,8 @@
try
{
-
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler,
brokerDetail);
+ TransportConnection.connect(_protocolHandler,brokerDetail);
+
// this blocks until the connection has been set up or when an
error
// has prevented the connection being set up
Modified:
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Mon May 12 13:19:50 2008
@@ -29,6 +29,7 @@
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.slf4j.Logger;
@@ -346,4 +347,9 @@
}
}
+ public static synchronized void connect(final AMQProtocolHandler
protocolHandler, final BrokerDetails brokerDetail)
+ throws AMQTransportConnectionException, IOException
+ {
+ getInstance(brokerDetail).connect(protocolHandler, brokerDetail);
+ }
}
Modified:
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
Mon May 12 13:19:50 2008
@@ -87,6 +87,8 @@
/** Holds the number of executor threads to create. */
private int _poolSize = Integer.getInteger("amqj.read_write_pool_size",
DEFAULT_POOL_SIZE);
+ private final boolean _useBiasedPool =
Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
/**
* Retrieves the singleton instance of this reference counter.
*
@@ -117,11 +119,19 @@
// _pool = Executors.newFixedThreadPool(_poolSize);
// Use a job queue that biases to writes
- _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
- 0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue());
+ if(_useBiasedPool)
+ {
+ _pool = new ThreadPoolExecutor(_poolSize, _poolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new ReadWriteJobQueue());
+ }
+ else
+ {
+ _pool = Executors.newFixedThreadPool(_poolSize);
+ }
}
+
return _pool;
}
}
Modified:
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?rev=655630&r1=655629&r2=655630&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
Mon May 12 13:19:50 2008
@@ -243,9 +243,10 @@
}
- public void incrementReference()
+ public boolean incrementReference()
{
_count++;
+ return true;
}
public void decrementReference(StoreContext context)