Author: aidan Date: Tue Feb 12 09:25:44 2008 New Revision: 620871 URL: http://svn.apache.org/viewvc?rev=620871&view=rev Log: Additonal fixes to perftests as well were necesary, plus these merges:
Merged revisions 619868,620495-620496 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r619868 | aidan | 2008-02-08 12:52:54 +0000 (Fri, 08 Feb 2008) | 4 lines QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j ........ r620495 | rupertlssmith | 2008-02-11 14:48:35 +0000 (Mon, 11 Feb 2008) | 1 line QPID-730 : Changed durable subscription test, to ensure re-connection happens under the same client name. ........ r620496 | rupertlssmith | 2008-02-11 14:50:18 +0000 (Mon, 11 Feb 2008) | 1 line QPID-729 : Added explicit list of unacked messages, acked on commit, rejected on roll-back. ........ Modified: incubator/qpid/branches/thegreatmerge/qpid/ (props changed) incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Propchange: incubator/qpid/branches/thegreatmerge/qpid/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original) +++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Feb 12 09:25:44 2008 @@ -457,7 +457,7 @@ foreach (BasicMessageConsumer consumer in _consumers.Values) { // Sends acknowledgement to server. - consumer.AcknowledgeLastDelivered(); + consumer.AcknowledgeDelivered(); } // Commits outstanding messages sent and outstanding acknowledgements. @@ -485,13 +485,16 @@ { Suspend(true); } - - // todo: rollback dispatcher when TX support is added - //if ( _dispatcher != null ) - // _dispatcher.Rollback(); - _connection.ConvenientProtocolWriter.SyncWrite( - TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); + // Reject up to message last delivered (if any) for each consumer. + // Need to send reject for messages delivered to consumers so far. + foreach (BasicMessageConsumer consumer in _consumers.Values) + { + // Sends acknowledgement to server. + consumer.RejectUnacked(); + } + + _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); if ( !suspended ) { @@ -1012,6 +1015,15 @@ _connection.ProtocolWriter.Write(ackFrame); } + public void RejectMessage(ulong deliveryTag, bool requeue) + { + if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) + { + AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); + _connection.ProtocolWriter.Write(rejectFrame); + } + } + /// <summary> /// Handle a message that bounced from the server, creating /// the corresponding exception and notifying the connection about it @@ -1104,8 +1116,8 @@ /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> /// - /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should - /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks> + /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should + /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks> private class Dispatcher { /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original) +++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Tue Feb 12 09:25:44 2008 @@ -20,6 +20,8 @@ */ using System; using System.Threading; +using System.Collections; +using System.Collections.Generic; using log4net; using Apache.Qpid.Client.Message; using Apache.Qpid.Collections; @@ -106,10 +108,15 @@ private AmqChannel _channel; + // <summary> + // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + // </summary> + //private long _lastDeliveryTag; + /// <summary> - /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. + /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed. /// </summary> - private long _lastDeliveryTag; + private LinkedList<long> _receivedDeliveryTags; /// <summary> /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode @@ -135,6 +142,11 @@ _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; + + if (_acknowledgeMode == AcknowledgeMode.SessionTransacted) + { + _receivedDeliveryTags = new LinkedList<long>(); + } } #region IMessageConsumer Members @@ -391,13 +403,24 @@ /// <summary> /// Acknowledge up to last message delivered (if any). Used when commiting. /// </summary> - internal void AcknowledgeLastDelivered() + internal void AcknowledgeDelivered() { - if (_lastDeliveryTag > 0) + foreach (long tag in _receivedDeliveryTags) { - _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast - _lastDeliveryTag = -1; + _channel.AcknowledgeMessage((ulong)tag, false); } + + _receivedDeliveryTags.Clear(); + } + + internal void RejectUnacked() + { + foreach (long tag in _receivedDeliveryTags) + { + _channel.RejectMessage((ulong)tag, true); + } + + _receivedDeliveryTags.Clear(); } private void PreDeliver(AbstractQmsMessage msg) @@ -442,7 +465,7 @@ break; case AcknowledgeMode.SessionTransacted: - _lastDeliveryTag = msg.DeliveryTag; + _receivedDeliveryTags.AddLast(msg.DeliveryTag); break; } } Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs (original) +++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs Tue Feb 12 09:25:44 2008 @@ -78,6 +78,8 @@ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, true, "TestSubscription" + testId); + Thread.Sleep(500); + // Send messages and receive on both consumers. testProducer[0].Send(testChannel[0].CreateTextMessage("A")); @@ -93,15 +95,15 @@ ConsumeNMessagesOnly(1, "B", testConsumer[1]); // Re-attach consumer, check that it gets the messages that it missed. - SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, + SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true, true, "TestSubscription" + testId); - ConsumeNMessagesOnly(1, "B", testConsumer[3]); + ConsumeNMessagesOnly(1, "B", testConsumer[2]); // Clean up any open consumers at the end of the test. - CloseEndPoint(0); + CloseEndPoint(2); CloseEndPoint(1); - CloseEndPoint(3); + CloseEndPoint(0); } /// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary> @@ -122,13 +124,13 @@ // Close end-point 1 without committing the message, then re-open the subscription to consume again. CloseEndPoint(1); - SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. - ConsumeNMessagesOnly(1, "A", testConsumer[2]); + ConsumeNMessagesOnly(1, "A", testConsumer[1]); - CloseEndPoint(2); + CloseEndPoint(1); CloseEndPoint(0); } @@ -151,13 +153,13 @@ // Close end-point 1 without committing the message, then re-open the subscription to consume again. CloseEndPoint(1); - SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, + SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, true, false, null); // Check that the message was released from the rolled back end-point an can be received on the alternative one instead. - ConsumeNMessagesOnly(1, "A", testConsumer[2]); + ConsumeNMessagesOnly(1, "A", testConsumer[1]); - CloseEndPoint(2); + CloseEndPoint(1); CloseEndPoint(0); } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 12 09:25:44 2008 @@ -218,9 +218,9 @@ } else { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content header received on channel " + _channelId); + _log.debug(debugIdentity() + "Content header received on channel " + _channelId); } if (ENABLE_JMSXUserID) @@ -255,9 +255,9 @@ throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null); } - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content body received on channel " + _channelId); + _log.debug(debugIdentity() + "Content body received on channel " + _channelId); } try Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Feb 12 09:25:44 2008 @@ -79,9 +79,9 @@ if (queue == null) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("No queue for '" + body.getQueue() + "'"); + _log.debug("No queue for '" + body.getQueue() + "'"); } if (body.getQueue() != null) { Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Feb 12 09:25:44 2008 @@ -99,9 +99,9 @@ } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 12 09:25:44 2008 @@ -302,9 +302,9 @@ public void populatePreDeliveryQueue(Subscription subscription) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); + _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")"); } Iterator<QueueEntry> currentQueue = _messages.iterator(); @@ -532,9 +532,9 @@ //else the clean up is not required as the message has already been taken for this queue therefore // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated. - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("Removed taken message:" + message.debugIdentity()); + _log.debug("Removed taken message:" + message.debugIdentity()); } // try the next message @@ -627,9 +627,9 @@ Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages); - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -655,9 +655,9 @@ // message will be null if we have no messages in the messageQueue. if (entry == null) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); } return; } @@ -696,9 +696,9 @@ if (messageQueue == sub.getResendQueue()) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); + _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -884,9 +884,9 @@ { if (!s.isSuspended()) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + + _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Feb 12 09:25:44 2008 @@ -416,9 +416,9 @@ } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); + _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); } return checkFilters(entry); @@ -563,9 +563,9 @@ { QueueEntry resent = _resendQueue.poll(); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removed for resending:" + resent.debugIdentity()); + _logger.debug("Removed for resending:" + resent.debugIdentity()); } resent.release(); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 12 09:25:44 2008 @@ -1369,9 +1369,9 @@ public void rejectMessage(UnprocessedMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag()); + _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); @@ -1379,9 +1379,9 @@ public void rejectMessage(AbstractJMSMessage message, boolean requeue) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag()); + _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag()); } rejectMessage(message.getDeliveryTag(), requeue); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Feb 12 09:25:44 2008 @@ -567,12 +567,12 @@ { if (!_closed.getAndSet(true)) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -639,13 +639,14 @@ { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { if (_closedStack != null) { - _logger.trace(_consumerTag + " markClosed():" + Arrays - .asList(Thread.currentThread().getStackTrace()).subList(3, 8)); - _logger.trace(_consumerTag + " previously:" + _closedStack.toString()); + StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + _logger.debug(_consumerTag + " markClosed():" + + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); + _logger.debug(_consumerTag + " previously:" + _closedStack.toString()); } else { @@ -881,14 +882,14 @@ // synchronized (_closed) { _closed.set(true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); if (_closedStack != null) { - _logger.trace(_consumerTag + " notifyError():" + _logger.debug(_consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1)); - _logger.trace(_consumerTag + " previously" + _closedStack.toString()); + _logger.debug(_consumerTag + " previously" + _closedStack.toString()); } else { @@ -1035,9 +1036,9 @@ { _session.rejectMessage(((AbstractJMSMessage) o), true); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); + _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag()); } iterator.remove(); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Feb 12 09:25:44 2008 @@ -171,7 +171,7 @@ m.setJMSReplyTo(q); m.setStringProperty("TempQueue", q.toString()); - _logger.trace("Message:" + m); + _logger.debug("Message:" + m); Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(), m.getStringProperty("TempQueue")); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Feb 12 09:25:44 2008 @@ -706,12 +706,15 @@ public void writeToBuffer(ByteBuffer buffer) { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (trace) { - _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); - _logger.trace(_properties.toString()); + _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "..."); + if (_properties != null) + { + _logger.debug(_properties.toString()); + } } EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize()); @@ -915,11 +918,11 @@ final Map.Entry<AMQShortString, AMQTypedValue> me = it.next(); try { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } // Write the actual parameter name @@ -928,12 +931,12 @@ } catch (Exception e) { - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Exception thrown:" + e); - _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + _logger.debug("Exception thrown:" + e); + _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:" + me.getValue().getValue()); - _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); + _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining()); } throw new RuntimeException(e); @@ -945,7 +948,7 @@ private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException { - final boolean trace = _logger.isTraceEnabled(); + final boolean trace = _logger.isDebugEnabled(); if (length > 0) { @@ -961,7 +964,7 @@ if (trace) { - _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType() + "', key '" + key + "', value '" + value.getValue() + "'"); } @@ -976,7 +979,7 @@ if (trace) { - _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done."); + _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done."); } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java Tue Feb 12 09:25:44 2008 @@ -335,9 +335,9 @@ public void onMessage(Message message) { - if (log.isTraceEnabled()) + if (log.isDebugEnabled()) { - log.trace("Message " + _received + "received in listener"); + log.debug("Message " + _received + "received in listener"); } if (message instanceof TextMessage) Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml Tue Feb 12 09:25:44 2008 @@ -56,13 +56,11 @@ <dependency> <groupId>uk.co.thebadgerset</groupId> <artifactId>junit-toolkit</artifactId> - <version>0.6-SNAPSHOT</version> <scope>runtime</scope> </dependency> <dependency> <groupId>uk.co.thebadgerset</groupId> <artifactId>junit-toolkit-maven-plugin</artifactId> - <version>0.6-SNAPSHOT</version> <scope>runtime</scope> </dependency> </dependencies> Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java Tue Feb 12 09:25:44 2008 @@ -20,12 +20,16 @@ */ package org.apache.qpid.client.message; +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.SimpleByteBufferAllocator; + import javax.jms.JMSException; import javax.jms.Session; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.BytesMessage; import javax.jms.TextMessage; +import javax.jms.Queue; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -36,6 +40,15 @@ public static TextMessage newTextMessage(Session session, int size) throws JMSException { return session.createTextMessage(createMessagePayload(size)); + } + + public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException + { + ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true); + JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding); + message.clearBody(); + message.setText(createMessagePayload(size)); + return message; } public static BytesMessage newBytesMessage(Session session, int size) throws JMSException Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Tue Feb 12 09:25:44 2008 @@ -116,6 +116,7 @@ defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java Tue Feb 12 09:25:44 2008 @@ -26,6 +26,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.ObjectMessage; import org.apache.log4j.Logger; Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Tue Feb 12 09:25:44 2008 @@ -26,23 +26,30 @@ import java.util.Date; import javax.jms.*; -import javax.naming.Context; import org.apache.log4j.Logger; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.Session; +import org.apache.qpid.topic.Config; +import org.apache.qpid.exchange.ExchangeDefaults; + /** * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return * ping messages generated by [EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes * too. - * <p/> + * * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique * temporary queue or the correlation id to correlate the original message to the reply. - * <p/> + * * <p/>There is a verbose mode flag which causes information about each ping to be output to the console * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should * be disabled for real timing tests as writing to the console will slow things down. - * <p/> + * * <p><table id="crc"><caption>CRC Card</caption> * <tr><th> Responsibilities <th> Collaborations * <tr><td> Bounce back messages to their reply to destination. @@ -50,74 +57,51 @@ * </table> * * @todo Replace the command line parsing with a neater tool. + * * @todo Make verbose accept a number of messages, only prints to console every X messages. */ public class PingPongBouncer implements MessageListener { private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); - /** - * The default prefetch size for the message consumer. - */ + /** The default prefetch size for the message consumer. */ private static final int PREFETCH = 1; - /** - * The default no local flag for the message consumer. - */ + /** The default no local flag for the message consumer. */ private static final boolean NO_LOCAL = true; private static final String DEFAULT_DESTINATION_NAME = "ping"; - /** - * The default exclusive flag for the message consumer. - */ + /** The default exclusive flag for the message consumer. */ private static final boolean EXCLUSIVE = false; - /** - * A convenient formatter to use when time stamping output. - */ + /** A convenient formatter to use when time stamping output. */ protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - /** - * Used to indicate that the reply generator should log timing info to the console (logger info level). - */ + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ private boolean _verbose = false; - /** - * Determines whether this bounce back client bounces back messages persistently. - */ + /** Determines whether this bounce back client bounces back messages persistently. */ private boolean _persistent = false; private Destination _consumerDestination; - /** - * Keeps track of the response destination of the previous message for the last reply to producer cache. - */ + /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ private Destination _lastResponseDest; - /** - * The producer for sending replies with. - */ + /** The producer for sending replies with. */ private MessageProducer _replyProducer; - /** - * The consumer controlSession. - */ + /** The consumer controlSession. */ private Session _consumerSession; - /** - * The producer controlSession. - */ + /** The producer controlSession. */ private Session _producerSession; - /** - * Holds the connection to the broker. - */ - private Connection _connection; + /** Holds the connection to the broker. */ + private AMQConnection _connection; - /** - * Flag used to indicate if this is a point to point or pub/sub ping client. - */ + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ private boolean _isPubSub = false; /** @@ -135,21 +119,22 @@ /** * Creates a PingPongBouncer on the specified producer and consumer sessions. * - * @param fileProperties The path to the file properties - * @param factoryName The factory name + * @param brokerDetails The addresses of the brokers to connect to. * @param username The broker username. * @param password The broker password. + * @param virtualpath The virtual host name within the broker. * @param destinationName The name of the queue to receive pings on * (or root of the queue name where many queues are generated). * @param persistent A flag to indicate that persistent message should be used. * @param transacted A flag to indicate that pings should be sent within transactions. * @param selector A message selector to filter received pings with. * @param verbose A flag to indicate that message timings should be sent to the console. + * * @throws Exception All underlying exceptions allowed to fall through. This is only test code... */ - public PingPongBouncer(String fileProperties, String factoryName, String username, String password, - String destinationName, boolean persistent, boolean transacted, - String selector, boolean verbose, boolean pubsub) throws Exception + public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, + String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, + boolean pubsub) throws Exception { // Create a client id to uniquely identify this client. InetAddress address = InetAddress.getLocalHost(); @@ -158,9 +143,11 @@ _persistent = persistent; setPubSub(pubsub); // Connect to the broker. - Context context = InitialContextHelper.getInitialContext(fileProperties); - ConnectionFactory factory = (ConnectionFactory) context.lookup(factoryName); - setConnection(factory.createConnection(username, password)); + setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); + _logger.info("Connected with URL:" + getConnection().toURL()); + + // Set up the failover notifier. + getConnection().setConnectionListener(new FailoverNotifier()); // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the // command line option. @@ -169,7 +156,8 @@ // Create the queue to listen for message on. createConsumerDestination(destinationName); - MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, selector, NO_LOCAL); + MessageConsumer consumer = + _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); // Create a producer for the replies, without a default destination. _replyProducer = _producerSession.createProducer(null); @@ -180,10 +168,57 @@ consumer.setMessageListener(this); } + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ + public static void main(String[] args) throws Exception + { + System.out.println("Starting..."); + + // Display help on the command line. + if (args.length == 0) + { + _logger.info("Running test with default values..."); + //usage(); + //System.exit(0); + } + + // Extract all command line parameters. + Config config = new Config(); + config.setOptions(args); + String brokerDetails = config.getHost() + ":" + config.getPort(); + String virtualpath = "test"; + String destinationName = config.getDestination(); + if (destinationName == null) + { + destinationName = DEFAULT_DESTINATION_NAME; + } + + String selector = config.getSelector(); + boolean transacted = config.isTransacted(); + boolean persistent = config.usePersistentMessages(); + boolean pubsub = config.isPubSub(); + boolean verbose = true; + + //String selector = null; + + // Instantiate the ping pong client with the command line options and start it running. + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, + selector, verbose, pubsub); + pingBouncer.getConnection().start(); + + System.out.println("Waiting..."); + } + private static void usage() { - System.err.println( - "Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + "-persistent : (true/false). Default is false\n" + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); + System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + + "-persistent : (true/false). Default is false\n" + + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); } /** @@ -200,8 +235,8 @@ String messageCorrelationId = message.getJMSCorrelationID(); if (_verbose) { - _logger.info(timestampFormatter - .format(new Date()) + ": Got ping with correlation id, " + messageCorrelationId); + _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " + + messageCorrelationId); } // Get the reply to destination from the message and check it is set. @@ -234,8 +269,8 @@ if (_verbose) { - _logger.info(timestampFormatter - .format(new Date()) + ": Sent reply with correlation id, " + messageCorrelationId); + _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " + + messageCorrelationId); } // Commit the transaction if running in transactional mode. @@ -252,7 +287,7 @@ * * @return The underlying connection that this ping client is running on. */ - public Connection getConnection() + public AMQConnection getConnection() { return _connection; } @@ -262,7 +297,7 @@ * * @param connection The ping connection. */ - public void setConnection(Connection connection) + public void setConnection(AMQConnection connection) { this._connection = connection; } @@ -290,7 +325,7 @@ /** * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not * a transactional controlSession, this method does nothing. - * <p/> + * * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is set, this will prompt the user to kill the broker * after the commit is applied. @@ -305,7 +340,7 @@ { if (_failBeforeCommit) { - _logger.trace("Failing Before Commit"); + _logger.debug("Failing Before Commit"); doFailover(); } @@ -313,11 +348,11 @@ if (_failAfterCommit) { - _logger.trace("Failing After Commit"); + _logger.debug("Failing After Commit"); doFailover(); } - _logger.trace("Session Commited."); + _logger.debug("Session Commited."); } catch (JMSException e) { @@ -353,8 +388,7 @@ System.in.read(); } catch (IOException e) - { - } + { } System.out.println("Continuing."); } @@ -371,22 +405,49 @@ System.in.read(); } catch (IOException e) - { - } + { } System.out.println("Continuing."); } - private void createConsumerDestination(String name) throws JMSException + private void createConsumerDestination(String name) { if (isPubSub()) { - _consumerDestination = _consumerSession.createTopic(name); + _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); } else { - _consumerDestination = _consumerSession.createQueue(name); + _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); + } + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); } } } Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Feb 12 09:25:44 2008 @@ -669,9 +669,12 @@ // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); // _log.debug("Creating a connection for the message producer."); - + File propsFile = new File(_fileProperties); + InputStream is = new FileInputStream(propsFile); + Properties properties = new Properties(); + properties.load(is); - Context context = InitialContextHelper.getInitialContext(_fileProperties); + Context context = new InitialContext(properties); ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); _connection = factory.createConnection(_username, _password); Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=620871&r1=620870&r2=620871&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original) +++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Tue Feb 12 09:25:44 2008 @@ -172,10 +172,10 @@ PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String fileProperties = testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME); - String factoryName = testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME); + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); @@ -187,7 +187,7 @@ { // Establish a bounce back client on the ping queue to bounce back the pings. perThreadSetup._testPingBouncer = - new PingPongBouncer(fileProperties, factoryName, username, password, destinationName, persistent, + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, transacted, selector, verbose, pubsub); // Start the connections for client and producer running.
