Author: arnaudsimon
Date: Tue Sep 18 11:33:39 2007
New Revision: 577011
URL: http://svn.apache.org/viewvc?rev=577011&view=rev
Log:
added message selector evaluation (for 0_10 only)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Sep 18 11:33:39 2007
@@ -1552,7 +1552,7 @@
public abstract BasicMessageConsumer createMessageConsumer(final
AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean
exclusive, String selector, final FieldTable rawSelector,
- final boolean noConsume, final boolean autoClose);
+ final boolean noConsume, final boolean autoClose) throws
JMSException;
/**
* Called by the MessageConsumer when closing, to deregister the consumer
from the map from consumerTag to consumer
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Sep 18 11:33:39 2007
@@ -79,6 +79,7 @@
* @param messageFactoryRegistry The message factory factory for the
session.
* @param defaultPrefetchHighMark The maximum number of messages to
prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at
which to resume the session.
+ * @param qpidConnection The qpid connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection,
AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry,
@@ -107,6 +108,7 @@
* @param acknowledgeMode The acknoledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched
before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which
to resume the session.
+ * @param qpidConnection The connection
*/
AMQSession_0_10(org.apache.qpidity.client.Connection qpidConnection,
AMQConnection con, int channelId,
boolean transacted, int acknowledgeMode, int
defaultPrefetchHigh, int defaultPrefetchLow)
@@ -276,7 +278,7 @@
final int prefetchLow,
final boolean noLocal,
final boolean exclusive,
String messageSelector,
final FieldTable ft,
final boolean noConsume,
- final boolean autoClose)
+ final boolean autoClose)
throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
@@ -304,8 +306,18 @@
boolean nowait, String messageSelector,
AMQShortString tag)
throws AMQException, FailoverException
{
+ boolean preAcquire;
+ try
+ {
+ preAcquire = consumer.getMessageSelector() == null ||
!(consumer.getDestination() instanceof AMQQueue);
+ }
+ catch (JMSException e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when
registering consumer", e);
+ }
getQpidSession().messageSubscribe(queueName.toString(),
tag.toString(), Session.TRANSFER_CONFIRM_MODE_REQUIRED,
-
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ preAcquire ?
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE :
+
Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
new
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ?
Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ?
Option.EXCLUSIVE : Option.NO_OPTION);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
Tue Sep 18 11:33:39 2007
@@ -327,7 +327,7 @@
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination
destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean
exclusive, String messageSelector, final FieldTable ft,
- final boolean noConsume, final boolean autoClose)
+ final boolean noConsume, final boolean autoClose) throws
JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=577011&r1=577010&r2=577011&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Sep 18 11:33:39 2007
@@ -27,10 +27,15 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.ExchangeQueryResult;
import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.filter.MessageFilter;
+import org.apache.qpidity.filter.JMSSelectorFilter;
import javax.jms.JMSException;
import java.io.IOException;
@@ -47,51 +52,105 @@
*/
protected final Logger _logger = LoggerFactory.getLogger(getClass());
+ /**
+ * The message selector filter associated with this consumer message
selector
+ */
+ private MessageFilter _filter = null;
+
+ /**
+ * The underlying QpidSession
+ */
+ private AMQSession_0_10 _0_10session;
+
+ /**
+ * Indicates whether this consumer receives pre-acquired messages
+ */
+ private boolean _preAcquire = true;
+
+ //--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection
connection, AMQDestination destination,
String messageSelector, boolean
noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler
protocolHandler,
FieldTable rawSelectorFieldTable, int
prefetchHigh, int prefetchLow,
boolean exclusive, int
acknowledgeMode, boolean noConsume, boolean autoClose)
+ throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal,
messageFactory, session, protocolHandler,
rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, noConsume, autoClose);
-
+ _0_10session = (AMQSession_0_10) session;
+ if (messageSelector != null)
+ {
+ try
+ {
+ _filter = new JMSSelectorFilter(messageSelector);
+ }
+ catch (QpidException e)
+ {
+ throw new JMSException("cannot create consumer because of
selector issue");
+ }
+ if (destination instanceof AMQQueue)
+ {
+ _preAcquire = false;
+ }
+ }
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
public void onMessage(Message message)
{
- int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
- AMQShortString exchange = new
AMQShortString(message.getDeliveryProperties().getExchange());
- AMQShortString routingKey = new
AMQShortString(message.getDeliveryProperties().getRoutingKey());
- boolean redelivered = message.getDeliveryProperties().getRedelivered();
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId,
consumerTag, exchange, routingKey, redelivered);
+ boolean messageOk = false;
try
{
- newMessage.receiveBody(message.readData());
+ messageOk = checkPreConditions(message);
}
- catch (IOException e)
+ catch (AMQException e)
{
- getSession().getAMQConnection().exceptionReceived(e);
+ try
+ {
+ getSession().getAMQConnection().getExceptionListener()
+ .onException(new JMSAMQException("Error when receiving
message", e));
+ }
+ catch (JMSException e1)
+ {
+ // we should silently log thie exception as it only hanppens
when the connection is closed
+ _logger.error("Exception when receiving message", e1);
+ }
}
- Struct[] headers = {message.getMessageProperties(),
message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the
exchange info
- if (!
message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
- {
- Future<ExchangeQueryResult> future = ((AMQSession_0_10)
getSession()).getQpidSession()
-
.exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
- //
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" +
message.getMessageProperties().getReplyTo()
- .getExchangeName() + "/" +
message.getMessageProperties().getReplyTo()
- .getRoutingKey() + "/" +
message.getMessageProperties().getReplyTo().getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
+ if (messageOk)
+ {
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new
AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new
AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered =
message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId,
consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(),
message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the
exchange info
+ if
(!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10)
getSession()).getQpidSession()
+
.exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ //
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" +
message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" +
message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" +
message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
+ }
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
}
- newMessage.setContentHeader(headers);
- getSession().messageReceived(newMessage);
+ // else ignore this message
}
//----- overwritten methods
@@ -130,6 +189,128 @@
{
return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
messageFrame.isRedelivered(),
messageFrame.getExchange(),
messageFrame.getRoutingKey(),
- messageFrame.getContentHeader(),
messageFrame.getBodies(), messageFrame.getReplyToURL());
+ messageFrame.getContentHeader(),
messageFrame.getBodies(),
+ messageFrame.getReplyToURL());
+ }
+
+ // private methods
+ /**
+ * Check whether a message can be delivered to this consumer.
+ *
+ * @param message The message to be checked.
+ * @return true if the message matches the selector and can be acquired,
false otherwise.
+ * @throws AMQException If the message preConditions cannot be checked due
to some internal error.
+ */
+ private boolean checkPreConditions(Message message) throws AMQException
+ {
+ boolean messageOk = true;
+ // TODO Use a tag for fiding out if message filtering is done here or
by the broker.
+ try
+ {
+ if (getMessageSelector() != null)
+ {
+ messageOk = _filter.matches((javax.jms.Message) message);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error when
evaluating message selector", e);
+ }
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("messageOk " + messageOk);
+ _logger.debug("_preAcquire " + _preAcquire);
+ }
+ if (!messageOk && _preAcquire)
+ {
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
+ }
+ else if (!messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
+ if (!_preAcquire && messageOk)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to acquire message");
+ }
+ messageOk = acquireMessage(message);
+ }
+ return messageOk;
+ }
+
+ /**
+ * Acknowledge a message
+ *
+ * @param message The message to be acknowledged
+ * @throws AMQException If the message cannot be acquired due to some
internal error.
+ */
+ private void acknowledgeMessage(Message message) throws AMQException
+ {
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageAcknowledge(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Release a message
+ *
+ * @param message The message to be released
+ * @throws AMQException If the message cannot be released due to some
internal error.
+ */
+ private void releaseMessage(Message message) throws AMQException
+ {
+ if (_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+ _0_10session.getQpidSession().messageRelease(ranges);
+ _0_10session.getCurrentException();
+ }
+ }
+
+ /**
+ * Acquire a message
+ *
+ * @param message The message to be acquired
+ * @return true if the message has been acquired, false otherwise.
+ * @throws AMQException If the message cannot be acquired due to some
internal error.
+ */
+ private boolean acquireMessage(Message message) throws AMQException
+ {
+ boolean result = false;
+ if (!_preAcquire)
+ {
+ RangeSet ranges = new RangeSet();
+ ranges.add(message.getMessageTransferId());
+
+ _0_10session.getQpidSession()
+ .messageAcquire(ranges,
org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ _0_10session.getQpidSession().sync();
+ RangeSet acquired =
_0_10session.getQpidSession().getAccquiredMessages();
+ if (acquired.size() > 0)
+ {
+ result = true;
+ }
+ _0_10session.getCurrentException();
+ }
+ return result;
}
}