Author: arnaudsimon
Date: Fri Aug 24 08:10:23 2007
New Revision: 569414
URL: http://svn.apache.org/viewvc?rev=569414&view=rev
Log:
updated consumer thread
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Fri Aug 24 08:10:23 2007
@@ -87,11 +87,6 @@
private boolean _isReceiving = false;
/**
- * Indicates that a nowait is receiving a message.
- */
- private boolean _isNoWaitIsReceiving = false;
-
- /**
* Number of mesages received asynchronously
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
@@ -109,7 +104,7 @@
* @param noLocal If true inhibits the delivery of messages
published by its own connection.
* @param subscriptionName Name of the subscription if this is to be
created as a durable subscriber.
* If this value is null, a non-durable
subscription is created.
- * @param consumerTag This consumer ID.
+ * @param consumerTag Thi actor ID
* @throws Exception If the MessageProducerImpl cannot be created due to
some internal error.
*/
protected MessageConsumerImpl(SessionImpl session, DestinationImpl
destination, String messageSelector,
@@ -362,34 +357,18 @@
throw new javax.jms.IllegalStateException("A listener has already
been set.");
}
- if (_incomingMessage != null)
- {
- System.out.println("We already had a message in the queue");
- result = (Message) _incomingMessage;
- _incomingMessage = null;
- return result;
- }
-
synchronized (_incomingMessageLock)
{
// This indicate to the delivery thread to deliver the message to
this consumer
// as it can happens that a message is delivered after a receive
operation as returned.
_isReceiving = true;
+ boolean blockingReceived = timeout == 0;
if (!_isStopped)
{
// if this consumer is stopped then this will be call when
starting
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
-
getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
- System.out.println("no message in the queue, issuing a flow(1)
and waiting for message");
-
+ requestOneMessage();
//When sync() returns we know whether we have received a
message or not.
getSession().getQpidSession().sync();
-
- System.out.println("we got returned from sync()");
- //received = getSession().getQpidSession().messagesReceived();
}
if (_messageReceived.get() && timeout < 0)
{
@@ -398,42 +377,81 @@
}
else
{
- // right we need to let onMessage know that a nowait is
potentially waiting for a message
- if (timeout < 0)
- {
- _isNoWaitIsReceiving = true;
- }
- while (_incomingMessage == null && !_isClosed)
+ boolean messageReceived = false;
+ while (!messageReceived)
{
- try
+ long timeBeforeWait = 0;
+ while (_incomingMessage == null && !_isClosed)
{
- System.out.println("waiting for message");
- _incomingMessageLock.wait(timeout);
+ if (!blockingReceived)
+ {
+ timeBeforeWait = System.currentTimeMillis();
+ }
+ try
+ {
+ _incomingMessageLock.wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
}
- catch (InterruptedException e)
+ if (_incomingMessage != null)
{
- // do nothing
+ result = (Message) _incomingMessage;
+ // tell the session that a message is inprocess
+ getSession().preProcessMessage(_incomingMessage);
+ // tell the session to acknowledge this message (if
required)
+ getSession().acknowledgeMessage(_incomingMessage);
+ _incomingMessage.afterMessageReceive();
+ messageReceived = true;
+ }
+ else
+ {
+ //now setup the new timeout
+ if (!blockingReceived)
+ {
+ timeout = timeout - (System.currentTimeMillis() -
timeBeforeWait);
+ }
+ if (!_isClosed)
+ {
+ // we need to request a new message
+ requestOneMessage();
+ getSession().getQpidSession().sync();
+ if (_messageReceived.get() && timeout < 0)
+ {
+ // we are waiting for too long and we haven't
received a proper message
+ result = null;
+ messageReceived = true;
+ }
+ }
}
- }
- if (_incomingMessage != null)
- {
- result = (Message) _incomingMessage;
- // tell the session that a message is inprocess
- getSession().preProcessMessage(_incomingMessage);
- // tell the session to acknowledge this message (if
required)
- getSession().acknowledgeMessage(_incomingMessage);
}
_incomingMessage = null;
}
// We now release any message received for this consumer
_isReceiving = false;
- _isNoWaitIsReceiving = false;
getSession().testQpidException();
}
return result;
}
/**
+ * Request a single message
+ */
+ private void requestOneMessage()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Requesting a single message");
+ }
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ _messageReceived.set(false);
+ }
+
+ /**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message
listener
* finishes processing the current message,
@@ -500,24 +518,22 @@
// notify the waiting thread
if (_messageListener == null)
{
- System.out.println("Received a message- onMessage in message
consumer Impl");
-
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Received a message- onMessage in message
consumer Impl");
+ }
synchronized (_incomingMessageLock)
{
if (messageOk)
{
- System.out.println("Received a message- onMessage in
message ok " + messageOk);
// we have received a proper message that we can
deliver
if (_isReceiving)
{
- System.out.println("Received a message- onMessage
in message _isReceiving");
-
_incomingMessage = message;
_incomingMessageLock.notify();
}
else
{
- System.out.println("Received a message- onMessage
in message releasing");
// this message has been received after a received
as returned
// we need to release it
releaseMessage(message);
@@ -530,21 +546,7 @@
// then we need to request a new one from the server
if (_isReceiving)
{
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
-
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
-
- // When sync() returns we know whether we have
received a message or not.
- getSession().getQpidSession().sync();
-
- if (_messageReceived.get() && _isNoWaitIsReceiving)
- {
- // Right a message nowait is waiting for a
message
- // but no one can be delivered it then need to
return
- _incomingMessageLock.notify();
- }
+ _incomingMessageLock.notify();
}
}
}
@@ -582,6 +584,7 @@
**/
try
{
+ message.afterMessageReceive();
_messageListener.onMessage((Message) message);
}
catch (RuntimeException re)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Fri Aug 24 08:10:23 2007
@@ -17,13 +17,10 @@
*/
package org.apache.qpidity.jms;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
+import org.apache.qpidity.jms.message.QpidMessage;
+import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.util.MessageListener;
-import org.apache.qpidity.jms.message.MessageFactory;
-import org.apache.qpidity.jms.message.QpidMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,12 +29,8 @@
* This is for guarantying that asynch messages are sequentially processed
within their session.
* <p> when used synchonously, messages are dispatched to the receiver itself.
*/
-public class QpidMessageListener implements MessageListener, Runnable
+public class QpidMessageListener implements MessageListener
{
-
- // temp solution
- LinkedBlockingQueue<Message> _queue = new LinkedBlockingQueue<Message>();
-
/**
* Used for debugging.
*/
@@ -57,53 +50,37 @@
public QpidMessageListener(MessageConsumerImpl consumer)
{
_consumer = consumer;
- Thread t = new Thread(this);
- t.start();
}
-
- public void run()
+
+ //---- org.apache.qpidity.MessagePartListener API
+ /**
+ * Deliver a message to the listener.
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void onMessage(Message message)
{
try
{
- while(true)
+ // to be used with flush
+ _consumer.notifyMessageReceived();
+
+ //convert this message into a JMS one
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ // if consumer is asynchronous then send this message to its
session.
+ if( _consumer.getMessageListener() != null )
{
- System.out.println("trying to take a message message");
- Message message = _queue.take();
-
- // to be used with flush
- System.out.println("processing the message");
- _consumer.notifyMessageReceived();
-
- //convert this message into a JMS one
- QpidMessage jmsMessage =
MessageFactory.getQpidMessage(message);
- // if consumer is asynchronous then send this message to its
session.
- if( _consumer.getMessageListener() != null )
- {
-
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(),
jmsMessage);
- }
- else
- {
- // deliver this message to the consumer itself
- _consumer.onMessage(jmsMessage);
- }
+
_consumer.getSession().dispatchMessage(_consumer.getMessageActorID(),
jmsMessage);
+ }
+ else
+ {
+ // deliver this message to the consumer itself
+ _consumer.onMessage(jmsMessage);
}
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
- }
-
- //---- org.apache.qpidity.MessagePartListener API
- /**
- * Deliver a message to the listener.
- *
- * @param message The message delivered to the listner.
- */
- public void onMessage(Message message)
- {
- System.out.println("Received a message");
- _queue.offer(message);
- System.out.println("Added queue to the message");
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
Fri Aug 24 08:10:23 2007
@@ -827,13 +827,22 @@
*
* @throws QpidException
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
ByteBuffer messageData = getMessageData();
if (messageData != null)
{
- _dataIn = new DataInputStream(new
ByteArrayInputStream(messageData.array()));
+ try
+ {
+ _dataIn = new DataInputStream(
+ new ByteArrayInputStream(messageData.array(),
messageData.arrayOffset(), messageData.limit()));
+ }
+ catch (Exception e)
+ {
+ throw new QpidException("Cannot retrieve data from message ",
null, e);
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
Fri Aug 24 08:10:23 2007
@@ -585,6 +585,7 @@
/**
* This method is invoked after this message has been received.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
Fri Aug 24 08:10:23 2007
@@ -887,6 +887,7 @@
*
* @throws QpidException If there is an internal error when procesing this
message.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
// recreate a destination object for the encoded destination
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/ObjectMessageImpl.java
Fri Aug 24 08:10:23 2007
@@ -149,6 +149,7 @@
*
* @throws QpidException
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Fri Aug 24 08:10:23 2007
@@ -395,7 +395,7 @@
try
{
// set the message data
- _qpidityMessage.clearData();
+ _qpidityMessage.clearData();
if (_logger.isDebugEnabled())
{
_logger.debug("_messageData POS " + _messageData.position());
@@ -429,6 +429,17 @@
{
return _qpidityMessage.getMessageTransferId();
}
+
+ /**
+ * This method is invoked after this message is received.
+ *
+ * @throws QpidException If there is an internal error when procesing this
message.
+ */
+ public void afterMessageReceive() throws QpidException
+ {
+ // do nothing for now
+ }
+
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java?rev=569414&r1=569413&r2=569414&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/TextMessageImpl.java
Fri Aug 24 08:10:23 2007
@@ -110,6 +110,7 @@
/**
* This method is invoked after this message has been received.
*/
+ @Override
public void afterMessageReceive() throws QpidException
{
super.afterMessageReceive();