Author: rgodfrey
Date: Thu May 29 06:04:37 2008
New Revision: 661325
URL: http://svn.apache.org/viewvc?rev=661325&view=rev
Log:
Made subscription sendLock straight lock, re-enabled per subscription async
delivery
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu May 29 06:04:37 2008
@@ -831,7 +831,7 @@
// may need to deliver queued messages
for (Subscription s : _tag2SubscriptionMap.values())
{
- s.getQueue().deliverAsync();
+ s.getQueue().deliverAsync(s);
}
}
}
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Thu May 29 06:04:37 2008
@@ -278,7 +278,7 @@
}
- deliverAsync();
+ deliverAsync(subscription);
}
@@ -456,8 +456,7 @@
private void deliverToSubscription(final Subscription sub, final
QueueEntry entry)
throws AMQException
{
- // the send lock is a read/write lock that prevents the subscription
from changing status while we are in this
- // block
+
sub.getSendLock();
try
{
@@ -772,7 +771,7 @@
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -1662,7 +1661,7 @@
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate,
QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync();
+ deliverAsync(_sub);
}
}
}
\ No newline at end of file
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
Thu May 29 06:04:37 2008
@@ -73,7 +73,7 @@
boolean wouldSuspend(QueueEntry msg);
- Object getSendLock();
+ void getSendLock();
void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
Modified:
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
Thu May 29 06:04:37 2008
@@ -22,9 +22,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -66,9 +65,6 @@
private QueueEntry.SubscriptionAcquiredState _owningState = new
QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
- private final Lock _stateChangeExclusiveLock;
-
-
static final class BrowserSubscription extends SubscriptionImpl
{
@@ -287,9 +283,9 @@
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- _stateChangeLock = readWriteLock.readLock();
- _stateChangeExclusiveLock = readWriteLock.writeLock();
+
+ _stateChangeLock = new ReentrantLock();
+
if (arguments != null)
{
@@ -445,7 +441,7 @@
boolean closed = false;
State state = getState();
- _stateChangeExclusiveLock.lock();
+ _stateChangeLock.lock();
try
{
while(!closed && state != State.CLOSED)
@@ -464,7 +460,7 @@
}
finally
{
- _stateChangeExclusiveLock.unlock();
+ _stateChangeLock.unlock();
}
@@ -495,10 +491,9 @@
return
!_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
}
- public Object getSendLock()
+ public void getSendLock()
{
_stateChangeLock.lock();
- return _deleted;
}
public void releaseSendLock()
Modified:
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
---
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
(original)
+++
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Thu May 29 06:04:37 2008
@@ -86,7 +86,7 @@
//no-op
}
- public Object getSendLock()
+ public void getSendLock()
{
return new Object();
}