Author: rhs
Date: Sat Jun 7 06:42:01 2008
New Revision: 664339
URL: http://svn.apache.org/viewvc?rev=664339&view=rev
Log:
QPID-1126: reuse channel numbers for sessions that have closed, and honor the
negotiated channel-max; also removed unnecessary catches that were swallowing
stack traces from several tests
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/FaultTest.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
Sat Jun 7 06:42:01 2008
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -55,7 +54,6 @@
public class Client implements org.apache.qpidity.nclient.Connection
{
- private AtomicInteger _channelNo = new AtomicInteger();
private Connection _conn;
private ClosedListener _closedListner;
private final Lock _lock = new ReentrantLock();
@@ -286,7 +284,7 @@
public Session createSession(long expiryInSeconds)
{
- Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ Channel ch = _conn.getChannel();
ClientSession ssn = new
ClientSession(UUID.randomUUID().toString().getBytes());
ssn.attach(ch);
ssn.sessionAttach(ssn.getName());
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/AbstractXATestCase.java
Sat Jun 7 06:42:01 2008
@@ -65,7 +65,7 @@
init();
}
- public abstract void init();
+ public abstract void init() throws Exception;
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/FaultTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/FaultTest.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/FaultTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/xa/FaultTest.java
Sat Jun 7 06:42:01 2008
@@ -75,15 +75,8 @@
{
if (!isBroker08())
{
- try
- {
- _xaqueueConnection.close();
- _queueConnection.close();
- }
- catch (Exception e)
- {
- fail("Exception thrown when cleaning standard connection: " +
e);
- }
+ _xaqueueConnection.close();
+ _queueConnection.close();
}
super.tearDown();
}
@@ -91,57 +84,16 @@
/**
* Initialize standard actors
*/
- public void init()
+ public void init() throws Exception
{
if (!isBroker08())
{
- // lookup test queue
- try
- {
- _queue = (Queue) getInitialContext().lookup(QUEUENAME);
- }
- catch (Exception e)
- {
- fail("cannot lookup test queue " + e.getMessage());
- }
- // lookup connection factory
- try
- {
- _queueFactory = getConnectionFactory();
- }
- catch (Exception e)
- {
- fail("enable to lookup connection factory ");
- }
- // create standard connection
- try
- {
- _xaqueueConnection =
_queueFactory.createXAQueueConnection("guest", "guest");
- }
- catch (JMSException e)
- {
- fail("cannot create queue connection: " + e.getMessage());
- }
- // create xa session
- XAQueueSession session = null;
- try
- {
- session = _xaqueueConnection.createXAQueueSession();
- }
- catch (JMSException e)
- {
- fail("cannot create queue session: " + e.getMessage());
- }
- // create a standard session
- try
- {
- _queueConnection = _queueFactory.createQueueConnection();
- _nonXASession = _queueConnection.createQueueSession(true,
Session.AUTO_ACKNOWLEDGE);
- }
- catch (JMSException e)
- {
- fail("cannot create queue session: " + e.getMessage());
- }
+ _queue = (Queue) getInitialContext().lookup(QUEUENAME);
+ _queueFactory = getConnectionFactory();
+ _xaqueueConnection =
_queueFactory.createXAQueueConnection("guest", "guest");
+ XAQueueSession session = _xaqueueConnection.createXAQueueSession();
+ _queueConnection = _queueFactory.createQueueConnection();
+ _nonXASession = _queueConnection.createQueueSession(true,
Session.AUTO_ACKNOWLEDGE);
init(session, _queue);
}
}
@@ -156,18 +108,10 @@
* Check that the second
* invocation is throwing the expected XA exception.
*/
- public void testSameXID()
+ public void testSameXID() throws Exception
{
- _logger.debug("running testSameXID");
Xid xid = getNewXid();
- try
- {
- _xaResource.start(xid, XAResource.TMNOFLAGS);
- }
- catch (XAException e)
- {
- fail("cannot start the transaction with xid: " + e.getMessage());
- }
+ _xaResource.start(xid, XAResource.TMNOFLAGS);
// we now exepct this operation to fail
try
{
@@ -178,10 +122,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_DUPID,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
/**
@@ -191,7 +131,6 @@
*/
public void testWrongStartFlag()
{
- _logger.debug("running testWrongStartFlag");
Xid xid = getNewXid();
try
{
@@ -202,10 +141,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_INVAL,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
/**
@@ -215,7 +150,6 @@
*/
public void testEnd()
{
- _logger.debug("running testEnd");
Xid xid = getNewXid();
try
{
@@ -226,10 +160,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
@@ -243,7 +173,6 @@
*/
public void testForget()
{
- _logger.debug("running testForget");
Xid xid = getNewXid();
try
{
@@ -254,10 +183,6 @@
{
// assertEquals("Wrong error code: ", XAException.XAER_NOTA,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -269,10 +194,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
/**
@@ -283,7 +204,6 @@
*/
public void testPrepare()
{
- _logger.debug("running testPrepare");
Xid xid = getNewXid();
try
{
@@ -294,10 +214,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_NOTA,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -309,10 +225,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
/**
@@ -323,9 +235,8 @@
* A non prepared xid is committed with one phase set to false.
* A prepared xid is committed with one phase set to true.
*/
- public void testCommit()
+ public void testCommit() throws Exception
{
- _logger.debug("running testCommit");
Xid xid = getNewXid();
try
{
@@ -336,10 +247,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_NOTA,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -351,10 +258,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -367,10 +270,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -384,20 +283,9 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
finally
{
- try
- {
- _xaResource.commit(xid, false);
- }
- catch (XAException e)
- {
- fail("Cannot commit prepared tx: " + e);
- }
+ _xaResource.commit(xid, false);
}
}
@@ -409,7 +297,6 @@
*/
public void testRollback()
{
- _logger.debug("running testRollback");
Xid xid = getNewXid();
try
{
@@ -420,10 +307,6 @@
{
assertEquals("Wrong error code: ", XAException.XAER_NOTA,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
xid = getNewXid();
try
{
@@ -435,35 +318,23 @@
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
/**
* Strategy:
* Check that the timeout is set correctly
*/
- public void testTransactionTimeoutvalue()
+ public void testTransactionTimeoutvalue() throws Exception
{
- _logger.debug("running testRollback");
Xid xid = getNewXid();
- try
- {
- _xaResource.start(xid, XAResource.TMNOFLAGS);
- assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(),
0);
- _xaResource.setTransactionTimeout(1000);
- assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(),
1000);
- _xaResource.end(xid, XAResource.TMSUCCESS);
- xid = getNewXid();
- _xaResource.start(xid, XAResource.TMNOFLAGS);
- assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(),
0);
- }
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
+ _xaResource.start(xid, XAResource.TMNOFLAGS);
+ assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0);
+ _xaResource.setTransactionTimeout(1000);
+ assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(),
1000);
+ _xaResource.end(xid, XAResource.TMSUCCESS);
+ xid = getNewXid();
+ _xaResource.start(xid, XAResource.TMNOFLAGS);
+ assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0);
}
/**
@@ -471,11 +342,10 @@
* Check that a transaction timeout as expected
* - set timeout to 10ms
* - sleep 1000ms
- * - call end and check that the expected exception is thrown
+ * - call end and check that the expected exception is thrown
*/
- public void testTransactionTimeout()
+ public void testTransactionTimeout() throws Exception
{
- _logger.debug("running testRollback");
Xid xid = getNewXid();
try
{
@@ -489,9 +359,6 @@
{
assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT,
e.errorCode);
}
- catch (Exception ex)
- {
- fail("Caught wrong exception, expected XAException, got: " + ex);
- }
}
+
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
Sat Jun 7 06:42:01 2008
@@ -131,10 +131,6 @@
{
session.closed();
}
- }
-
- public void close()
- {
connection.removeChannel(channel);
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ChannelDelegate.java
Sat Jun 7 06:42:01 2008
@@ -41,12 +41,14 @@
public @Override void sessionDetached(Channel channel, SessionDetached
closed)
{
- channel.getSession().closed();
+ channel.closed();
}
public @Override void sessionDetach(Channel channel, SessionDetach dtc)
{
channel.getSession().closed();
+ channel.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ channel.closed();
}
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
Sat Jun 7 06:42:01 2008
@@ -22,8 +22,9 @@
import org.apache.qpidity.transport.util.Logger;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.nio.ByteBuffer;
@@ -48,6 +49,7 @@
final private Sender<ConnectionEvent> sender;
final private ConnectionDelegate delegate;
+ private int channelMax = 1;
// want to make this final
private int _connectionId;
@@ -88,6 +90,32 @@
sender.send(event);
}
+ public int getChannelMax()
+ {
+ return channelMax;
+ }
+
+ void setChannelMax(int max)
+ {
+ channelMax = max;
+ }
+
+ public Channel getChannel()
+ {
+ synchronized (channels)
+ {
+ for (int i = 0; i < getChannelMax(); i++)
+ {
+ if (!channels.containsKey(i))
+ {
+ return getChannel(i);
+ }
+ }
+
+ throw new RuntimeException("no more channels available");
+ }
+ }
+
public Channel getChannel(int number)
{
synchronized (channels)
@@ -120,11 +148,10 @@
log.debug("connection closed: %s", this);
synchronized (channels)
{
- for (Iterator<Channel> it = channels.values().iterator();
- it.hasNext(); )
+ List<Channel> values = new ArrayList<Channel>(channels.values());
+ for (Channel ch : values)
{
- it.next().closed();
- it.remove();
+ ch.closed();
}
}
delegate.closed();
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
Sat Jun 7 06:42:01 2008
@@ -152,7 +152,7 @@
@Override public void connectionTune(Channel context, ConnectionTune
struct)
{
- // should update the channel max given by the broker.
+ context.getConnection().setChannelMax(struct.getChannelMax());
context.connectionTuneOk(struct.getChannelMax(),
struct.getMaxFrameSize(), struct.getHeartbeatMax());
context.connectionOpen(_virtualHost, null, Option.INSIST);
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Sat Jun 7 06:42:01 2008
@@ -526,7 +526,6 @@
}
}
}
- channel.close();
channel.setSession(null);
channel = null;
}
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java?rev=664339&r1=664338&r2=664339&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
Sat Jun 7 06:42:01 2008
@@ -575,6 +575,7 @@
catch (JMSException e)
{
Throwable cause = e.getLinkedException();
+ cause.printStackTrace();
assertEquals("Incorrect exception",
AMQAuthenticationException.class, cause.getClass());
assertEquals("Incorrect error code thrown", 403,
((AMQAuthenticationException) cause).getErrorCode().getCode());
}