Author: rhs
Date: Thu Oct 23 08:23:22 2008
New Revision: 707388

URL: http://svn.apache.org/viewvc?rev=707388&view=rev
Log:
Candidate fix for QPID-1389. Make sure we don't send commands unless the 
session is fully opened.

Modified:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
 Thu Oct 23 08:23:22 2008
@@ -77,11 +77,6 @@
         conn.getSender().close();
     }
 
-    @Override public void sessionAttached(Connection conn, SessionAttached atc)
-    {
-        
-    }
-
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
     {
         Session ssn = conn.getSession(dtc.getChannel());

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
 Thu Oct 23 08:23:22 2008
@@ -148,6 +148,7 @@
         Session ssn = getSession(conn, atc);
         conn.map(ssn, atc.getChannel());
         ssn.sessionAttached(atc.getName());
+        ssn.setState(Session.State.OPEN);
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Thu Oct 23 08:23:22 2008
@@ -157,6 +157,15 @@
         }
     }
 
+    void setState(State state)
+    {
+        synchronized (commands)
+        {
+            this.state = state;
+            commands.notifyAll();
+        }
+    }
+
     private void initReceiver()
     {
         synchronized (processedLock)
@@ -390,9 +399,26 @@
         {
             synchronized (commands)
             {
-                if (state == CLOSED)
+                if (state != OPEN && state != CLOSED)
                 {
+                    Waiter w = new Waiter(commands, timeout);
+                    while (w.hasTime() && (state != OPEN && state != CLOSED))
+                    {
+                        w.await();
+                    }
+                }
+
+                switch (state)
+                {
+                case OPEN:
+                    break;
+                case CLOSED:
                     throw new SessionClosedException();
+                default:
+                    throw new SessionException
+                        (String.format
+                         ("timed out waiting for session to become open %s",
+                          state));
                 }
 
                 int next = commandsOut++;

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 Thu Oct 23 08:23:22 2008
@@ -57,6 +57,11 @@
         log.warn("UNHANDLED: [%s] %s", ssn, method);
     }
 
+    @Override public void sessionAttached(Session ssn, SessionAttached atc)
+    {
+        ssn.setState(Session.State.OPEN);
+    }
+
     @Override public void sessionTimeout(Session ssn, SessionTimeout t)
     {
         ssn.setExpiry(t.getTimeout());


Reply via email to