Author: rupertlssmith
Date: Tue Oct  2 05:28:37 2007
New Revision: 581207

URL: http://svn.apache.org/viewvc?rev=581207&view=rev
Log:
QPID-616. Corrected pending message count and pending data size calculations 
for pubsub testing.

Modified:
    
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Modified: 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=581207&r1=581206&r2=581207&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Tue Oct  2 05:28:37 2007
@@ -408,7 +408,7 @@
     protected Object _sendPauseMonitor = new Object();
 
     /** Keeps a count of the number of message currently sent but not 
received. */
-    protected AtomicInteger _unreceived = new AtomicInteger(0);
+    protected static AtomicInteger _unreceived = new AtomicInteger(0);
 
     /** A source for providing sequential unique correlation ids. These will 
be unique within the same JVM. */
     private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -486,7 +486,7 @@
      */
     public PingPongProducer(Properties overrides) throws Exception
     {
-        // log.debug("public PingPongProducer(Properties overrides = " + 
overrides + "): called");
+        log.debug("public PingPongProducer(Properties overrides = " + 
overrides + "): called");
 
         // Create a set of parsed properties from the defaults overriden by 
the passed in values.
         ParsedProperties properties = new ParsedProperties(defaults);
@@ -694,12 +694,12 @@
      */
     public void createProducer() throws JMSException
     {
-        // log.debug("public void createProducer(): called");
+        log.debug("public void createProducer(): called");
 
         _producer = (MessageProducer) _producerSession.createProducer(null);
         _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
 
-        // log.debug("Created producer for " + (_persistent ? "persistent" : 
"non-persistent") + " messages.");
+        log.debug("Created producer for " + (_persistent ? "persistent" : 
"non-persistent") + " messages.");
     }
 
     /**
@@ -717,14 +717,14 @@
     public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique,
         boolean durable) throws JMSException, AMQException
     {
-        /*log.debug("public void createPingDestinations(int noOfDestinations = 
" + noOfDestinations + ", String selector = "
+        log.debug("public void createPingDestinations(int noOfDestinations = " 
+ noOfDestinations + ", String selector = "
             + selector + ", String rootName = " + rootName + ", boolean unique 
= " + unique + ", boolean durable = "
-            + durable + "): called");*/
+            + durable + "): called");
 
         _pingDestinations = new ArrayList<Destination>();
 
         // Create the desired number of ping destinations and consumers for 
them.
-        // log.debug("Creating " + noOfDestinations + " destinations to 
ping.");
+        log.debug("Creating " + noOfDestinations + " destinations to ping.");
 
         for (int i = 0; i < noOfDestinations; i++)
         {
@@ -735,12 +735,12 @@
             // Generate an id, unique within this pinger or to the whole JVM 
depending on the unique flag.
             if (unique)
             {
-                // log.debug("Creating unique destinations.");
+                log.debug("Creating unique destinations.");
                 id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + 
_connection.getClientID();
             }
             else
             {
-                // log.debug("Creating shared destinations.");
+                log.debug("Creating shared destinations.");
                 id = "_" + _queueSharedID.incrementAndGet();
             }
 
@@ -750,14 +750,14 @@
                 if (!durable)
                 {
                     destination = new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                    // log.debug("Created non-durable topic " + destination);
+                    log.debug("Created non-durable topic " + destination);
                 }
                 else
                 {
                     destination =
                         AMQTopic.createDurableTopic(new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
                             _clientID, (AMQConnection) _connection);
-                    // log.debug("Created durable topic " + destination);
+                    log.debug("Created durable topic " + destination);
                 }
             }
             // Otherwise this is a p2p pinger, in which case create queues.
@@ -771,7 +771,7 @@
                 ((AMQSession) _producerSession).bindQueue(destinationName, 
destinationName, null,
                     ExchangeDefaults.DIRECT_EXCHANGE_NAME);
 
-                // log.debug("Created queue " + destination);
+                log.debug("Created queue " + destination);
             }
 
             // Keep the destination.
@@ -831,24 +831,24 @@
      */
     public void onMessageWithConsumerNo(Message message, int consumerNo)
     {
-        // log.debug("public void onMessageWithConsumerNo(Message message, int 
consumerNo = " + consumerNo + "): called");
+        log.debug("public void onMessageWithConsumerNo(Message message, int 
consumerNo = " + consumerNo + "): called");
         try
         {
             long now = System.nanoTime();
             long timestamp = getTimestamp(message);
             long pingTime = now - timestamp;
 
-            // NDC.push("cons" + consumerNo);
+            NDC.push("cons" + consumerNo);
 
             // Extract the messages correlation id.
             String correlationID = message.getJMSCorrelationID();
-            // log.debug("correlationID = " + correlationID);
+            log.debug("correlationID = " + correlationID);
 
             int num = message.getIntProperty("MSG_NUM");
-            // log.info("Message " + num + " received.");
+            log.info("Message " + num + " received.");
 
             boolean isRedelivered = message.getJMSRedelivered();
-            // log.debug("isRedelivered = " + isRedelivered);
+            log.debug("isRedelivered = " + isRedelivered);
 
             if (!isRedelivered)
             {
@@ -862,7 +862,7 @@
                     // Restart the timeout timer on every message.
                     perCorrelationId.timeOutStart = System.nanoTime();
 
-                    // log.debug("Reply was expected, decrementing the latch 
for the id, " + correlationID);
+                    log.debug("Reply was expected, decrementing the latch for 
the id, " + correlationID);
 
                     // Decrement the countdown latch. Before this point, it is 
possible that two threads might enter this
                     // method simultanesouly with the same correlation id. 
Decrementing the latch in a synchronized block
@@ -879,7 +879,12 @@
 
                         // Decrement the count of sent but not yet received 
messages.
                         int unreceived = _unreceived.decrementAndGet();
-                        int unreceivedSize = (unreceived * ((_messageSize == 
0) ? 1 : _messageSize));
+                        int unreceivedSize =
+                            (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize))
+                            / (_isPubSub ? getConsumersPerDestination() : 1);
+
+                        log.debug("unreceived = " + unreceived);
+                        log.debug("unreceivedSize = " + unreceivedSize);
 
                         // Release a waiting sender if there is one.
                         synchronized (_sendPauseMonitor)
@@ -890,22 +895,23 @@
                             }
                         }
 
-                        // NDC.push("/rem" + remainingCount);
+                        NDC.push("/rem" + remainingCount);
 
-                        // log.debug("remainingCount = " + remainingCount);
-                        // log.debug("trueCount = " + trueCount);
+                        log.debug("remainingCount = " + remainingCount);
+                        log.debug("trueCount = " + trueCount);
 
                         // Commit on transaction batch size boundaries. At 
this point in time the waiting producer remains
                         // blocked, even on the last message.
                         // Commit count is divided by noOfConsumers in p2p 
mode, so that each consumer only commits on
                         // each batch boundary. For pub/sub each consumer gets 
every message so no division is done.
                         long commitCount = _isPubSub ? remainingCount : 
(remainingCount / _noOfConsumers);
-                        // log.debug("commitCount = " + commitCount);
+                        log.debug("commitCount = " + commitCount);
 
                         if ((commitCount % _txBatchSize) == 0)
                         {
                             // log.debug("Trying commit for consumer " + 
consumerNo + ".");
                             commitTx(_consumerSession[consumerNo]);
+                            log.info("Tx committed on consumer " + consumerNo);
                         }
 
                         // Forward the message and remaining count to any 
interested chained message listener.
@@ -950,8 +956,8 @@
         }
         finally
         {
-            // log.debug("public void onMessageWithConsumerNo(Message message, 
int consumerNo): ending");
-            // NDC.clear();
+            log.debug("public void onMessageWithConsumerNo(Message message, 
int consumerNo): ending");
+            NDC.clear();
         }
     }
 
@@ -1122,8 +1128,8 @@
      */
     protected boolean sendMessage(int i, Message message) throws JMSException
     {
-        // log.debug("protected boolean sendMessage(int i = " + i + ", Message 
message): called");
-        // log.debug("_txBatchSize = " + _txBatchSize);
+        log.debug("protected boolean sendMessage(int i = " + i + ", Message 
message): called");
+        log.debug("_txBatchSize = " + _txBatchSize);
 
         // Round robin the destinations as the messages are sent.
         Destination destination = _pingDestinations.get(i % 
_pingDestinations.size());
@@ -1154,15 +1160,16 @@
             {
                 // Get the size estimate of sent but not yet received messages.
                 int unreceived = _unreceived.get();
-                int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : 
_messageSize));
+                int unreceivedSize =
+                    (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) / 
(_isPubSub ? getConsumersPerDestination() : 1);
 
-                // log.debug("unreceived = " + unreceived);
-                // log.debug("unreceivedSize = " + unreceivedSize);
-                // log.debug("_maxPendingSize = " + _maxPendingSize);
+                log.debug("unreceived = " + unreceived);
+                log.debug("unreceivedSize = " + unreceivedSize);
+                log.debug("_maxPendingSize = " + _maxPendingSize);
 
                 if (unreceivedSize > _maxPendingSize)
                 {
-                    // log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
+                    log.debug("unreceived size estimate over limit = " + 
unreceivedSize);
 
                     // Wait on the send pause barrier for the limit to be 
re-established.
                     try
@@ -1202,7 +1209,7 @@
             message.setIntProperty("MSG_NUM", num);
             setTimestamp(message);
             _producer.send(message);
-            // log.info("Message " + num + " sent.");
+            log.info("Message " + num + " sent.");
         }
         else
         {
@@ -1210,11 +1217,15 @@
             message.setIntProperty("MSG_NUM", num);
             setTimestamp(message);
             _producer.send(destination, message);
-            // log.info("Message " + num + " sent.");
+            log.info("Message " + num + " sent.");
         }
 
-        // Increase the unreceived size, this may actually happen aftern the 
message is received.
-        _unreceived.getAndIncrement();
+        // Increase the unreceived size, this may actually happen after the 
message is received.
+        // The unreceived size is incremented by the number of consumers that 
will get a copy of the message,
+        // in pub/sub mode.
+        // _unreceived.getAndIncrement();
+        int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? 
getConsumersPerDestination() : 1);
+        log.debug("newUnreceivedCount = " + newUnreceivedCount);
 
         // Apply message rate throttling if a rate limit has been set up.
         if (_rateLimiter != null)
@@ -1340,11 +1351,15 @@
 
     public void start() throws JMSException
     {
+        log.debug("public void start(): called");
+
         _connection.start();
+        log.debug("Producer started.");
 
         for (int i = 0; i < _noOfConsumers; i++)
         {
             _consumerConnection[i].start();
+            log.debug("Consumer " + i + " started.");
         }
     }
 
@@ -1394,22 +1409,24 @@
      */
     public void close() throws JMSException
     {
-        // log.debug("public void close(): called");
+        log.debug("public void close(): called");
 
         try
         {
             if (_connection != null)
             {
+                log.debug("Before close producer connection.");
                 _connection.close();
-                // log.debug("Close connection.");
+                log.debug("Closed producer connection.");
             }
 
             for (int i = 0; i < _noOfConsumers; i++)
             {
                 if (_consumerConnection[i] != null)
                 {
+                    log.debug("Before close consumer connection " + i + ".");
                     _consumerConnection[i].close();
-                    // log.debug("Closed consumer connection.");
+                    log.debug("Closed consumer connection " + i + ".");
                 }
             }
         }
@@ -1449,7 +1466,7 @@
      */
     protected boolean commitTx(Session session) throws JMSException
     {
-        // log.debug("protected void commitTx(Session session): called");
+        log.debug("protected void commitTx(Session session): called");
 
         boolean committed = false;
 
@@ -1486,7 +1503,7 @@
                 long start = System.nanoTime();
                 session.commit();
                 committed = true;
-                // log.debug("Time taken to commit :" + ((System.nanoTime() - 
start) / 1000000f) + " ms");
+                log.debug("Time taken to commit :" + ((System.nanoTime() - 
start) / 1000000f) + " ms");
 
                 if (_failAfterCommit)
                 {


Reply via email to