Author: ritchiem
Date: Wed Jul 25 05:40:24 2007
New Revision: 559427

URL: http://svn.apache.org/viewvc?view=rev&rev=559427
Log:
AMQMessage - added //todo-s and removed unused parameter StoreContext from 
expired() method call.
ConcurrentSelectorDeliveryManager - Update to reflect expired() call change. 
Created new _reaperContextStore to be used when performing reaper operations 
such as message dequeue due to expiration. Removed old commented code.

Modified:
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
 Wed Jul 25 05:40:24 2007
@@ -35,6 +35,7 @@
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -72,7 +73,7 @@
 
         SaslServer ss = null;
         try
-        {
+        {                       
             ss = authMgr.createSaslServer(String.valueOf(body.mechanism), 
session.getLocalFQDN());
 
             if (ss == null)

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Wed Jul 25 05:40:24 2007
@@ -204,6 +204,7 @@
         if (message instanceof AMQDataBlock)
         {
             amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+                        
         }
         else if (message instanceof ByteBuffer)
         {

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Jul 25 05:40:24 2007
@@ -81,12 +81,17 @@
     // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new 
TransientMessageData();
 
+    //todo: this should be part of a messageOnQueue object
     private Set<Subscription> _rejectedBy = null;
 
+    //todo: this should be part of a messageOnQueue object
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, 
AtomicBoolean>();
+    //todo: this should be part of a messageOnQueue object
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new 
HashMap<AMQQueue, Subscription>();
 
     private final int hashcode = System.identityHashCode(this);
+
+    //todo: this should be part of a messageOnQueue object
     private long _expiration;
 
     public String debugIdentity()
@@ -652,14 +657,13 @@
     /**
      * Checks to see if the message has expired. If it has the message is 
dequeued.
      *
-     * @param storecontext
-     * @param queue
+     * @param queue The queue to check the expiration against. (Currently not 
used)
      *
      * @return true if the message has expire
      *
      * @throws AMQException
      */
-    public boolean expired(StoreContext storecontext, AMQQueue queue) throws 
AMQException
+    public boolean expired(AMQQueue queue) throws AMQException
     {
         // note: If the storecontext isn't need then we can remove the 
getChannel() from Subscription.
 

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Jul 25 05:40:24 2007
@@ -87,6 +87,10 @@
     private final Object _queueHeadLock = new Object();
     private String _processingThreadName = "";
 
+
+    /** Used by any reaping thread to purge messages */
+    private StoreContext _reapingStoreContext = new StoreContext();
+
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, 
AMQQueue queue)
     {
 
@@ -463,17 +467,19 @@
             assert removed == message;
 
             // if the message expired then the _totalMessageSize needs 
adjusting
-            if (message.expired(sub.getChannel().getStoreContext(), _queue))
+            if (message.expired(_queue))
             {
                 _totalMessageSize.addAndGet(-message.getSize());
 
-                message.dequeue(sub.getChannel().getStoreContext(), _queue);
+                // Use the reapingStoreContext as any sub(if we have one) may 
be in a tx.
+                message.dequeue(_reapingStoreContext, _queue);
 
                 if (_log.isInfoEnabled())
                 {
                     _log.info(debugIdentity() + " Doing clean up of the main 
_message queue.");
                 }
             }
+
             //else the clean up is not required as the message has already 
been taken for this queue therefore
             // it was the responsibility of the code that took the message to 
ensure the _totalMessageSize was updated.
 
@@ -513,15 +519,15 @@
         // if the message is null then don't purge as we have no messagse.
         if (message != null)
         {
+            // Check that the message hasn't expired.
+            if (message.expired(_queue))
+            {
+                return true;
+            }
+
             // if we have a subscriber perform message checks
             if (sub != null)
             {
-                // Check that the message hasn't expired.
-                if (message.expired(sub.getChannel().getStoreContext(), 
_queue))
-                {
-                    return true;
-                }
-
                 // if we have a queue browser(we don't purge) so check mark 
the message as taken
                 purge = ((!sub.isBrowser() || message.isTaken(_queue)));
             }
@@ -640,7 +646,14 @@
         }
         catch (AMQException e)
         {
-            message.release(_queue);
+            if (message != null)
+            {
+                message.release(_queue);
+            }
+            else
+            {
+                _log.error(debugIdentity() + "Unable to release message as it 
is null. " + e, e);
+            }
             _log.error(debugIdentity() + "Unable to deliver message as dequeue 
failed: " + e, e);
         }
     }
@@ -719,25 +732,6 @@
 
     }
 
-//    private void sendNextMessage(Subscription sub)
-//    {
-//        if (sub.filtersMessages())
-//        {
-//            sendNextMessage(sub, sub.getPreDeliveryQueue());
-//            if (sub.isAutoClose())
-//            {
-//                if (sub.getPreDeliveryQueue().isEmpty())
-//                {
-//                    sub.close();
-//                }
-//            }
-//        }
-//        else
-//        {
-//            sendNextMessage(sub, _messages);
-//        }
-//    }
-
     public void deliver(StoreContext context, AMQShortString name, AMQMessage 
msg, boolean deliverFirst) throws AMQException
     {
 
@@ -746,8 +740,6 @@
         {
             _log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ") 
:" + msg);
         }
-        // This shouldn't be done here.
-//        msg.release();
 
         //Check if we have someone to deliver the message to.
         _lock.lock();


Reply via email to