Author: ritchiem
Date: Wed Oct 10 02:59:56 2007
New Revision: 583415

URL: http://svn.apache.org/viewvc?rev=583415&view=rev
Log:
QPID-578 : Queue Deletion causes unacked msgs to be discard resulting in the 
RejcectHandler throwing NPE when it too tries to discard the message.

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=583415&r1=583414&r2=583415&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 Wed Oct 10 02:59:56 2007
@@ -23,7 +23,6 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicRejectBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -71,7 +70,7 @@
         {
             _logger.debug("Rejecting:" + evt.getMethod().deliveryTag +
                           ": Requeue:" + evt.getMethod().requeue +
-//                              ": Resend:" + evt.getMethod().resend +
+                          //": Resend:" + evt.getMethod().resend +
                           " on channel:" + channel.debugIdentity());
         }
 
@@ -86,17 +85,31 @@
         }
         else
         {
+            if (message.queue == null || message.queue.isDeleted())
+            {
+                _logger.warn("Message's Queue as already been purged, unable 
to Reject. " +
+                             "Dropping message should use Dead Letter Queue");
+                //sendtoDeadLetterQueue(msg)                
+                return;
+            }
+
+            if (!message.message.isReferenced())
+            {
+                _logger.warn("Message as already been purged, unable to 
Reject.");
+                return;
+            }
+
 
             if (_logger.isTraceEnabled())
             {
                 _logger.trace("Rejecting: DT:" + deliveryTag + "-" + 
message.message.debugIdentity() +
                               ": Requeue:" + evt.getMethod().requeue +
-//                              ": Resend:" + evt.getMethod().resend +
+                              //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());
             }
 
             // If we haven't requested message to be resent to this consumer 
then reject it from ever getting it.
-//            if (!evt.getMethod().resend)
+            //if (!evt.getMethod().resend)
             {
                 
message.message.reject(message.message.getDeliveredSubscription(message.queue));
             }
@@ -108,6 +121,7 @@
             else
             {
                 _logger.warn("Dropping message as requeue not required and 
there is no dead letter queue");
+                //sendtoDeadLetterQueue(AMQMessage message)
 //                message.queue = channel.getDefaultDeadLetterQueue();
 //                channel.requeue(deliveryTag);
             }

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=583415&r1=583414&r2=583415&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Oct 10 02:59:56 2007
@@ -130,6 +130,11 @@
 
     }
 
+    public boolean isReferenced()
+    {
+        return _referenceCount.get() > 0;
+    }    
+
     /**
      * Used to iterate through all the body frames associated with this 
message. Will not keep all the data in memory
      * therefore is memory-efficient.

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=583415&r1=583414&r2=583415&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Wed Oct 10 02:59:56 2007
@@ -238,6 +238,11 @@
         return _autoDelete;
     }
 
+    public boolean isDeleted()
+    {
+        return _deleted.get();
+    }    
+
     /** @return no of messages(undelivered) on the queue. */
     public int getMessageCount()
     {


Reply via email to