Author: bhupendrab
Date: Fri Mar 16 08:07:57 2007
New Revision: 518999

URL: http://svn.apache.org/viewvc?view=rev&rev=518999
Log:
Added timeout to be passed on command line.
Updated PingPongProducer.java to run it either continuously or for a fixed no 
of messages

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=518999&r1=518998&r2=518999
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Fri Mar 16 08:07:57 2007
@@ -39,6 +39,7 @@
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
@@ -559,11 +560,12 @@
         boolean transacted = config.isTransacted();
         boolean persistent = config.usePersistentMessages();
         int messageSize = (config.getPayload() != 0) ? config.getPayload() : 
DEFAULT_MESSAGE_SIZE;
-        //int messageCount = config.getMessages();
+        int messageCount = config.getMessages();
         int destCount = (config.getDestinationsCount() != 0) ? 
config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
         int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : 
DEFAULT_TX_BATCH_SIZE;
         int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
         boolean pubsub = config.isPubSub();
+        long timeout = (config.getTimeout() != 0) ? config.getTimeout() : 
DEFAULT_TIMEOUT;
 
         String destName = config.getDestination();
         if (destName == null)
@@ -623,10 +625,19 @@
         // Ensure that the ping pong producer is registered to listen for 
exceptions on the connection too.
         pingProducer.getConnection().setExceptionListener(pingProducer);
 
-        // Create the ping loop thread and run it until it is terminated by 
the shutdown hook or exception.
-        Thread pingThread = new Thread(pingProducer);
-        pingThread.run();
-        pingThread.join();
+        // If messageount is 0, then continue sending
+        if (messageCount == 0)
+        {
+            // Create the ping loop thread and run it until it is terminated 
by the shutdown hook or exception.
+            Thread pingThread = new Thread(pingProducer);
+            pingThread.start();
+            pingThread.join();
+        }
+        else
+        {
+            pingProducer.ping(messageCount, timeout);
+        }
+        pingProducer.close();
     }
 
     /**
@@ -785,8 +796,13 @@
                     if ((remainingCount % _txBatchSize) == 0)
                     {
                         commitTx(_consumerSession);
+                        if (!_consumerSession.getTransacted() &&
+                            _consumerSession.getAcknowledgeMode() == 
Session.CLIENT_ACKNOWLEDGE)
+                        {
+                            ((AMQSession)_consumerSession).acknowledge();
+                        }
                     }
-
+                    
                     // Forward the message and remaining count to any 
interested chained message listener.
                     if (_chainedMessageListener != null)
                     {
@@ -1017,9 +1033,9 @@
     }
 
     /**
-     * The ping loop implementation. This sends out pings waits for replies 
and inserts short pauses in between each.
+     * The ping implementation. This sends out pings waits for replies and 
inserts short pauses in between each.
      */
-    public void pingLoop()
+    public void ping(int pingCount, long timeout)
     {
         try
         {
@@ -1028,7 +1044,7 @@
             msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
 
             // Send the message and wait for a reply.
-            pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
+            pingAndWaitForReply(msg, pingCount, timeout);
 
             // Introduce a short pause if desired.
             pause(DEFAULT_SLEEP_TIME);
@@ -1045,6 +1061,11 @@
         }
     }
 
+    public void ping()
+    {
+        ping(DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
+    }
+
     public Destination getReplyDestination()
     {
         return getReplyDestinations().get(0);
@@ -1105,7 +1126,7 @@
         // Keep running until the publish flag is cleared.
         while (_publish)
         {
-            pingLoop();
+            ping();
         }
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java?view=diff&rev=518999&r1=518998&r2=518999
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
 Fri Mar 16 08:07:57 2007
@@ -51,6 +51,7 @@
     private int batchSize;
     private int rate;
     private boolean ispubsub;
+    private long timeout;
 
     public Config()
     {
@@ -161,6 +162,16 @@
         this.delay = delay;
     }
 
+    public long getTimeout()
+    {
+        return timeout;
+    }
+
+    public void setTimeout(long time)
+    {
+        this.timeout = time;
+    }
+
     public String getClientId()
     {
         return clientId;
@@ -284,6 +295,10 @@
         else if("-destinationname".equalsIgnoreCase(key))
         {
             destinationName = value;
+        }
+        else if("-timeout".equalsIgnoreCase(key))
+        {
+            setTimeout(parseLong("Bad timeout data", value));
         }
         else
         {


Reply via email to