Author: rhs
Date: Wed Oct 10 11:19:20 2007
New Revision: 583567

URL: http://svn.apache.org/viewvc?rev=583567&view=rev
Log:
made the session usable from multiple threads (hopefully)

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=583567&r1=583566&r2=583567&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Wed Oct 10 11:19:20 2007
@@ -177,6 +177,8 @@
 
     void complete(long lower, long upper)
     {
+        log.debug("%s complete(%d, %d)", this, lower, upper);
+
         synchronized (commands)
         {
             for (long id = lower; id <= upper; id++)
@@ -184,18 +186,19 @@
                 commands.remove(id);
             }
 
-            if (commands.isEmpty())
-            {
-                log.debug("%s no outsanding commands", this);
-                commands.notifyAll();
-            }
+            commands.notifyAll();
+            log.debug("%s   commands remaining: %s", this, commands);
         }
     }
 
     void complete(long mark)
     {
-        complete(this.mark, mark);
-        this.mark = mark;
+        synchronized (commands)
+        {
+            complete(this.mark, mark);
+            this.mark = mark;
+            commands.notifyAll();
+        }
     }
 
     protected void invoke(Method m)
@@ -205,9 +208,13 @@
             synchronized (commands)
             {
                 commands.put(commandsOut++, m);
+                channel.method(m);
             }
         }
-        channel.method(m);
+        else
+        {
+            channel.method(m);
+        }
     }
 
     public void header(Header header)
@@ -250,15 +257,17 @@
         log.debug("%s sync()", this);
         synchronized (commands)
         {
-            if (!commands.isEmpty())
+            long point = commandsOut - 1;
+
+            if (mark < point)
             {
                 executionSync();
             }
 
-            while (!closed.get() && !commands.isEmpty())
+            while (!closed.get() && mark < point)
             {
                 try {
-                    log.debug("%s   waiting", this);
+                    log.debug("%s   waiting for[%d]: %s", this, point, 
commands);
                     commands.wait();
                 }
                 catch (InterruptedException e)
@@ -267,7 +276,7 @@
                 }
             }
 
-            if (!commands.isEmpty())
+            if (mark < point)
             {
                 throw new RuntimeException("session closed");
             }
@@ -286,16 +295,20 @@
         }
         future.set(result);
     }
+
     protected <T> Future<T> invoke(Method m, Class<T> klass)
     {
-        long command = commandsOut;
-        ResultFuture<T> future = new ResultFuture<T>(klass);
-        synchronized (results)
+        synchronized (commands)
         {
-            results.put(command, future);
+            long command = commandsOut;
+            ResultFuture<T> future = new ResultFuture<T>(klass);
+            synchronized (results)
+            {
+                results.put(command, future);
+            }
+            invoke(m);
+            return future;
         }
-        invoke(m);
-        return future;
     }
 
     private class ResultFuture<T> implements Future<T>


Reply via email to