Author: rupertlssmith
Date: Fri Aug 17 08:10:01 2007
New Revision: 567062
URL: http://svn.apache.org/viewvc?view=rev&rev=567062
Log:
Calculating commit batch size boundaries correctly for multi-consumer tests.
Ignoring redelivered messages.
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=567062&r1=567061&r2=567062
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Fri Aug 17 08:10:01 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.requestreply;
import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
@@ -456,6 +457,9 @@
private static final String KILL_BROKER_PROMPT = "Kill broker now, then
press Return.";
private String _clientID;
+ /** Keeps count of the total messages sent purely for debugging purposes.
*/
+ private static AtomicInteger numSent = new AtomicInteger();
+
/**
* Creates a ping producer with the specified parameters, of which there
are many. See the class level comments
* for details. This constructor creates a connection to the broker and
creates producer and consumer sessions on
@@ -574,8 +578,14 @@
*/
protected void createConnection(String clientID) throws AMQException,
URLSyntaxException
{
+ log.debug("protected void createConnection(String clientID = " +
clientID + "): called");
+
+ log.debug("Creating a connection for the message producer.");
+
_connection = new AMQConnection(_brokerDetails, _username, _password,
clientID, _virtualpath);
+ log.debug("Creating " + _noOfConsumers + " connections for the
consumers.");
+
_consumerConnection = new Connection[_noOfConsumers];
for (int i = 0; i < _noOfConsumers; i++)
@@ -765,7 +775,9 @@
log.debug("public void createReplyConsumers(Collection<Destination>
destinations = " + destinations
+ ", String selector = " + selector + "): called");
- log.debug("Creating " + destinations.size() + " reply consumers.");
+ log.debug("There are " + destinations.size() + " destinations.");
+ log.debug("Creating " + _noOfConsumers + " consumers on each
destination.");
+ log.debug("Total number of consumers is: " + (destinations.size() *
_noOfConsumers));
for (Destination destination : destinations)
{
@@ -788,7 +800,7 @@
}
});
- log.debug("Set this to listen to replies sent to destination:
" + destination);
+ log.debug("Set consumer " + i + " to listen to replies sent to
destination: " + destination);
}
}
}
@@ -802,95 +814,118 @@
*/
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
- // log.debug("public void onMessage(Message message): called");
+ log.debug("public void onMessageWithConsumerNo(Message message, int
consumerNo = " + consumerNo + "): called");
+
+ NDC.push("cons" + consumerNo);
try
{
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- // log.debug("correlationID = " + correlationID);
+ log.debug("correlationID = " + correlationID);
- // Countdown on the traffic light if there is one for the matching
correlation id.
- PerCorrelationId perCorrelationId =
perCorrelationIds.get(correlationID);
+ int num = message.getIntProperty("MSG_NUM");
+ log.info("Message " + num + " received.");
- if (perCorrelationId != null)
+ boolean isRedelivered = message.getJMSRedelivered();
+ log.debug("isRedelivered = " + isRedelivered);
+
+ if (!isRedelivered)
{
- CountDownLatch trafficLight = perCorrelationId.trafficLight;
+ // Countdown on the traffic light if there is one for the
matching correlation id.
+ PerCorrelationId perCorrelationId =
perCorrelationIds.get(correlationID);
- // Restart the timeout timer on every message.
- perCorrelationId.timeOutStart = System.nanoTime();
+ if (perCorrelationId != null)
+ {
+ CountDownLatch trafficLight =
perCorrelationId.trafficLight;
- // log.debug("Reply was expected, decrementing the latch for
the id, " + correlationID);
+ // Restart the timeout timer on every message.
+ perCorrelationId.timeOutStart = System.nanoTime();
- // Decrement the countdown latch. Before this point, it is
possible that two threads might enter this
- // method simultanesouly with the same correlation id.
Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for the
remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ log.debug("Reply was expected, decrementing the latch for
the id, " + correlationID);
- synchronized (trafficLight)
- {
- trafficLight.countDown();
+ // Decrement the countdown latch. Before this point, it is
possible that two threads might enter this
+ // method simultanesouly with the same correlation id.
Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for
the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
+
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
- // Decrement the count of sent but not yet received
messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ?
1 : _messageSize));
+ // Decrement the count of sent but not yet received
messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize = (unreceived * ((_messageSize ==
0) ? 1 : _messageSize));
- // Release a waiting sender if there is one.
- synchronized (_sendPauseMonitor)
- {
- if ((_maxPendingSize > 0) && (unreceivedSize <
_maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
+ // Release a waiting sender if there is one.
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate under limit =
" + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to
be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
- _sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
+ if ((_maxPendingSize > 0) && (unreceivedSize <
_maxPendingSize))
+ // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- throw new RuntimeException(e);
+ log.debug("unreceived size estimate under
limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the
limit to be re-established.
+ /*try
+ {*/
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.notify();
+ /*}
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
}
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
}
- }
- // log.debug("remainingCount = " + remainingCount);
- // log.debug("trueCount = " + trueCount);
+ NDC.push("/rem" + remainingCount);
- // Commit on transaction batch size boundaries. At this
point in time the waiting producer remains
- // blocked, even on the last message.
- if ((remainingCount % _txBatchSize) == 0)
- {
- commitTx(_consumerSession[consumerNo]);
- }
+ log.debug("remainingCount = " + remainingCount);
+ log.debug("trueCount = " + trueCount);
- // Forward the message and remaining count to any
interested chained message listener.
- if (_chainedMessageListener != null)
- {
- _chainedMessageListener.onMessage(message, (int)
remainingCount);
- }
+ // Commit on transaction batch size boundaries. At
this point in time the waiting producer remains
+ // blocked, even on the last message.
+ // Commit count is divided by noOfConsumers in p2p
mode, so that each consumer only commits on
+ // each batch boundary. For pub/sub each consumer gets
every message so no division is done.
+ long commitCount = _isPubSub ? remainingCount :
(remainingCount / _noOfConsumers);
+ log.debug("commitCount = " + commitCount);
- // Check if this is the last message, in which case
release any waiting producers. This is done
- // after the transaction has been committed and any
listeners notified.
- if (trueCount == 1)
- {
- trafficLight.countDown();
+ if ((commitCount % _txBatchSize) == 0)
+ {
+ log.debug("Trying commit for consumer " +
consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ }
+
+ // Forward the message and remaining count to any
interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int)
remainingCount);
+ }
+
+ // Check if this is the last message, in which case
release any waiting producers. This is done
+ // after the transaction has been committed and any
listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
}
}
+ else
+ {
+ log.warn("Got unexpected message with correlationId: " +
correlationID);
+ }
}
else
{
- log.warn("Got unexpected message with correlationId: " +
correlationID);
+ log.debug("Got redelivered message, ignoring.");
}
// Print out ping times for every message in verbose mode only.
@@ -909,8 +944,11 @@
{
log.warn("There was a JMSException: " + e.getMessage(), e);
}
-
- // log.debug("public void onMessage(Message message): ending");
+ finally
+ {
+ log.debug("public void onMessageWithConsumerNo(Message message,
int consumerNo): ending");
+ NDC.clear();
+ }
}
/**
@@ -943,6 +981,8 @@
try
{
+ NDC.push("prod");
+
// Create a count down latch to count the number of replies with.
This is created before the messages are
// sent so that the replies cannot be received before the count
down is created.
// One is added to this, so that the last reply becomes a special
case. The special case is that the
@@ -1006,6 +1046,7 @@
// so will be a memory leak if this is not done.
finally
{
+ NDC.pop();
perCorrelationIds.remove(messageCorrelationId);
}
}
@@ -1135,11 +1176,17 @@
// Send the message either to its round robin destination, or its
default destination.
if (destination == null)
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
_producer.send(message);
+ log.info("Message " + num + " sent.");
}
else
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
_producer.send(destination, message);
+ log.info("Message " + num + " sent.");
}
// Increase the unreceived size, this may actually happen aftern the
message is recevied.
@@ -1157,6 +1204,7 @@
// Commit on every transaction batch size boundary. Here i + 1 is the
count of actual messages sent.
if (((i + 1) % _txBatchSize) == 0)
{
+ log.debug("Trying commit on producer session.");
committed = commitTx(_producerSession);
}
@@ -1377,7 +1425,7 @@
*/
protected boolean commitTx(Session session) throws JMSException
{
- // log.debug("protected void commitTx(Session controlSession):
called");
+ log.debug("protected void commitTx(Session session): called");
boolean committed = false;
@@ -1396,6 +1444,8 @@
if (session.getTransacted())
{
+ log.debug("Session is transacted.");
+
try
{
if (_failBeforeCommit)
@@ -1409,10 +1459,10 @@
waitForUser(KILL_BROKER_PROMPT);
}
- // long l = System.nanoTime();
+ long start = System.nanoTime();
session.commit();
committed = true;
- // log.debug("Time taken to commit :" + ((System.nanoTime() -
l) / 1000000f) + " ms");
+ log.debug("Time taken to commit :" + ((System.nanoTime() -
start) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1425,7 +1475,7 @@
waitForUser(KILL_BROKER_PROMPT);
}
- // log.trace("Session Commited.");
+ log.debug("Session Commited.");
}
catch (JMSException e)
{
@@ -1495,6 +1545,8 @@
*/
public int getExpectedNumPings(int numpings)
{
+ log.debug("public int getExpectedNumPings(int numpings = " + numpings
+ "): called");
+
log.debug("Each ping will be received by " + (_isPubSub ?
getConsumersPerDestination() : 1) + " consumers.");
return numpings * (_isPubSub ? getConsumersPerDestination() : 1);