Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Jun 1 07:33:07 2007 @@ -14,42 +14,43 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid.test.unit.client.channelclose; import junit.framework.TestCase; -import javax.jms.Connection; -import javax.jms.Session; - -import javax.jms.JMSException; -import javax.jms.ExceptionListener; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Message; -import javax.jms.TextMessage; -import javax.jms.Queue; +import org.apache.log4j.Logger; -import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQTimeoutException; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQStateManager; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ChannelOpenBody; import org.apache.qpid.framing.ChannelOpenOkBody; -import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ExchangeDeclareBody; import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQTimeoutException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.jms.ConnectionListener; -import org.apache.log4j.Logger; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener { @@ -73,15 +74,14 @@ TransportConnection.killAllVMBrokers(); } - /* close channel, use chanel with same id ensure error. - */ - public void testReusingChannelAfterFullClosure() + */ + public void testReusingChannelAfterFullClosure() throws Exception { _connection = newConnection(); - //Create Producer + // Create Producer try { _connection.start(); @@ -113,6 +113,7 @@ { _logger.info("Exception occured was:" + e.getErrorCode()); } + assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode()); _connection = newConnection(); @@ -134,29 +135,27 @@ /* close channel and send guff then send ok no errors */ - public void testSendingMethodsAfterClose() + public void testSendingMethodsAfterClose() throws Exception { try { - _connection = new AMQConnection("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='" - + _brokerlist + "'"); + _connection = new AMQConnection("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='" + _brokerlist + "'"); ((AMQConnection) _connection).setConnectionListener(this); - _connection.setExceptionListener(this); - //Change the StateManager for one that doesn't respond with Close-OKs + // Change the StateManager for one that doesn't respond with Close-OKs AMQStateManager oldStateManager = ((AMQConnection) _connection).getProtocolHandler().getStateManager(); _session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); _connection.start(); - //Test connection + // Test connection checkSendingMessage(); - //Set StateManager to manager that ignores Close-oks + // Set StateManager to manager that ignores Close-oks AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession(); AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession); newStateManager.changeState(oldStateManager.getCurrentState()); @@ -214,7 +213,7 @@ createChannelAndTest(TEST_CHANNEL); - //Test connection is still ok + // Test connection is still ok checkSendingMessage(); @@ -248,9 +247,9 @@ } } - private void createChannelAndTest(int channel) + private void createChannelAndTest(int channel) throws FailoverException { - //Create A channel + // Create A channel try { createChannel(channel); @@ -274,14 +273,14 @@ private void sendClose(int channel) { - AMQFrame frame = ChannelCloseOkBody.createAMQFrame(channel, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); + AMQFrame frame = + ChannelCloseOkBody.createAMQFrame(channel, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion()); ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame); } - private void checkSendingMessage() throws JMSException { TEST++; @@ -307,8 +306,7 @@ AMQConnection connection = null; try { - connection = new AMQConnection("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='" - + _brokerlist + "'"); + connection = new AMQConnection("amqp://guest:[EMAIL PROTECTED]/test?brokerlist='" + _brokerlist + "'"); connection.setConnectionListener(this); @@ -330,24 +328,24 @@ fail("Creating new connection when:" + e.getMessage()); } - return connection; } - private void declareExchange(int channelId, String _type, String _name, boolean nowait) throws AMQException + private void declareExchange(int channelId, String _type, String _name, boolean nowait) + throws AMQException, FailoverException { - AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null, // arguments - false, // autoDelete - false, // durable - new AMQShortString(_name), // exchange - false, // internal - nowait, // nowait - true, // passive - 0, // ticket - new AMQShortString(_type)); // type + AMQFrame exchangeDeclare = + ExchangeDeclareBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments + false, // autoDelete + false, // durable + new AMQShortString(_name), // exchange + false, // internal + nowait, // nowait + true, // passive + 0, // ticket + new AMQShortString(_type)); // type if (nowait) { @@ -355,36 +353,31 @@ } else { - ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, + SYNC_TIMEOUT); } } - private void createChannel(int channelId) throws AMQException + private void createChannel(int channelId) throws AMQException, FailoverException { - ((AMQConnection) _connection).getProtocolHandler().syncWrite( - ChannelOpenBody.createAMQFrame(channelId, - ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), - ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), - null), // outOfBand - ChannelOpenOkBody.class); + ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId, + ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(), + ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand + ChannelOpenOkBody.class); } - public void onException(JMSException jmsException) { - //_logger.info("CCT" + jmsException); + // _logger.info("CCT" + jmsException); fail(jmsException.getMessage()); } public void bytesSent(long count) - { - } + { } public void bytesReceived(long count) - { - - } + { } public boolean preFailover(boolean redirect) { @@ -397,6 +390,5 @@ } public void failoverComplete() - { - } + { } }
Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Fri Jun 1 07:33:07 2007 @@ -22,29 +22,29 @@ import junit.framework.TestCase; -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.url.URLSyntaxException; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; -import javax.jms.TextMessage; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.AMQException; -import org.apache.qpid.testutil.QpidClientConnection; -import org.apache.log4j.Logger; -import org.apache.log4j.Level; +import java.util.concurrent.atomic.AtomicInteger; public class MessageRequeueTest extends TestCase { @@ -86,7 +86,7 @@ { super.tearDown(); - if (!passed) // clean up + if (!passed) // clean up { QpidClientConnection conn = new QpidClientConnection(BROKER); @@ -96,6 +96,7 @@ conn.disconnect(); } + TransportConnection.killVMBroker(1); } @@ -117,7 +118,7 @@ final MessageConsumer consumer = conn.getSession().createConsumer(q); int messagesReceived = 0; - long messageLog[] = new long[numTestMessages + 1]; + long[] messageLog = new long[numTestMessages + 1]; _logger.info("consuming..."); Message msg = consumer.receive(1000); @@ -130,15 +131,13 @@ int msgindex = msg.getIntProperty("index"); if (messageLog[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() + + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + "IN:" + msgindex); } if (dt == 0) @@ -148,7 +147,7 @@ messageLog[msgindex] = dt; - //get Next message + // get Next message msg = consumer.receive(1000); } @@ -163,7 +162,7 @@ for (long b : messageLog) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -175,6 +174,7 @@ index++; } + assertEquals(list.toString(), 0, failed); _logger.info("consumed: " + messagesReceived); conn.disconnect(); @@ -199,7 +199,7 @@ t1.start(); t2.start(); t3.start(); -// t4.start(); + // t4.start(); try { @@ -228,7 +228,7 @@ for (long b : receieved) { - if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0) + if ((b == 0) && (index != 0)) // delivery tag of zero shouldn't exist (and we don't have msg 0) { _logger.error("Index: " + index + " was not received."); list.append(" "); @@ -237,8 +237,10 @@ list.append(b); failed++; } + index++; } + assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed); assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed); passed = true; @@ -278,15 +280,14 @@ int msgindex = result.getIntProperty("index"); if (receieved[msgindex] != 0) { - _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() + - ") more than once."); + _logger.error("Received Message(" + msgindex + ":" + + ((AbstractJMSMessage) result).getDeliveryTag() + ") more than once."); } if (_logger.isInfoEnabled()) { - _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + - "DT:" + dt + - "IN:" + msgindex); + _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " + "DT:" + dt + + "IN:" + msgindex); } if (dt == 0) @@ -297,9 +298,8 @@ receieved[msgindex] = dt; } - count++; - if (count % 100 == 0) + if ((count % 100) == 0) { _logger.info("consumer-" + id + ": got " + result + ", new count is " + count); } @@ -328,11 +328,10 @@ } } - public void testRequeue() throws JMSException, AMQException, URLSyntaxException { int run = 0; -// while (run < 10) + // while (run < 10) { run++; @@ -359,7 +358,6 @@ assertNotNull("Message should not be null", msg); - // As we have not ack'd message will be requeued. _logger.debug("Close Consumer"); consumer.close(); @@ -369,4 +367,4 @@ } } -} \ No newline at end of file +} Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Fri Jun 1 07:33:07 2007 @@ -21,18 +21,20 @@ package org.apache.qpid.test.unit.transacted; import junit.framework.TestCase; + +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.AMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.Session; -import javax.jms.MessageProducer; -import javax.jms.MessageConsumer; -import javax.jms.Queue; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; /** @@ -62,10 +64,10 @@ { TransportConnection.createVMBroker(1); } + testMethod++; queue += testMethod; - newConnection(); } @@ -106,7 +108,6 @@ assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -119,7 +120,7 @@ _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -135,7 +136,6 @@ assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenDisconnect"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -151,7 +151,7 @@ _logger.info("receiving result"); Message result = _consumer.receive(1000); - //commit to ensure message is removed from queue + // commit to ensure message is removed from queue _session.commit(); assertNull("test message was put and disconnected before commit, but is still present", result); @@ -168,7 +168,6 @@ assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); - _logger.info("sending test message"); String MESSAGE_TEXT = "testPutThenRollback"; _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); @@ -335,13 +334,12 @@ assertTrue("Messasge is not marked as redelivered", result.getJMSRedelivered()); } - /** * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order * * @throws Exception On error */ - public void testSend2ThenRollback() throws Exception + /*public void testSend2ThenRollback() throws Exception { assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -391,7 +389,7 @@ } assertNull("test message should be null", result); - } + }*/ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { @@ -428,7 +426,7 @@ { assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered()); } - else // or it will be msg 2 arriving the first time due to latency. + else // or it will be msg 2 arriving the first time due to latency. { _logger.info("Message 2 wasn't prefetched so wasn't rejected"); assertEquals("2", ((TextMessage) result).getText()); @@ -444,7 +442,6 @@ _session.commit(); } - public void testPutThenRollbackThenGet() throws Exception { Modified: incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java (original) +++ incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java Fri Jun 1 07:33:07 2007 @@ -1,25 +1,25 @@ package org.apache.qpid.testutil; +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQConnectionFactory; import org.apache.qpid.client.AMQConnectionURL; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.JMSAMQException; import org.apache.qpid.url.URLSyntaxException; -import org.apache.log4j.Logger; -import javax.jms.ExceptionListener; -import javax.jms.Session; import javax.jms.Connection; +import javax.jms.ExceptionListener; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; public class QpidClientConnection implements ExceptionListener { - private static final Logger _logger = Logger.getLogger(QpidClientConnection.class); private boolean transacted = true; @@ -40,17 +40,16 @@ setPrefetch(5000); } - public void connect() throws JMSException { if (!connected) { /* - * amqp://[user:[EMAIL PROTECTED]/virtualhost? - * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' - * [&failover='method[?option='value'[&option='value']]'] - * [&option='value']" - */ + * amqp://[user:[EMAIL PROTECTED]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; try { @@ -63,7 +62,6 @@ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); - _logger.info("starting connection"); connection.start(); @@ -124,7 +122,6 @@ this.prefetch = prefetch; } - /** override as necessary */ public void onException(JMSException exception) { @@ -266,4 +263,3 @@ _logger.info("consumed: " + messagesReceived); } } - Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java (original) +++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java Fri Jun 1 07:33:07 2007 @@ -23,14 +23,17 @@ import org.apache.qpid.protocol.AMQConstant; /** - * AMQConnectionClosedException indicates that an operation cannot be performed becauase a connection has been closed. + * AMQConnectionClosedException indicates that a connection has been closed. + * + * <p/>This exception is really used as an event, in order that the method handler that raises it creates an event + * which is propagated to the io handler, in order to notify it of the connection closure. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Represents a failed operation on a closed conneciton. + * <tr><td> Represents a the closure of a connection. * </table> * - * @todo Does this duplicate AMQConnectionException? + * @todo Should review where exceptions-as-events */ public class AMQConnectionClosedException extends AMQException { Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java (original) +++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java Fri Jun 1 07:33:07 2007 @@ -14,13 +14,22 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. + * under the License. + * * - * */ package org.apache.qpid; - +/** + * AMQPInvalidClassException indicates an error when trying to store an illegally typed argument in a field table. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Represents illegal argument type for field table values. + * </table> + * + * @todo Could just re-use an exising exception like IllegalArgumentException or ClassCastException. + */ public class AMQPInvalidClassException extends RuntimeException { public AMQPInvalidClassException(String s) Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java (original) +++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java Fri Jun 1 07:33:07 2007 @@ -28,7 +28,7 @@ * * <p/>An event listener may be associated with a particular context, usually an AMQP channel, and in addition to * receiving method events will be notified of errors on that context. This enables listeners to perform any clean - * up that they need to do before the context is closed. + * up that they need to do before the context is closed or retried. * * <p/><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities @@ -64,8 +64,6 @@ * any necessary clean-up for the context. * * @param e The underlying exception that is the source of the error. - * - * @todo Consider narrowing the exception, or wrapping it. */ void error(Exception e); } Modified: incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java (original) +++ incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java Fri Jun 1 07:33:07 2007 @@ -26,6 +26,8 @@ * <p><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * </table> + * + * @todo Drop this. There are already array pretty printing methods it java.utils.Arrays. */ public class PrettyPrintingUtils { Modified: incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt (original) +++ incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt Fri Jun 1 07:33:07 2007 @@ -116,7 +116,7 @@ The most common test case to run is implemented in the class PingAsyncTestPerf, which sends and recieves messages simultaneously. This class uses a PingPongProdicer to do its sending and receiving, and wraps it in a suitable way to make it callable through the extended JUnit test runner. This class also accpets another parameter "batchSize" with a default of "1000". This tells the test how many messages to send before stopping sending and waiting for them all to come back. The actual value entered does not matter too much, but typically values larger than 1000 are used to ensure that there is a reasonable opportunity for simultaneous sending and receiving, and less than 10000 to ensure that each test method invocation does not go on for too long. -The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure peak performance. +The test script parameters can all be seen in the pom.xml file. A three letter code is used on the test scripts, first letter P or T for persistent or transient, second letter Q or T for queue (p2p) or topic (pub/sub), third letter R for reliability tests, C for client scaling tests, M for message size tests.Typically tests run and sample their results for 10 minutes, to get a reasonable measurement of a broker running under a steady load. The tests as configured do not measure 'burst' performance. The reliability/burn in tests, test the broker running at slightly below its maximum throughput for a period of 24 hours. Their purpose is to check that the broker remains stable under load for a reasonable duration, in order to provide some confidence in the long-term stability of its process. These tests are intended to be run as a two step process. The first two tests run for 10 minutes and are used to asses the broker throughput for the test. The output from these tests are to be fed into the rate limiter for the second set of tests, so that the broker may be set up to run at slightly below its maximum throughput for the 24 hour duration. It is suggested that 90% of the rate achieved by the first two tests should be used for this. Modified: incubator/qpid/branches/M2/java/perftests/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/perftests/pom.xml (original) +++ incubator/qpid/branches/M2/java/perftests/pom.xml Fri Jun 1 07:33:07 2007 @@ -305,16 +305,16 @@ <PTM-Qpid-2-1M>-n PTM-Qpid-2-1M -d10M -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=true transacted=false commitBatchSize=10 batchSize=1000 messageSize=1048476 destinationsCount=1 rate=0 maxPending=20000000</PTM-Qpid-2-1M> <!-- Failover Tests. --> - <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1> - <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2> - <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3> - <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4> - <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5> - <FT-Qpid-1-P>-n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1-P> - <FT-Qpid-2-P>-n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2-P> - <FT-Qpid-3-P>-n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3-P> - <FT-Qpid-4-P>-n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4-P> - <FT-Qpid-5-P>-n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5-P> + <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1> + <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2> + <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3> + <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4> + <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf messageSize=256 batchSize=10000 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5> + <FT-Qpid-1-P>-n FT-Qpid-1-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-1-P> + <FT-Qpid-2-P>-n FT-Qpid-2-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterSend=true -o $QPID_WORK/results</FT-Qpid-2-P> + <FT-Qpid-3-P>-n FT-Qpid-3-P -s [25000] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 batchSize=10000 transacted=true broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" failAfterCommit=true -o $QPID_WORK/results</FT-Qpid-3-P> + <FT-Qpid-4-P>-n FT-Qpid-4-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failBeforeSend=true -o $QPID_WORK/results</FT-Qpid-4-P> + <FT-Qpid-5-P>-n FT-Qpid-5-P -s [250] -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true messageSize=256 broker="tcp://127.0.0.1:5001;tcp://127.0.0.1:5002" transacted=false failAfterSend=true -o $QPID_WORK/results</FT-Qpid-5-P> </commands> </configuration> Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?view=diff&rev=543496&r1=543495&r2=543496 ============================================================================== --- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original) +++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Fri Jun 1 07:33:07 2007 @@ -1,7 +1,7 @@ package org.apache.qpid.server.failure; import junit.framework.TestCase; -import org.apache.qpid.testutil.QpidClientConnection; +import org.apache.qpid.testutil.QpidClientConnectionHelper; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -17,7 +17,7 @@ { private static final Logger _logger = Logger.getLogger(HeapExhaustion.class); - protected QpidClientConnection conn; + protected QpidClientConnectionHelper conn; protected final String BROKER = "localhost"; protected final String vhost = "/test"; protected final String queue = "direct://amq.direct//queue"; @@ -32,7 +32,7 @@ protected void setUp() throws Exception { - conn = new QpidClientConnection(BROKER); + conn = new QpidClientConnectionHelper(BROKER); conn.setVirtualHost(vhost); conn.connect(); Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java?view=auto&rev=543496 ============================================================================== --- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java (added) +++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java Fri Jun 1 07:33:07 2007 @@ -0,0 +1,275 @@ +package org.apache.qpid.testutil; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQConnectionFactory; +import org.apache.qpid.client.AMQConnectionURL; +import org.apache.qpid.client.JMSAMQException; +import org.apache.qpid.url.URLSyntaxException; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * @todo This was originally cut and paste from the client module leading to a duplicate class, then altered very + * slightly. To avoid the duplicate class the name was altered slightly to have 'Helper' on the end in order + * to distinguish it from the original. Delete this class and use the original instead, just upgrade it to + * provide the new features needed. + */ +public class QpidClientConnectionHelper implements ExceptionListener +{ + + private static final Logger _logger = Logger.getLogger(QpidClientConnectionHelper.class); + + private boolean transacted = true; + private int ackMode = Session.CLIENT_ACKNOWLEDGE; + private Connection connection; + + private String virtualHost; + private String brokerlist; + private int prefetch; + protected Session session; + protected boolean connected; + + public QpidClientConnectionHelper(String broker) + { + super(); + setVirtualHost("/test"); + setBrokerList(broker); + setPrefetch(5000); + } + + public void connect() throws JMSException + { + if (!connected) + { + /* + * amqp://[user:[EMAIL PROTECTED]/virtualhost? + * brokerlist='[transport://]host[:port][?option='value'[&option='value']];' + * [&failover='method[?option='value'[&option='value']]'] + * [&option='value']" + */ + String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'"; + try + { + AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl)); + _logger.info("connecting to Qpid :" + brokerUrl); + connection = factory.createConnection(); + + // register exception listener + connection.setExceptionListener(this); + + session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch); + + _logger.info("starting connection"); + connection.start(); + + connected = true; + } + catch (URLSyntaxException e) + { + throw new JMSAMQException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage(), e); + } + } + } + + public void disconnect() throws JMSException + { + if (connected) + { + session.commit(); + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected"); + } + } + + public void disconnectWithoutCommit() throws JMSException + { + if (connected) + { + session.close(); + connection.close(); + connected = false; + _logger.info("disconnected without commit"); + } + } + + public String getBrokerList() + { + return brokerlist; + } + + public void setBrokerList(String brokerlist) + { + this.brokerlist = brokerlist; + } + + public String getVirtualHost() + { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) + { + this.virtualHost = virtualHost; + } + + public void setPrefetch(int prefetch) + { + this.prefetch = prefetch; + } + + /** override as necessary */ + public void onException(JMSException exception) + { + _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage()); + } + + public boolean isConnected() + { + return connected; + } + + public Session getSession() + { + return session; + } + + /** + * Put a String as a text messages, repeat n times. A null payload will result in a null message. + * + * @param queueName The queue name to put to + * @param payload the content of the payload + * @param copies the number of messages to put + * + * @throws javax.jms.JMSException any exception that occurs + */ + public void put(String queueName, String payload, int copies, int deliveryMode) throws JMSException + { + if (!connected) + { + connect(); + } + + _logger.info("putting to queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageProducer sender = session.createProducer(queue); + + sender.setDeliveryMode(deliveryMode); + + for (int i = 0; i < copies; i++) + { + Message m = session.createTextMessage(payload + i); + m.setIntProperty("index", i + 1); + sender.send(m); + } + + session.commit(); + sender.close(); + _logger.info("put " + copies + " copies"); + } + + /** + * GET the top message on a queue. Consumes the message. Accepts timeout value. + * + * @param queueName The quename to get from + * @param readTimeout The timeout to use + * + * @return the content of the text message if any + * + * @throws javax.jms.JMSException any exception that occured + */ + public Message getNextMessage(String queueName, long readTimeout) throws JMSException + { + if (!connected) + { + connect(); + } + + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(readTimeout); + session.commit(); + consumer.close(); + + Message result; + + // all messages we consume should be TextMessages + if (message instanceof TextMessage) + { + result = ((TextMessage) message); + } + else if (null == message) + { + result = null; + } + else + { + _logger.info("warning: received non-text message"); + result = message; + } + + return result; + } + + /** + * GET the top message on a queue. Consumes the message. + * + * @param queueName The Queuename to get from + * + * @return The string content of the text message, if any received + * + * @throws javax.jms.JMSException any exception that occurs + */ + public Message getNextMessage(String queueName) throws JMSException + { + return getNextMessage(queueName, 0); + } + + /** + * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer. + * + * @param queueName The Queue name to consume from + * @param readTimeout The timeout for each consume + * + * @throws javax.jms.JMSException Any exception that occurs during the consume + * @throws InterruptedException If the consume thread was interrupted during a consume. + */ + public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException + { + if (!connected) + { + connect(); + } + + _logger.info("consuming queue " + queueName); + Queue queue = session.createQueue(queueName); + + final MessageConsumer consumer = session.createConsumer(queue); + int messagesReceived = 0; + + _logger.info("consuming..."); + while ((consumer.receive(readTimeout)) != null) + { + messagesReceived++; + } + + session.commit(); + consumer.close(); + _logger.info("consumed: " + messagesReceived); + } +}
