Author: rajith
Date: Sat Aug 25 08:31:30 2007
New Revision: 569688
URL: http://svn.apache.org/viewvc?rev=569688&view=rev
Log:
I provided a fixed to the deadlock issue in MessageConsumerImpl.
Here is the deadlock issue
---------------------------
The internal receive thread acquires the _incomingMessageLock and blocks on
sync()
The MINA thread gets on to onMessage() and blocks while trying to acquire the
incomingMessageLock
Since the MINA thread doesn't return it can't process the execution.complete()
sent by the broker.
Since the execution.complete doesn't get processed, the sync() doesn't return.
Hence the deadlock.
Solution
----------
I rewrote the receive logic using a LinkedBlockingQueue and leveraging the
application thread that calls receive methods
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
Sat Aug 25 08:31:30 2007
@@ -22,8 +22,8 @@
msg.writeInt(123);
prod.send(msg);
- javax.jms.Message m = cons.receive();
- System.out.println(m);
+ javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+ System.out.println("Data : " + m.readInt());
}
catch(Exception e)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Sat Aug 25 08:31:30 2007
@@ -17,24 +17,30 @@
*/
package org.apache.qpidity.jms;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
-import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.RangeSet;
-import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.RangeSet;
import org.apache.qpidity.client.MessagePartListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
-
-import javax.jms.*;
+import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.jms.message.MessageFactory;
+import org.apache.qpidity.jms.message.QpidMessage;
/**
* Implementation of JMS message consumer
*/
-public class MessageConsumerImpl extends MessageActor implements
MessageConsumer
+public class MessageConsumerImpl extends MessageActor implements
MessageConsumer, org.apache.qpidity.client.util.MessageListener
{
// we can receive up to 100 messages for an asynchronous listener
public static final int MAX_MESSAGE_TRANSFERRED = 100;
@@ -91,9 +97,9 @@
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
-
- private AtomicBoolean _messageReceived = new AtomicBoolean();
-
+
+ private LinkedBlockingQueue<QpidMessage> _queue = new
LinkedBlockingQueue<QpidMessage>();
+
//----- Constructors
/**
* Create a new MessageProducerImpl.
@@ -120,11 +126,8 @@
_subscriptionName = subscriptionName;
_isStopped = getSession().isStopped();
// let's create a message part assembler
- /**
- * A Qpid message listener that pushes messages to this consumer
session when this consumer is
- * asynchronous or directly to this consumer when it is synchronously
accessed.
- */
- MessagePartListener messageAssembler = new
MessagePartListenerAdapter(new QpidMessageListener(this));
+
+ MessagePartListener messageAssembler = new
MessagePartListenerAdapter(this);
if (destination instanceof Queue)
{
@@ -183,10 +186,8 @@
// this will prevent the broker from sending more than one message
// When a messageListener is set the flow will be adjusted.
// until then we assume it's for synchronous message consumption
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
-
- getSession().getQpidSession().sync();
+ requestCredit(1);
+ requestSync();
// check for an exception
if (getSession().getCurrentException() != null)
{
@@ -266,9 +267,7 @@
getSession().getQpidSession().messageStop(getMessageActorID());
}
_messageAsyncrhonouslyReceived = 0;
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_MESSAGE_TRANSFERRED);
+ requestCredit(MAX_MESSAGE_TRANSFERRED);
}
/**
@@ -281,7 +280,28 @@
*/
public Message receive() throws JMSException
{
- return receive(0);
+ // Check if we can get a message immediately
+ Message result;
+ result = receiveNoWait();
+
+ if(result != null)
+ {
+ return result;
+ }
+
+ try
+ {
+ // Now issue a credit and wait for the broker to send a message
+ // IMO no point doing a credit() flush() and sync() in a loop.
+ // This will only overload the broker. After the initial try we
can wait
+ // for the broker to send a message when it gets one
+ requestCredit(1);
+ return (Message)_queue.take();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
/**
@@ -297,20 +317,35 @@
*/
public Message receive(long timeout) throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
if (timeout < 0)
{
throw new JMSException("Invalid timeout value: " + timeout);
}
+
Message result;
try
{
- result = internalReceive(timeout);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ // We shouldn't do a sync(). Bcos the timeout can happen
+ // before the sync() returns
+ return (Message)_queue.poll(timeout,TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
/**
@@ -321,138 +356,56 @@
*/
public Message receiveNoWait() throws JMSException
{
+ checkClosed();
+ checkIfListenerSet();
Message result;
try
{
- result = internalReceive(-1);
+ // first check if we have any in the queue already
+ result = (Message)_queue.poll();
+ if(result == null)
+ {
+ requestCredit(1);
+ requestFlush();
+ requestSync();
+ return (Message)_queue.poll();
+ }
+ else
+ {
+ return result;
+ }
}
catch (Exception e)
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- return result;
}
-
- // not public methods
-
- /**
- * Receive a synchronous message
- * <p> This call blocks until a message arrives, the timeout expires, or
this message consumer
- * is closed.
- * <p> A timeout of zero never expires, and the call blocks indefinitely
(unless this message consumer
- * is closed)
- * <p> A timeout less than 0 returns the next message or null if one is
not available.
- *
- * @param timeout The timeout value (in milliseconds)
- * @return the next message or null if one is not available.
- * @throws Exception If receiving the next message fails due to some
internal error.
- */
- private Message internalReceive(long timeout) throws Exception
+
+ // not public methods
+ private void requestCredit(int units)
{
- checkNotClosed();
- Message result = null;
-
- if (_messageListener != null)
- {
- throw new javax.jms.IllegalStateException("A listener has already
been set.");
- }
-
- synchronized (_incomingMessageLock)
- {
- // This indicate to the delivery thread to deliver the message to
this consumer
- // as it can happens that a message is delivered after a receive
operation as returned.
- _isReceiving = true;
- boolean blockingReceived = timeout == 0;
- if (!_isStopped)
- {
- // if this consumer is stopped then this will be call when
starting
- requestOneMessage();
- //When sync() returns we know whether we have received a
message or not.
- System.out.println("Internal receive -- Called sync()");
- getSession().getQpidSession().sync();
- System.out.println("Internal receive -- Returned from
sync()");
- }
- if (_messageReceived.get() && timeout < 0)
- {
- // this is a nowait and we havent received a message then we
must immediatly return
- result = null;
- }
- else
- {
- boolean messageReceived = false;
- while (!messageReceived)
- {
- long timeBeforeWait = 0;
- while (_incomingMessage == null && !_isClosed)
- {
- if (!blockingReceived)
- {
- timeBeforeWait = System.currentTimeMillis();
- }
- try
- {
- _incomingMessageLock.wait(timeout);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
- if (_incomingMessage != null)
- {
- result = (Message) _incomingMessage;
- // tell the session that a message is inprocess
- getSession().preProcessMessage(_incomingMessage);
- // tell the session to acknowledge this message (if
required)
- getSession().acknowledgeMessage(_incomingMessage);
- _incomingMessage.afterMessageReceive();
- messageReceived = true;
- }
- else
- {
- //now setup the new timeout
- if (!blockingReceived)
- {
- timeout = timeout - (System.currentTimeMillis() -
timeBeforeWait);
- }
- if (!_isClosed)
- {
- // we need to request a new message
- requestOneMessage();
- getSession().getQpidSession().sync();
- if (_messageReceived.get() && timeout < 0)
- {
- // we are waiting for too long and we haven't
received a proper message
- result = null;
- messageReceived = true;
- }
- }
- }
- }
- _incomingMessage = null;
- }
- // We now release any message received for this consumer
- _isReceiving = false;
- getSession().testQpidException();
- }
- return result;
+ getSession().getQpidSession()
+ .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, units);
}
-
- /**
- * Request a single message
- */
- private void requestOneMessage()
+
+ private void requestFlush()
+ {
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ }
+
+ private void requestSync()
+ {
+ getSession().getQpidSession().sync();
+ }
+
+ private void checkClosed() throws JMSException
{
- if (_logger.isDebugEnabled())
+ if(_isStopped)
{
- _logger.debug("Requesting a single message");
+ throw new JMSException("Session is closed");
}
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- getSession().getQpidSession().messageFlush(getMessageActorID());
- _messageReceived.set(false);
}
-
+
/**
* Stop the delivery of messages to this consumer.
* <p>For asynchronous receiver, this operation blocks until the message
listener
@@ -475,93 +428,42 @@
{
synchronized (_incomingMessageLock)
{
- _isStopped = false;
- if (_isReceiving)
- {
- // there is a synch call waiting for a message to be delivered
- // so tell the broker to deliver a message
- getSession().getQpidSession()
- .messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- 1);
-
getSession().getQpidSession().messageFlush(getMessageActorID());
- }
+ _isStopped = false;
}
}
- /**
- * Deliver a message to this consumer.
- *
- * @param message The message delivered to this consumer.
- */
- protected synchronized void onMessage(QpidMessage message)
- {
+ public void onMessage(org.apache.qpidity.api.Message message)
+ {
try
{
- // if there is a message selector then we need to evaluate it.
- boolean messageOk = true;
- if (_messageSelector != null)
+ QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
+ if (_messageListener == null)
{
- messageOk = _filter.matches((Message) message);
+ _queue.offer(jmsMessage);
}
-
- System.out.println("Received a message- onMessage in message
consumer Impl");
- if (!messageOk && _preAcquire)
+ else
{
- // this is the case for topics
- // We need to ack this message
- System.out.println("onMessage - trying to ack message");
- acknowledgeMessage(message);
- System.out.println("onMessage - acked message");
- }
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk)
- {
- System.out.println("onMessage - trying to acquire message");
- messageOk = acquireMessage(message);
- System.out.println("onMessage - acquired message");
+ // I still think we don't need that additional thread in
SessionImpl
+ // if the Application blocks on a message thats fine
+ // getSession().dispatchMessage(getMessageActorID(),
jmsMessage);
+ notifyMessageListener(jmsMessage);
}
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+
+ public void notifyMessageListener(QpidMessage message)throws
RuntimeException
+ {
+ try
+ {
+ boolean messageOk = checkPreConditions(message);
- // if this consumer is synchronous then set the current message and
- // notify the waiting thread
- if (_messageListener == null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Received a message- onMessage in message
consumer Impl");
- }
- synchronized (_incomingMessageLock)
- {
- System.out.println("got incomming message lock");
- if (messageOk)
- {
- // we have received a proper message that we can
deliver
- if (_isReceiving)
- {
- System.out.println("Is receiving true, setting
message and notifying");
- _incomingMessage = message;
- _incomingMessageLock.notify();
- }
- else
- {
- // this message has been received after a received
as returned
- // we need to release it
- releaseMessage(message);
- }
- }
- else
- {
- // oups the message did not match the selector or we
did not manage to acquire it
- // If the receiver is still waiting for a message
- // then we need to request a new one from the server
- if (_isReceiving)
- {
- _incomingMessageLock.notify();
- }
- }
- }
- }
- else
+ // only deliver the message if it is valid
+ if (messageOk)
{
_messageAsyncrhonouslyReceived++;
if (_messageAsyncrhonouslyReceived >= MAX_MESSAGE_TRANSFERRED)
@@ -569,56 +471,93 @@
// ask the server for the delivery of
MAX_MESSAGE_TRANSFERRED more messages
resetAsynchMessageReceived();
}
- // only deliver the message if it is valid
- if (messageOk)
+
+ preApplicationProcessing(message);
+ // The JMS specs says:
+ /* The result of a listener throwing a RuntimeException
depends on the session?s
+ * acknowledgment mode.
+ ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
+ * will be immediately redelivered. The number of times a JMS
provider will
+ * redeliver the same message before giving up is
provider-dependent.
+ ? --- CLIENT_ACKNOWLEDGE - the next message for the listener
is delivered.
+ * --- Transacted Session - the next message for the listener
is delivered.
+ *
+ * The number of time we try redelivering the message is 0
+ **/
+ try
+ {
+
+ _messageListener.onMessage((Message) message);
+ }
+ catch (RuntimeException re)
{
- // This is an asynchronous message
- // tell the session that a message is in process
- getSession().preProcessMessage(message);
- // If the session is transacted we need to ack the message
first
- // This is because a message is associated with its tx
only when acked
- if (getSession().getTransacted())
- {
- getSession().acknowledgeMessage(message);
- }
- // The JMS specs says:
- /* The result of a listener throwing a RuntimeException
depends on the session?s
- * acknowledgment mode.
- ? --- AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE - the message
- * will be immediately redelivered. The number of times a
JMS provider will
- * redeliver the same message before giving up is
provider-dependent.
- ? --- CLIENT_ACKNOWLEDGE - the next message for the
listener is delivered.
- * --- Transacted Session - the next message for the
listener is delivered.
- *
- * The number of time we try redelivering the message is 0
- **/
- try
- {
- message.afterMessageReceive();
- _messageListener.onMessage((Message) message);
- }
- catch (RuntimeException re)
- {
- // do nothing as this message will not be redelivered
- }
- // If the session has been recovered we then need to
redelivered this message
- if (getSession().isInRecovery())
- {
- releaseMessage(message);
- }
- else if (!getSession().getTransacted())
- {
- // Tell the jms Session to ack this message if required
- getSession().acknowledgeMessage(message);
- }
+ // do nothing as this message will not be redelivered
}
}
+
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage());
}
}
+
+ private void checkIfListenerSet() throws javax.jms.IllegalStateException
+ {
+
+ if (_messageListener != null)
+ {
+ throw new javax.jms.IllegalStateException("A listener has already
been set.");
+ }
+ }
+
+ private void preApplicationProcessing(QpidMessage message)throws Exception
+ {
+ getSession().preProcessMessage(message);
+ // If the session is transacted we need to ack the message first
+ // This is because a message is associated with its tx only when acked
+ if (getSession().getTransacted())
+ {
+ getSession().acknowledgeMessage(message);
+ }
+ message.afterMessageReceive();
+ }
+
+ private boolean checkPreConditions(QpidMessage message)throws QpidException
+ {
+ boolean messageOk = true;
+ if (_messageSelector != null)
+ {
+ messageOk = _filter.matches((Message) message);
+ if (!messageOk)
+ {
+ System.out.println("Message not OK, releasing");
+ releaseMessage(message);
+ }
+ }
+
+ System.out.println("messageOk " + messageOk);
+ System.out.println("_preAcquire " + _preAcquire);
+
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ System.out.println("filterMessage - trying to ack message");
+ acknowledgeMessage(message);
+ System.out.println("filterMessage - acked message");
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ System.out.println("filterMessage - trying to acquire message");
+ messageOk = acquireMessage(message);
+ System.out.println("filterMessage - acquired message");
+ }
+
+ return messageOk;
+ }
/**
* Release a message
@@ -680,10 +619,5 @@
getSession().getQpidSession().messageAcknowledge(ranges);
getSession().testQpidException();
}
- }
-
- public void notifyMessageReceived()
- {
- _messageReceived.set(true);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=569688&r1=569687&r2=569688&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Sat Aug 25 08:31:30 2007
@@ -1301,7 +1301,8 @@
{
try
{
- mc.onMessage(message.getMessage());
+ // mc.onMessage(message.getMessage());
+ mc.notifyMessageListener(message.getMessage());
}
catch (RuntimeException t)
{