Author: arnaudsimon
Date: Fri Apr  4 05:02:52 2008
New Revision: 644688

URL: http://svn.apache.org/viewvc?rev=644688&view=rev
Log:
QPID-796: Added ability to enable/disable message prefetching. Prefetching is 
controlled through the property max_prefetch, it is turned off when 
max_prefetch =0. (this is 0.10 code path change)

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
Modified:
    incubator/qpid/trunk/qpid/java/010ExcludeList
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/cpp.async.testprofile
    incubator/qpid/trunk/qpid/java/cpp.sync.testprofile
    incubator/qpid/trunk/qpid/java/module.xml

Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Fri Apr  4 05:02:52 2008
@@ -5,5 +5,6 @@
 org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
 org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
 org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
-// this test needs durable subscribe states to be persisted
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
\ No newline at end of file
+// those tests need durable subscribe states to be persisted
+org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
+org.apache.qpid.test.unit.ct.DurableSubscriberTests#testDurSubRestoresMessageSelector
 
\ No newline at end of file

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Fri Apr  4 05:02:52 2008
@@ -27,7 +27,6 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpidity.nclient.Session;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.ErrorCode;
@@ -45,7 +44,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
-import java.util.Iterator;
 
 /**
  * This is a 0.10 Session
@@ -58,10 +56,6 @@
      */
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQSession_0_10.class);
 
-    /**
-     * The maximum number of pre-fetched messages per destination
-     */
-    public static long MAX_PREFETCH = 1000;
 
     /**
      * The underlying QpidSession
@@ -101,8 +95,6 @@
         super(con, channelId, transacted, acknowledgeMode, 
messageFactoryRegistry, defaultPrefetchHighMark,
               defaultPrefetchLowMark);
 
-        MAX_PREFETCH = 
Integer.parseInt(System.getProperty("max_prefetch","1000"));
-
         // create the qpid session with an expiry  <= 0 so that the session 
does not expire
         _qpidSession = qpidConnection.createSession(0);
         // set the exception listnere for this session
@@ -404,18 +396,23 @@
                                           new 
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
                                           consumer.isNoLocal() ? 
Option.NO_LOCAL : Option.NO_OPTION,
                                           consumer.isExclusive() ? 
Option.EXCLUSIVE : Option.NO_OPTION);
-
-        getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_MODE_WINDOW);
+        if (ClientProperties.MAX_PREFETCH == 0)
+        {
+            
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_MODE_CREDIT);
+        }
+        else
+        {
+            
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_MODE_WINDOW);
+        }
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         // only if not immediat prefetch
-        if(consumer.isStrated() || _immediatePrefetch)
+        if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() || 
_immediatePrefetch))
         {
             // set the flow
             getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
                     
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                    AMQSession_0_10.MAX_PREFETCH);
-
+                    ClientProperties.MAX_PREFETCH);
         }
         getQpidSession().sync();
         getCurrentException();
@@ -517,17 +514,27 @@
                 //only set if msg list is null
                 try
                 {
-                 //   if (consumer.getMessageListener() != null)
-                 //   {
-                        
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                                                     MAX_PREFETCH);
-                  //  }
+                    if (ClientProperties.MAX_PREFETCH == 0)
+                    {
+                        if (consumer.getMessageListener() != null)
+                        {
+                            
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+                                    Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+                        }
+                    }
+                    else
+                    {
+                        getQpidSession()
+                                
.messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_MESSAGE,
+                                        ClientProperties.MAX_PREFETCH);
+                    }
                     getQpidSession()
-                    .messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+                            .messageFlow(consumer.getConsumerTag().toString(), 
Session.MESSAGE_FLOW_UNIT_BYTE,
+                                    0xFFFFFFFF);
                 }
-                catch(Exception e)
+                catch (Exception e)
                 {
-                    throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error 
while trying to get the listener",e);
+                    throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error 
while trying to get the listener", e);
                 }
             }
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Fri Apr  4 05:02:52 2008
@@ -27,7 +27,6 @@
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Session;
 import org.apache.qpidity.transport.*;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.filter.MessageFilter;
@@ -39,6 +38,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This is a 0.10 message consumer.
@@ -72,6 +72,11 @@
      */
     private boolean _isStarted = false;
 
+    /**
+     * Specify whether this consumer is performing a sync receive
+     */
+    private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
                                         String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
@@ -136,6 +141,11 @@
         }
         if (messageOk)
         {
+            if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+            {
+                
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                        
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+            }
             _logger.debug("messageOk, trying to notify");
             super.notifyMessage(jmsMessage, channelId);
         }
@@ -307,23 +317,33 @@
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
         }
-        if (!messageOk && _preAcquire)
+        if (!messageOk)
         {
-            // this is the case for topics
-            // We need to ack this message
-            if (_logger.isDebugEnabled())
+            if (_preAcquire)
             {
-                _logger.debug("filterMessage - trying to ack message");
+                // this is the case for topics
+                // We need to ack this message
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("filterMessage - trying to ack message");
+                }
+                acknowledgeMessage(message);
             }
-            acknowledgeMessage(message);
-        }
-        else if (!messageOk)
-        {
-            if (_logger.isDebugEnabled())
+            else
             {
-                _logger.debug("Message not OK, releasing");
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Message not OK, releasing");
+                }
+                releaseMessage(message);
+            }
+            // if we are syncrhonously waiting for a message
+            // and messages are not prefetched we then need to request another 
one
+            if(ClientProperties.MAX_PREFETCH == 0)
+            {
+               
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
             }
-            releaseMessage(message);
         }
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
@@ -429,6 +449,11 @@
     public void setMessageListener(final MessageListener messageListener) 
throws JMSException
     {
         super.setMessageListener(messageListener);
+        if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+        {
+            
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+        }
         if (messageListener != null && !_synchronousQueue.isEmpty())
         {
             Iterator messages=_synchronousQueue.iterator();
@@ -449,11 +474,44 @@
     public void start()
     {
         _isStarted = true;
+        if (_syncReceive.get())
+        {
+            
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+        }
     }
 
     public void stop()
     {
         _isStarted = false;
+    }
+
+    /**
+     * When messages are not prefetched we need to request a message from the
+     * broker.
+     * Note that if the timeout is too short a message may be queued in 
_synchronousQueue until
+     * this consumer closes or request it.
+     * @param l
+     * @return
+     * @throws InterruptedException
+     */
+    public Object getMessageFromQueue(long l) throws InterruptedException
+    {
+        if (isStrated() && ClientProperties.MAX_PREFETCH == 0 && 
_synchronousQueue.isEmpty())
+        {
+            
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                    
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+        }
+        if (ClientProperties.MAX_PREFETCH == 0)
+        {
+            _syncReceive.set(true);
+        }
+        Object o = super.getMessageFromQueue(l);
+        if (ClientProperties.MAX_PREFETCH == 0)
+        {
+            _syncReceive.set(false);
+        }
+        return o;
     }
 
 }

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java?rev=644688&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
 Fri Apr  4 05:02:52 2008
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client;
+
+/**
+ * This class centralized the Qpid client properties.
+ */
+public class ClientProperties
+{
+
+    /**
+     * The maximum number of pre-fetched messages per destination
+     */
+    public static long MAX_PREFETCH = 
Long.valueOf(System.getProperties().getProperty("max_prefetch", "1000"));
+
+    /**
+     * When true a sync command is sent after every persistent messages. 
+     */
+    public static boolean FULLY_SYNC = Boolean.getBoolean("fully_sync");
+}

Modified: incubator/qpid/trunk/qpid/java/cpp.async.testprofile
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cpp.async.testprofile?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/cpp.async.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/cpp.async.testprofile Fri Apr  4 05:02:52 
2008
@@ -2,6 +2,7 @@
 broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t 
--log-output ${build.data}/broker.log --load-module 
${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --store-async yes
 broker.clean=${build.data}
 java.naming.provider.url=${project.root}/test-provider.properties
+max_prefetch=1000
 test.excludes=true
 test.excludesfile=${project.root}/010ExcludeList
 log=info

Modified: incubator/qpid/trunk/qpid/java/cpp.sync.testprofile
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cpp.sync.testprofile?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/cpp.sync.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/cpp.sync.testprofile Fri Apr  4 05:02:52 2008
@@ -3,6 +3,7 @@
 broker.clean=${build.data}
 java.naming.provider.url=${project.root}/test-provider.properties
 test.excludes=true
+max_prefetch=1000
 test.excludesfile=${project.root}/010ExcludeList
 log=info
 amqj.logging.level=$log

Modified: incubator/qpid/trunk/qpid/java/module.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/module.xml?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/module.xml (original)
+++ incubator/qpid/trunk/qpid/java/module.xml Fri Apr  4 05:02:52 2008
@@ -185,6 +185,7 @@
       <sysproperty key="broker.version" value="${broker.version}"/>
       <sysproperty key="test.excludes" value="${test.excludes}"/>
       <sysproperty key="test.excludesfile" value="${test.excludesfile}"/>
+      <sysproperty key="max_prefetch" value ="${max_prefetch}"/>  
 
       <formatter type="plain"/>
       <formatter type="xml"/>


Reply via email to