Author: arnaudsimon
Date: Fri Aug 24 08:10:23 2007
New Revision: 569414

URL: http://svn.apache.org/viewvc?rev=569414&view=rev
Log:
updated consumer thread

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Fri Aug 24 08:10:23 2007
@@ -87,11 +87,6 @@
     private boolean _isReceiving = false;
 
     /**
-     * Indicates that a nowait is receiving a message.
-     */
-    private boolean _isNoWaitIsReceiving = false;
-
-    /**
      * Number of mesages received asynchronously
      * Nether exceed MAX_MESSAGE_TRANSFERRED
      */
@@ -109,7 +104,7 @@
      * @param noLocal          If true inhibits the delivery of messages 
published by its own connection.
      * @param subscriptionName Name of the subscription if this is to be 
created as a durable subscriber.
      *                         If this value is null, a non-durable 
subscription is created.
-     * @param consumerTag      This consumer ID. 
+     * @param consumerTag      Thi actor ID
      * @throws Exception If the MessageProducerImpl cannot be created due to 
some internal error.
      */
     protected MessageConsumerImpl(SessionImpl session, DestinationImpl 
destination, String messageSelector,
@@ -362,34 +357,18 @@
             throw new javax.jms.IllegalStateException("A listener has already 
been set.");
         }
 
-        if (_incomingMessage != null)
-        {
-            System.out.println("We already had a message in the queue");
-            result = (Message) _incomingMessage;
-            _incomingMessage = null;
-            return result;
-        }
-
         synchronized (_incomingMessageLock)
         {
             // This indicate to the delivery thread to deliver the message to 
this consumer
             // as it can happens that a message is delivered after a receive 
operation as returned.
             _isReceiving = true;
+            boolean blockingReceived = timeout == 0;
             if (!_isStopped)
             {
                 // if this consumer is stopped then this will be call when 
starting
-                getSession().getQpidSession()
-                        .messageFlow(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                     1);
-                
getSession().getQpidSession().messageFlush(getMessageActorID());
-                _messageReceived.set(false);
-                System.out.println("no message in the queue, issuing a flow(1) 
and waiting for message");
-
+                requestOneMessage();
                 //When sync() returns we know whether we have received a 
message or not.
                 getSession().getQpidSession().sync();
-
-                System.out.println("we got returned from sync()");
-                //received = getSession().getQpidSession().messagesReceived();
             }
             if (_messageReceived.get() && timeout < 0)
             {
@@ -398,42 +377,81 @@
             }
             else
             {
-                // right we need to let onMessage know that a nowait is 
potentially waiting for a message
-                if (timeout < 0)
-                {
-                    _isNoWaitIsReceiving = true;
-                }
-                while (_incomingMessage == null && !_isClosed)
+                boolean messageReceived = false;
+                while (!messageReceived)
                 {
-                    try
+                    long timeBeforeWait = 0;
+                    while (_incomingMessage == null && !_isClosed)
                     {
-                        System.out.println("waiting for message");
-                        _incomingMessageLock.wait(timeout);
+                        if (!blockingReceived)
+                        {
+                            timeBeforeWait = System.currentTimeMillis();
+                        }
+                        try
+                        {
+                            _incomingMessageLock.wait(timeout);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            // do nothing
+                        }
                     }
-                    catch (InterruptedException e)
+                    if (_incomingMessage != null)
                     {
-                        // do nothing
+                        result = (Message) _incomingMessage;
+                        // tell the session that a message is inprocess
+                        getSession().preProcessMessage(_incomingMessage);
+                        // tell the session to acknowledge this message (if 
required)
+                        getSession().acknowledgeMessage(_incomingMessage);
+                        _incomingMessage.afterMessageReceive();
+                        messageReceived = true;
+                    }
+                    else
+                    {
+                        //now setup the new timeout
+                        if (!blockingReceived)
+                        {
+                            timeout = timeout - (System.currentTimeMillis() - 
timeBeforeWait);
+                        }
+                        if (!_isClosed)
+                        {
+                            // we need to request a new message
+                            requestOneMessage();
+                            getSession().getQpidSession().sync();
+                            if (_messageReceived.get() && timeout < 0)
+                            {
+                                // we are waiting for too long and we haven't 
received a proper message
+                                result = null;
+                                messageReceived = true;
+                            }
+                        }
                     }
-                }
-                if (_incomingMessage != null)
-                {
-                    result = (Message) _incomingMessage;
-                    // tell the session that a message is inprocess
-                    getSession().preProcessMessage(_incomingMessage);
-                    // tell the session to acknowledge this message (if 
required)
-                    getSession().acknowledgeMessage(_incomingMessage);
                 }
                 _incomingMessage = null;
             }
             // We now release any message received for this consumer
             _isReceiving = false;
-            _isNoWaitIsReceiving = false;
             getSession().testQpidException();
         }
         return result;
     }
 
     /**
+     * Request a single message
+     */
+    private void requestOneMessage()
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Requesting a single message");
+        }
+        getSession().getQpidSession()
+                .messageFlow(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+        getSession().getQpidSession().messageFlush(getMessageActorID());
+        _messageReceived.set(false);
+    }
+
+    /**
      * Stop the delivery of messages to this consumer.
      * <p>For asynchronous receiver, this operation blocks until the message 
listener
      * finishes processing the current message,
@@ -500,24 +518,22 @@
             // notify the waiting thread
             if (_messageListener == null)
             {
-                System.out.println("Received a message- onMessage in message 
consumer Impl");
-
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Received a message- onMessage in message 
consumer Impl");
+                }
                 synchronized (_incomingMessageLock)
                 {
                     if (messageOk)
                     {
-                        System.out.println("Received a message- onMessage in 
message ok " + messageOk);
                         // we have received a proper message that we can 
deliver
                         if (_isReceiving)
                         {
-                            System.out.println("Received a message- onMessage 
in message _isReceiving");
-
                             _incomingMessage = message;
                             _incomingMessageLock.notify();
                         }
                         else
                         {
-                            System.out.println("Received a message- onMessage 
in message releasing");
                             // this message has been received after a received 
as returned
                             // we need to release it
                             releaseMessage(message);
@@ -530,21 +546,7 @@
                         // then we need to request a new one from the server
                         if (_isReceiving)
                         {
-                            getSession().getQpidSession()
-                                    .messageFlow(getMessageActorID(),
-                                                 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-                            
getSession().getQpidSession().messageFlush(getMessageActorID());
-                            _messageReceived.set(false);
-
-                            // When sync() returns we know whether we have 
received a message or not.
-                            getSession().getQpidSession().sync();
-
-                            if (_messageReceived.get() && _isNoWaitIsReceiving)
-                            {
-                                // Right a message nowait is waiting for a 
message
-                                // but no one can be delivered it then need to 
return
-                                _incomingMessageLock.notify();
-                            }
+                            _incomingMessageLock.notify();
                         }
                     }
                 }
@@ -582,6 +584,7 @@
                     **/
                     try
                     {
+                        message.afterMessageReceive();
                         _messageListener.onMessage((Message) message);
                     }
                     catch (RuntimeException re)

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
 Fri Aug 24 08:10:23 2007
@@ -17,13 +17,10 @@
  */
 package org.apache.qpidity.jms;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.jms.message.MessageFactory;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.client.util.MessageListener;
-import org.apache.qpidity.jms.message.MessageFactory;
-import org.apache.qpidity.jms.message.QpidMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,12 +29,8 @@
  * This is for guarantying that asynch messages are sequentially processed 
within their session.
  * <p> when used synchonously, messages are dispatched to the receiver itself.
  */
-public class QpidMessageListener implements MessageListener, Runnable
+public class QpidMessageListener implements MessageListener
 {
-    
-    // temp solution
-    LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
-    
     /**
      * Used for debugging.
      */
@@ -57,53 +50,37 @@
     public QpidMessageListener(MessageConsumerImpl consumer)
     {
         _consumer = consumer;
-        Thread t = new Thread(this);
-        t.start();
     }
-    
-    public void run()
+
+    //---- org.apache.qpidity.MessagePartListener API
+    /**
+     * Deliver a message to the listener.
+     *
+     * @param message The message delivered to the listner.
+     */
+    public void onMessage(Message message)
     {
         try
         {
-            while(true)
+            // to be used with flush
+            _consumer.notifyMessageReceived();
+            
+            //convert this message into a JMS one
+            QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+            // if consumer is asynchronous then send this message to its 
session.
+            if( _consumer.getMessageListener() != null )
             {
-                System.out.println("trying to take a message message");
-                Message message = _queue.take();
-                    
-                // to be used with flush
-                System.out.println("processing the message");
-                _consumer.notifyMessageReceived();
-                            
-                //convert this message into a JMS one
-                QpidMessage jmsMessage = 
MessageFactory.getQpidMessage(message);
-                // if consumer is asynchronous then send this message to its 
session.
-                if( _consumer.getMessageListener() != null )
-                {
-                    
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), 
jmsMessage);
-                }
-                else
-                {
-                    // deliver this message to the consumer itself
-                    _consumer.onMessage(jmsMessage);
-                }
+                
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(), 
jmsMessage);
+            }
+            else
+            {
+                // deliver this message to the consumer itself
+                _consumer.onMessage(jmsMessage);
             }
         }
         catch (Exception e)
         {
             throw new RuntimeException(e.getMessage());
         }
-    }
-
-    //---- org.apache.qpidity.MessagePartListener API
-    /**
-     * Deliver a message to the listener.
-     *
-     * @param message The message delivered to the listner.
-     */
-    public void onMessage(Message message)
-    {
-        System.out.println("Received a message");
-        _queue.offer(message);
-        System.out.println("Added queue to the message");
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 Fri Aug 24 08:10:23 2007
@@ -827,13 +827,22 @@
      *
      * @throws QpidException
      */
+    @Override
     public void afterMessageReceive() throws QpidException
     {
         super.afterMessageReceive();
         ByteBuffer messageData = getMessageData();
         if (messageData != null)
         {
-            _dataIn = new DataInputStream(new 
ByteArrayInputStream(messageData.array()));
+            try
+            {
+                _dataIn = new DataInputStream(
+                        new ByteArrayInputStream(messageData.array(), 
messageData.arrayOffset(), messageData.limit()));
+            }
+            catch (Exception e)
+            {
+                throw new QpidException("Cannot retrieve data from message ", 
null, e);
+            }
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
 Fri Aug 24 08:10:23 2007
@@ -585,6 +585,7 @@
     /**
      * This method is invoked after this message has been received.
      */
+    @Override
     public void afterMessageReceive() throws QpidException
     {
         super.afterMessageReceive();

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
 Fri Aug 24 08:10:23 2007
@@ -887,6 +887,7 @@
      *
      * @throws QpidException If there is an internal error when procesing this 
message.
      */
+    @Override
     public void afterMessageReceive() throws QpidException
     {
         // recreate a destination object for the encoded destination

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
 Fri Aug 24 08:10:23 2007
@@ -149,6 +149,7 @@
      *
      * @throws QpidException
      */
+    @Override
     public void afterMessageReceive() throws QpidException
     {
         super.afterMessageReceive();

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 Fri Aug 24 08:10:23 2007
@@ -395,7 +395,7 @@
         try
         {
             // set the message data
-            _qpidityMessage.clearData();         
+            _qpidityMessage.clearData();
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("_messageData POS " + _messageData.position());
@@ -429,6 +429,17 @@
     {
         return _qpidityMessage.getMessageTransferId();
     }
+
+    /**
+     * This method is invoked after this message is received.
+     *
+     * @throws QpidException If there is an internal error when procesing this 
message.
+     */
+    public void afterMessageReceive() throws QpidException
+    {
+        // do nothing for now 
+    }
+
 }
 
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
 Fri Aug 24 08:10:23 2007
@@ -110,6 +110,7 @@
     /**
      * This method is invoked after this message has been received.
      */
+    @Override
     public void afterMessageReceive() throws QpidException
     {
         super.afterMessageReceive();


Reply via email to