Author: rhs
Date: Fri Jun 13 11:38:25 2008
New Revision: 667615
URL: http://svn.apache.org/viewvc?rev=667615&view=rev
Log:
QPID-901: request known-completed every 64K incoming commands, fixed handling
of incoming known-completed to clear out processed set
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java?rev=667615&r1=667614&r2=667615&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Range.java
Fri Jun 13 11:38:25 2008
@@ -20,6 +20,9 @@
*/
package org.apache.qpidity.transport;
+import java.util.ArrayList;
+import java.util.List;
+
import static org.apache.qpid.util.Serial.*;
@@ -77,6 +80,42 @@
return new Range(min(lower, range.lower), max(upper, range.upper));
}
+ public List<Range> subtract(Range range)
+ {
+ List<Range> result = new ArrayList<Range>();
+
+ if (includes(range.lower) && le(lower, range.lower - 1))
+ {
+ result.add(new Range(lower, range.lower - 1));
+ }
+
+ if (includes(range.upper) && le(range.upper + 1, upper))
+ {
+ result.add(new Range(range.upper + 1, upper));
+ }
+
+ if (result.isEmpty() && !range.includes(this))
+ {
+ result.add(this);
+ }
+
+ return result;
+ }
+
+ public Range intersect(Range range)
+ {
+ int l = max(lower, range.lower);
+ int r = min(upper, range.upper);
+ if (gt(l, r))
+ {
+ return null;
+ }
+ else
+ {
+ return new Range(l, r);
+ }
+ }
+
public String toString()
{
return "[" + lower + ", " + upper + "]";
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=667615&r1=667614&r2=667615&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Fri Jun 13 11:38:25 2008
@@ -128,6 +128,10 @@
int id = nextCommandId();
cmd.setId(id);
log.debug("ID: [%s] %s", this.channel, id);
+ if ((id % 65536) == 0)
+ {
+ flushProcessed(true);
+ }
}
public void processed(Method command)
@@ -164,12 +168,17 @@
public void flushProcessed()
{
+ flushProcessed(false);
+ }
+
+ private void flushProcessed(boolean timely_reply)
+ {
RangeSet copy;
synchronized (processedLock)
{
copy = processed.copy();
}
- sessionCompleted(copy);
+ sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
}
void knownComplete(RangeSet kc)
@@ -177,16 +186,15 @@
synchronized (processedLock)
{
RangeSet newProcessed = new RangeSet();
- OUTER: for (Range r : processed)
+ for (Range pr : processed)
{
for (Range kr : kc)
{
- if (kr.includes(r))
+ for (Range r : pr.subtract(kr))
{
- continue OUTER;
+ newProcessed.add(r);
}
}
- newProcessed.add(r);
}
this.processed = newProcessed;
}