Author: rgreig
Date: Wed Jan 10 14:44:42 2007
New Revision: 495020

URL: http://svn.apache.org/viewvc?view=rev&rev=495020
Log:
QPID-32 : Add option to run tests with persistent messages

Modified:
    incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
    incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java

Modified: incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh?view=diff&rev=495020&r1=495019&r2=495020
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh 
(original)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceProvidingClient.sh Wed 
Jan 10 14:44:42 2007
@@ -28,4 +28,4 @@
 . ./setupclasspath.sh
 echo $CP
 # usage: just pass in the host(s)
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" 
-Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j 
org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test 
serviceQ
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" 
-Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j 
org.apache.qpid.requestreply.ServiceProvidingClient $1 guest guest /test 
serviceQ P T

Modified: 
incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh?view=diff&rev=495020&r1=495019&r2=495020
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh 
(original)
+++ incubator/qpid/trunk/qpid/java/perftests/bin/serviceRequestingClient.sh Wed 
Jan 10 14:44:42 2007
@@ -30,4 +30,4 @@
 # XXX -Xms1024m -XX:NewSize=300m
 . ./setupclasspath.sh
 echo $CP
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" 
-Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j 
org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest 
/test serviceQ "$@"
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" 
-Damqj.test.logging.level="info" -Dlog4j.configuration=perftests.log4j 
org.apache.qpid.requestreply.ServiceRequestingClient $thehosts guest guest 
/test serviceQ P T "$@"

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=diff&rev=495020&r1=495019&r2=495020
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
 Wed Jan 10 14:44:42 2007
@@ -42,10 +42,21 @@
 
     private AMQConnection _connection;
 
+    private Session _session;
+    private Session _producerSession;
+
+    private boolean _isTransactional;
+
     public ServiceProvidingClient(String brokerDetails, String username, 
String password,
-                                  String clientName, String virtualPath, 
String serviceName)
+                                  String clientName, String virtualPath, 
String serviceName, 
+                                                                 String 
deliveryModeString, String transactedMode)
             throws AMQException, JMSException, URLSyntaxException
     {
+               final int deliveryMode = 
deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT;
+               _isTransactional = transactedMode.toUpperCase().charAt(0) == 
'T' ? true : false;
+
+        _logger.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " 
+ _isTransactional);
+               
         _connection = new AMQConnection(brokerDetails, username, password,
                                         clientName, virtualPath);
         _connection.setConnectionListener(new ConnectionListener()
@@ -74,13 +85,14 @@
                 _logger.info("App got failover complete callback");
             }
         });
-        final Session session = (Session) _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        _session = (Session) _connection.createSession(_isTransactional, 
Session.AUTO_ACKNOWLEDGE);
+        _producerSession = (Session) 
_connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
 
         _logger.info("Service (queue) name is '" + serviceName + "'...");
 
         AMQQueue destination = new AMQQueue(serviceName);
 
-        MessageConsumer consumer = session.createConsumer(destination,
+        MessageConsumer consumer = _session.createConsumer(destination,
                                                           100, true, false, 
null);
 
         consumer.setMessageListener(new MessageListener()
@@ -107,9 +119,9 @@
                         _responseDest = responseDest;
 
                         _logger.info("About to create a producer");
-                        _destinationProducer = 
session.createProducer(responseDest);
+                        _destinationProducer = 
_producerSession.createProducer(responseDest);
                         _destinationProducer.setDisableMessageTimestamp(true);
-                        
_destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                        _destinationProducer.setDeliveryMode(deliveryMode);
                         _logger.info("After create a producer");
                     }
                 }
@@ -127,7 +139,7 @@
                 try
                 {
                     String payload = "This is a response: sing together: 
'Mahnah mahnah...'" + tm.getText();
-                    TextMessage msg = session.createTextMessage(payload);
+                    TextMessage msg = 
_producerSession.createTextMessage(payload);
                     if (tm.propertyExists("timeSent"))
                     {
                         _logger.info("timeSent property set on message");
@@ -135,6 +147,15 @@
                         msg.setStringProperty("timeSent", 
tm.getStringProperty("timeSent"));
                     }
                     _destinationProducer.send(msg);
+                                       
+                                       if(_isTransactional)
+                    {
+                        _producerSession.commit();
+                    }
+                    if(_isTransactional)
+                    {
+                        _session.commit();
+                    }
                     if (_messageCount % 1000 == 0)
                     {
                         _logger.info("Sent response to '" + _responseDest + 
"'");
@@ -158,9 +179,9 @@
     {
         _logger.info("Starting...");
 
-        if (args.length < 5)
+        if (args.length < 7)
         {
-            System.out.println("Usage: brokerDetails username password 
virtual-path serviceQueue [selector]");
+            System.out.println("Usage: brokerDetails username password 
virtual-path serviceQueue <P[ersistent]|N[onPersistent]>  
<T[ransacted]|N[onTransacted]> [selector]");
             System.exit(1);
         }
         String clientId = null;
@@ -177,7 +198,7 @@
         try
         {
             ServiceProvidingClient client = new 
ServiceProvidingClient(args[0], args[1], args[2],
-                                                                       
clientId, args[3], args[4]);
+                                                                       
clientId, args[3], args[4], args[5], args[6]);
             client.run();
         }
         catch (JMSException e)

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=diff&rev=495020&r1=495019&r2=495020
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
 Wed Jan 10 14:44:42 2007
@@ -53,10 +53,12 @@
     private AMQConnection _connection;
 
     private Session _session;
+    private Session _producerSession;
 
     private long _averageLatency;
 
     private int _messageCount;
+    private boolean _isTransactional;
 
     private volatile boolean _completed;
 
@@ -106,7 +108,7 @@
             }
             try
             {
-                m.getPropertyNames();
+                               m.getPropertyNames();
                 if (m.propertyExists("timeSent"))
                 {
                     long timeSent = 
Long.parseLong(m.getStringProperty("timeSent"));
@@ -123,6 +125,10 @@
                         _log.info("Average latency now: " + _averageLatency);
                     }
                 }
+                               if(_isTransactional)
+                {
+                    _session.commit();
+                }
             }
             catch (JMSException e)
             {
@@ -168,24 +174,33 @@
     }
 
     public ServiceRequestingClient(String brokerHosts, String clientID, String 
username, String password,
-                                   String vpath, String commandQueueName,
+                                   String vpath, String commandQueueName, 
+                                                                  String 
deliveryModeString, String transactedMode,
                                    final int messageCount, final int 
messageDataLength) throws AMQException, URLSyntaxException
     {
+               final int deliveryMode = 
deliveryModeString.toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT 
+                                                                               
                                                                           : 
DeliveryMode.NON_PERSISTENT;
+                                                                               
                                                                           
+               _isTransactional = transactedMode.toUpperCase().charAt(0) == 
'T' ? true : false;
+
+        _log.info("Delivery Mode: " + deliveryMode + "\t isTransactional: " + 
_isTransactional);
+
         _messageCount = messageCount;
         MESSAGE_DATA = createMessagePayload(messageDataLength);
         try
         {
             createConnection(brokerHosts, clientID, username, password, vpath);
-            _session = (Session) _connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            _session = (Session) _connection.createSession(_isTransactional, 
Session.AUTO_ACKNOWLEDGE);
+            _producerSession = (Session) 
_connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
 
 
             _connection.setExceptionListener(this);
 
 
             AMQQueue destination = new AMQQueue(commandQueueName);
-            _producer = (MessageProducer) _session.createProducer(destination);
+            _producer = (MessageProducer) 
_producerSession.createProducer(destination);
             _producer.setDisableMessageTimestamp(true);
-            _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            _producer.setDeliveryMode(deliveryMode);
 
             _tempDestination = new AMQQueue("TempResponse" +
                                             
Long.toString(System.currentTimeMillis()), true);
@@ -196,6 +211,10 @@
             TextMessage first = _session.createTextMessage(MESSAGE_DATA);
             first.setJMSReplyTo(_tempDestination);
             _producer.send(first);
+                       if(_isTransactional)
+            {
+                _producerSession.commit();
+            }
             try
             {
                 Thread.sleep(1000);
@@ -227,7 +246,7 @@
         _connection.start();
         for (int i = 1; i < _messageCount; i++)
         {
-            TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
+            TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA 
+ i);
             msg.setJMSReplyTo(_tempDestination);
             if (i % 1000 == 0)
             {
@@ -235,6 +254,11 @@
                 msg.setStringProperty("timeSent", String.valueOf(timeNow));
             }
             _producer.send(msg);
+            if(_isTransactional)
+            {
+                _producerSession.commit();    
+            }
+
         }
         _log.info("Finished sending " + _messageCount + " messages");
     }
@@ -260,17 +284,17 @@
         if (args.length < 6)
         {
             System.err.println(
-                    "Usage: ServiceRequestingClient <brokerDetails - semicolon 
separated host:port list> <username> <password> <vpath> <command queue name> 
<number of messages> <message size>");
+                    "Usage: ServiceRequestingClient <brokerDetails - semicolon 
separated host:port list> <username> <password> <vpath> <command queue name> 
<P[ersistent]|N[onPersistent]>  <T[ransacted]|N[onTransacted]> <number of 
messages> <message size>");
             System.exit(1);
         }
         try
         {
-            int messageDataLength = args.length > 6 ? 
Integer.parseInt(args[6]) : 4096;
+            int messageDataLength = args.length > 8 ? 
Integer.parseInt(args[8]) : 4096;
 
             InetAddress address = InetAddress.getLocalHost();
             String clientID = address.getHostName() + 
System.currentTimeMillis();
             ServiceRequestingClient client = new 
ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
-                                                                         
args[4], Integer.parseInt(args[5]),
+                                                                         
args[4], args[5], args[6], Integer.parseInt(args[7]),
                                                                          
messageDataLength);
             Object waiter = new Object();
             client.run(waiter);


Reply via email to