Author: arnaudsimon
Date: Wed Feb 6 04:56:20 2008
New Revision: 618986
URL: http://svn.apache.org/viewvc?rev=618986&view=rev
Log:
QPID-777 and QPID-778
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Feb 6 04:56:20 2008
@@ -24,7 +24,6 @@
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -78,8 +77,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.AMQBindingURL;
@@ -183,14 +180,14 @@
* keeps a record of subscriptions which have been created in the current
instance. It does not remember
* subscriptions between executions of the client.
*/
- private final ConcurrentHashMap<String, TopicSubscriberAdaptor>
_subscriptions =
+ protected final ConcurrentHashMap<String, TopicSubscriberAdaptor>
_subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
/**
* Holds a mapping from message consumers to their identifying names, so
that their subscriptions may be looked
* up in the [EMAIL PROTECTED] #_subscriptions} map.
*/
- private final ConcurrentHashMap<BasicMessageConsumer, String>
_reverseSubscriptionMap =
+ protected final ConcurrentHashMap<BasicMessageConsumer, String>
_reverseSubscriptionMap =
new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
@@ -271,10 +268,10 @@
protected final boolean _immediatePrefetch;
/** Indicates that warnings should be generated on violations of the
strict AMQP. */
- private final boolean _strictAMQP;
+ protected final boolean _strictAMQP;
/** Indicates that runtime exceptions should be generated on vilations of
the strict AMQP. */
- private final boolean _strictAMQPFATAL;
+ protected final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
/**
@@ -459,8 +456,8 @@
{
if (_logger.isInfoEnabled())
{
- _logger.info("Closing session: " + this + ":"
- +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ _logger.info("Closing session: " + this );//+ ":"
+ // +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
}
synchronized (_messageDeliveryLock)
@@ -673,6 +670,14 @@
false, false);
}
+ public MessageConsumer createExclusiveConsumer(Destination destination)
throws JMSException
+ {
+ checkValidDestination(destination);
+
+ return createConsumerImpl(destination, _defaultPrefetchHighMark,
_defaultPrefetchLowMark, false, true, null, null,
+ false, false);
+ }
+
public MessageConsumer createConsumer(Destination destination, String
messageSelector) throws JMSException
{
checkValidDestination(destination);
@@ -723,70 +728,7 @@
false);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
- {
-
- checkNotClosed();
- AMQTopic origTopic = checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
- TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
- if (subscriber != null)
- {
- if (subscriber.getTopic().equals(topic))
- {
- throw new IllegalStateException("Already subscribed to topic "
+ topic + " with subscription exchange "
- + name);
- }
- else
- {
- unsubscribe(name);
- }
- }
- else
- {
- AMQShortString topicName;
- if (topic instanceof AMQTopic)
- {
- topicName = ((AMQTopic) topic).getRoutingKey();
- }
- else
- {
- topicName = new AMQShortString(topic.getTopicName());
- }
-
- if (_strictAMQP)
- {
- if (_strictAMQPFATAL)
- {
- throw new UnsupportedOperationException("JMS Durable not
currently supported by AMQP.");
- }
- else
- {
- _logger.warn("Unable to determine if subscription already
exists for '" + topicName + "' "
- + "for creation durableSubscriber. Requesting
queue deletion regardless.");
- }
-
- deleteQueue(dest.getAMQQueueName());
- }
- else
- {
- // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
- // says we must trash the subscription.
- if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName())
- && !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
- {
- deleteQueue(dest.getAMQQueueName());
- }
- }
- }
-
- subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createConsumer(dest));
-
- _subscriptions.put(name, subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
- return subscriber;
- }
+ public abstract TopicSubscriber createDurableSubscriber(Topic topic,
String name) throws JMSException;
/** Note, currently this does not handle reuse of the same name with
different topics correctly. */
public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal)
@@ -1800,7 +1742,7 @@
/*
* I could have combined the last 3 methods, but this way it improves
readability
*/
- private AMQTopic checkValidTopic(Topic topic) throws JMSException
+ protected AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
{
@@ -2060,7 +2002,7 @@
*
* @todo Be aware of possible changes to parameter order as versions
change.
*/
- private void deleteQueue(final AMQShortString queueName) throws
JMSException
+ protected void deleteQueue(final AMQShortString queueName) throws
JMSException
{
try
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Feb 6 04:56:20 2008
@@ -38,14 +38,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.HashMap;
-
/**
* This is a 0.10 Session
*/
@@ -146,6 +143,25 @@
//------- overwritten methods of class AMQSession
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal)
+ throws JMSException
+ {
+ checkNotClosed();
+ checkValidTopic(topic);
+ if( _subscriptions.containsKey(name))
+ {
+ _subscriptions.get(name).close();
+ }
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name,
_connection);
+ BasicMessageConsumer consumer = (BasicMessageConsumer)
createConsumer(dest, messageSelector, noLocal);
+ TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest,
consumer);
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
/**
* Acknowledge one or many messages.
*
@@ -362,6 +378,14 @@
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
+ if(consumer.isStrated())
+ {
+ // set the flow
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+
+ }
getQpidSession().sync();
getCurrentException();
}
@@ -462,11 +486,11 @@
//only set if msg list is null
try
{
- if (consumer.getMessageListener() != null)
- {
+ // if (consumer.getMessageListener() != null)
+ // {
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_MESSAGE,
MAX_PREFETCH);
- }
+ // }
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
@@ -579,8 +603,7 @@
void start() throws AMQException
{
-
- super.suspendChannel(false);
+ suspendChannel(false);
for(BasicMessageConsumer c: _consumers.values())
{
c.start();
@@ -601,7 +624,7 @@
}
}
- synchronized void startDistpatcherIfNecessary()
+ synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
if (!_immediatePrefetch)
@@ -621,5 +644,72 @@
}
startDistpatcherIfNecessary(false);
+ }
+
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic=checkValidTopic(topic);
+ AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name,
_connection);
+
+ TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to topic "
+ topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName=((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName=new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable not
currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription already
exists for '" + topicName + "' "
+ + "for creation durableSubscriber. Requesting
queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer)
createExclusiveConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Wed Feb 6 04:56:20 2008
@@ -21,9 +21,8 @@
package org.apache.qpid.client;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
@@ -333,4 +332,70 @@
return new AMQTemporaryQueue(this);
}
+
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
+ {
+
+ checkNotClosed();
+ AMQTopic origTopic = checkValidTopic(topic);
+ AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name,
_connection);
+ TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+ if (subscriber != null)
+ {
+ if (subscriber.getTopic().equals(topic))
+ {
+ throw new IllegalStateException("Already subscribed to
topic " + topic + " with subscription exchange "
+ + name);
+ }
+ else
+ {
+ unsubscribe(name);
+ }
+ }
+ else
+ {
+ AMQShortString topicName;
+ if (topic instanceof AMQTopic)
+ {
+ topicName = ((AMQTopic) topic).getRoutingKey();
+ }
+ else
+ {
+ topicName = new AMQShortString(topic.getTopicName());
+ }
+
+ if (_strictAMQP)
+ {
+ if (_strictAMQPFATAL)
+ {
+ throw new UnsupportedOperationException("JMS Durable
not currently supported by AMQP.");
+ }
+ else
+ {
+ _logger.warn("Unable to determine if subscription
already exists for '" + topicName + "' "
+ + "for creation durableSubscriber.
Requesting queue deletion regardless.");
+ }
+
+ deleteQueue(dest.getAMQQueueName());
+ }
+ else
+ {
+ // if the queue is bound to the exchange but NOT for this
topic, then the JMS spec
+ // says we must trash the subscription.
+ if (isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName())
+ && !isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(), topicName))
+ {
+ deleteQueue(dest.getAMQQueueName());
+ }
+ }
+ }
+
+ subscriber = new TopicSubscriberAdaptor(dest,
(BasicMessageConsumer) createConsumer(dest));
+
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+ return subscriber;
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
Wed Feb 6 04:56:20 2008
@@ -71,12 +71,26 @@
queueName, isDurable);
}
+ protected AMQTopic(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, boolean isExclusive,
+ boolean isAutoDelete, AMQShortString queueName,
boolean isDurable)
+ {
+ super(exchangeName, exchangeClass, routingKey, isExclusive,
isAutoDelete, queueName, isDurable );
+ }
+
+
public static AMQTopic createDurableTopic(AMQTopic topic, String
subscriptionName, AMQConnection connection)
throws JMSException
{
return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(),
false,
getDurableTopicQueueName(subscriptionName,
connection),
true);
+ }
+
+ public static AMQTopic createDurable010Topic(AMQTopic topic, String
subscriptionName, AMQConnection connection)
+ throws JMSException
+ {
+ return new AMQTopic(topic.getExchangeName(),
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+ getDurableTopicQueueName(subscriptionName, connection), false);
}
public static AMQShortString getDurableTopicQueueName(String
subscriptionName, AMQConnection connection) throws JMSException
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Feb 6 04:56:20 2008
@@ -385,7 +385,23 @@
}
}
- public abstract Object getMessageFromQueue(long l) throws
InterruptedException;
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ Object o;
+ if (l > 0)
+ {
+ o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+ }
+ else if (l < 0)
+ {
+ o = _synchronousQueue.poll();
+ }
+ else
+ {
+ o = _synchronousQueue.take();
+ }
+ return o;
+ }
private boolean closeOnAutoClose() throws JMSException
{
@@ -977,6 +993,12 @@
public void stop()
{
// do nothing as this is a 0_10 feature
+ }
+
+ public boolean isStrated()
+ {
+ // do nothing as this is a 0_10 feature
+ return false;
}
public AMQShortString getQueuename()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Wed Feb 6 04:56:20 2008
@@ -19,10 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +38,6 @@
import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -50,12 +46,7 @@
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[],
ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * A counter for keeping the number of available messages for this consumer
- */
- private final AtomicLong _messageCounter = new AtomicLong(0);
-
- /**
+ /**
* Number of received message so far
*/
private final AtomicLong _messagesReceived = new AtomicLong(0);
@@ -117,11 +108,17 @@
// ----- Interface org.apache.qpidity.client.util.MessageListener
/**
+ *
+ * This is invoked by the session thread when emptying the session message
queue.
+ * We first check if the message is valid (match the selector) and then
deliver it to the
+ * message listener or to the sync consumer queue.
+ *
* @param jmsMessage this message has already been processed so can't redo
preDeliver
* @param channelId
*/
public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
+ _messagesReceived.incrementAndGet();
boolean messageOk = false;
try
{
@@ -136,12 +133,6 @@
}
catch (Exception e1)
{
- // the receiver may be waiting for a message
- if (_messageCounter.get() >= 0)
- {
- _messageCounter.decrementAndGet();
- _synchronousQueue.add(new NullTocken());
- }
// we should silently log thie exception as it only hanppens
when the connection is closed
_logger.error("Exception when receiving message", e1);
}
@@ -152,20 +143,28 @@
}
}
- public void onMessage(Message message)
+ /**
+ * Require more credit for this consumer
+ */
+ private void requireMoreCreditIfNecessary()
{
- if (isMessageListenerSet())
+ if (_isStarted && _messagesReceived.get() >=
AMQSession_0_10.MAX_PREFETCH)
{
- _messagesReceived.incrementAndGet();
- if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
-
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-
AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
+ // require more credit
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ AMQSession_0_10.MAX_PREFETCH);
+ _messagesReceived.set(0);
}
+ }
+
+ /**
+ * This method is invoked by the transport layer when a message is
delivered for this
+ * consumer. The message is transformed and pass to the session.
+ * @param message an 0.10 message
+ */
+ public void onMessage(Message message)
+ {
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
String consumerTag = getConsumerTag().toString();
@@ -207,8 +206,6 @@
newMessage.setReplyToURL(replyToUrl);
}
newMessage.setContentHeader(headers);
- // increase the counter of messages
- _messageCounter.incrementAndGet();
getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -242,10 +239,20 @@
{
// notify the session
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+ if (isMessageListenerSet())
+ {
+ requireMoreCreditIfNecessary();
+ }
+ else if (_synchronousQueue.isEmpty())
+ {
+ requireMoreCreditIfNecessary();
+ }
//if (!Boolean.getBoolean("noAck"))
//{
super.postDeliver(msg);
//}
+
+
}
void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +358,9 @@
}
messageOk = acquireMessage(message);
}
- if (!messageOk)
- {
- requestCreditIfCreditMode();
- }
return messageOk;
}
- private void requestCreditIfCreditMode()
- {
- try
- {
- // the current message received is not good, so we need to get a
message.
- if (getMessageListener() == null)
- {
- int oldval = _messageCounter.intValue();
-
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.intValue() <= oldval)
- {
- // we haven't received a message so tell the receiver to
return null
- _synchronousQueue.add(new NullTocken());
- }
- else
- {
- _messageCounter.decrementAndGet();
- }
- }
- // we now need to check if we have received a message
-
- }
- catch (Exception e)
- {
- _logger.error(
- "Error getting message listener, couldn't request credit
after releasing a message that failed the selector test",
- e);
- }
- }
/**
* Acknowledge a message
@@ -469,16 +435,18 @@
super.setMessageListener(messageListener);
if (messageListener == null)
{
-
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+ /*
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
_0_10session.getQpidSession()
.messageFlowMode(getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
+ */
}
else
{
+ //TODO: empty the list of sync messages.
if (_connection.started())
{
_0_10session.getQpidSession()
@@ -491,65 +459,13 @@
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
_messagesReceived.set(0);
- ;
}
}
}
- public Object getMessageFromQueue(long l) throws InterruptedException
+ public boolean isStrated()
{
- if (!_isStarted)
- {
- return null;
- }
- Object o;
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- if (l == 0)
- {
- o = _synchronousQueue.take();
- }
- else
- {
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else
- {
- o = _synchronousQueue.poll();
- }
- if (o == null)
- {
- _logger.debug("Message Didn't arrive in time, checking if one
is inflight");
- // checking if one is inflight
-
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- _0_10session.getQpidSession()
- .messageFlow(getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
- if (_messageCounter.get() > 0)
- {
- o = _synchronousQueue.take();
- }
- }
- }
- if (o instanceof NullTocken)
- {
- o = null;
- }
- return o;
- }
-
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws
JMSException
- {
- _messageCounter.decrementAndGet();
- super.preApplicationProcessing(jmsMsg);
- }
-
- private class NullTocken
- {
-
+ return _isStarted;
}
public void start()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
Wed Feb 6 04:56:20 2008
@@ -86,22 +86,5 @@
messageFrame.getRoutingKey(), messageFrame.getContentHeader(),
messageFrame.getBodies());
}
-
- public Object getMessageFromQueue(long l) throws InterruptedException
- {
- Object o;
- if (l > 0)
- {
- o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
- }
- else if (l < 0)
- {
- o = _synchronousQueue.poll();
- }
- else
- {
- o = _synchronousQueue.take();
- }
- return o;
- }
+
}