Author: arnaudsimon
Date: Wed Jun  4 07:30:15 2008
New Revision: 663124

URL: http://svn.apache.org/viewvc?rev=663124&view=rev
Log:
QPID-1120: Changed addDeliveredMessage and commit so session.completed is sent 
before credits dry up

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=663124&r1=663123&r2=663124&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Jun  4 07:30:15 2008
@@ -1919,7 +1919,7 @@
         }
     }
 
-    private void checkTransacted() throws JMSException
+    protected void checkTransacted() throws JMSException
     {
         if (!getTransacted())
         {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=663124&r1=663123&r2=663124&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Jun  4 07:30:15 2008
@@ -43,7 +43,6 @@
 import javax.jms.*;
 import javax.jms.IllegalStateException;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.Map;
@@ -73,6 +72,12 @@
 
     // a ref on the qpidity connection
     protected org.apache.qpidity.nclient.Connection _qpidConnection;
+
+    /**
+     * USed to store the range of in tx messages
+     */
+    private RangeSet _txRangeSet = new RangeSet();
+    private int _txSize = 0;
     //--- constructors
 
     /**
@@ -276,19 +281,9 @@
         {
             _dispatcher.rollback();
         }
-
-        RangeSet ranges = new RangeSet();
-        while (true)
-        {
-            Long tag = _deliveredMessageTags.poll();
-            if (tag == null)
-            {
-                break;
-            }
-
-            ranges.add((int) (long) tag);
-        }
-        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+        getQpidSession().messageRelease(_txRangeSet, Option.SET_REDELIVERED);
+        _txRangeSet.clear();
+        _txSize = 0;
     }
 
     /**
@@ -743,4 +738,46 @@
         return 
getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
     }
 
+
+    /**
+     * Store non committed messages for this session
+     * With 0.10 messages are consumed with window mode, we must send a 
completion
+     * before the window size is reached so credits don't dry up. 
+     * @param id
+     */
+    @Override protected void addDeliveredMessage(long id)
+    {
+        _txRangeSet.add((int) id);
+        _txSize++;
+        // this is a heuristic, we may want to have that configurable 
+        if( _txSize > _connection.getMaxPrefetch() / 2 )
+        {
+           // send completed so consumer credits don't dry up
+           getQpidSession().messageAcknowledge(_txRangeSet, false);
+           _txSize = 0;
+        }
+    }
+
+    @Override public void commit() throws JMSException
+    {
+        checkTransacted();
+        try
+        {
+            if( _txSize > 0 )
+            {
+                getQpidSession().messageAcknowledge(_txRangeSet, true);
+                _txRangeSet.clear();
+                _txSize = 0;
+            }
+            sendCommit();
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Failed to commit: " + e.getMessage(), 
e);
+        }
+        catch (FailoverException e)
+        {
+            throw new JMSAMQException("Fail-over interrupted commit. Status of 
the commit is uncertain.", e);
+        }
+    }
 }


Reply via email to