Author: arnaudsimon
Date: Wed Feb  6 04:56:20 2008
New Revision: 618986

URL: http://svn.apache.org/viewvc?rev=618986&view=rev
Log:
QPID-777 and QPID-778

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Feb  6 04:56:20 2008
@@ -24,7 +24,6 @@
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -78,8 +77,6 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.AMQBindingURL;
@@ -183,14 +180,14 @@
      * keeps a record of subscriptions which have been created in the current 
instance. It does not remember
      * subscriptions between executions of the client.
      */
-    private final ConcurrentHashMap<String, TopicSubscriberAdaptor> 
_subscriptions =
+    protected final ConcurrentHashMap<String, TopicSubscriberAdaptor> 
_subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
 
     /**
      * Holds a mapping from message consumers to their identifying names, so 
that their subscriptions may be looked
      * up in the [EMAIL PROTECTED] #_subscriptions} map.
      */
-    private final ConcurrentHashMap<BasicMessageConsumer, String> 
_reverseSubscriptionMap =
+    protected final ConcurrentHashMap<BasicMessageConsumer, String> 
_reverseSubscriptionMap =
             new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
@@ -271,10 +268,10 @@
     protected final boolean _immediatePrefetch;
 
     /** Indicates that warnings should be generated on violations of the 
strict AMQP. */
-    private final boolean _strictAMQP;
+    protected final boolean _strictAMQP;
 
     /** Indicates that runtime exceptions should be generated on vilations of 
the strict AMQP. */
-    private final boolean _strictAMQPFATAL;
+    protected final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
     /**
@@ -459,8 +456,8 @@
     {
         if (_logger.isInfoEnabled())
         {
-            _logger.info("Closing session: " + this + ":"
-                         + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+            _logger.info("Closing session: " + this );//+ ":"
+                        // + 
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
         }
 
         synchronized (_messageDeliveryLock)
@@ -673,6 +670,14 @@
                                   false, false);
     }
 
+    public MessageConsumer createExclusiveConsumer(Destination destination) 
throws JMSException
+    {
+        checkValidDestination(destination);
+
+        return createConsumerImpl(destination, _defaultPrefetchHighMark, 
_defaultPrefetchLowMark, false, true, null, null,
+                                  false, false);
+    }
+
     public MessageConsumer createConsumer(Destination destination, String 
messageSelector) throws JMSException
     {
         checkValidDestination(destination);
@@ -723,70 +728,7 @@
                                   false);
     }
 
-    public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
-    {
-
-        checkNotClosed();
-        AMQTopic origTopic = checkValidTopic(topic);
-        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
-        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
-        if (subscriber != null)
-        {
-            if (subscriber.getTopic().equals(topic))
-            {
-                throw new IllegalStateException("Already subscribed to topic " 
+ topic + " with subscription exchange "
-                                                + name);
-            }
-            else
-            {
-                unsubscribe(name);
-            }
-        }
-        else
-        {
-            AMQShortString topicName;
-            if (topic instanceof AMQTopic)
-            {
-                topicName = ((AMQTopic) topic).getRoutingKey();
-            }
-            else
-            {
-                topicName = new AMQShortString(topic.getTopicName());
-            }
-
-            if (_strictAMQP)
-            {
-                if (_strictAMQPFATAL)
-                {
-                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
-                }
-                else
-                {
-                    _logger.warn("Unable to determine if subscription already 
exists for '" + topicName + "' "
-                                 + "for creation durableSubscriber. Requesting 
queue deletion regardless.");
-                }
-
-                deleteQueue(dest.getAMQQueueName());
-            }
-            else
-            {
-                // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
-                // says we must trash the subscription.
-                if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName())
-                    && !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
-                {
-                    deleteQueue(dest.getAMQQueueName());
-                }
-            }
-        }
-
-        subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createConsumer(dest));
-
-        _subscriptions.put(name, subscriber);
-        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
-
-        return subscriber;
-    }
+    public abstract TopicSubscriber createDurableSubscriber(Topic topic, 
String name) throws JMSException;
 
     /** Note, currently this does not handle reuse of the same name with 
different topics correctly. */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal)
@@ -1800,7 +1742,7 @@
     /*
      * I could have combined the last 3 methods, but this way it improves 
readability
      */
-    private AMQTopic checkValidTopic(Topic topic) throws JMSException
+    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
     {
         if (topic == null)
         {
@@ -2060,7 +2002,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions 
change.
      */
-    private void deleteQueue(final AMQShortString queueName) throws 
JMSException
+    protected void deleteQueue(final AMQShortString queueName) throws 
JMSException
     {
         try
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Feb  6 04:56:20 2008
@@ -38,14 +38,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import javax.jms.Destination;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
-import java.util.HashMap;
-
 /**
  * This is a 0.10 Session
  */
@@ -146,6 +143,25 @@
 
     //------- overwritten methods of class AMQSession
 
+     public TopicSubscriber createDurableSubscriber(Topic topic, String name, 
String messageSelector, boolean noLocal)
+            throws JMSException
+    {
+        checkNotClosed();
+        checkValidTopic(topic);
+        if( _subscriptions.containsKey(name))
+        {
+            _subscriptions.get(name).close();
+        }
+        AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, 
_connection);
+        BasicMessageConsumer consumer = (BasicMessageConsumer) 
createConsumer(dest, messageSelector, noLocal);
+        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, 
consumer);
+        
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
+    }
+
     /**
      * Acknowledge one or many messages.
      *
@@ -362,6 +378,14 @@
         getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_MODE_CREDIT);
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
+        if(consumer.isStrated())
+        {
+            // set the flow
+            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+                    
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                    AMQSession_0_10.MAX_PREFETCH);
+
+        }
         getQpidSession().sync();
         getCurrentException();
     }
@@ -462,11 +486,11 @@
                 //only set if msg list is null
                 try
                 {
-                    if (consumer.getMessageListener() != null)
-                    {
+                 //   if (consumer.getMessageListener() != null)
+                 //   {
                         
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_MESSAGE,
                                                      MAX_PREFETCH);
-                    }
+                  //  }
                     getQpidSession()
                     .messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
                 }
@@ -579,8 +603,7 @@
 
     void start() throws AMQException
     {
-
-        super.suspendChannel(false);
+        suspendChannel(false);
         for(BasicMessageConsumer  c:  _consumers.values())
         {
               c.start();
@@ -601,7 +624,7 @@
         }
     }
 
-      synchronized void startDistpatcherIfNecessary()
+   synchronized void startDistpatcherIfNecessary()
     {
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
         if (!_immediatePrefetch)
@@ -621,5 +644,72 @@
         }
 
         startDistpatcherIfNecessary(false);
+    }
+
+
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
+    {
+
+        checkNotClosed();
+        AMQTopic origTopic=checkValidTopic(topic);
+        AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, 
_connection);
+
+        TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+        if (subscriber != null)
+        {
+            if (subscriber.getTopic().equals(topic))
+            {
+                throw new IllegalStateException("Already subscribed to topic " 
+ topic + " with subscription exchange "
+                        + name);
+            }
+            else
+            {
+                unsubscribe(name);
+            }
+        }
+        else
+        {
+            AMQShortString topicName;
+            if (topic instanceof AMQTopic)
+            {
+                topicName=((AMQTopic) topic).getRoutingKey();
+            }
+            else
+            {
+                topicName=new AMQShortString(topic.getTopicName());
+            }
+
+            if (_strictAMQP)
+            {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not 
currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already 
exists for '" + topicName + "' "
+                            + "for creation durableSubscriber. Requesting 
queue deletion regardless.");
+                }
+
+                deleteQueue(dest.getAMQQueueName());
+            }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName())
+                        && !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
+        }
+
+        subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) 
createExclusiveConsumer(dest));
+
+        _subscriptions.put(name, subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+        return subscriber;
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Wed Feb  6 04:56:20 2008
@@ -21,9 +21,8 @@
 package org.apache.qpid.client;
 
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
@@ -333,4 +332,70 @@
 
         return new AMQTemporaryQueue(this);
     }
+
+    public  TopicSubscriber createDurableSubscriber(Topic topic, String name) 
throws JMSException
+       {
+
+           checkNotClosed();
+           AMQTopic origTopic = checkValidTopic(topic);
+           AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, 
_connection);
+           TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+           if (subscriber != null)
+           {
+               if (subscriber.getTopic().equals(topic))
+               {
+                   throw new IllegalStateException("Already subscribed to 
topic " + topic + " with subscription exchange "
+                                                   + name);
+               }
+               else
+               {
+                   unsubscribe(name);
+               }
+           }
+           else
+           {
+               AMQShortString topicName;
+               if (topic instanceof AMQTopic)
+               {
+                   topicName = ((AMQTopic) topic).getRoutingKey();
+               }
+               else
+               {
+                   topicName = new AMQShortString(topic.getTopicName());
+               }
+
+               if (_strictAMQP)
+               {
+                   if (_strictAMQPFATAL)
+                   {
+                       throw new UnsupportedOperationException("JMS Durable 
not currently supported by AMQP.");
+                   }
+                   else
+                   {
+                       _logger.warn("Unable to determine if subscription 
already exists for '" + topicName + "' "
+                                    + "for creation durableSubscriber. 
Requesting queue deletion regardless.");
+                   }
+
+                   deleteQueue(dest.getAMQQueueName());
+               }
+               else
+               {
+                   // if the queue is bound to the exchange but NOT for this 
topic, then the JMS spec
+                   // says we must trash the subscription.
+                   if (isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName())
+                       && !isQueueBound(dest.getExchangeName(), 
dest.getAMQQueueName(), topicName))
+                   {
+                       deleteQueue(dest.getAMQQueueName());
+                   }
+               }
+           }
+
+           subscriber = new TopicSubscriberAdaptor(dest, 
(BasicMessageConsumer) createConsumer(dest));
+
+           _subscriptions.put(name, subscriber);
+           _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
+
+           return subscriber;
+       }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
 Wed Feb  6 04:56:20 2008
@@ -71,12 +71,26 @@
               queueName, isDurable);
     }
 
+    protected AMQTopic(AMQShortString exchangeName, AMQShortString 
exchangeClass, AMQShortString routingKey, boolean isExclusive,
+                               boolean isAutoDelete, AMQShortString queueName, 
boolean isDurable)
+    {
+        super(exchangeName, exchangeClass, routingKey, isExclusive, 
isAutoDelete, queueName, isDurable );
+    }
+
+
     public static AMQTopic createDurableTopic(AMQTopic topic, String 
subscriptionName, AMQConnection connection)
             throws JMSException
     {
         return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), 
false,
                             getDurableTopicQueueName(subscriptionName, 
connection),
                             true);
+    }
+
+    public static AMQTopic createDurable010Topic(AMQTopic topic, String 
subscriptionName, AMQConnection connection)
+            throws JMSException
+    {
+        return new AMQTopic(topic.getExchangeName(), 
ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getRoutingKey(), true, false,
+              getDurableTopicQueueName(subscriptionName, connection), false);
     }
 
     public static AMQShortString getDurableTopicQueueName(String 
subscriptionName, AMQConnection connection) throws JMSException

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Feb  6 04:56:20 2008
@@ -385,7 +385,23 @@
         }
     }
 
-    public abstract Object getMessageFromQueue(long l) throws 
InterruptedException;
+    public  Object getMessageFromQueue(long l) throws InterruptedException
+    {
+         Object o;
+         if (l > 0)
+         {
+             o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
+         }
+         else if (l < 0)
+         {
+             o = _synchronousQueue.poll();
+         }
+         else
+         {
+             o = _synchronousQueue.take();
+         }
+         return o;
+     }
 
     private boolean closeOnAutoClose() throws JMSException
     {
@@ -977,6 +993,12 @@
     public void stop()
     {
         // do nothing as this is a 0_10 feature
+    }
+
+    public boolean isStrated()
+    {
+        // do nothing as this is a 0_10 feature
+        return false;
     }
 
     public AMQShortString getQueuename()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Wed Feb  6 04:56:20 2008
@@ -19,10 +19,7 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
@@ -41,7 +38,6 @@
 import javax.jms.MessageListener;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -50,12 +46,7 @@
 public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], 
ByteBuffer>
         implements org.apache.qpidity.nclient.util.MessageListener
 {
-    /**
-     * A counter for keeping the number of available messages for this consumer
-     */
-    private final AtomicLong _messageCounter = new AtomicLong(0);
-
-    /**
+     /**
      * Number of received message so far
      */
     private final AtomicLong _messagesReceived = new AtomicLong(0);
@@ -117,11 +108,17 @@
     // ----- Interface org.apache.qpidity.client.util.MessageListener
 
     /**
+     *
+     * This is invoked by the session thread when emptying the session message 
queue.
+     * We first check if the message is valid (match the selector) and then 
deliver it to the
+     * message listener or to the sync consumer queue.
+     *
      * @param jmsMessage this message has already been processed so can't redo 
preDeliver
      * @param channelId
      */
     public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
     {
+        _messagesReceived.incrementAndGet();
         boolean messageOk = false;
         try
         {
@@ -136,12 +133,6 @@
             }
             catch (Exception e1)
             {
-                // the receiver may be waiting for a message
-                if (_messageCounter.get() >= 0)
-                {
-                    _messageCounter.decrementAndGet();
-                    _synchronousQueue.add(new NullTocken());
-                }
                 // we should silently log thie exception as it only hanppens 
when the connection is closed
                 _logger.error("Exception when receiving message", e1);
             }
@@ -152,20 +143,28 @@
         }
     }
 
-    public void onMessage(Message message)
+    /**
+     * Require more credit for this consumer
+     */
+    private void requireMoreCreditIfNecessary()
     {
-        if (isMessageListenerSet())
+        if (_isStarted && _messagesReceived.get() >= 
AMQSession_0_10.MAX_PREFETCH)
         {
-            _messagesReceived.incrementAndGet();
-            if (_messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
-            {
-                // require more credit
-                
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                          
AMQSession_0_10.MAX_PREFETCH);
-                _messagesReceived.set(0);
-            }
+            // require more credit
+            
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                    AMQSession_0_10.MAX_PREFETCH);
+            _messagesReceived.set(0);
         }
+    }
+
+    /**
+     * This method is invoked by the transport layer when a message is 
delivered for this
+     * consumer. The message is transformed and pass to the session.
+     * @param message an 0.10 message
+     */
+    public void onMessage(Message message)
+    {
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         String consumerTag = getConsumerTag().toString();
@@ -207,8 +206,6 @@
             newMessage.setReplyToURL(replyToUrl);
         }
         newMessage.setContentHeader(headers);
-        // increase the counter of messages
-        _messageCounter.incrementAndGet();
         getSession().messageReceived(newMessage);
         // else ignore this message
     }
@@ -242,10 +239,20 @@
     {
         // notify the session
         ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
+        if (isMessageListenerSet())
+        {
+            requireMoreCreditIfNecessary();
+        }
+        else if (_synchronousQueue.isEmpty())
+        {
+            requireMoreCreditIfNecessary();
+        }
         //if (!Boolean.getBoolean("noAck"))
         //{
             super.postDeliver(msg);
         //}
+
+
     }
 
     void notifyMessage(UnprocessedMessage messageFrame, int channelId)
@@ -351,50 +358,9 @@
             }
             messageOk = acquireMessage(message);
         }
-        if (!messageOk)
-        {
-            requestCreditIfCreditMode();
-        }
         return messageOk;
     }
 
-    private void requestCreditIfCreditMode()
-    {
-        try
-        {
-            // the current message received is not good, so we need to get a 
message.
-            if (getMessageListener() == null)
-            {
-                int oldval = _messageCounter.intValue();
-                
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                          
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                          1);
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
-                _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                if (_messageCounter.intValue() <= oldval)
-                {
-                    // we haven't received a message so tell the receiver to 
return null
-                    _synchronousQueue.add(new NullTocken());
-                }
-                else
-                {
-                    _messageCounter.decrementAndGet();
-                }
-            }
-            // we now need to check if we have received a message
-
-        }
-        catch (Exception e)
-        {
-            _logger.error(
-                    "Error getting message listener, couldn't request credit 
after releasing a message that failed the selector test",
-                    e);
-        }
-    }
 
     /**
      * Acknowledge a message
@@ -469,16 +435,18 @@
         super.setMessageListener(messageListener);
         if (messageListener == null)
         {
-            
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+           /* 
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
             _0_10session.getQpidSession()
                     .messageFlowMode(getConsumerTag().toString(), 
Session.MESSAGE_FLOW_MODE_CREDIT);
             
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                       
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                       0xFFFFFFFF);
             _0_10session.getQpidSession().sync();
+            */
         }
         else
         {
+            //TODO: empty the list of sync messages.
             if (_connection.started())
             {
                 _0_10session.getQpidSession()
@@ -491,65 +459,13 @@
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
                 _messagesReceived.set(0);
-                ;
             }
         }
     }
 
-    public Object getMessageFromQueue(long l) throws InterruptedException
+    public boolean isStrated()
     {
-        if (!_isStarted)
-        {
-            return null;
-        }
-        Object o;
-        _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                                                  
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
-        if (l == 0)
-        {
-            o = _synchronousQueue.take();
-        }
-        else
-        {
-            if (l > 0)
-            {
-                o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-            }
-            else
-            {
-                o = _synchronousQueue.poll();
-            }
-            if (o == null)
-            {
-                _logger.debug("Message Didn't arrive in time, checking if one 
is inflight");
-                // checking if one is inflight
-                
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
-                _0_10session.getQpidSession().sync();
-                _0_10session.getQpidSession()
-                        .messageFlow(getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
-                if (_messageCounter.get() > 0)
-                {
-                    o = _synchronousQueue.take();
-                }
-            }
-        }
-        if (o instanceof NullTocken)
-        {
-            o = null;
-        }
-        return o;
-    }
-
-    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws 
JMSException
-    {
-        _messageCounter.decrementAndGet();
-        super.preApplicationProcessing(jmsMsg);
-    }
-
-    private class NullTocken
-    {
-
+        return _isStarted;
     }
 
     public void start()

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=618986&r1=618985&r2=618986&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 Wed Feb  6 04:56:20 2008
@@ -86,22 +86,5 @@
             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), 
messageFrame.getBodies());
 
     }
-
-     public Object getMessageFromQueue(long l) throws InterruptedException
-     {
-         Object o;
-         if (l > 0)
-         {
-             o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-         }
-         else if (l < 0)
-         {
-             o = _synchronousQueue.poll();
-         }
-         else
-         {
-             o = _synchronousQueue.take();
-         }
-         return o;
-     }
+     
 }


Reply via email to