This isn't a safe way to fix this problem. Comments are inline.

[EMAIL PROTECTED] wrote:
Author: arnaudsimon
Date: Tue Jun  3 00:01:54 2008
New Revision: 662665

URL: http://svn.apache.org/viewvc?rev=662665&view=rev
Log:
QPID-1112: Added sessionCompleted support and changed onMessage for invoking 
sessionCompleted when all expected messages have been received.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Jun  3 00:01:54 2008
@@ -38,7 +38,6 @@
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.SortedSet;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -116,7 +115,7 @@
      * consumer whereas JMS defines this at the session level, hence why we 
associate it with the consumer in our
      * implementation.
      */
-    private final int _acknowledgeMode;
+    protected final int _acknowledgeMode;
/**
      * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Tue Jun  3 00:01:54 2008
@@ -77,6 +77,12 @@
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+ /**
+     * Used for no-ack mode so to send session completion command
+     */
+    private int _numberReceivedMessages = 0;
+    private int _firstMessageToComplete;
+
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
                                         String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
@@ -115,7 +121,6 @@
      * message listener or to the sync consumer queue.
      *
      * @param jmsMessage this message has already been processed so can't redo 
preDeliver
-     * @param channelId
      */
     @Override public void notifyMessage(AbstractJMSMessage jmsMessage)
     {
@@ -160,6 +165,25 @@
      */
     public void onMessage(Message message)
     {
+        /**
+         * For no-ack mode
+         */
+        if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
+        {
+            _numberReceivedMessages++;
+            if(_numberReceivedMessages == 1)
+            {
+                _firstMessageToComplete = message.getMessageTransferId();
+            }
+           if(_numberReceivedMessages >= 
getSession().getAMQConnection().getMaxPrefetch() )
+            {
+                RangeSet r = new RangeSet();
+                r.add(_firstMessageToComplete, message.getMessageTransferId());

The RangeSet computed here will include more than it should. You should keep a RangeSet member variable and add each id to it individually rather than trying to compute the range yourself.

+                _0_10session.getQpidSession().sessionCompleted(r, 
Option.TIMELY_REPLY);
+                _numberReceivedMessages = 0;

It's not safe to use sessionCompleted(...) directly in this way. You need to use Session.processed(...). See ClientSession.messageAcknowledge(...), for an example.

+            }
+        }
+
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         AMQShortString consumerTag = getConsumerTag();

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
 Tue Jun  3 00:01:54 2008
@@ -65,6 +65,16 @@
public void sessionDetach(byte[] name); + /**
+     * This control is sent by the receiver of commands, and handled by the 
sender
+     * of commands. It informs the sender of all commands completed by the 
receiver.
+     * This excludes commands known by the receiver to be considered complete 
at the sender.
+     *
+     * @param commands completed commands.
+     * @param options  [EMAIL PROTECTED] Option#TIMELY_REPLY} If set, the 
sender is no longer free to delay the known-completed reply.
+     */
+    public void sessionCompleted(RangeSet commands, Option... options);
+

This shouldn't be part of the public session API. (see above)

     public void sessionRequestTimeout(long expiry);
public byte[] getName();
@@ -103,6 +113,7 @@
     public void messageTransfer(String destination, Message msg, short 
confirmMode, short acquireMode)
             throws IOException;
+
     /**
* <p>This transfer streams a complete message using a single method. * It uses pull-semantics instead of doing a push.</p>

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=662665&r1=662664&r2=662665&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 Tue Jun  3 00:01:54 2008
@@ -105,6 +105,11 @@
         _currentDataSizeNotSynced = 0;
     }
+ public void sessionCompleted(RangeSet commands, Option ... options)
+    {
+        super.sessionCompleted(commands, options);
+    }
+

This shouldn't be part of the public session API. (see above)

     /* -------------------------
      * Data methods
      * ------------------------*/


Reply via email to