Author: arnaudsimon
Date: Thu Apr 17 05:44:35 2008
New Revision: 649070

URL: http://svn.apache.org/viewvc?rev=649070&view=rev
Log:
QPID-796 Made connection URL property + use session level method

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    incubator/qpid/trunk/qpid/java/default.testprofile

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Thu Apr 17 05:44:35 2008
@@ -152,6 +152,9 @@
 
     protected AMQConnectionDelegate _delegate;
 
+    // this connection maximum number of prefetched messages
+    private long _maxPrefetch;
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -231,6 +234,17 @@
      */
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration 
sslConfig) throws AMQException
     {
+        // set this connection maxPrefetch
+        if (connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH) != null)
+        {
+            _maxPrefetch = Long.parseLong( 
connectionURL.getOption(ConnectionURL.AMQ_MAXPREFETCH));
+        }
+        else
+        {
+            // use the defaul value set for all connections
+            _maxPrefetch = ClientProperties.MAX_PREFETCH;
+        }
+
         _failoverPolicy = new FailoverPolicy(connectionURL);
         if 
(_failoverPolicy.getCurrentBrokerDetails().getTransport().equals(BrokerDetails.VM))
         {
@@ -1178,5 +1192,15 @@
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);
+    }
+
+    /**
+     * Get the maximum number of messages that this connection can pre-fetch.
+     *
+     * @return The maximum number of messages that this connection can 
pre-fetch.
+     */
+    public long getMaxPrefetch()
+    {
+       return _maxPrefetch;
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Apr 17 05:44:35 2008
@@ -2339,6 +2339,17 @@
         }
     }
 
+    /**
+     * Indicates whether this session consumers pre-fetche messages
+     *
+     * @return true if this session consumers pre-fetche messages false 
otherwise
+     */
+    public boolean prefetch()
+    {
+        return getAMQConnection().getMaxPrefetch() > 0;
+    }
+
+
     public abstract void sendSuspendChannel(boolean suspend) throws 
AMQException, FailoverException;
 
     /** Responsible for decoding a message fragment and passing it to the 
appropriate message consumer. */

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Thu Apr 17 05:44:35 2008
@@ -406,7 +406,7 @@
                                           new 
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
                                           consumer.isExclusive() ? 
Option.EXCLUSIVE : Option.NO_OPTION);
 
-        if (ClientProperties.MAX_PREFETCH == 0)
+        if (! prefetch())
         {
             
getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), 
MessageFlowMode.CREDIT);
         }
@@ -417,12 +417,12 @@
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
MessageCreditUnit.BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         // only if not immediat prefetch
-        if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || 
_immediatePrefetch))
+        if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
         {
             // set the flow
             getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
                                          MessageCreditUnit.MESSAGE,
-                                         ClientProperties.MAX_PREFETCH);
+                                         getAMQConnection().getMaxPrefetch());
         }
         getQpidSession().sync();
         getCurrentException();
@@ -531,7 +531,7 @@
                 //only set if msg list is null
                 try
                 {
-                    if (ClientProperties.MAX_PREFETCH == 0)
+                    if (! prefetch())
                     {
                         if (consumer.getMessageListener() != null)
                         {
@@ -543,7 +543,7 @@
                     {
                         getQpidSession()
                             .messageFlow(consumer.getConsumerTag().toString(), 
MessageCreditUnit.MESSAGE,
-                                         ClientProperties.MAX_PREFETCH);
+                                         getAMQConnection().getMaxPrefetch());
                     }
                     getQpidSession()
                         .messageFlow(consumer.getConsumerTag().toString(), 
MessageCreditUnit.BYTE, 0xFFFFFFFF);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Thu Apr 17 05:44:35 2008
@@ -141,7 +141,7 @@
         }
         if (messageOk)
         {
-            if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+            if (isMessageListenerSet() && ! getSession().prefetch())
             {
                 
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                           
MessageCreditUnit.MESSAGE, 1);
@@ -330,7 +330,7 @@
             }
             // if we are syncrhonously waiting for a message
             // and messages are not prefetched we then need to request another 
one
-            if(ClientProperties.MAX_PREFETCH == 0)
+            if(! getSession().prefetch())
             {
                
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                          
MessageCreditUnit.MESSAGE, 1);
@@ -422,7 +422,7 @@
     public void setMessageListener(final MessageListener messageListener) 
throws JMSException
     {
         super.setMessageListener(messageListener);
-        if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+        if (messageListener != null && ! getSession().prefetch())
         {
             
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                       
MessageCreditUnit.MESSAGE, 1);
@@ -470,17 +470,17 @@
      */
     public Object getMessageFromQueue(long l) throws InterruptedException
     {
-        if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && 
_synchronousQueue.isEmpty())
+        if (isStrated() && ! getSession().prefetch() && 
_synchronousQueue.isEmpty())
         {
             
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
                                                       
MessageCreditUnit.MESSAGE, 1);
         }
-        if (ClientProperties.MAX_PREFETCH == 0)
+        if (! getSession().prefetch())
         {
             _syncReceive.set(true);
         }
         Object o = super.getMessageFromQueue(l);
-        if (ClientProperties.MAX_PREFETCH == 0)
+        if (! getSession().prefetch())
         {
             _syncReceive.set(false);
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
 Thu Apr 17 05:44:35 2008
@@ -32,6 +32,7 @@
   */
 public interface ConnectionURL
 {
+    public static final String AMQ_MAXPREFETCH = "maxprefetch";
     public static final String AMQ_PROTOCOL = "amqp";
     public static final String OPTIONS_BROKERLIST = "brokerlist";
     public static final String OPTIONS_FAILOVER = "failover";

Modified: incubator/qpid/trunk/qpid/java/default.testprofile
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/default.testprofile?rev=649070&r1=649069&r2=649070&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/default.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/default.testprofile Thu Apr 17 05:44:35 2008
@@ -5,6 +5,7 @@
 test.excludes=true
 test.excludesfile=${project.root}/08ExcludeList
 log=info
+max_prefetch=1000
 amqj.logging.level=$log
 root.logging.level=$log
 log4j.configuration=file://${project.root}/log4j-test.xml


Reply via email to