Author: rgreig
Date: Fri Dec 15 11:21:46 2006
New Revision: 487625
URL: http://svn.apache.org/viewvc?view=rev&rev=487625
Log:
QPID-194 Patch supplied by Rob Godfrey.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487625&r1=487624&r2=487625
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri Dec 15 11:21:46 2006
@@ -165,6 +165,8 @@
if (consumer == null)
{
_logger.warn("Received a message from queue " +
message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Consumers that exist: " + _consumers);
+ _logger.warn("Session hashcode: " +
System.identityHashCode(this));
}
else
{
@@ -958,26 +960,37 @@
* @param queueName
* @return the consumer tag generated by the broker
*/
- private String consumeFromQueue(String queueName, AMQProtocolHandler
protocolHandler, int prefetchHigh, int prefetchLow,
- boolean noLocal, boolean exclusive, int
acknowledgeMode, boolean nowait) throws AMQException
+ private void consumeFromQueue(BasicMessageConsumer consumer, String
queueName, AMQProtocolHandler protocolHandler,
+ boolean nowait) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as
parametsrs?
//need to generate a consumer tag on the client so we can exploit the
nowait flag
String tag = Integer.toString(_nextTag++);
- AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
- queueName, tag,
noLocal,
- acknowledgeMode
== Session.NO_ACKNOWLEDGE,
- exclusive,
nowait);
- if (nowait)
+ consumer.setConsumerTag(tag);
+ // we must register the consumer in the map before we actually start
listening
+ _consumers.put(tag, consumer);
+ try
{
- protocolHandler.writeFrame(jmsConsume);
+ AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId,
0,
+ queueName,
tag, consumer.isNoLocal(),
+
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+
consumer.isExclusive(), nowait);
+ if (nowait)
+ {
+ protocolHandler.writeFrame(jmsConsume);
+ }
+ else
+ {
+ protocolHandler.syncWrite(jmsConsume,
BasicConsumeOkBody.class);
+ }
}
- else
+ catch (AMQException e)
{
- protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+ // clean-up the map in the event of an error
+ _consumers.remove(tag);
+ throw e;
}
- return tag;
}
public Queue createQueue(String queueName) throws JMSException
@@ -1354,12 +1367,7 @@
bindQueue(amqd, queueName, protocolHandler,
consumer.getRawSelectorFieldTable());
- String consumerTag = consumeFromQueue(queueName, protocolHandler,
consumer.getPrefetchHigh(),
- consumer.getPrefetchLow(),
consumer.isNoLocal(), consumer.isExclusive(),
- consumer.getAcknowledgeMode(),
nowait);
-
- consumer.setConsumerTag(consumerTag);
- _consumers.put(consumerTag, consumer);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait);
}
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487625&r1=487624&r2=487625
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Fri Dec 15 11:21:46 2006
@@ -280,7 +280,7 @@
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try