Author: rhs
Date: Fri Jun 13 11:38:25 2008
New Revision: 667615

URL: http://svn.apache.org/viewvc?rev=667615&view=rev
Log:
QPID-901: request known-completed every 64K incoming commands, fixed handling 
of incoming known-completed to clear out processed set

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java?rev=667615&r1=667614&r2=667615&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
 Fri Jun 13 11:38:25 2008
@@ -20,6 +20,9 @@
  */
 package org.apache.qpidity.transport;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.apache.qpid.util.Serial.*;
 
 
@@ -77,6 +80,42 @@
         return new Range(min(lower, range.lower), max(upper, range.upper));
     }
 
+    public List<Range> subtract(Range range)
+    {
+        List<Range> result = new ArrayList<Range>();
+
+        if (includes(range.lower) && le(lower, range.lower - 1))
+        {
+            result.add(new Range(lower, range.lower - 1));
+        }
+
+        if (includes(range.upper) && le(range.upper + 1, upper))
+        {
+            result.add(new Range(range.upper + 1, upper));
+        }
+
+        if (result.isEmpty() && !range.includes(this))
+        {
+            result.add(this);
+        }
+
+        return result;
+    }
+
+    public Range intersect(Range range)
+    {
+        int l = max(lower, range.lower);
+        int r = min(upper, range.upper);
+        if (gt(l, r))
+        {
+            return null;
+        }
+        else
+        {
+            return new Range(l, r);
+        }
+    }
+
     public String toString()
     {
         return "[" + lower + ", " + upper + "]";

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=667615&r1=667614&r2=667615&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Fri Jun 13 11:38:25 2008
@@ -128,6 +128,10 @@
         int id = nextCommandId();
         cmd.setId(id);
         log.debug("ID: [%s] %s", this.channel, id);
+        if ((id % 65536) == 0)
+        {
+            flushProcessed(true);
+        }
     }
 
     public void processed(Method command)
@@ -164,12 +168,17 @@
 
     public void flushProcessed()
     {
+        flushProcessed(false);
+    }
+
+    private void flushProcessed(boolean timely_reply)
+    {
         RangeSet copy;
         synchronized (processedLock)
         {
             copy = processed.copy();
         }
-        sessionCompleted(copy);
+        sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
     }
 
     void knownComplete(RangeSet kc)
@@ -177,16 +186,15 @@
         synchronized (processedLock)
         {
             RangeSet newProcessed = new RangeSet();
-            OUTER: for (Range r : processed)
+            for (Range pr : processed)
             {
                 for (Range kr : kc)
                 {
-                    if (kr.includes(r))
+                    for (Range r : pr.subtract(kr))
                     {
-                        continue OUTER;
+                        newProcessed.add(r);
                     }
                 }
-                newProcessed.add(r);
             }
             this.processed = newProcessed;
         }


Reply via email to