Author: rhs
Date: Tue Jun 3 12:12:03 2008
New Revision: 662859
URL: http://svn.apache.org/viewvc?rev=662859&view=rev
Log:
QPID-901: honor the timely-reply flag and handle known-completed
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=662859&r1=662858&r2=662859&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Tue Jun 3 12:12:03 2008
@@ -72,7 +72,8 @@
// incoming command count
int commandsIn = 0;
// completed incoming commands
- private final RangeSet processed = new RangeSet();
+ private final Object processedLock = new Object();
+ private RangeSet processed = new RangeSet();
private Range syncPoint = null;
// outgoing command count
@@ -112,11 +113,6 @@
return commandsIn++;
}
- public RangeSet getProcessed()
- {
- return processed;
- }
-
public void processed(Method command)
{
processed(command.getId());
@@ -138,7 +134,7 @@
log.debug("%s processed(%s)", this, range);
boolean flush;
- synchronized (processed)
+ synchronized (processedLock)
{
processed.add(range);
flush = syncPoint != null && processed.includes(syncPoint);
@@ -152,20 +148,40 @@
public void flushProcessed()
{
RangeSet copy;
- synchronized (processed)
+ synchronized (processedLock)
{
copy = processed.copy();
}
sessionCompleted(copy);
}
+ void knownComplete(RangeSet kc)
+ {
+ synchronized (processedLock)
+ {
+ RangeSet newProcessed = new RangeSet();
+ OUTER: for (Range r : processed)
+ {
+ for (Range kr : kc)
+ {
+ if (kr.includes(r))
+ {
+ continue OUTER;
+ }
+ }
+ newProcessed.add(r);
+ }
+ this.processed = newProcessed;
+ }
+ }
+
void syncPoint()
{
int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
Range range = new Range(0, id - 1);
boolean flush;
- synchronized (processed)
+ synchronized (processedLock)
{
flush = processed.includes(range);
if (!flush)
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java?rev=662859&r1=662858&r2=662859&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/SessionDelegate.java
Tue Jun 3 12:12:03 2008
@@ -74,6 +74,19 @@
ssn.complete(range.getLower(), range.getUpper());
}
}
+ if (cmp.getTimelyReply())
+ {
+ ssn.sessionKnownCompleted(ranges);
+ }
+ }
+
+ @Override public void sessionKnownCompleted(Session ssn,
SessionKnownCompleted kcmp)
+ {
+ RangeSet kc = kcmp.getCommands();
+ if (kc != null)
+ {
+ ssn.knownComplete(kc);
+ }
}
@Override public void sessionFlush(Session ssn, SessionFlush flush)