Author: aidan
Date: Tue Feb 12 09:25:44 2008
New Revision: 620871

URL: http://svn.apache.org/viewvc?rev=620871&view=rev
Log:
Additonal fixes to perftests as well were necesary, plus these merges:

Merged revisions 619868,620495-620496 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r619868 | aidan | 2008-02-08 12:52:54 +0000 (Fri, 08 Feb 2008) | 4 lines
  
  QPID-588: change instances of trace() and isTraceEnabled to debug equivalent 
to support older versions of log4j
........
  r620495 | rupertlssmith | 2008-02-11 14:48:35 +0000 (Mon, 11 Feb 2008) | 1 
line
  
  QPID-730 : Changed durable subscription test, to ensure re-connection happens 
under the same client name.
........
  r620496 | rupertlssmith | 2008-02-11 14:50:18 +0000 (Mon, 11 Feb 2008) | 1 
line
  
  QPID-729 : Added explicit list of unacked messages, acked on commit, rejected 
on roll-back.
........

Modified:
    incubator/qpid/branches/thegreatmerge/qpid/   (props changed)
    
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java

Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
 Tue Feb 12 09:25:44 2008
@@ -457,7 +457,7 @@
                 foreach (BasicMessageConsumer consumer  in _consumers.Values)
                 {
                     // Sends acknowledgement to server.
-                    consumer.AcknowledgeLastDelivered();
+                    consumer.AcknowledgeDelivered();
                 }
 
                 // Commits outstanding messages sent and outstanding 
acknowledgements.
@@ -485,13 +485,16 @@
                     {
                         Suspend(true);
                     }
-                 
-                    // todo: rollback dispatcher when TX support is added
-                    //if ( _dispatcher != null )
-                    //   _dispatcher.Rollback();
 
-                    _connection.ConvenientProtocolWriter.SyncWrite(
-                                                                   
TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+                    // Reject up to message last delivered (if any) for each 
consumer.
+                    // Need to send reject for messages delivered to consumers 
so far.
+                    foreach (BasicMessageConsumer consumer  in 
_consumers.Values)
+                    {
+                        // Sends acknowledgement to server.
+                        consumer.RejectUnacked();
+                    }
+
+                    
_connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId),
 typeof(TxRollbackOkBody));
 
                     if ( !suspended )
                     {
@@ -1012,6 +1015,15 @@
             _connection.ProtocolWriter.Write(ackFrame);
         }
 
+        public void RejectMessage(ulong deliveryTag, bool requeue)
+        {
+            if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || 
(_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+            {
+                AMQFrame rejectFrame = 
BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+                _connection.ProtocolWriter.Write(rejectFrame);
+            }
+        }
+        
         /// <summary>
         /// Handle a message that bounced from the server, creating
         /// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
         /// Placing stop check after consume may also be wrong as it may cause 
a message to be thrown away. Seems more correct to use interupt on 
         /// the block thread to cause it to prematurely return from its wait, 
whereupon it can be made to re-check the stop flag.</remarks>
         ///
-        /// <remarks>Exception swalled, if there is an exception whilst 
notifying the connection on bounced messages. Unhandled excetpion should
-        /// fall through and termiante the loop, as it is a bug if it 
occurrs.</remarks>
+        /// <remarks>Exception swallowed, if there is an exception whilst 
notifying the connection on bounced messages. Unhandled excetpion should
+        /// fall through and terminate the loop, as it is a bug if it 
occurrs.</remarks>
         private class Dispatcher
         {            
             /// <summary> Flag used to indicate when this dispatcher is to be 
stopped (0=go, 1=stop). </summary>

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
 Tue Feb 12 09:25:44 2008
@@ -20,6 +20,8 @@
  */
 using System;
 using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
 using log4net;
 using Apache.Qpid.Client.Message;
 using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
 
         private AmqChannel _channel;
 
+        // <summary>
+        // Tag of last message delievered, whoch should be acknowledged on 
commit in transaction mode.
+        // </summary>
+        //private long _lastDeliveryTag;
+
         /// <summary>
-        /// Tag of last message delievered, whoch should be acknowledged on 
commit in transaction mode.
+        /// Explicit list of all received but un-acked messages in a 
transaction. Used to ensure acking is completed when transaction is committed.
         /// </summary>
-        private long _lastDeliveryTag;
+        private LinkedList<long> _receivedDeliveryTags;
 
         /// <summary>
         /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
             _prefetchHigh = prefetchHigh;
             _prefetchLow = prefetchLow;
             _exclusive = exclusive;
+
+            if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+            {
+                _receivedDeliveryTags = new LinkedList<long>();
+            }
         }
 
         #region IMessageConsumer Members
@@ -391,13 +403,24 @@
         /// <summary>
         /// Acknowledge up to last message delivered (if any). Used when 
commiting.
         /// </summary>
-        internal void AcknowledgeLastDelivered()
+        internal void AcknowledgeDelivered()
         {
-            if (_lastDeliveryTag > 0)
+            foreach (long tag in _receivedDeliveryTags)
             {
-                _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // 
XXX evil cast
-                _lastDeliveryTag = -1;
+                _channel.AcknowledgeMessage((ulong)tag, false);
             }
+
+            _receivedDeliveryTags.Clear();
+        }
+
+        internal void RejectUnacked()
+        {
+            foreach (long tag in _receivedDeliveryTags)
+            {
+                _channel.RejectMessage((ulong)tag, true);
+            }
+
+            _receivedDeliveryTags.Clear();
         }
 
         private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
                     break;
                 
                 case AcknowledgeMode.SessionTransacted:
-                    _lastDeliveryTag = msg.DeliveryTag;
+                    _receivedDeliveryTags.AddLast(msg.DeliveryTag);
                     break;
             }
         }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
 Tue Feb 12 09:25:44 2008
@@ -78,6 +78,8 @@
             SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, 
false, ExchangeNameDefaults.TOPIC, true,
                           true, "TestSubscription" + testId);
 
+            Thread.Sleep(500);
+
             // Send messages and receive on both consumers.
             testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
 
@@ -93,15 +95,15 @@
             ConsumeNMessagesOnly(1, "B", testConsumer[1]);
 
             // Re-attach consumer, check that it gets the messages that it 
missed.
-            SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode, 
false, ExchangeNameDefaults.TOPIC, true,
+            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, 
false, ExchangeNameDefaults.TOPIC, true,
                           true, "TestSubscription" + testId);
 
-            ConsumeNMessagesOnly(1, "B", testConsumer[3]);
+            ConsumeNMessagesOnly(1, "B", testConsumer[2]);
 
             // Clean up any open consumers at the end of the test.
-            CloseEndPoint(0);
+            CloseEndPoint(2);
             CloseEndPoint(1);
-            CloseEndPoint(3);
+            CloseEndPoint(0);
         }
 
         /// <summary> Check that an uncommitted receive can be re-received, on 
re-consume from the same durable subscription. </summary>
@@ -122,13 +124,13 @@
 
             // Close end-point 1 without committing the message, then re-open 
the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, 
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, 
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
                           true, false, null);
 
             // Check that the message was released from the rolled back 
end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
 
-            CloseEndPoint(2);
+            CloseEndPoint(1);
             CloseEndPoint(0);
         }
 
@@ -151,13 +153,13 @@
 
             // Close end-point 1 without committing the message, then re-open 
the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, 
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, 
AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
                           true, false, null);
 
             // Check that the message was released from the rolled back 
end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
 
-            CloseEndPoint(2);
+            CloseEndPoint(1);
             CloseEndPoint(0);
         }
     }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Feb 12 09:25:44 2008
@@ -218,9 +218,9 @@
         }
         else
         {
-            if (_log.isTraceEnabled())
+            if (_log.isDebugEnabled())
             {
-                _log.trace(debugIdentity() + "Content header received on 
channel " + _channelId);
+                _log.debug(debugIdentity() + "Content header received on 
channel " + _channelId);
             }
 
             if (ENABLE_JMSXUserID)
@@ -255,9 +255,9 @@
             throw new AMQException(null, "Received content body without 
previously receiving a JmsPublishBody", null);
         }
 
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace(debugIdentity() + "Content body received on channel " + 
_channelId);
+            _log.debug(debugIdentity() + "Content body received on channel " + 
_channelId);
         }
 
         try

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
 Tue Feb 12 09:25:44 2008
@@ -79,9 +79,9 @@
 
             if (queue == null)
             {
-                if (_log.isTraceEnabled())
+                if (_log.isDebugEnabled())
                 {
-                    _log.trace("No queue for '" + body.getQueue() + "'");
+                    _log.debug("No queue for '" + body.getQueue() + "'");
                 }
                 if (body.getQueue() != null)
                 {

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
 Tue Feb 12 09:25:44 2008
@@ -99,9 +99,9 @@
             }
 
 
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
-                _logger.trace("Rejecting: DT:" + deliveryTag + "-" + 
message.getMessage().debugIdentity() +
+                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + 
message.getMessage().debugIdentity() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Tue Feb 12 09:25:44 2008
@@ -302,9 +302,9 @@
 
     public void populatePreDeliveryQueue(Subscription subscription)
     {
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace("Populating PreDeliveryQueue for Subscription(" + 
System.identityHashCode(subscription) + ")");
+            _log.debug("Populating PreDeliveryQueue for Subscription(" + 
System.identityHashCode(subscription) + ")");
         }
 
         Iterator<QueueEntry> currentQueue = _messages.iterator();
@@ -532,9 +532,9 @@
             //else the clean up is not required as the message has already 
been taken for this queue therefore
             // it was the responsibility of the code that took the message to 
ensure the _totalMessageSize was updated.
 
-            if (_log.isTraceEnabled())
+            if (_log.isDebugEnabled())
             {
-                _log.trace("Removed taken message:" + message.debugIdentity());
+                _log.debug("Removed taken message:" + message.debugIdentity());
             }
 
             // try the next message
@@ -627,9 +627,9 @@
 
         Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
 
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + 
System.identityHashCode(sub) +
+            _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + 
System.identityHashCode(sub) +
                        ") from queue (" + 
System.identityHashCode(messageQueue) +
                        ") AMQQueue (" + System.identityHashCode(queue) + ")");
         }
@@ -655,9 +655,9 @@
                 // message will be null if we have no messages in the 
messageQueue.
                 if (entry == null)
                 {
-                    if (_log.isTraceEnabled())
+                    if (_log.isDebugEnabled())
                     {
-                        _log.trace(debugIdentity() + "No messages for 
Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + 
System.identityHashCode(messageQueue) + ")");
+                        _log.debug(debugIdentity() + "No messages for 
Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + 
System.identityHashCode(messageQueue) + ")");
                     }
                     return;
                 }
@@ -696,9 +696,9 @@
 
             if (messageQueue == sub.getResendQueue())
             {
-                if (_log.isTraceEnabled())
+                if (_log.isDebugEnabled())
                 {
-                    _log.trace(debugIdentity() + "All messages sent from 
resendQueue for " + sub);
+                    _log.debug(debugIdentity() + "All messages sent from 
resendQueue for " + sub);
                 }
                 if (messageQueue.isEmpty())
                 {
@@ -884,9 +884,9 @@
                 {
                     if (!s.isSuspended())
                     {
-                        if (_log.isTraceEnabled())
+                        if (_log.isDebugEnabled())
                         {
-                            _log.trace(debugIdentity() + "Delivering Message:" 
+ entry.getMessage().debugIdentity() + " to(" +
+                            _log.debug(debugIdentity() + "Delivering Message:" 
+ entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Tue Feb 12 09:25:44 2008
@@ -416,9 +416,9 @@
         }
 
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("(" + debugIdentity() + ") checking filters for 
message (" + entry.debugIdentity());
+            _logger.debug("(" + debugIdentity() + ") checking filters for 
message (" + entry.debugIdentity());
         }
         return checkFilters(entry);
 
@@ -563,9 +563,9 @@
             {
                 QueueEntry resent = _resendQueue.poll();
 
-                if (_logger.isTraceEnabled())
+                if (_logger.isDebugEnabled())
                 {
-                    _logger.trace("Removed for resending:" + 
resent.debugIdentity());
+                    _logger.debug("Removed for resending:" + 
resent.debugIdentity());
                 }
 
                 resent.release();

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Feb 12 09:25:44 2008
@@ -1369,9 +1369,9 @@
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Unacked message:" + 
message.getDeliveryTag());
+            _logger.debug("Rejecting Unacked message:" + 
message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);
@@ -1379,9 +1379,9 @@
 
     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
     {
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Abstract message:" + 
message.getDeliveryTag());
+            _logger.debug("Rejecting Abstract message:" + 
message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Feb 12 09:25:44 2008
@@ -567,12 +567,12 @@
         {
             if (!_closed.getAndSet(true))
             {
-                if (_logger.isTraceEnabled())
+                if (_logger.isDebugEnabled())
                 {
                     StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
                     if (_closedStack != null)
                     {
-                        _logger.trace(_consumerTag + " previously:" + 
_closedStack.toString());
+                        _logger.debug(_consumerTag + " previously:" + 
_closedStack.toString());
                     }
                     else
                     {
@@ -639,13 +639,14 @@
         {
             _closed.set(true);
 
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " markClosed():" + Arrays
-                            
.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
-                    _logger.trace(_consumerTag + " previously:" + 
_closedStack.toString());
+                    StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
+                    _logger.debug(_consumerTag + " markClosed():"
+                                  + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
+                    _logger.debug(_consumerTag + " previously:" + 
_closedStack.toString());
                 }
                 else
                 {
@@ -881,14 +882,14 @@
         // synchronized (_closed)
         {
             _closed.set(true);
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
                 StackTraceElement[] stackTrace = 
Thread.currentThread().getStackTrace();
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " notifyError():"
+                    _logger.debug(_consumerTag + " notifyError():"
                                   + Arrays.asList(stackTrace).subList(3, 
stackTrace.length - 1));
-                    _logger.trace(_consumerTag + " previously" + 
_closedStack.toString());
+                    _logger.debug(_consumerTag + " previously" + 
_closedStack.toString());
                 }
                 else
                 {
@@ -1035,9 +1036,9 @@
                 {
                     _session.rejectMessage(((AbstractJMSMessage) o), true);
 
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Rejected message:" + 
((AbstractJMSMessage) o).getDeliveryTag());
+                        _logger.debug("Rejected message:" + 
((AbstractJMSMessage) o).getDeliveryTag());
                     }
 
                     iterator.remove();

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
 Tue Feb 12 09:25:44 2008
@@ -171,7 +171,7 @@
             m.setJMSReplyTo(q);
             m.setStringProperty("TempQueue", q.toString());
 
-            _logger.trace("Message:" + m);
+            _logger.debug("Message:" + m);
 
             Assert.assertEquals("Check temp queue has been set correctly", 
m.getJMSReplyTo().toString(),
                 m.getStringProperty("TempQueue"));

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 Tue Feb 12 09:25:44 2008
@@ -706,12 +706,15 @@
 
     public void writeToBuffer(ByteBuffer buffer)
     {
-        final boolean trace = _logger.isTraceEnabled();
+        final boolean trace = _logger.isDebugEnabled();
 
         if (trace)
         {
-            _logger.trace("FieldTable::writeToBuffer: Writing encoded length 
of " + getEncodedSize() + "...");
-            _logger.trace(_properties.toString());
+            _logger.debug("FieldTable::writeToBuffer: Writing encoded length 
of " + getEncodedSize() + "...");
+            if (_properties != null)
+            {
+                _logger.debug(_properties.toString());
+            }
         }
 
         EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
@@ -915,11 +918,11 @@
                 final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
                 try
                 {
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Writing Property:" + me.getKey() + " 
Type:" + me.getValue().getType() + " Value:"
+                        _logger.debug("Writing Property:" + me.getKey() + " 
Type:" + me.getValue().getType() + " Value:"
                             + me.getValue().getValue());
-                        _logger.trace("Buffer Position:" + buffer.position() + 
" Remaining:" + buffer.remaining());
+                        _logger.debug("Buffer Position:" + buffer.position() + 
" Remaining:" + buffer.remaining());
                     }
 
                     // Write the actual parameter name
@@ -928,12 +931,12 @@
                 }
                 catch (Exception e)
                 {
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Exception thrown:" + e);
-                        _logger.trace("Writing Property:" + me.getKey() + " 
Type:" + me.getValue().getType() + " Value:"
+                        _logger.debug("Exception thrown:" + e);
+                        _logger.debug("Writing Property:" + me.getKey() + " 
Type:" + me.getValue().getType() + " Value:"
                             + me.getValue().getValue());
-                        _logger.trace("Buffer Position:" + buffer.position() + 
" Remaining:" + buffer.remaining());
+                        _logger.debug("Buffer Position:" + buffer.position() + 
" Remaining:" + buffer.remaining());
                     }
 
                     throw new RuntimeException(e);
@@ -945,7 +948,7 @@
     private void setFromBuffer(ByteBuffer buffer, long length) throws 
AMQFrameDecodingException
     {
 
-        final boolean trace = _logger.isTraceEnabled();
+        final boolean trace = _logger.isDebugEnabled();
         if (length > 0)
         {
 
@@ -961,7 +964,7 @@
 
                 if (trace)
                 {
-                    _logger.trace("FieldTable::PropFieldTable(buffer," + 
length + "): Read type '" + value.getType()
+                    _logger.debug("FieldTable::PropFieldTable(buffer," + 
length + "): Read type '" + value.getType()
                         + "', key '" + key + "', value '" + value.getValue() + 
"'");
                 }
 
@@ -976,7 +979,7 @@
 
         if (trace)
         {
-            _logger.trace("FieldTable::FieldTable(buffer," + length + "): 
Done.");
+            _logger.debug("FieldTable::FieldTable(buffer," + length + "): 
Done.");
         }
     }
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
 Tue Feb 12 09:25:44 2008
@@ -335,9 +335,9 @@
 
         public void onMessage(Message message)
         {
-            if (log.isTraceEnabled())
+            if (log.isDebugEnabled())
             {
-                log.trace("Message " + _received + "received in listener");
+                log.debug("Message " + _received + "received in listener");
             }
 
             if (message instanceof TextMessage)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml 
(original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml 
Tue Feb 12 09:25:44 2008
@@ -56,13 +56,11 @@
         <dependency>
             <groupId>uk.co.thebadgerset</groupId>
             <artifactId>junit-toolkit</artifactId>
-            <version>0.6-SNAPSHOT</version>
             <scope>runtime</scope>
             </dependency>
         <dependency>
             <groupId>uk.co.thebadgerset</groupId>
             <artifactId>junit-toolkit-maven-plugin</artifactId>
-            <version>0.6-SNAPSHOT</version>
             <scope>runtime</scope>
         </dependency>
     </dependencies>

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
 Tue Feb 12 09:25:44 2008
@@ -20,12 +20,16 @@
  */
 package org.apache.qpid.client.message;
 
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+
 import javax.jms.JMSException;
 import javax.jms.Session;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.BytesMessage;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 
@@ -36,6 +40,15 @@
     public static TextMessage newTextMessage(Session session, int size) throws 
JMSException
     {
         return session.createTextMessage(createMessagePayload(size));
+    }
+
+    public static JMSTextMessage newJMSTextMessage(int size, String encoding) 
throws JMSException
+    {
+        ByteBuffer byteBuffer = (new 
SimpleByteBufferAllocator()).allocate(size, true);
+        JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding);
+        message.clearBody();
+        message.setText(createMessagePayload(size));
+        return message;
     }
 
     public static BytesMessage newBytesMessage(Session session, int size) 
throws JMSException

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 Tue Feb 12 09:25:44 2008
@@ -116,6 +116,7 @@
         defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
         defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
         defaults.setProperty(RATE_PROPNAME, "20");
+        defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
         defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, 
NUM_MESSAGES_TO_ACTION_DEFAULT);
     }
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
 Tue Feb 12 09:25:44 2008
@@ -26,6 +26,7 @@
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
 
 import org.apache.log4j.Logger;
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
 Tue Feb 12 09:25:44 2008
@@ -26,23 +26,30 @@
 import java.util.Date;
 
 import javax.jms.*;
-import javax.naming.Context;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
 /**
  * PingPongBouncer is a message listener the bounces back messages to their 
reply to destination. This is used to return
  * ping messages generated by [EMAIL PROTECTED] 
org.apache.qpid.requestreply.PingPongProducer} but could be used for other 
purposes
  * too.
- * <p/>
+ *
  * <p/>The correlation id from the received message is extracted, and placed 
into the reply as the correlation id. Messages
  * are bounced back to their reply-to destination. The original sender of the 
message has the option to use either a unique
  * temporary queue or the correlation id to correlate the original message to 
the reply.
- * <p/>
+ *
  * <p/>There is a verbose mode flag which causes information about each ping 
to be output to the console
  * (info level logging, so usually console). This can be helpfull to check the 
bounce backs are happening but should
  * be disabled for real timing tests as writing to the console will slow 
things down.
- * <p/>
+ *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Bounce back messages to their reply to destination.
@@ -50,74 +57,51 @@
  * </table>
  *
  * @todo Replace the command line parsing with a neater tool.
+ *
  * @todo Make verbose accept a number of messages, only prints to console 
every X messages.
  */
 public class PingPongBouncer implements MessageListener
 {
     private static final Logger _logger = 
Logger.getLogger(PingPongBouncer.class);
 
-    /**
-     * The default prefetch size for the message consumer.
-     */
+    /** The default prefetch size for the message consumer. */
     private static final int PREFETCH = 1;
 
-    /**
-     * The default no local flag for the message consumer.
-     */
+    /** The default no local flag for the message consumer. */
     private static final boolean NO_LOCAL = true;
 
     private static final String DEFAULT_DESTINATION_NAME = "ping";
 
-    /**
-     * The default exclusive flag for the message consumer.
-     */
+    /** The default exclusive flag for the message consumer. */
     private static final boolean EXCLUSIVE = false;
 
-    /**
-     * A convenient formatter to use when time stamping output.
-     */
+    /** A convenient formatter to use when time stamping output. */
     protected static final SimpleDateFormat timestampFormatter = new 
SimpleDateFormat("hh:mm:ss:SS");
 
-    /**
-     * Used to indicate that the reply generator should log timing info to the 
console (logger info level).
-     */
+    /** Used to indicate that the reply generator should log timing info to 
the console (logger info level). */
     private boolean _verbose = false;
 
-    /**
-     * Determines whether this bounce back client bounces back messages 
persistently.
-     */
+    /** Determines whether this bounce back client bounces back messages 
persistently. */
     private boolean _persistent = false;
 
     private Destination _consumerDestination;
 
-    /**
-     * Keeps track of the response destination of the previous message for the 
last reply to producer cache.
-     */
+    /** Keeps track of the response destination of the previous message for 
the last reply to producer cache. */
     private Destination _lastResponseDest;
 
-    /**
-     * The producer for sending replies with.
-     */
+    /** The producer for sending replies with. */
     private MessageProducer _replyProducer;
 
-    /**
-     * The consumer controlSession.
-     */
+    /** The consumer controlSession. */
     private Session _consumerSession;
 
-    /**
-     * The producer controlSession.
-     */
+    /** The producer controlSession. */
     private Session _producerSession;
 
-    /**
-     * Holds the connection to the broker.
-     */
-    private Connection _connection;
+    /** Holds the connection to the broker. */
+    private AMQConnection _connection;
 
-    /**
-     * Flag used to indicate if this is a point to point or pub/sub ping 
client.
-     */
+    /** Flag used to indicate if this is a point to point or pub/sub ping 
client. */
     private boolean _isPubSub = false;
 
     /**
@@ -135,21 +119,22 @@
     /**
      * Creates a PingPongBouncer on the specified producer and consumer 
sessions.
      *
-     * @param fileProperties  The path to the file properties
-     * @param factoryName     The factory name
+     * @param brokerDetails The addresses of the brokers to connect to.
      * @param username        The broker username.
      * @param password        The broker password.
+     * @param virtualpath     The virtual host name within the broker.
      * @param destinationName The name of the queue to receive pings on
      *                        (or root of the queue name where many queues are 
generated).
      * @param persistent      A flag to indicate that persistent message 
should be used.
      * @param transacted      A flag to indicate that pings should be sent 
within transactions.
      * @param selector        A message selector to filter received pings with.
      * @param verbose         A flag to indicate that message timings should 
be sent to the console.
+     *
      * @throws Exception All underlying exceptions allowed to fall through. 
This is only test code...
      */
-    public PingPongBouncer(String fileProperties, String factoryName, String 
username, String password,
-                           String destinationName, boolean persistent, boolean 
transacted,
-                           String selector, boolean verbose, boolean pubsub) 
throws Exception
+    public PingPongBouncer(String brokerDetails, String username, String 
password, String virtualpath,
+                           String destinationName, boolean persistent, boolean 
transacted, String selector, boolean verbose,
+                           boolean pubsub) throws Exception
     {
         // Create a client id to uniquely identify this client.
         InetAddress address = InetAddress.getLocalHost();
@@ -158,9 +143,11 @@
         _persistent = persistent;
         setPubSub(pubsub);
         // Connect to the broker.
-        Context context = 
InitialContextHelper.getInitialContext(fileProperties);
-        ConnectionFactory factory = (ConnectionFactory) 
context.lookup(factoryName);
-        setConnection(factory.createConnection(username, password));
+        setConnection(new AMQConnection(brokerDetails, username, password, 
clientId, virtualpath));
+        _logger.info("Connected with URL:" + getConnection().toURL());
+
+        // Set up the failover notifier.
+        getConnection().setConnectionListener(new FailoverNotifier());
 
         // Create a controlSession to listen for messages on and one to send 
replies on, transactional depending on the
         // command line option.
@@ -169,7 +156,8 @@
 
         // Create the queue to listen for message on.
         createConsumerDestination(destinationName);
-        MessageConsumer consumer = 
_consumerSession.createConsumer(_consumerDestination, selector, NO_LOCAL);
+        MessageConsumer consumer =
+            _consumerSession.createConsumer(_consumerDestination, PREFETCH, 
NO_LOCAL, EXCLUSIVE, selector);
 
         // Create a producer for the replies, without a default destination.
         _replyProducer = _producerSession.createProducer(null);
@@ -180,10 +168,57 @@
         consumer.setMessageListener(this);
     }
 
+    /**
+     * Starts a stand alone ping-pong client running in verbose mode.
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception
+    {
+        System.out.println("Starting...");
+
+        // Display help on the command line.
+        if (args.length == 0)
+        {
+            _logger.info("Running test with default values...");
+            //usage();
+            //System.exit(0);
+        }
+
+        // Extract all command line parameters.
+        Config config = new Config();
+        config.setOptions(args);
+        String brokerDetails = config.getHost() + ":" + config.getPort();
+        String virtualpath = "test";
+        String destinationName = config.getDestination();
+        if (destinationName == null)
+        {
+            destinationName = DEFAULT_DESTINATION_NAME;
+        }
+
+        String selector = config.getSelector();
+        boolean transacted = config.isTransacted();
+        boolean persistent = config.usePersistentMessages();
+        boolean pubsub = config.isPubSub();
+        boolean verbose = true;
+
+        //String selector = null;
+
+        // Instantiate the ping pong client with the command line options and 
start it running.
+        PingPongBouncer pingBouncer =
+            new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, 
destinationName, persistent, transacted,
+                                selector, verbose, pubsub);
+        pingBouncer.getConnection().start();
+
+        System.out.println("Waiting...");
+    }
+
     private static void usage()
     {
-        System.err.println(
-                "Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port 
: broker port\n" + "-destinationname : queue/topic name\n" + "-transacted : 
(true/false). Default is false\n" + "-persistent : (true/false). Default is 
false\n" + "-pubsub     : (true/false). Default is false\n" + "-selector   : 
selector string\n");
+        System.err.println("Usage: PingPongBouncer \n" + "-host : broker 
host\n" + "-port : broker port\n"
+                           + "-destinationname : queue/topic name\n" + 
"-transacted : (true/false). Default is false\n"
+                           + "-persistent : (true/false). Default is false\n"
+                           + "-pubsub     : (true/false). Default is false\n" 
+ "-selector   : selector string\n");
     }
 
     /**
@@ -200,8 +235,8 @@
             String messageCorrelationId = message.getJMSCorrelationID();
             if (_verbose)
             {
-                _logger.info(timestampFormatter
-                        .format(new Date()) + ": Got ping with correlation id, 
" + messageCorrelationId);
+                _logger.info(timestampFormatter.format(new Date()) + ": Got 
ping with correlation id, "
+                             + messageCorrelationId);
             }
 
             // Get the reply to destination from the message and check it is 
set.
@@ -234,8 +269,8 @@
 
             if (_verbose)
             {
-                _logger.info(timestampFormatter
-                        .format(new Date()) + ": Sent reply with correlation 
id, " + messageCorrelationId);
+                _logger.info(timestampFormatter.format(new Date()) + ": Sent 
reply with correlation id, "
+                             + messageCorrelationId);
             }
 
             // Commit the transaction if running in transactional mode.
@@ -252,7 +287,7 @@
      *
      * @return The underlying connection that this ping client is running on.
      */
-    public Connection getConnection()
+    public AMQConnection getConnection()
     {
         return _connection;
     }
@@ -262,7 +297,7 @@
      *
      * @param connection The ping connection.
      */
-    public void setConnection(Connection connection)
+    public void setConnection(AMQConnection connection)
     {
         this._connection = connection;
     }
@@ -290,7 +325,7 @@
     /**
      * Convenience method to commit the transaction on the specified 
controlSession. If the controlSession to commit on is not
      * a transactional controlSession, this method does nothing.
-     * <p/>
+     *
      * <p/>If the [EMAIL PROTECTED] #_failBeforeCommit} flag is set, this will 
prompt the user to kill the broker before the
      * commit is applied. If the [EMAIL PROTECTED] #_failAfterCommit} flag is 
set, this will prompt the user to kill the broker
      * after the commit is applied.
@@ -305,7 +340,7 @@
             {
                 if (_failBeforeCommit)
                 {
-                    _logger.trace("Failing Before Commit");
+                    _logger.debug("Failing Before Commit");
                     doFailover();
                 }
 
@@ -313,11 +348,11 @@
 
                 if (_failAfterCommit)
                 {
-                    _logger.trace("Failing After Commit");
+                    _logger.debug("Failing After Commit");
                     doFailover();
                 }
 
-                _logger.trace("Session Commited.");
+                _logger.debug("Session Commited.");
             }
             catch (JMSException e)
             {
@@ -353,8 +388,7 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
 
         System.out.println("Continuing.");
     }
@@ -371,22 +405,49 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
 
         System.out.println("Continuing.");
 
     }
 
-    private void createConsumerDestination(String name) throws JMSException
+    private void createConsumerDestination(String name)
     {
         if (isPubSub())
         {
-            _consumerDestination = _consumerSession.createTopic(name);
+            _consumerDestination = new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
         }
         else
         {
-            _consumerDestination = _consumerSession.createQueue(name);
+            _consumerDestination = new 
AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
+        }
+    }
+
+    /**
+     * A connection listener that logs out any failover complete events. Could 
do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        { }
+
+        public void bytesReceived(long count)
+        { }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true;
+        }
+
+        public boolean preResubscribe()
+        {
+            return true;
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
         }
     }
 }

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Tue Feb 12 09:25:44 2008
@@ -669,9 +669,12 @@
         // _log.debug("protected void createConnection(String clientID = " + 
clientID + "): called");
 
         // _log.debug("Creating a connection for the message producer.");
-      
+        File propsFile = new File(_fileProperties);
+        InputStream is = new FileInputStream(propsFile);
+        Properties properties = new Properties();
+        properties.load(is);
 
-        Context context = 
InitialContextHelper.getInitialContext(_fileProperties);
+        Context context = new InitialContext(properties);
         ConnectionFactory factory = (ConnectionFactory) 
context.lookup(_factoryName);
         _connection = factory.createConnection(_username, _password);
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
 Tue Feb 12 09:25:44 2008
@@ -172,10 +172,10 @@
             PerThreadSetup perThreadSetup = new PerThreadSetup();
 
             // Extract the test set up paramaeters.
-            String fileProperties = 
testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME);
-            String factoryName = 
testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME);
+            String brokerDetails = 
testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
             String username = 
testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
             String password = 
testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+            String virtualPath = 
testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
             String destinationName = 
testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
             boolean persistent = 
testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
             boolean transacted = 
testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
@@ -187,7 +187,7 @@
             {
                 // Establish a bounce back client on the ping queue to bounce 
back the pings.
                 perThreadSetup._testPingBouncer =
-                    new PingPongBouncer(fileProperties, factoryName, username, 
password, destinationName, persistent,
+                    new PingPongBouncer(brokerDetails, username, password, 
virtualPath, destinationName, persistent,
                         transacted, selector, verbose, pubsub);
 
                 // Start the connections for client and producer running.


Reply via email to