Author: arnaudsimon
Date: Thu Apr 17 05:44:35 2008
New Revision: 649070
URL: http://svn.apache.org/viewvc?rev=649070&view=rev
Log:
QPID-796 Made connection URL property + use session level method
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
incubator/qpid/trunk/qpid/java/default.testprofile
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Thu Apr 17 05:44:35 2008
@@ -152,6 +152,9 @@
protected AMQConnectionDelegate _delegate;
+ // this connection maximum number of prefetched messages
+ private long _maxPrefetch;
+
/**
* @param broker brokerdetails
* @param username username
@@ -231,6 +234,17 @@
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration
sslConfig) throws AMQException
{
+ // set this connection maxPrefetch
+ if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+ {
+ _maxPrefetch = Long.parseLong(
connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+ }
+ else
+ {
+ // use the defaul value set for all connections
+ _maxPrefetch = ClientProperties.MAX_PREFETCH;
+ }
+
_failoverPolicy = new FailoverPolicy(connectionURL);
if
(_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
{
@@ -1178,5 +1192,15 @@
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
+ }
+
+ /**
+ * Get the maximum number of messages that this connection can pre-fetch.
+ *
+ * @return The maximum number of messages that this connection can
pre-fetch.
+ */
+ public long getMaxPrefetch()
+ {
+ return _maxPrefetch;
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Apr 17 05:44:35 2008
@@ -2339,6 +2339,17 @@
}
}
+ /**
+ * Indicates whether this session consumers pre-fetche messages
+ *
+ * @return true if this session consumers pre-fetche messages false
otherwise
+ */
+ public boolean prefetch()
+ {
+ return getAMQConnection().getMaxPrefetch() > 0;
+ }
+
+
public abstract void sendSuspendChannel(boolean suspend) throws
AMQException, FailoverException;
/** Responsible for decoding a message fragment and passing it to the
appropriate message consumer. */
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Thu Apr 17 05:44:35 2008
@@ -406,7 +406,7 @@
new
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isExclusive() ?
Option.EXCLUSIVE : Option.NO_OPTION);
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! prefetch())
{
getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(),
MessageFlowMode.CREDIT);
}
@@ -417,12 +417,12 @@
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
MessageCreditUnit.BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() ||
_immediatePrefetch))
+ if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
MessageCreditUnit.MESSAGE,
- ClientProperties.MAX_PREFETCH);
+ getAMQConnection().getMaxPrefetch());
}
getQpidSession().sync();
getCurrentException();
@@ -531,7 +531,7 @@
//only set if msg list is null
try
{
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! prefetch())
{
if (consumer.getMessageListener() != null)
{
@@ -543,7 +543,7 @@
{
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(),
MessageCreditUnit.MESSAGE,
- ClientProperties.MAX_PREFETCH);
+ getAMQConnection().getMaxPrefetch());
}
getQpidSession()
.messageFlow(consumer.getConsumerTag().toString(),
MessageCreditUnit.BYTE, 0xFFFFFFFF);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Thu Apr 17 05:44:35 2008
@@ -141,7 +141,7 @@
}
if (messageOk)
{
- if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+ if (isMessageListenerSet() && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -330,7 +330,7 @@
}
// if we are syncrhonously waiting for a message
// and messages are not prefetched we then need to request another
one
- if(ClientProperties.MAX_PREFETCH == 0)
+ if(! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -422,7 +422,7 @@
public void setMessageListener(final MessageListener messageListener)
throws JMSException
{
super.setMessageListener(messageListener);
- if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+ if (messageListener != null && ! getSession().prefetch())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
@@ -470,17 +470,17 @@
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (isStrated() && ClientProperties.MAX_PREFETCH == 0 &&
_synchronousQueue.isEmpty())
+ if (isStrated() && ! getSession().prefetch() &&
_synchronousQueue.isEmpty())
{
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
MessageCreditUnit.MESSAGE, 1);
}
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! getSession().prefetch())
{
_syncReceive.set(true);
}
Object o = super.getMessageFromQueue(l);
- if (ClientProperties.MAX_PREFETCH == 0)
+ if (! getSession().prefetch())
{
_syncReceive.set(false);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
Thu Apr 17 05:44:35 2008
@@ -32,6 +32,7 @@
*/
public interface ConnectionURL
{
+ public static final String AMQ_MAXPREFETCH = "maxprefetch";
public static final String AMQ_PROTOCOL = "amqp";
public static final String OPTIONS_BROKERLIST = "brokerlist";
public static final String OPTIONS_FAILOVER = "failover";
Modified: incubator/qpid/trunk/qpid/java/default.testprofile
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/default.testprofile?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/default.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/default.testprofile Thu Apr 17 05:44:35 2008
@@ -5,6 +5,7 @@
test.excludes=true
test.excludesfile=${project.root}/08ExcludeList
log=info
+max_prefetch=1000
amqj.logging.level=$log
root.logging.level=$log
log4j.configuration=file://${project.root}/log4j-test.xml