Author: ritchiem
Date: Wed Feb 21 07:47:17 2007
New Revision: 510060
URL: http://svn.apache.org/viewvc?view=rev&rev=510060
Log:
QPID-348 Problems related to prefetching of messages
Client caches are now cleared.
Partially commented out code in AMQSession and BasicMessageConsumer pending
broker fixes to ensure channel suspension is respected. Tests fail otherwise.
Tests pass just now as they are not correct, JIRA raised for fix (QPID-386).
Spec Changes
Added recover-ok method to recover. But to maintain compatibility added a
nowait bit to request the response.
Java Changes
AMQConnection added wrapping of AMQExceptions that can be thrown by the waiting
suspend calls.
AMQSession Added clean up code for rollback/recover to clean up Session._queue
and BMC._syncQueue
BasicMessageConsumer - added rollback method to clean up _syncQueue
ChannelCloseMethodHandler - reduced logging level from error to debug for
received methods.
FlowControllingBlockingQueue - added code to return iterator so messages can be
purged cleanly.
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
incubator/qpid/trunk/qpid/specs/amqp.0-8.xml
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
Wed Feb 21 07:47:17 2007
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -43,16 +44,24 @@
public void methodReceived(AMQStateManager stateManager,
AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
-
+
_logger.debug("Recover received on protocol session " + session + "
and channel " + evt.getChannelId());
AMQChannel channel = session.getChannel(evt.getChannelId());
BasicRecoverBody body = evt.getMethod();
-
+
if (channel == null)
{
throw body.getChannelNotFoundException(evt.getChannelId());
}
channel.resend(session, body.requeue);
+
+ if (!body.nowait)
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
session.writeFrame(BasicRecoverOkBody.createAMQFrame(evt.getChannelId(), (byte)
8, (byte) 0));
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Wed Feb 21 07:47:17 2007
@@ -532,7 +532,14 @@
if (_started)
{
- session.start();
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
return session;
}
@@ -690,7 +697,14 @@
while (it.hasNext())
{
final AMQSession s = (AMQSession) ((Map.Entry)
it.next()).getValue();
- s.start();
+ try
+ {
+ s.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = true;
}
@@ -703,7 +717,14 @@
{
for (Iterator i = _sessions.values().iterator(); i.hasNext();)
{
- ((AMQSession) i.next()).stop();
+ try
+ {
+ ((AMQSession) i.next()).stop();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
}
_started = false;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Feb 21 07:47:17 2007
@@ -100,6 +100,9 @@
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -191,6 +194,11 @@
private boolean _hasMessageListeners;
+ private boolean _suspended;
+
+ private final Object _suspensionLock = new Object();
+
+
/** Responsible for decoding a message fragment and passing it to the
appropriate message consumer. */
private class Dispatcher extends Thread
@@ -285,8 +293,50 @@
//fixme awaitTermination
}
- }
+ public void rollback()
+ {
+
+ synchronized (_lock)
+ {
+ boolean isStopped = connectionStopped();
+
+ if (!isStopped)
+ {
+ setConnectionStopped(true);
+ }
+
+ rejectAllMessages(true);
+
+ _logger.debug("Session Pre Dispatch Queue cleared");
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.rollback();
+ }
+
+ setConnectionStopped(isStopped);
+ }
+
+ }
+
+ public void rejectPending(AMQShortString consumerTag)
+ {
+ synchronized (_lock)
+ {
+ boolean stopped = connectionStopped();
+
+ _dispatcher.setConnectionStopped(false);
+
+ rejectMessagesForConsumerTag(consumerTag, true);
+
+ if (stopped)
+ {
+ _dispatcher.setConnectionStopped(stopped);
+ }
+ }
+ }
+ }
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
@@ -328,7 +378,7 @@
if
(_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending
channel. Current value is " + currentValue);
-
suspendChannel();
+ new
Thread(new SuspenderRunner(true)).start();
}
}
@@ -337,7 +387,7 @@
if
(_acknowledgeMode == NO_ACKNOWLEDGE)
{
_logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending
channel. Current value is " + currentValue);
-
unsuspendChannel();
+ new
Thread(new SuspenderRunner(false)).start();
}
}
});
@@ -480,16 +530,39 @@
public void rollback() throws JMSException
{
- checkTransacted();
- try
- {
- // TODO: Be aware of possible changes to parameter order as
versions change.
- getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
- }
- catch (AMQException e)
+ synchronized (_suspensionLock)
{
- throw(JMSException) (new JMSException("Failed to rollback: " +
e).initCause(e));
+ checkTransacted();
+ try
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
+ boolean isSuspended = isSuspended();
+
+ if (!isSuspended)
+ {
+// suspendChannel(true);
+ }
+
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
+ if (!isSuspended)
+ {
+// suspendChannel(false);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw(JMSException) (new JMSException("Failed to rollback: " +
e).initCause(e));
+ }
}
}
@@ -597,6 +670,13 @@
}
+
+ public boolean isSuspended()
+ {
+ return _suspended;
+ }
+
+
/**
* Called when the server initiates the closure of the session
unilaterally.
*
@@ -737,14 +817,45 @@
checkNotTransacted(); // throws IllegalStateException if a transacted
session
// this is set only here, and the before the consumer's onMessage is
called it is set to false
_inRecovery = true;
- for (BasicMessageConsumer consumer : _consumers.values())
+ try
+ {
+
+ boolean isSuspended = isSuspended();
+
+// if (!isSuspended)
+// {
+// suspendChannel(true);
+// }
+
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.clearUnackedMessages();
+ }
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
_connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+
false, // nowait
+
false) // requeue
+ , BasicRecoverOkBody.class);
+
+// if (_dispatcher != null)
+// {
+// _dispatcher.rollback();
+// }
+//
+// if (!isSuspended)
+// {
+// suspendChannel(false);
+// }
+ }
+ catch (AMQException e)
{
- consumer.clearUnackedMessages();
+ throw new JMSAMQException(e);
}
- // TODO: Be aware of possible changes to parameter order as versions
change.
-
getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
-
false)); // requeue
}
boolean isInRecovery()
@@ -1057,7 +1168,7 @@
}
catch (AMQInvalidRoutingKeyException e)
{
- JMSException ide = new
InvalidDestinationException("Invalid routing
key:"+amqd.getRoutingKey().toString());
+ JMSException ide = new
InvalidDestinationException("Invalid routing key:" +
amqd.getRoutingKey().toString());
ide.setLinkedException(e);
throw ide;
}
@@ -1731,7 +1842,7 @@
return _channelId;
}
- void start()
+ void start() throws AMQException
{
//fixme This should be controlled by _stopped as it pairs with the
stop method
//fixme or check the FlowControlledBlockingQueue _queue to see if we
have flow controlled.
@@ -1740,7 +1851,7 @@
if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to
resume delivery
- unsuspendChannel();
+ suspendChannel(false);
}
if (hasMessageListeners())
@@ -1773,10 +1884,10 @@
}
}
- void stop()
+ void stop() throws AMQException
{
//stop the server delivering messages to this session
- suspendChannel();
+ suspendChannel(true);
if (_dispatcher != null)
{
@@ -1837,6 +1948,12 @@
_destinationConsumerCount.remove(dest);
}
}
+
+ //ensure we remove the messages from the consumer even if the
dispatcher hasn't started
+ if (_dispatcher == null)
+ {
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+ }
}
}
@@ -1889,35 +2006,54 @@
}
}
- private void suspendChannel()
+ private void suspendChannel(boolean suspend) throws AMQException
{
- _logger.warn("Suspending channel");
- // TODO: Be aware of possible changes to parameter order as versions
change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
- false);
// active
- getProtocolHandler().writeFrame(channelFlowFrame);
- }
+ synchronized (_suspensionLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting channel flow : " + (suspend ?
"suspended" : "unsuspended"));
+ }
- private void unsuspendChannel()
- {
- _logger.warn("Unsuspending channel");
- // TODO: Be aware of possible changes to parameter order as versions
change.
- AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId,
-
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
- true);
// active
- getProtocolHandler().writeFrame(channelFlowFrame);
+ _suspended = suspend;
+
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+ AMQFrame channelFlowFrame =
ChannelFlowBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+
!suspend); // active
+
+ _connection.getProtocolHandler().syncWrite(channelFlowFrame,
ChannelFlowOkBody.class);
+ }
}
+
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
BasicMessageConsumer consumer = (BasicMessageConsumer)
_consumers.get(consumerTag);
- if ((consumer != null) && (consumer.isAutoClose()))
+ if (consumer != null)
{
- consumer.closeWhenNoMessages(true);
+ if (consumer.isAutoClose())
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ else
+ {
+ consumer.rollback();
+ }
}
- }
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _dispatcher.rejectPending(consumerTag);
+ }
+ else
+ {
+ rejectMessagesForConsumerTag(consumerTag, true);
+ }
+ }
/*
* I could have combined the last 3 methods, but this way it improves
readability
@@ -2008,5 +2144,75 @@
}
+ private class SuspenderRunner implements Runnable
+ {
+ private boolean _suspend;
+
+ public SuspenderRunner(boolean suspend)
+ {
+ _suspend = suspend;
+ }
+
+ public void run()
+ {
+ try
+ {
+ suspendChannel(_suspend);
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to suspend channel");
+ }
+ }
+ }
+
+
+ private void rejectAllMessages(boolean requeue)
+ {
+ rejectMessagesForConsumerTag(null, requeue);
+ }
+
+ /** @param consumerTag The consumerTag to prune from queue or all if null
+ * @param requeue Should the removed messages be requeued (or discarded.
Possibly to DLQ)
+ */
+
+ private void rejectMessagesForConsumerTag(AMQShortString consumerTag,
boolean requeue)
+ {
+ Iterator messages = _queue.iterator();
+
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message = (UnprocessedMessage) messages.next();
+
+ if (consumerTag == null ||
message.getDeliverBody().consumerTag.equals(consumerTag))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Removing message from _queue:" + message);
+ }
+
+ messages.remove();
+
+// rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+
+ _logger.debug("Rejected the message(" +
message.getDeliverBody() + ") for consumer :" + consumerTag);
+ }
+ else
+ {
+ _logger.error("Pruned pending message for consumer:" +
consumerTag);
+ }
+ }
+ }
+
+ public void rejectMessage(long deliveryTag, boolean requeue)
+ {
+ AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+ deliveryTag,
+ requeue);
+
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Feb 21 07:47:17 2007
@@ -49,9 +49,7 @@
{
private static final Logger _logger =
Logger.getLogger(BasicMessageConsumer.class);
- /**
- * The connection being used by this consumer
- */
+ /** The connection being used by this consumer */
private AMQConnection _connection;
private String _messageSelector;
@@ -60,33 +58,20 @@
private AMQDestination _destination;
- /**
- * When true indicates that a blocking receive call is in progress
- */
+ /** When true indicates that a blocking receive call is in progress */
private final AtomicBoolean _receiving = new AtomicBoolean(false);
- /**
- * Holds an atomic reference to the listener installed.
- */
+ /** Holds an atomic reference to the listener installed. */
private final AtomicReference<MessageListener> _messageListener = new
AtomicReference<MessageListener>();
- /**
- * The consumer tag allows us to close the consumer by sending a jmsCancel
method to the
- * broker
- */
+ /** The consumer tag allows us to close the consumer by sending a
jmsCancel method to the broker */
private AMQShortString _consumerTag;
- /**
- * We need to know the channel id when constructing frames
- */
+ /** We need to know the channel id when constructing frames */
private int _channelId;
/**
- * Used in the blocking receive methods to receive a message from
- * the Session thread.
- * <p/>
- * Or to notify of errors
- * <p/>
- * Argument true indicates we want strict FIFO semantics
+ * Used in the blocking receive methods to receive a message from the
Session thread. <p/> Or to notify of errors
+ * <p/> Argument true indicates we want strict FIFO semantics
*/
private final ArrayBlockingQueue _synchronousQueue;
@@ -96,55 +81,48 @@
private AMQProtocolHandler _protocolHandler;
- /**
- * We need to store the "raw" field table so that we can resubscribe in
the event of failover being required
- */
+ /** We need to store the "raw" field table so that we can resubscribe in
the event of failover being required */
private FieldTable _rawSelectorFieldTable;
/**
- * We store the high water prefetch field in order to be able to reuse it
when resubscribing in the event of failover
+ * We store the high water prefetch field in order to be able to reuse it
when resubscribing in the event of
+ * failover
*/
private int _prefetchHigh;
/**
- * We store the low water prefetch field in order to be able to reuse it
when resubscribing in the event of failover
+ * We store the low water prefetch field in order to be able to reuse it
when resubscribing in the event of
+ * failover
*/
private int _prefetchLow;
- /**
- * We store the exclusive field in order to be able to reuse it when
resubscribing in the event of failover
- */
+ /** We store the exclusive field in order to be able to reuse it when
resubscribing in the event of failover */
private boolean _exclusive;
/**
- * The acknowledge mode in force for this consumer. Note that the AMQP
protocol allows different ack modes
- * per consumer whereas JMS defines this at the session level, hence why
we associate it with the consumer in our
+ * The acknowledge mode in force for this consumer. Note that the AMQP
protocol allows different ack modes per
+ * consumer whereas JMS defines this at the session level, hence why we
associate it with the consumer in our
* implementation.
*/
private int _acknowledgeMode;
- /**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
+ /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
- /**
- * Tag of last message delievered, whoch should be acknowledged on commit
in
- * transaction mode.
- */
+ /** Tag of last message delievered, whoch should be acknowledged on commit
in transaction mode. */
private long _lastDeliveryTag;
/**
- * Switch to enable sending of acknowledgements when using
DUPS_OK_ACKNOWLEDGE mode.
- * Enabled when _outstannding number of msgs >= _prefetchHigh and disabled
at < _prefetchLow
+ * Switch to enable sending of acknowledgements when using
DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
+ * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
*/
private boolean _dups_ok_acknowledge_send;
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new
ConcurrentLinkedQueue<Long>();
/**
- * The thread that was used to call receive(). This is important for being
able to interrupt that thread if
- * a receive() is in progress.
+ * The thread that was used to call receive(). This is important for being
able to interrupt that thread if a
+ * receive() is in progress.
*/
private Thread _receivingThread;
@@ -417,13 +395,15 @@
}
/**
- * We can get back either a Message or an exception from the queue. This
method examines the argument and deals
- * with it by throwing it (if an exception) or returning it (in any other
case).
+ * We can get back either a Message or an exception from the queue. This
method examines the argument and deals with
+ * it by throwing it (if an exception) or returning it (in any other case).
*
* @param o
+ *
* @return a message only if o is a Message
- * @throws JMSException if the argument is a throwable. If it is a
JMSException it is rethrown as is, but if not
- * a JMSException is created with the linked
exception set appropriately
+ *
+ * @throws JMSException if the argument is a throwable. If it is a
JMSException it is rethrown as is, but if not a
+ * JMSException is created with the linked exception
set appropriately
*/
private AbstractJMSMessage returnMessageOrThrow(Object o)
throws JMSException
@@ -488,9 +468,8 @@
}
/**
- * Called when you need to invalidate a consumer. Used for example when
failover has occurred and the
- * client has vetoed automatic resubscription.
- * The caller must hold the failover mutex.
+ * Called when you need to invalidate a consumer. Used for example when
failover has occurred and the client has
+ * vetoed automatic resubscription. The caller must hold the failover
mutex.
*/
void markClosed()
{
@@ -499,8 +478,8 @@
}
/**
- * Called from the AMQSession when a message has arrived for this
consumer. This methods handles both the case
- * of a message listener or a synchronous receive() caller.
+ * Called from the AMQSession when a message has arrived for this
consumer. This methods handles both the case of a
+ * message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
* @param channelId channel on which this message was sent
@@ -643,9 +622,7 @@
}
}
- /**
- * Acknowledge up to last message delivered (if any). Used when commiting.
- */
+ /** Acknowledge up to last message delivered (if any). Used when
commiting. */
void acknowledgeLastDelivered()
{
if (_lastDeliveryTag > 0)
@@ -676,8 +653,8 @@
/**
- * Perform cleanup to deregister this consumer. This occurs when closing
the consumer in both the clean
- * case and in the case of an error occurring.
+ * Perform cleanup to deregister this consumer. This occurs when closing
the consumer in both the clean case and in
+ * the case of an error occurring.
*/
private void deregisterConsumer()
{
@@ -728,9 +705,7 @@
}
}
- /**
- * Called on recovery to reset the list of delivery tags
- */
+ /** Called on recovery to reset the list of delivery tags */
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
@@ -760,4 +735,42 @@
}
}
+
+ public void rollback()
+ {
+
+ if (_synchronousQueue.size() > 0)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting the messages for consumer with tag:"
+ _consumerTag);
+ }
+ for (Object o : _synchronousQueue)
+ {
+ if (o instanceof AbstractJMSMessage)
+ {
+// _session.rejectMessage(((AbstractJMSMessage)
o).getDeliveryTag(), true);
+
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejected message" + o);
+ }
+
+ }
+ else
+ {
+ _logger.error("Queue contained a :" + o.getClass() +
+ " unable to reject as it is not an
AbstractJMSMessage. Will be cleared");
+ }
+ }
+
+ if (_synchronousQueue.size() != 0)
+ {
+ _logger.warn("Queue was not empty after rejecting all
messages");
+ }
+
+ _synchronousQueue.clear();
+ }
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
Wed Feb 21 07:47:17 2007
@@ -64,7 +64,10 @@
protocolSession.writeFrame(frame);
if (errorCode != AMQConstant.REPLY_SUCCESS)
{
- _logger.error("Channel close received with errorCode " + errorCode
+ ", and reason " + reason);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Channel close received with errorCode " +
errorCode + ", and reason " + reason);
+ }
if (errorCode == AMQConstant.NO_CONSUMERS)
{
throw new AMQNoConsumersException("Error: " + reason, null);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
Wed Feb 21 07:47:17 2007
@@ -20,23 +20,23 @@
*/
package org.apache.qpid.client.util;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.log4j.Logger;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Iterator;
/**
- * A blocking queue that emits events above a user specified threshold allowing
- * the caller to take action (e.g. flow control) to try to prevent the queue
- * growing (much) further. The underlying queue itself is not bounded therefore
- * the caller is not obliged to react to the events.
- * <p/>
- * This implementation is <b>only</b> safe where we have a single thread adding
- * items and a single (different) thread removing items.
+ * A blocking queue that emits events above a user specified threshold
allowing the caller to take action (e.g. flow
+ * control) to try to prevent the queue growing (much) further. The underlying
queue itself is not bounded therefore the
+ * caller is not obliged to react to the events. <p/> This implementation is
<b>only</b> safe where we have a single
+ * thread adding items and a single (different) thread removing items.
*/
public class FlowControllingBlockingQueue
{
- /**
- * This queue is bounded and is used to store messages before being
dispatched to the consumer
- */
+ /** This queue is bounded and is used to store messages before being
dispatched to the consumer */
private final BlockingQueue _queue = new LinkedBlockingQueue();
private final int _flowControlHighThreshold;
@@ -44,12 +44,10 @@
private final ThresholdListener _listener;
- /**
- * We require a separate count so we can track whether we have reached the
- * threshold
- */
+ /** We require a separate count so we can track whether we have reached
the threshold */
private int _count;
+
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
@@ -74,7 +72,7 @@
Object o = _queue.take();
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (_count-- == _flowControlLowThreshold)
{
@@ -90,7 +88,7 @@
_queue.add(o);
if (_listener != null)
{
- synchronized(_listener)
+ synchronized (_listener)
{
if (++_count == _flowControlHighThreshold)
{
@@ -98,6 +96,11 @@
}
}
}
+ }
+
+ public Iterator iterator()
+ {
+ return _queue.iterator();
}
}
Modified: incubator/qpid/trunk/qpid/specs/amqp.0-8.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/amqp.0-8.xml?view=diff&rev=510060&r1=510059&r2=510060
==============================================================================
--- incubator/qpid/trunk/qpid/specs/amqp.0-8.xml (original)
+++ incubator/qpid/trunk/qpid/specs/amqp.0-8.xml Wed Feb 21 07:47:17 2007
@@ -2514,6 +2514,15 @@
message, potentially then delivering it to an alternative subscriber.
</doc>
</field>
+ <field name = "nowait" type = "bit">
+ do not send a reply method
+ <doc>
+ If set, the server will not respond to the method. The client should
+ not wait for a reply method. If the server could not complete the
+ method it will raise a channel or connection exception.
+ </doc>
+ </field>
+
<doc name="rule">
The server MUST set the redelivered flag on all messages that are resent.
</doc>
@@ -2521,7 +2530,16 @@
The server MUST raise a channel exception if this is called on a
transacted channel.
</doc>
-</method>
+ <response name="rollback-ok"/>
+ </method>
+ <method name="recover-ok" synchronous="1" index="101">
+ confirm a successful recover
+ <doc>
+ This method confirms to the client that the recover succeeded.
+ Note that if an recover fails, the server raises a channel exception.
+ </doc>
+ <chassis name="client" implement="MUST"/>
+ </method>
</class>