Author: arnaudsimon
Date: Fri Apr 4 05:02:52 2008
New Revision: 644688
URL: http://svn.apache.org/viewvc?rev=644688&view=rev
Log:
QPID-796: Added ability to enable/disable message prefetching. Prefetching is
controlled through the property max_prefetch, it is turned off when
max_prefetch =0. (this is 0.10 code path change)
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
Modified:
incubator/qpid/trunk/qpid/java/010ExcludeList
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/cpp.async.testprofile
incubator/qpid/trunk/qpid/java/cpp.sync.testprofile
incubator/qpid/trunk/qpid/java/module.xml
Modified: incubator/qpid/trunk/qpid/java/010ExcludeList
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/010ExcludeList?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/010ExcludeList (original)
+++ incubator/qpid/trunk/qpid/java/010ExcludeList Fri Apr 4 05:02:52 2008
@@ -5,5 +5,6 @@
org.apache.qpid.client.MessageListenerMultiConsumerTest#testRecieveBoth
org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testMigrateDurableSubscriber
-// this test needs durable subscribe states to be persisted
-org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
\ No newline at end of file
+// those tests need durable subscribe states to be persisted
+org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testDurSubRestoredAfterNonPersistentMessageSent
+org.apache.qpid.test.unit.ct.DurableSubscriberTests#testDurSubRestoresMessageSelector
\ No newline at end of file
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Fri Apr 4 05:02:52 2008
@@ -27,7 +27,6 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
-import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -45,7 +44,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
-import java.util.Iterator;
/**
* This is a 0.10 Session
@@ -58,10 +56,6 @@
*/
private static final Logger _logger =
LoggerFactory.getLogger(AMQSession_0_10.class);
- /**
- * The maximum number of pre-fetched messages per destination
- */
- public static long MAX_PREFETCH = 1000;
/**
* The underlying QpidSession
@@ -101,8 +95,6 @@
super(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, defaultPrefetchHighMark,
defaultPrefetchLowMark);
- MAX_PREFETCH =
Integer.parseInt(System.getProperty("max_prefetch","1000"));
-
// create the qpid session with an expiry <= 0 so that the session
does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
@@ -404,18 +396,23 @@
new
MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
consumer.isNoLocal() ?
Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ?
Option.EXCLUSIVE : Option.NO_OPTION);
-
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_WINDOW);
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
+ }
+ else
+ {
+
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_WINDOW);
+ }
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
// only if not immediat prefetch
- if(consumer.isStrated() || _immediatePrefetch)
+ if(ClientProperties.MAX_PREFETCH > 0 && (consumer.isStrated() ||
_immediatePrefetch))
{
// set the flow
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
-
+ ClientProperties.MAX_PREFETCH);
}
getQpidSession().sync();
getCurrentException();
@@ -517,17 +514,27 @@
//only set if msg list is null
try
{
- // if (consumer.getMessageListener() != null)
- // {
-
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_MESSAGE,
- MAX_PREFETCH);
- // }
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ if (consumer.getMessageListener() != null)
+ {
+
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+ Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
+ }
+ else
+ {
+ getQpidSession()
+
.messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ ClientProperties.MAX_PREFETCH);
+ }
getQpidSession()
- .messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
+ .messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
}
- catch(Exception e)
+ catch (Exception e)
{
- throw new AMQException(AMQConstant.INTERNAL_ERROR,"Error
while trying to get the listener",e);
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Error
while trying to get the listener", e);
}
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Fri Apr 4 05:02:52 2008
@@ -27,7 +27,6 @@
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.*;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
@@ -39,6 +38,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* This is a 0.10 message consumer.
@@ -72,6 +72,11 @@
*/
private boolean _isStarted = false;
+ /**
+ * Specify whether this consumer is performing a sync receive
+ */
+ private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection
connection, AMQDestination destination,
String messageSelector, boolean
noLocal, MessageFactoryRegistry messageFactory,
@@ -136,6 +141,11 @@
}
if (messageOk)
{
+ if (isMessageListenerSet() && ClientProperties.MAX_PREFETCH == 0)
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage, channelId);
}
@@ -307,23 +317,33 @@
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
- if (!messageOk && _preAcquire)
+ if (!messageOk)
{
- // this is the case for topics
- // We need to ack this message
- if (_logger.isDebugEnabled())
+ if (_preAcquire)
{
- _logger.debug("filterMessage - trying to ack message");
+ // this is the case for topics
+ // We need to ack this message
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("filterMessage - trying to ack message");
+ }
+ acknowledgeMessage(message);
}
- acknowledgeMessage(message);
- }
- else if (!messageOk)
- {
- if (_logger.isDebugEnabled())
+ else
{
- _logger.debug("Message not OK, releasing");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Message not OK, releasing");
+ }
+ releaseMessage(message);
+ }
+ // if we are syncrhonously waiting for a message
+ // and messages are not prefetched we then need to request another
one
+ if(ClientProperties.MAX_PREFETCH == 0)
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
}
- releaseMessage(message);
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
@@ -429,6 +449,11 @@
public void setMessageListener(final MessageListener messageListener)
throws JMSException
{
super.setMessageListener(messageListener);
+ if (messageListener != null && ClientProperties.MAX_PREFETCH == 0)
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
if (messageListener != null && !_synchronousQueue.isEmpty())
{
Iterator messages=_synchronousQueue.iterator();
@@ -449,11 +474,44 @@
public void start()
{
_isStarted = true;
+ if (_syncReceive.get())
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
}
public void stop()
{
_isStarted = false;
+ }
+
+ /**
+ * When messages are not prefetched we need to request a message from the
+ * broker.
+ * Note that if the timeout is too short a message may be queued in
_synchronousQueue until
+ * this consumer closes or request it.
+ * @param l
+ * @return
+ * @throws InterruptedException
+ */
+ public Object getMessageFromQueue(long l) throws InterruptedException
+ {
+ if (isStrated() && ClientProperties.MAX_PREFETCH == 0 &&
_synchronousQueue.isEmpty())
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ }
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ _syncReceive.set(true);
+ }
+ Object o = super.getMessageFromQueue(l);
+ if (ClientProperties.MAX_PREFETCH == 0)
+ {
+ _syncReceive.set(false);
+ }
+ return o;
}
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java?rev=644688&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ClientProperties.java
Fri Apr 4 05:02:52 2008
@@ -0,0 +1,36 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client;
+
+/**
+ * This class centralized the Qpid client properties.
+ */
+public class ClientProperties
+{
+
+ /**
+ * The maximum number of pre-fetched messages per destination
+ */
+ public static long MAX_PREFETCH =
Long.valueOf(System.getProperties().getProperty("max_prefetch", "1000"));
+
+ /**
+ * When true a sync command is sent after every persistent messages.
+ */
+ public static boolean FULLY_SYNC = Boolean.getBoolean("fully_sync");
+}
Modified: incubator/qpid/trunk/qpid/java/cpp.async.testprofile
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cpp.async.testprofile?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/cpp.async.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/cpp.async.testprofile Fri Apr 4 05:02:52
2008
@@ -2,6 +2,7 @@
broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t
--log-output ${build.data}/broker.log --load-module
${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --store-async yes
broker.clean=${build.data}
java.naming.provider.url=${project.root}/test-provider.properties
+max_prefetch=1000
test.excludes=true
test.excludesfile=${project.root}/010ExcludeList
log=info
Modified: incubator/qpid/trunk/qpid/java/cpp.sync.testprofile
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cpp.sync.testprofile?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/cpp.sync.testprofile (original)
+++ incubator/qpid/trunk/qpid/java/cpp.sync.testprofile Fri Apr 4 05:02:52 2008
@@ -3,6 +3,7 @@
broker.clean=${build.data}
java.naming.provider.url=${project.root}/test-provider.properties
test.excludes=true
+max_prefetch=1000
test.excludesfile=${project.root}/010ExcludeList
log=info
amqj.logging.level=$log
Modified: incubator/qpid/trunk/qpid/java/module.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/module.xml?rev=644688&r1=644687&r2=644688&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/module.xml (original)
+++ incubator/qpid/trunk/qpid/java/module.xml Fri Apr 4 05:02:52 2008
@@ -185,6 +185,7 @@
<sysproperty key="broker.version" value="${broker.version}"/>
<sysproperty key="test.excludes" value="${test.excludes}"/>
<sysproperty key="test.excludesfile" value="${test.excludesfile}"/>
+ <sysproperty key="max_prefetch" value ="${max_prefetch}"/>
<formatter type="plain"/>
<formatter type="xml"/>