Author: ritchiem
Date: Thu Jan 11 17:23:43 2007
New Revision: 495460
URL: http://svn.apache.org/viewvc?view=rev&rev=495460
Log:
QPID-276
Update to AMQChannel to remove race condition over UnacknowledgedMessageMap
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Thu Jan 11 17:23:43 2007
@@ -275,7 +275,7 @@
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue,
AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal) throws
AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean
noLocal) throws AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -326,20 +326,24 @@
/**
* Add a message to the channel-based list of unacknowledged messages
*
- * @param message the message that was delivered
+ * @param message the message that was delivered
* @param deliveryTag the delivery tag used when delivering the message
(see protocol spec for description of
- * the delivery tag)
- * @param queue the queue from which the message was delivered
+ * the delivery tag)
+ * @param queue the queue from which the message was delivered
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag,
AMQShortString consumerTag, AMQQueue queue)
{
- _unacknowledgedMessageMap.add(deliveryTag, new
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
- checkSuspension();
+ synchronized (_unacknowledgedMessageMap.getLock())
+ {
+ _unacknowledgedMessageMap.add(deliveryTag, new
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+ checkSuspension();
+ }
}
/**
* Called to attempt re-enqueue all outstanding unacknowledged messages on
the channel.
* May result in delivery to this same channel or to other subscribers.
+ *
* @throws org.apache.qpid.AMQException if the requeue fails
*/
public void requeue() throws AMQException
@@ -427,8 +431,11 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws
AMQException
{
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple,
_txnContext);
- checkSuspension();
+ synchronized (_unacknowledgedMessageMap.getLock())
+ {
+ _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag,
multiple, _txnContext);
+ checkSuspension();
+ }
}
/**
@@ -450,6 +457,7 @@
private void checkSuspension()
{
boolean suspend;
+
suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
setSuspended(suspend);
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
Thu Jan 11 17:23:43 2007
@@ -43,6 +43,8 @@
void visit(Visitor visitor) throws AMQException;
+ Object getLock();
+
void add(long deliveryTag, UnacknowledgedMessage message);
void collect(long deliveryTag, boolean multiple,
List<UnacknowledgedMessage> msgs);
@@ -67,6 +69,7 @@
/**
* Get the set of delivery tags that are outstanding.
+ *
* @return a set of delivery tags
*/
Set<Long> getDeliveryTags();
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
Thu Jan 11 17:23:43 2007
@@ -75,7 +75,7 @@
{
synchronized (_lock)
{
- for(UnacknowledgedMessage msg : msgs)
+ for (UnacknowledgedMessage msg : msgs)
{
_map.remove(msg.deliveryTag);
}
@@ -95,7 +95,7 @@
synchronized (_lock)
{
Collection<UnacknowledgedMessage> currentEntries = _map.values();
- for (UnacknowledgedMessage msg: currentEntries)
+ for (UnacknowledgedMessage msg : currentEntries)
{
visitor.callback(msg);
}
@@ -103,9 +103,14 @@
}
}
+ public Object getLock()
+ {
+ return _lock;
+ }
+
public void add(long deliveryTag, UnacknowledgedMessage message)
{
- synchronized( _lock)
+ synchronized (_lock)
{
_map.put(deliveryTag, message);
_lastDeliveryTag = deliveryTag;
@@ -209,7 +214,7 @@
{
synchronized (_lock)
{
- for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+ for (Map.Entry<Long, UnacknowledgedMessage> entry :
_map.entrySet())
{
msgs.add(entry.getValue());
if (entry.getKey() == key)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=495460&r1=495459&r2=495460
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Thu Jan 11 17:23:43 2007
@@ -134,6 +134,7 @@
{
_logger.info("Protocol Session closed");
final AMQProtocolSession amqProtocolSession =
AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
+ //fixme -- this can be null
amqProtocolSession.closeSession();
}