Author: rhs
Date: Thu Oct 23 08:23:22 2008
New Revision: 707388
URL: http://svn.apache.org/viewvc?rev=707388&view=rev
Log:
Candidate fix for QPID-1389. Make sure we don't send commands unless the
session is fully opened.
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
Thu Oct 23 08:23:22 2008
@@ -77,11 +77,6 @@
conn.getSender().close();
}
- @Override public void sessionAttached(Connection conn, SessionAttached atc)
- {
-
- }
-
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
Session ssn = conn.getSession(dtc.getChannel());
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
Thu Oct 23 08:23:22 2008
@@ -148,6 +148,7 @@
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());
ssn.sessionAttached(atc.getName());
+ ssn.setState(Session.State.OPEN);
}
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Thu Oct 23 08:23:22 2008
@@ -157,6 +157,15 @@
}
}
+ void setState(State state)
+ {
+ synchronized (commands)
+ {
+ this.state = state;
+ commands.notifyAll();
+ }
+ }
+
private void initReceiver()
{
synchronized (processedLock)
@@ -390,9 +399,26 @@
{
synchronized (commands)
{
- if (state == CLOSED)
+ if (state != OPEN && state != CLOSED)
{
+ Waiter w = new Waiter(commands, timeout);
+ while (w.hasTime() && (state != OPEN && state != CLOSED))
+ {
+ w.await();
+ }
+ }
+
+ switch (state)
+ {
+ case OPEN:
+ break;
+ case CLOSED:
throw new SessionClosedException();
+ default:
+ throw new SessionException
+ (String.format
+ ("timed out waiting for session to become open %s",
+ state));
}
int next = commandsOut++;
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=707388&r1=707387&r2=707388&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Thu Oct 23 08:23:22 2008
@@ -57,6 +57,11 @@
log.warn("UNHANDLED: [%s] %s", ssn, method);
}
+ @Override public void sessionAttached(Session ssn, SessionAttached atc)
+ {
+ ssn.setState(Session.State.OPEN);
+ }
+
@Override public void sessionTimeout(Session ssn, SessionTimeout t)
{
ssn.setExpiry(t.getTimeout());