This isn't a safe way to fix this problem. Comments are inline.
[EMAIL PROTECTED] wrote:
Author: arnaudsimon
Date: Tue Jun 3 00:01:54 2008
New Revision: 662665
URL: http://svn.apache.org/viewvc?rev=662665&view=rev
Log:
QPID-1112: Added sessionCompleted support and changed onMessage for invoking
sessionCompleted when all expected messages have been received.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Tue Jun 3 00:01:54 2008
@@ -38,7 +38,6 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.SortedSet;
import java.util.ArrayList;
import java.util.Collections;
@@ -116,7 +115,7 @@
* consumer whereas JMS defines this at the session level, hence why we
associate it with the consumer in our
* implementation.
*/
- private final int _acknowledgeMode;
+ protected final int _acknowledgeMode;
/**
* Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Jun 3 00:01:54 2008
@@ -77,6 +77,12 @@
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ /**
+ * Used for no-ack mode so to send session completion command
+ */
+ private int _numberReceivedMessages = 0;
+ private int _firstMessageToComplete;
+
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection
connection, AMQDestination destination,
String messageSelector, boolean
noLocal, MessageFactoryRegistry messageFactory,
@@ -115,7 +121,6 @@
* message listener or to the sync consumer queue.
*
* @param jmsMessage this message has already been processed so can't redo
preDeliver
- * @param channelId
*/
@Override public void notifyMessage(AbstractJMSMessage jmsMessage)
{
@@ -160,6 +165,25 @@
*/
public void onMessage(Message message)
{
+ /**
+ * For no-ack mode
+ */
+ if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
+ {
+ _numberReceivedMessages++;
+ if(_numberReceivedMessages == 1)
+ {
+ _firstMessageToComplete = message.getMessageTransferId();
+ }
+ if(_numberReceivedMessages >=
getSession().getAMQConnection().getMaxPrefetch() )
+ {
+ RangeSet r = new RangeSet();
+ r.add(_firstMessageToComplete, message.getMessageTransferId());
The RangeSet computed here will include more than it should. You should
keep a RangeSet member variable and add each id to it individually
rather than trying to compute the range yourself.
+ _0_10session.getQpidSession().sessionCompleted(r,
Option.TIMELY_REPLY);
+ _numberReceivedMessages = 0;
It's not safe to use sessionCompleted(...) directly in this way. You
need to use Session.processed(...). See
ClientSession.messageAcknowledge(...), for an example.
+ }
+ }
+
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
AMQShortString consumerTag = getConsumerTag();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
Tue Jun 3 00:01:54 2008
@@ -65,6 +65,16 @@
public void sessionDetach(byte[] name);
+ /**
+ * This control is sent by the receiver of commands, and handled by the
sender
+ * of commands. It informs the sender of all commands completed by the
receiver.
+ * This excludes commands known by the receiver to be considered complete
at the sender.
+ *
+ * @param commands completed commands.
+ * @param options [EMAIL PROTECTED] Option#TIMELY_REPLY} If set, the
sender is no longer free to delay the known-completed reply.
+ */
+ public void sessionCompleted(RangeSet commands, Option... options);
+
This shouldn't be part of the public session API. (see above)
public void sessionRequestTimeout(long expiry);
public byte[] getName();
@@ -103,6 +113,7 @@
public void messageTransfer(String destination, Message msg, short
confirmMode, short acquireMode)
throws IOException;
+
/**
* <p>This transfer streams a complete message using a single method.
* It uses pull-semantics instead of doing a push.</p>
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Tue Jun 3 00:01:54 2008
@@ -105,6 +105,11 @@
_currentDataSizeNotSynced = 0;
}
+ public void sessionCompleted(RangeSet commands, Option ... options)
+ {
+ super.sessionCompleted(commands, options);
+ }
+
This shouldn't be part of the public session API. (see above)
/* -------------------------
* Data methods
* ------------------------*/