Author: rhs
Date: Fri Oct  5 08:58:39 2007
New Revision: 582320

URL: http://svn.apache.org/viewvc?rev=582320&view=rev
Log:
don't wait for results if the session is closed, also added missing 
synchronization for flushProcessed(), and fixed a buggy log statement

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=582320&r1=582319&r2=582320&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Fri Oct  5 08:58:39 2007
@@ -30,6 +30,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 /**
@@ -57,7 +58,7 @@
     private Map<Long,Method> commands = new HashMap<Long,Method>();
     private long mark = 0;
 
-    private boolean closed = false;
+    private AtomicBoolean closed = new AtomicBoolean(false);
 
 
     public Map<Long,Method> getOutstandingCommands()
@@ -122,16 +123,19 @@
         long mark = -1;
         boolean first = true;
         RangeSet rest = new RangeSet();
-        for (Range r: processed)
+        synchronized (processed)
         {
-            if (first)
-            {
-                first = false;
-                mark = r.getUpper();
-            }
-            else
+            for (Range r: processed)
             {
-                rest.add(r);
+                if (first)
+                {
+                    first = false;
+                    mark = r.getUpper();
+                }
+                else
+                {
+                    rest.add(r);
+                }
             }
         }
         executionComplete(mark, rest);
@@ -251,10 +255,10 @@
                 executionSync();
             }
 
-            while (!closed && !commands.isEmpty())
+            while (!closed.get() && !commands.isEmpty())
             {
                 try {
-                    log.debug("%s   waiting");
+                    log.debug("%s   waiting", this);
                     commands.wait();
                 }
                 catch (InterruptedException e)
@@ -318,10 +322,11 @@
         {
             synchronized (this)
             {
-                while (!isDone())
+                while (!closed.get() && !isDone())
                 {
                     try
                     {
+                        log.debug("%s waiting for result: %s", Session.this, 
this);
                         wait(timeout, nanos);
                     }
                     catch (InterruptedException e)
@@ -331,6 +336,11 @@
                 }
             }
 
+            if (!isDone())
+            {
+                throw new RuntimeException("session closed");
+            }
+
             return result;
         }
 
@@ -349,6 +359,11 @@
             return result != null;
         }
 
+        public String toString()
+        {
+            return String.format("Future(%s)", isDone() ? result : klass);
+        }
+
     }
 
     public void close()
@@ -359,10 +374,17 @@
 
     public void closed()
     {
+        closed.set(true);
         synchronized (commands)
         {
-            closed = true;
             commands.notifyAll();
+        }
+        synchronized (results)
+        {
+            for (ResultFuture<?> result : results.values())
+            {
+                result.notifyAll();
+            }
         }
     }
 


Reply via email to