Author: rgodfrey
Date: Thu May 29 06:04:37 2008
New Revision: 661325

URL: http://svn.apache.org/viewvc?rev=661325&view=rev
Log:
Made subscription sendLock straight lock, re-enabled per subscription async 
delivery

Modified:
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu May 29 06:04:37 2008
@@ -831,7 +831,7 @@
                 // may need to deliver queued messages
                 for (Subscription s : _tag2SubscriptionMap.values())
                 {
-                    s.getQueue().deliverAsync();
+                    s.getQueue().deliverAsync(s);
                 }
             }
         }

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Thu May 29 06:04:37 2008
@@ -278,7 +278,7 @@
         }
 
 
-        deliverAsync();
+        deliverAsync(subscription);
 
     }
 
@@ -456,8 +456,7 @@
     private void deliverToSubscription(final Subscription sub, final 
QueueEntry entry)
             throws AMQException
     {
-        // the send lock is a read/write lock that prevents the subscription 
from changing status while we are in this
-        // block
+
         sub.getSendLock();
         try
         {
@@ -772,7 +771,7 @@
                 _activeSubscriberCount.incrementAndGet();
 
             }
-            deliverAsync();
+            deliverAsync(sub);
         }
     }
 
@@ -1662,7 +1661,7 @@
         public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, 
QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
-            deliverAsync();
+            deliverAsync(_sub);
         }
     }
 }
\ No newline at end of file

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
 Thu May 29 06:04:37 2008
@@ -73,7 +73,7 @@
 
     boolean wouldSuspend(QueueEntry msg);
 
-    Object getSendLock();
+    void getSendLock();
     void releaseSendLock();
 
     void resend(final QueueEntry entry) throws AMQException;

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
 Thu May 29 06:04:37 2008
@@ -22,9 +22,8 @@
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -66,9 +65,6 @@
     
     private QueueEntry.SubscriptionAcquiredState _owningState = new 
QueueEntry.SubscriptionAcquiredState(this);
     private final Lock _stateChangeLock;
-    private final Lock _stateChangeExclusiveLock;
-
-
 
     static final class BrowserSubscription extends SubscriptionImpl
     {
@@ -287,9 +283,9 @@
         _deliveryMethod = deliveryMethod;
         _recordMethod = recordMethod;
 
-        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
-        _stateChangeLock = readWriteLock.readLock();
-        _stateChangeExclusiveLock = readWriteLock.writeLock();
+
+        _stateChangeLock = new ReentrantLock();
+
 
         if (arguments != null)
         {
@@ -445,7 +441,7 @@
         boolean closed = false;
         State state = getState();
 
-        _stateChangeExclusiveLock.lock();
+        _stateChangeLock.lock();
         try
         {
             while(!closed && state != State.CLOSED)
@@ -464,7 +460,7 @@
         }
         finally
         {
-            _stateChangeExclusiveLock.unlock();
+            _stateChangeLock.unlock();
         }
 
 
@@ -495,10 +491,9 @@
         return 
!_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
     }
 
-    public Object getSendLock()
+    public void getSendLock()
     {
         _stateChangeLock.lock();
-        return _deleted;
     }
 
     public void releaseSendLock()

Modified: 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?rev=661325&r1=661324&r2=661325&view=diff
==============================================================================
--- 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 (original)
+++ 
incubator/qpid/branches/broker-queue-refactor/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
 Thu May 29 06:04:37 2008
@@ -86,7 +86,7 @@
         //no-op
     }
 
-    public Object getSendLock()
+    public void getSendLock()
     {
         return new Object();
     }


Reply via email to