Author: ritchiem
Date: Tue Jan 23 14:32:51 2007
New Revision: 499165

URL: http://svn.apache.org/viewvc?view=rev&rev=499165
Log:
QPID-103 Implemented support for MessageListener in AMQSession.
Required configuring an Asynchronous performance test.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Jan 23 14:32:51 2007
@@ -67,6 +67,8 @@
     private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
+    private MessageListener _messageListener = null;
+
     /**
      * Used to reference durable subscribers so they requests for unsubscribe 
can be handled
      * correctly.  Note this only keeps a record of subscriptions which have 
been created
@@ -852,13 +854,37 @@
     public MessageListener getMessageListener() throws JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener 
interface not supported");
+        return _messageListener;
     }
 
     public void setMessageListener(MessageListener listener) throws 
JMSException
     {
         checkNotClosed();
-        throw new java.lang.UnsupportedOperationException("MessageListener 
interface not supported");
+
+        if (!isStopped())
+        {
+            throw new javax.jms.IllegalStateException("Attempt to set listener 
while session is started.");
+        }
+
+        // We are stopped         
+        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+        {
+            BasicMessageConsumer consumer = i.next();
+
+            if (consumer.isReceiving())
+            {
+                throw new javax.jms.IllegalStateException("Another thread is 
already receiving synchronously.");
+            }
+        }
+
+        _messageListener = listener;
+        
+        for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
+        {
+            i.next().setMessageListener(_messageListener);
+        }
+
+
     }
 
     public void run()
@@ -1067,6 +1093,7 @@
     {
         checkTemporaryDestination(destination);
 
+
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
             public Object operation() throws JMSException
@@ -1089,6 +1116,11 @@
                                                                          
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          
_acknowledgeMode, noConsume, autoClose);
 
+                if (_messageListener != null)
+                {
+                    consumer.setMessageListener(_messageListener);
+                }
+
                 try
                 {
                     registerConsumer(consumer, false);
@@ -1736,19 +1768,21 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
-        _consumers.remove(consumer.getConsumerTag());
-        String subscriptionName = _reverseSubscriptionMap.remove(consumer);
-        if (subscriptionName != null)
+        if (_consumers.remove(consumer.getConsumerTag()) != null)
         {
-            _subscriptions.remove(subscriptionName);
-        }
+            String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+            if (subscriptionName != null)
+            {
+                _subscriptions.remove(subscriptionName);
+            }
 
-        Destination dest = consumer.getDestination();
-        synchronized (dest)
-        {
-            if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+            Destination dest = consumer.getDestination();
+            synchronized (dest)
             {
-                _destinationConsumerCount.remove(dest);
+                if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+                {
+                    _destinationConsumerCount.remove(dest);
+                }
             }
         }
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=499165&r1=499164&r2=499165
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Jan 23 14:32:51 2007
@@ -221,7 +221,10 @@
         if (_session.isStopped())
         {
             _messageListener.set(messageListener);
-            _logger.debug("Session stopped : Message listener set for 
destination " + _destination);
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session stopped : Message listener(" + 
messageListener + ") set for destination " + _destination);
+            }
         }
         else
         {
@@ -258,10 +261,10 @@
 
                             // Set Message Listener
                             _logger.debug("Set Message Listener");
-                            _messageListener.set(messageListener);             
               
+                            _messageListener.set(messageListener);
                         }
                     }
-                    );                    
+                    );
                 }
             }
         }
@@ -328,6 +331,11 @@
     public boolean isExclusive()
     {
         return _exclusive;
+    }
+
+    public boolean isReceiving()
+    {
+        return _receiving.get();
     }
 
     public Message receive() throws JMSException


Reply via email to