Author: rhs
Date: Fri Oct 5 08:58:39 2007
New Revision: 582320
URL: http://svn.apache.org/viewvc?rev=582320&view=rev
Log:
don't wait for results if the session is closed, also added missing
synchronization for flushProcessed(), and fixed a buggy log statement
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=582320&r1=582319&r2=582320&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Fri Oct 5 08:58:39 2007
@@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -57,7 +58,7 @@
private Map<Long,Method> commands = new HashMap<Long,Method>();
private long mark = 0;
- private boolean closed = false;
+ private AtomicBoolean closed = new AtomicBoolean(false);
public Map<Long,Method> getOutstandingCommands()
@@ -122,16 +123,19 @@
long mark = -1;
boolean first = true;
RangeSet rest = new RangeSet();
- for (Range r: processed)
+ synchronized (processed)
{
- if (first)
- {
- first = false;
- mark = r.getUpper();
- }
- else
+ for (Range r: processed)
{
- rest.add(r);
+ if (first)
+ {
+ first = false;
+ mark = r.getUpper();
+ }
+ else
+ {
+ rest.add(r);
+ }
}
}
executionComplete(mark, rest);
@@ -251,10 +255,10 @@
executionSync();
}
- while (!closed && !commands.isEmpty())
+ while (!closed.get() && !commands.isEmpty())
{
try {
- log.debug("%s waiting");
+ log.debug("%s waiting", this);
commands.wait();
}
catch (InterruptedException e)
@@ -318,10 +322,11 @@
{
synchronized (this)
{
- while (!isDone())
+ while (!closed.get() && !isDone())
{
try
{
+ log.debug("%s waiting for result: %s", Session.this,
this);
wait(timeout, nanos);
}
catch (InterruptedException e)
@@ -331,6 +336,11 @@
}
}
+ if (!isDone())
+ {
+ throw new RuntimeException("session closed");
+ }
+
return result;
}
@@ -349,6 +359,11 @@
return result != null;
}
+ public String toString()
+ {
+ return String.format("Future(%s)", isDone() ? result : klass);
+ }
+
}
public void close()
@@ -359,10 +374,17 @@
public void closed()
{
+ closed.set(true);
synchronized (commands)
{
- closed = true;
commands.notifyAll();
+ }
+ synchronized (results)
+ {
+ for (ResultFuture<?> result : results.values())
+ {
+ result.notifyAll();
+ }
}
}