Author: rupertlssmith
Date: Tue Oct 2 06:46:00 2007
New Revision: 581237
URL: http://svn.apache.org/viewvc?rev=581237&view=rev
Log:
Merged revisions 581171-581188,581190-581207 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r581171 | rupertlssmith | 2007-10-02 10:36:47 +0100 (Tue, 02 Oct 2007) | 1
line
Updated poms to use 0.6-SNAPSHOT version of junit-toolkit and plugin
consistently.
........
r581207 | rupertlssmith | 2007-10-02 13:28:37 +0100 (Tue, 02 Oct 2007) | 1
line
QPID-616. Corrected pending message count and pending data size calculations
for pubsub testing.
........
Modified:
incubator/qpid/branches/M2/ (props changed)
incubator/qpid/branches/M2/java/perftests/distribution/pom.xml
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/branches/M2/java/pom.xml
Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Tue Oct 2 06:46:00 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-580941,580985,580992-580993,581002
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-580941,580985,580992-580993,581002,581171-581188,581190-581207
Modified: incubator/qpid/branches/M2/java/perftests/distribution/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/distribution/pom.xml?rev=581237&r1=581236&r2=581237&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/distribution/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/distribution/pom.xml Tue Oct 2
06:46:00 2007
@@ -56,13 +56,13 @@
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
- <version>0.5</version>
+ <version>0.6-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit-maven-plugin</artifactId>
- <version>0.5</version>
+ <version>0.6-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Modified:
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=581237&r1=581236&r2=581237&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
(original)
+++
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
Tue Oct 2 06:46:00 2007
@@ -408,7 +408,7 @@
protected Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not
received. */
- protected AtomicInteger _unreceived = new AtomicInteger(0);
+ protected static AtomicInteger _unreceived = new AtomicInteger(0);
/** A source for providing sequential unique correlation ids. These will
be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -486,7 +486,7 @@
*/
public PingPongProducer(Properties overrides) throws Exception
{
- // log.debug("public PingPongProducer(Properties overrides = " +
overrides + "): called");
+ log.debug("public PingPongProducer(Properties overrides = " +
overrides + "): called");
// Create a set of parsed properties from the defaults overriden by
the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -694,12 +694,12 @@
*/
public void createProducer() throws JMSException
{
- // log.debug("public void createProducer(): called");
+ log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT);
- // log.debug("Created producer for " + (_persistent ? "persistent" :
"non-persistent") + " messages.");
+ log.debug("Created producer for " + (_persistent ? "persistent" :
"non-persistent") + " messages.");
}
/**
@@ -717,14 +717,14 @@
public void createPingDestinations(int noOfDestinations, String selector,
String rootName, boolean unique,
boolean durable) throws JMSException, AMQException
{
- /*log.debug("public void createPingDestinations(int noOfDestinations =
" + noOfDestinations + ", String selector = "
+ log.debug("public void createPingDestinations(int noOfDestinations = "
+ noOfDestinations + ", String selector = "
+ selector + ", String rootName = " + rootName + ", boolean unique
= " + unique + ", boolean durable = "
- + durable + "): called");*/
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for
them.
- // log.debug("Creating " + noOfDestinations + " destinations to
ping.");
+ log.debug("Creating " + noOfDestinations + " destinations to ping.");
for (int i = 0; i < noOfDestinations; i++)
{
@@ -735,12 +735,12 @@
// Generate an id, unique within this pinger or to the whole JVM
depending on the unique flag.
if (unique)
{
- // log.debug("Creating unique destinations.");
+ log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" +
_connection.getClientID();
}
else
{
- // log.debug("Creating shared destinations.");
+ log.debug("Creating shared destinations.");
id = "_" + _queueSharedID.incrementAndGet();
}
@@ -750,14 +750,14 @@
if (!durable)
{
destination = new
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- // log.debug("Created non-durable topic " + destination);
+ log.debug("Created non-durable topic " + destination);
}
else
{
destination =
AMQTopic.createDurableTopic(new
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
_clientID, (AMQConnection) _connection);
- // log.debug("Created durable topic " + destination);
+ log.debug("Created durable topic " + destination);
}
}
// Otherwise this is a p2p pinger, in which case create queues.
@@ -771,7 +771,7 @@
((AMQSession) _producerSession).bindQueue(destinationName,
destinationName, null,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- // log.debug("Created queue " + destination);
+ log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -831,24 +831,24 @@
*/
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
- // log.debug("public void onMessageWithConsumerNo(Message message, int
consumerNo = " + consumerNo + "): called");
+ log.debug("public void onMessageWithConsumerNo(Message message, int
consumerNo = " + consumerNo + "): called");
try
{
long now = System.nanoTime();
long timestamp = getTimestamp(message);
long pingTime = now - timestamp;
- // NDC.push("cons" + consumerNo);
+ NDC.push("cons" + consumerNo);
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
- // log.debug("correlationID = " + correlationID);
+ log.debug("correlationID = " + correlationID);
int num = message.getIntProperty("MSG_NUM");
- // log.info("Message " + num + " received.");
+ log.info("Message " + num + " received.");
boolean isRedelivered = message.getJMSRedelivered();
- // log.debug("isRedelivered = " + isRedelivered);
+ log.debug("isRedelivered = " + isRedelivered);
if (!isRedelivered)
{
@@ -862,7 +862,7 @@
// Restart the timeout timer on every message.
perCorrelationId.timeOutStart = System.nanoTime();
- // log.debug("Reply was expected, decrementing the latch
for the id, " + correlationID);
+ log.debug("Reply was expected, decrementing the latch for
the id, " + correlationID);
// Decrement the countdown latch. Before this point, it is
possible that two threads might enter this
// method simultanesouly with the same correlation id.
Decrementing the latch in a synchronized block
@@ -879,7 +879,12 @@
// Decrement the count of sent but not yet received
messages.
int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize ==
0) ? 1 : _messageSize));
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 :
_messageSize))
+ / (_isPubSub ? getConsumersPerDestination() : 1);
+
+ log.debug("unreceived = " + unreceived);
+ log.debug("unreceivedSize = " + unreceivedSize);
// Release a waiting sender if there is one.
synchronized (_sendPauseMonitor)
@@ -890,22 +895,23 @@
}
}
- // NDC.push("/rem" + remainingCount);
+ NDC.push("/rem" + remainingCount);
- // log.debug("remainingCount = " + remainingCount);
- // log.debug("trueCount = " + trueCount);
+ log.debug("remainingCount = " + remainingCount);
+ log.debug("trueCount = " + trueCount);
// Commit on transaction batch size boundaries. At
this point in time the waiting producer remains
// blocked, even on the last message.
// Commit count is divided by noOfConsumers in p2p
mode, so that each consumer only commits on
// each batch boundary. For pub/sub each consumer gets
every message so no division is done.
long commitCount = _isPubSub ? remainingCount :
(remainingCount / _noOfConsumers);
- // log.debug("commitCount = " + commitCount);
+ log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
{
// log.debug("Trying commit for consumer " +
consumerNo + ".");
commitTx(_consumerSession[consumerNo]);
+ log.info("Tx committed on consumer " + consumerNo);
}
// Forward the message and remaining count to any
interested chained message listener.
@@ -950,8 +956,8 @@
}
finally
{
- // log.debug("public void onMessageWithConsumerNo(Message message,
int consumerNo): ending");
- // NDC.clear();
+ log.debug("public void onMessageWithConsumerNo(Message message,
int consumerNo): ending");
+ NDC.clear();
}
}
@@ -1122,8 +1128,8 @@
*/
protected boolean sendMessage(int i, Message message) throws JMSException
{
- // log.debug("protected boolean sendMessage(int i = " + i + ", Message
message): called");
- // log.debug("_txBatchSize = " + _txBatchSize);
+ log.debug("protected boolean sendMessage(int i = " + i + ", Message
message): called");
+ log.debug("_txBatchSize = " + _txBatchSize);
// Round robin the destinations as the messages are sent.
Destination destination = _pingDestinations.get(i %
_pingDestinations.size());
@@ -1154,15 +1160,16 @@
{
// Get the size estimate of sent but not yet received messages.
int unreceived = _unreceived.get();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 :
_messageSize));
+ int unreceivedSize =
+ (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) /
(_isPubSub ? getConsumersPerDestination() : 1);
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
- // log.debug("_maxPendingSize = " + _maxPendingSize);
+ log.debug("unreceived = " + unreceived);
+ log.debug("unreceivedSize = " + unreceivedSize);
+ log.debug("_maxPendingSize = " + _maxPendingSize);
if (unreceivedSize > _maxPendingSize)
{
- // log.debug("unreceived size estimate over limit = " +
unreceivedSize);
+ log.debug("unreceived size estimate over limit = " +
unreceivedSize);
// Wait on the send pause barrier for the limit to be
re-established.
try
@@ -1202,7 +1209,7 @@
message.setIntProperty("MSG_NUM", num);
setTimestamp(message);
_producer.send(message);
- // log.info("Message " + num + " sent.");
+ log.info("Message " + num + " sent.");
}
else
{
@@ -1210,11 +1217,15 @@
message.setIntProperty("MSG_NUM", num);
setTimestamp(message);
_producer.send(destination, message);
- // log.info("Message " + num + " sent.");
+ log.info("Message " + num + " sent.");
}
- // Increase the unreceived size, this may actually happen aftern the
message is received.
- _unreceived.getAndIncrement();
+ // Increase the unreceived size, this may actually happen after the
message is received.
+ // The unreceived size is incremented by the number of consumers that
will get a copy of the message,
+ // in pub/sub mode.
+ // _unreceived.getAndIncrement();
+ int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ?
getConsumersPerDestination() : 1);
+ log.debug("newUnreceivedCount = " + newUnreceivedCount);
// Apply message rate throttling if a rate limit has been set up.
if (_rateLimiter != null)
@@ -1340,11 +1351,15 @@
public void start() throws JMSException
{
+ log.debug("public void start(): called");
+
_connection.start();
+ log.debug("Producer started.");
for (int i = 0; i < _noOfConsumers; i++)
{
_consumerConnection[i].start();
+ log.debug("Consumer " + i + " started.");
}
}
@@ -1394,22 +1409,24 @@
*/
public void close() throws JMSException
{
- // log.debug("public void close(): called");
+ log.debug("public void close(): called");
try
{
if (_connection != null)
{
+ log.debug("Before close producer connection.");
_connection.close();
- // log.debug("Close connection.");
+ log.debug("Closed producer connection.");
}
for (int i = 0; i < _noOfConsumers; i++)
{
if (_consumerConnection[i] != null)
{
+ log.debug("Before close consumer connection " + i + ".");
_consumerConnection[i].close();
- // log.debug("Closed consumer connection.");
+ log.debug("Closed consumer connection " + i + ".");
}
}
}
@@ -1449,7 +1466,7 @@
*/
protected boolean commitTx(Session session) throws JMSException
{
- // log.debug("protected void commitTx(Session session): called");
+ log.debug("protected void commitTx(Session session): called");
boolean committed = false;
@@ -1486,7 +1503,7 @@
long start = System.nanoTime();
session.commit();
committed = true;
- // log.debug("Time taken to commit :" + ((System.nanoTime() -
start) / 1000000f) + " ms");
+ log.debug("Time taken to commit :" + ((System.nanoTime() -
start) / 1000000f) + " ms");
if (_failAfterCommit)
{
Modified: incubator/qpid/branches/M2/java/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/pom.xml?rev=581237&r1=581236&r2=581237&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/pom.xml (original)
+++ incubator/qpid/branches/M2/java/pom.xml Tue Oct 2 06:46:00 2007
@@ -399,7 +399,7 @@
<plugin>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit-maven-plugin</artifactId>
- <version>0.5</version>
+ <version>0.6-SNAPSHOT</version>
</plugin>