Author: aidan
Date: Thu Jun 26 03:25:36 2008
New Revision: 671845
URL: http://svn.apache.org/viewvc?rev=671845&view=rev
Log:
QPID-854 QPID-999 : Merge Changes to the client to make the dispatcher
responsible for closing the queue browser when all the messages have been
processed.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Jun 26 03:25:36 2008
@@ -802,6 +802,10 @@
deregisterConsumer(consumer);
}
+ else
+ {
+ _queue.add(new
UnprocessedMessage.CloseConsumerMessage(consumer));
+ }
}
}
}
@@ -1150,6 +1154,13 @@
public StreamMessage createStreamMessage() throws JMSException
{
+ // This method needs to be improved. Throwables only arrive here from
the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by
the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we
simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the
instanceof check below will fail.
+ // We need to determin here if the connection should be
+
synchronized (_connection.getFailoverMutex())
{
checkNotClosed();
@@ -1599,7 +1610,7 @@
deleteQueue(AMQTopic.getDurableTopicQueueName(name,
_connection));
}
- else
+ else // Queue Browser
{
if (isQueueBound(getDefaultTopicExchangeName(),
AMQTopic.getDurableTopicQueueName(name, _connection)))
@@ -2778,7 +2789,8 @@
_lock.wait();
}
- if (tagLE(deliveryTag, _rollbackMark.get()))
+ if (!(message instanceof
UnprocessedMessage.CloseConsumerMessage)
+ && tagLE(deliveryTag, _rollbackMark.get()))
{
rejectMessage(message, true);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Jun 26 03:25:36 2008
@@ -507,6 +507,12 @@
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -561,6 +567,7 @@
}
else
{
+ // FIXME: wow this is ugly
// //fixme this probably is not right
// if (!isNoConsume())
{ // done in BasicCancelOK Handler but not sending one so just
deregister.
@@ -615,6 +622,36 @@
}
/**
+ * @param closeMessage
+ * this message signals that we should close the browser
+ */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage
closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close
+ // this consumer.
+ // Though an AutoClose consumer with message listener is quite
odd..
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is
not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result
of connection closing,"
+ + "but we shouldn't have close yet");
+ }
+ }
+ }
+
+
+ /**
* Called from the AMQSession when a message has arrived for this
consumer. This methods handles both the case of a
* message listener or a synchronous receive() caller.
*
@@ -622,6 +659,12 @@
*/
void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage)
messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=671845&r1=671844&r2=671845&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
Thu Jun 26 03:25:36 2008
@@ -107,4 +107,177 @@
{
return "";
}
-}
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ AMQShortString _consumerTag;
+
+ public CloseConsumerMessage(int channelId, long deliveryId,
AMQShortString consumerTag,
+ AMQShortString exchange, AMQShortString routingKey, boolean
redelivered)
+ {
+ super(channelId, deliveryId, consumerTag, exchange, routingKey,
redelivered);
+ _consumerTag = consumerTag;
+ }
+
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ this(0, 0, consumer.getConsumerTag(), null, null, false);
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
+ {
+
+ public AMQShortString getConsumerTag()
+ {
+ return _consumerTag;
+ }
+
+ @Override
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean execute(MethodDispatcher methodDispatcher,
int channelId) throws AMQException
+ {
+ return false;
+ }
+
+ @Override
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelException(AMQConstant
code, String message)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelException(AMQConstant
code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQChannelException getChannelNotFoundException(int
channelId)
+ {
+ return null;
+ }
+
+ @Override
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ @Override
+ public AMQConnectionException
getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQConnectionException
getConnectionException(AMQConstant code, String message,
+ Throwable cause)
+ {
+ return null;
+ }
+
+ @Override
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ @Override
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ @Override
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ @Override
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ @Override
+ public void handle(int channelId,
AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+
+ }
+ };
+ }
+
+ @Override
+ public List getBodies()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getContentHeader()
+ {
+ return null;
+ }
+
+ @Override
+ public void receiveBody(Object nativeMessageBody)
+ {
+
+ }
+
+ @Override
+ public void setContentHeader(Object nativeMessageHeader)
+ {
+
+ }
+ }
+}
\ No newline at end of file