Author: rgreig
Date: Wed Jan 24 07:04:06 2007
New Revision: 499427

URL: http://svn.apache.org/viewvc?view=rev&rev=499427
Log:
(Patch submitted by Rupert Smith) Cleans up the countdown latch used to count 
expected number of messages. Not clearing it from the map of countdowns by 
message correlation id causes a memory leak as the map is long lived over many 
tests.

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=499427&r1=499426&r2=499427
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Wed Jan 24 07:04:06 2007
@@ -157,7 +157,7 @@
     private PingPongProducer(String brokerDetails, String username, String 
password, String virtualpath, boolean transacted,
                              boolean persistent, int messageSize, boolean 
verbose, boolean afterCommit, boolean beforeCommit,
                              boolean afterSend, boolean beforeSend, boolean 
failOnce, int batchSize, int rate)
-            throws Exception
+                      throws Exception
     {
         // Create a connection to the broker.
         InetAddress address = InetAddress.getLocalHost();
@@ -217,10 +217,10 @@
      * @throws Exception All allowed to fall through. This is only test code...
      */
     public PingPongProducer(String brokerDetails, String username, String 
password, String virtualpath,
-                            String destinationName, String selector, boolean 
transacted, boolean persistent,
-                            int messageSize, boolean verbose, boolean 
afterCommit, boolean beforeCommit,
-                            boolean afterSend, boolean beforeSend, boolean 
failOnce, int batchSize,
-                            int noOfDestinations, int rate, boolean pubsub) 
throws Exception
+                            String destinationName, String selector, boolean 
transacted, boolean persistent, int messageSize,
+                            boolean verbose, boolean afterCommit, boolean 
beforeCommit, boolean afterSend,
+                            boolean beforeSend, boolean failOnce, int 
batchSize, int noOfDestinations, int rate,
+                            boolean pubsub) throws Exception
     {
         this(brokerDetails, username, password, virtualpath, transacted, 
persistent, messageSize, verbose, afterCommit,
              beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
@@ -245,98 +245,6 @@
         }
     }
 
-    private void createPingDestination(String name)
-    {
-        if (isPubSub())
-        {
-            _pingDestination = new AMQTopic(name);
-        }
-        else
-        {
-            _pingDestination = new AMQQueue(name);
-        }
-    }
-
-    /**
-     * Creates the producer to send the pings on.  If the tests are with 
nultiple-destinations, then producer
-     * is created with null destination, so that any destination can be 
specified while sending
-     *
-     * @throws JMSException
-     */
-    public void createProducer() throws JMSException
-    {
-        if (getDestinationsCount() > 1)
-        {
-            // create producer with initial destination as null for test with 
multiple-destinations
-            // In this case, a different destination will be used while 
sending the message
-            _producer = (MessageProducer) 
getProducerSession().createProducer(null);
-        }
-        else
-        {
-            // Create a producer with known destination to send the pings on.
-            _producer = (MessageProducer) 
getProducerSession().createProducer(_pingDestination);
-
-        }
-
-        _producer.setDisableMessageTimestamp(true);
-        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
-    }
-
-    /**
-     * Creates the temporary destination to listen to the responses
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumer(String selector) throws JMSException
-    {
-        // Create a temporary destination to get the pongs on.
-        if (isPubSub())
-        {
-            _replyDestination = _consumerSession.createTemporaryTopic();
-        }
-        else
-        {
-            _replyDestination = _consumerSession.createTemporaryQueue();
-        }
-
-        // Create a message consumer to get the replies with and register this 
to be called back by it.
-        MessageConsumer consumer = 
_consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, 
EXCLUSIVE, selector);
-        consumer.setMessageListener(this);
-    }
-
-    /**
-     * Creates consumer instances for each destination. This is used when test 
is being done with multiple destinations.
-     *
-     * @param selector
-     * @throws JMSException
-     */
-    public void createConsumers(String selector) throws JMSException
-    {
-        for (int i = 0; i < getDestinationsCount(); i++)
-        {
-            MessageConsumer consumer =
-                    getConsumerSession().createConsumer(getDestination(i), 
PREFETCH, false, EXCLUSIVE, selector);
-            consumer.setMessageListener(this);
-        }
-    }
-
-
-    protected Session getConsumerSession()
-    {
-        return _consumerSession;
-    }
-
-    public Destination getPingDestination()
-    {
-        return _pingDestination;
-    }
-
-    protected void setPingDestination(Destination destination)
-    {
-        _pingDestination = destination;
-    }
-
     /**
      * Starts a ping-pong loop running from the command line. The bounce back 
client [EMAIL PROTECTED] org.apache.qpid.requestreply.PingPongBouncer} also 
needs
      * to be started to bounce the pings back again.
@@ -356,9 +264,9 @@
         // Extract the command line.
         if (args.length < 2)
         {
-            System.err.println("Usage: TestPingPublisher <brokerDetails> 
<virtual path> [verbose (true/false)] " +
-                               "[transacted (true/false)] [persistent 
(true/false)] [message size in bytes] [batchsize]" +
-                               " [rate] [pubsub(true/false)]");
+            System.err.println("Usage: TestPingPublisher <brokerDetails> 
<virtual path> [verbose (true/false)] "
+                               + "[transacted (true/false)] [persistent 
(true/false)] [message size in bytes] [batchsize]"
+                               + " [rate] [pubsub(true/false)]");
             System.exit(0);
         }
 
@@ -411,10 +319,10 @@
         }
 
         // Create a ping producer to handle the request/wait/reply cycle.
-        PingPongProducer pingProducer = new PingPongProducer(brokerDetails, 
"guest", "guest", virtualpath,
-                                                             
PING_DESTINATION_NAME, null, transacted, persistent, messageSize, verbose,
-                                                             afterCommit, 
beforeCommit, afterSend, beforeSend, failOnce, batchSize,
-                                                             0, rate, 
ispubsub);
+        PingPongProducer pingProducer =
+            new PingPongProducer(brokerDetails, "guest", "guest", virtualpath, 
PING_DESTINATION_NAME, null, transacted,
+                                 persistent, messageSize, verbose, 
afterCommit, beforeCommit, afterSend, beforeSend,
+                                 failOnce, batchSize, 0, rate, ispubsub);
 
         pingProducer.getConnection().start();
 
@@ -433,6 +341,76 @@
     }
 
     /**
+     * Creates the producer to send the pings on.  If the tests are with 
nultiple-destinations, then producer
+     * is created with null destination, so that any destination can be 
specified while sending
+     *
+     * @throws JMSException
+     */
+    public void createProducer() throws JMSException
+    {
+        if (getDestinationsCount() > 1)
+        {
+            // create producer with initial destination as null for test with 
multiple-destinations
+            // In this case, a different destination will be used while 
sending the message
+            _producer = (MessageProducer) 
getProducerSession().createProducer(null);
+        }
+        else
+        {
+            // Create a producer with known destination to send the pings on.
+            _producer = (MessageProducer) 
getProducerSession().createProducer(_pingDestination);
+
+        }
+
+        _producer.setDisableMessageTimestamp(true);
+        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
+    }
+
+    /**
+     * Creates the temporary destination to listen to the responses
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumer(String selector) throws JMSException
+    {
+        // Create a temporary destination to get the pongs on.
+        if (isPubSub())
+        {
+            _replyDestination = _consumerSession.createTemporaryTopic();
+        }
+        else
+        {
+            _replyDestination = _consumerSession.createTemporaryQueue();
+        }
+
+        // Create a message consumer to get the replies with and register this 
to be called back by it.
+        MessageConsumer consumer =
+            _consumerSession.createConsumer(_replyDestination, PREFETCH, 
NO_LOCAL, EXCLUSIVE, selector);
+        consumer.setMessageListener(this);
+    }
+
+    /**
+     * Creates consumer instances for each destination. This is used when test 
is being done with multiple destinations.
+     *
+     * @param selector
+     * @throws JMSException
+     */
+    public void createConsumers(String selector) throws JMSException
+    {
+        for (int i = 0; i < getDestinationsCount(); i++)
+        {
+            MessageConsumer consumer =
+                getConsumerSession().createConsumer(getDestination(i), 
PREFETCH, false, EXCLUSIVE, selector);
+            consumer.setMessageListener(this);
+        }
+    }
+
+    public Destination getPingDestination()
+    {
+        return _pingDestination;
+    }
+
+    /**
      * Primes the test loop by sending a few messages, then introduces a short 
wait. This allows the bounce back client
      * on the other end a chance to configure its reply producer on the reply 
to destination. It is also worth calling
      * this a few times, in order to prime the JVMs JIT compilation.
@@ -455,12 +433,9 @@
                 Thread.sleep(100);
             }
             catch (InterruptedException ignore)
-            {
-
-            }
+            { }
         }
 
-
     }
 
     /**
@@ -535,13 +510,16 @@
         // Put a unique correlation id on the message before sending it.
         String messageCorrelationId = Long.toString(getNewID());
 
-
         pingNoWaitForReply(message, numPings, messageCorrelationId);
 
         CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
+
         // Block the current thread until a reply to the message is received, 
or it times out.
         trafficLight.await(timeout, TimeUnit.MILLISECONDS);
 
+        // Remove the countdown latch from the map because the map is long 
lived, so will cause a memory leak.
+        trafficLights.remove(messageCorrelationId);
+
         // Work out how many replies were receieved.
         int numReplies = numPings - (int) trafficLight.getCount();
 
@@ -570,7 +548,8 @@
      * @return The reply, or null if no reply arrives before the timeout.
      * @throws JMSException All underlying JMSExceptions are allowed to fall 
through.
      */
-    public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId) throws JMSException, InterruptedException
+    public void pingNoWaitForReply(Message message, int numPings, String 
messageCorrelationId)
+                            throws JMSException, InterruptedException
     {
         // Create a count down latch to count the number of replies with. This 
is created before the message is sent
         // so that the message is not received before the count down is 
created.
@@ -668,11 +647,6 @@
         return _replyDestination;
     }
 
-    protected void setReplyDestination(Destination destination)
-    {
-        _replyDestination = destination;
-    }
-
     public void setMessageListener(MessageListener messageListener)
     {
         _messageListener = messageListener;
@@ -683,6 +657,33 @@
         return trafficLights.get(correlationID);
     }
 
+    protected Session getConsumerSession()
+    {
+        return _consumerSession;
+    }
+
+    protected void setPingDestination(Destination destination)
+    {
+        _pingDestination = destination;
+    }
+
+    protected void setReplyDestination(Destination destination)
+    {
+        _replyDestination = destination;
+    }
+
+    private void createPingDestination(String name)
+    {
+        if (isPubSub())
+        {
+            _pingDestination = new AMQTopic(name);
+        }
+        else
+        {
+            _pingDestination = new AMQQueue(name);
+        }
+    }
+
     /*
     * When the test is being performed with multiple queues, then this method 
will be used, which has a loop to
     * pick up the next queue from the queues list and sends message to it.
@@ -716,12 +717,10 @@
     public static class FailoverNotifier implements ConnectionListener
     {
         public void bytesSent(long count)
-        {
-        }
+        { }
 
         public void bytesReceived(long count)
-        {
-        }
+        { }
 
         public boolean preFailover(boolean redirect)
         {


Reply via email to