Author: rhs
Date: Tue Jun  3 12:12:03 2008
New Revision: 662859

URL: http://svn.apache.org/viewvc?rev=662859&view=rev
Log:
QPID-901: honor the timely-reply flag and handle known-completed

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=662859&r1=662858&r2=662859&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Tue Jun  3 12:12:03 2008
@@ -72,7 +72,8 @@
     // incoming command count
     int commandsIn = 0;
     // completed incoming commands
-    private final RangeSet processed = new RangeSet();
+    private final Object processedLock = new Object();
+    private RangeSet processed = new RangeSet();
     private Range syncPoint = null;
 
     // outgoing command count
@@ -112,11 +113,6 @@
         return commandsIn++;
     }
 
-    public RangeSet getProcessed()
-    {
-        return processed;
-    }
-
     public void processed(Method command)
     {
         processed(command.getId());
@@ -138,7 +134,7 @@
         log.debug("%s processed(%s)", this, range);
 
         boolean flush;
-        synchronized (processed)
+        synchronized (processedLock)
         {
             processed.add(range);
             flush = syncPoint != null && processed.includes(syncPoint);
@@ -152,20 +148,40 @@
    public void flushProcessed()
     {
         RangeSet copy;
-        synchronized (processed)
+        synchronized (processedLock)
         {
             copy = processed.copy();
         }
         sessionCompleted(copy);
     }
 
+    void knownComplete(RangeSet kc)
+    {
+        synchronized (processedLock)
+        {
+            RangeSet newProcessed = new RangeSet();
+            OUTER: for (Range r : processed)
+            {
+                for (Range kr : kc)
+                {
+                    if (kr.includes(r))
+                    {
+                        continue OUTER;
+                    }
+                }
+                newProcessed.add(r);
+            }
+            this.processed = newProcessed;
+        }
+    }
+
     void syncPoint()
     {
         int id = getCommandsIn() - 1;
         log.debug("%s synced to %d", this, id);
         Range range = new Range(0, id - 1);
         boolean flush;
-        synchronized (processed)
+        synchronized (processedLock)
         {
             flush = processed.includes(range);
             if (!flush)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java?rev=662859&r1=662858&r2=662859&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
 Tue Jun  3 12:12:03 2008
@@ -74,6 +74,19 @@
                 ssn.complete(range.getLower(), range.getUpper());
             }
         }
+        if (cmp.getTimelyReply())
+        {
+            ssn.sessionKnownCompleted(ranges);
+        }
+    }
+
+    @Override public void sessionKnownCompleted(Session ssn, 
SessionKnownCompleted kcmp)
+    {
+        RangeSet kc = kcmp.getCommands();
+        if (kc != null)
+        {
+            ssn.knownComplete(kc);
+        }
     }
 
     @Override public void sessionFlush(Session ssn, SessionFlush flush)


Reply via email to