Author: aidan
Date: Thu Jun 26 03:25:36 2008
New Revision: 671845

URL: http://svn.apache.org/viewvc?rev=671845&view=rev
Log:
QPID-854 QPID-999 : Merge Changes to the client to make the dispatcher 
responsible for closing the queue browser when all the messages have been 
processed.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Jun 26 03:25:36 2008
@@ -802,6 +802,10 @@
 
                         deregisterConsumer(consumer);
                     }
+                    else
+                    {
+                        _queue.add(new 
UnprocessedMessage.CloseConsumerMessage(consumer));
+                    }
                 }
             }
         }
@@ -1150,6 +1154,13 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
+        // This method needs to be improved. Throwables only arrive here from 
the mina : exceptionRecived
+        // calls through connection.closeAllSessions which is also called by 
the public connection.close()
+        // with a null cause
+        // When we are closing the Session due to a protocol session error we 
simply create a new AMQException
+        // with the correct error code and text this is cleary WRONG as the 
instanceof check below will fail.
+        // We need to determin here if the connection should be
+
         synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
@@ -1599,7 +1610,7 @@
 
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, 
_connection));
             }
-            else
+            else // Queue Browser
             {
 
                 if (isQueueBound(getDefaultTopicExchangeName(), 
AMQTopic.getDurableTopicQueueName(name, _connection)))
@@ -2778,7 +2789,8 @@
                             _lock.wait();
                         }
 
-                        if (tagLE(deliveryTag, _rollbackMark.get()))
+                        if (!(message instanceof 
UnprocessedMessage.CloseConsumerMessage)
+                            && tagLE(deliveryTag, _rollbackMark.get()))
                         {
                             rejectMessage(message, true);
                         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Jun 26 03:25:36 2008
@@ -507,6 +507,12 @@
 
             throw e;
         }
+        else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+        {
+            _closed.set(true);
+            deregisterConsumer();
+            return null;
+        }
         else
         {
             return (AbstractJMSMessage) o;
@@ -561,6 +567,7 @@
             }
             else
             {
+               // FIXME: wow this is ugly
                 // //fixme this probably is not right
                 // if (!isNoConsume())
                 { // done in BasicCancelOK Handler but not sending one so just 
deregister.
@@ -615,6 +622,36 @@
     }
 
     /**
+     * @param closeMessage
+     *            this message signals that we should close the browser
+     */
+    public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage 
closeMessage)
+    {
+        if (isMessageListenerSet())
+        {
+            // Currently only possible to get this msg type with a browser.
+            // If we get the message here then we should probably just close
+            // this consumer.
+            // Though an AutoClose consumer with message listener is quite 
odd..
+            // Just log out the fact so we know where we are
+            _logger.warn("Using an AutoCloseconsumer with message listener is 
not supported.");
+        }
+        else
+        {
+            try
+            {
+                _synchronousQueue.put(closeMessage);
+            }
+            catch (InterruptedException e)
+            {
+                _logger.info(" SynchronousQueue.put interupted. Usually result 
of connection closing,"
+                        + "but we shouldn't have close yet");
+            }
+        }
+    }
+
+    
+    /**
      * Called from the AMQSession when a message has arrived for this 
consumer. This methods handles both the case of a
      * message listener or a synchronous receive() caller.
      *
@@ -622,6 +659,12 @@
      */
     void notifyMessage(UnprocessedMessage messageFrame)
     {
+        if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+        {
+            notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) 
messageFrame);
+            return;
+        }
+
         final boolean debug = _logger.isDebugEnabled();
 
         if (debug)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
 Thu Jun 26 03:25:36 2008
@@ -107,4 +107,177 @@
     {
         return "";
     }    
-}
+    
+    public static final class CloseConsumerMessage extends UnprocessedMessage
+    {
+        AMQShortString _consumerTag;
+
+        public CloseConsumerMessage(int channelId, long deliveryId, 
AMQShortString consumerTag,
+                AMQShortString exchange, AMQShortString routingKey, boolean 
redelivered)
+        {
+            super(channelId, deliveryId, consumerTag, exchange, routingKey, 
redelivered);
+            _consumerTag = consumerTag;
+        }
+
+        public CloseConsumerMessage(BasicMessageConsumer consumer)
+        {
+            this(0, 0, consumer.getConsumerTag(), null, null, false);          
  
+        }
+
+        public BasicDeliverBody getDeliverBody()
+             {
+                 return new BasicDeliverBody()
+                  {
+    
+                     public AMQShortString getConsumerTag()
+                     {
+                         return _consumerTag;
+                     }
+
+                    @Override
+                    public long getDeliveryTag()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public AMQShortString getExchange()
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean getRedelivered()
+                    {
+                        return false;
+                    }
+
+                    @Override
+                    public AMQShortString getRoutingKey()
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public boolean execute(MethodDispatcher methodDispatcher, 
int channelId) throws AMQException
+                    {
+                        return false;
+                    }
+
+                    @Override
+                    public AMQFrame generateFrame(int channelId)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public AMQChannelException getChannelException(AMQConstant 
code, String message)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public AMQChannelException getChannelException(AMQConstant 
code, String message, Throwable cause)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public AMQChannelException getChannelNotFoundException(int 
channelId)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public int getClazz()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public AMQConnectionException 
getConnectionException(AMQConstant code, String message)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public AMQConnectionException 
getConnectionException(AMQConstant code, String message,
+                            Throwable cause)
+                    {
+                        return null;
+                    }
+
+                    @Override
+                    public byte getMajor()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public int getMethod()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public byte getMinor()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public int getSize()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public void writeMethodPayload(ByteBuffer buffer)
+                    {
+                    }
+
+                    @Override
+                    public void writePayload(ByteBuffer buffer)
+                    {
+                    }
+
+                    @Override
+                    public byte getFrameType()
+                    {
+                        return 0;
+                    }
+
+                    @Override
+                    public void handle(int channelId, 
AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                            throws AMQException
+                    {
+                        
+                    }
+                  };
+             }
+
+        @Override
+        public List getBodies()
+        {
+            return null;
+        }
+
+        @Override
+        public Object getContentHeader()
+        {
+            return null;
+        }
+
+        @Override
+        public void receiveBody(Object nativeMessageBody)
+        {
+            
+        }
+
+        @Override
+        public void setContentHeader(Object nativeMessageHeader)
+        {
+            
+        }
+    }
+}
\ No newline at end of file


Reply via email to