Author: arnaudsimon
Date: Mon Jan 21 05:33:00 2008
New Revision: 613883

URL: http://svn.apache.org/viewvc?rev=613883&view=rev
Log:
Imporved topic perf tests

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java?rev=613883&r1=613882&r2=613883&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
 Mon Jan 21 05:33:00 2008
@@ -28,6 +28,7 @@
 import java.util.Random;
 import java.util.List;
 import java.util.ArrayList;
+import java.io.FileWriter;
 
 public class Client
 {
@@ -74,6 +75,9 @@
             Properties properties=new Properties();
             
properties.load(this.getClass().getResourceAsStream("topic.properties"));
 
+            String logFilePath = System.getProperty("logFilePath", "./");
+            FileWriter file = new FileWriter(logFilePath + "client-" + 
System.currentTimeMillis() + ".cvs",true);
+
             //Create the initial context
             Context ctx=new InitialContext(properties);
 
@@ -99,6 +103,14 @@
             // Create a session on the connection
             // This session is a default choice of non-transacted and uses the 
auto acknowledge feature of a session.
             Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            Queue queueCompleted = session.createQueue("completed");
+            Queue queueStarted = session.createQueue("started");
+            MessageProducer prod = session.createProducer(queueCompleted);
+            MessageConsumer cons = session.createConsumer(queueStarted);
+            cons.receive();
+            _logger.info("Starting producing messages");
+
             _message=TestMessageFactory.newBytesMessage(session, 1024);
 
             Random random=new Random();
@@ -108,6 +120,7 @@
             long intervalThroughput;
             long totalThroughput;
             long numProducers=1;
+            String info;
             startNewProducer(session, random);
             while (testDuration < duration)
             {
@@ -121,17 +134,21 @@
                 intervalThroughput=(totalMessagesProduced - 
messagesProducedLastInterval) / 5;
                 totalThroughput=totalMessagesProduced / testDuration;
                 messagesProducedLastInterval=totalMessagesProduced;
-                _logger.info("Number of producers " + numProducers + " | This 
interval throughput = " +
-                        intervalThroughput + " | Total throughput = " + 
totalThroughput);
+                info = "Number of producers " + numProducers + " | This 
interval throughput = " +
+                        intervalThroughput + " | Total throughput = " + 
totalThroughput;
+                _logger.info(info);
+                file.write(info + "\n");
                 startNewProducer(session, random);
                 numProducers++;
             }
+            file.close();
             // stop all the producers
             for (Runner runner : _runners)
             {
                 runner.stop();
             }
-
+            _logger.info("Stopping server");
+            prod.send(session.createTextMessage("stop"));
         }
         catch (Exception e)
         {
@@ -144,11 +161,13 @@
     {
         // select a random topic
         int topicNumber=random.nextInt(50);
+        _logger.info("creating producer for topic: topic- " + topicNumber);
         Topic topic=session.createTopic("topic-" + topicNumber);
         MessageProducer prod=session.createProducer(topic);
         Runner runner=new Runner(prod);
         _runners.add(runner);
         Thread thread=new Thread(runner);
+        thread.setDaemon(true);
         thread.start();
     }
 
@@ -156,7 +175,6 @@
     {
         MessageProducer _prod;
         boolean _produce=true;
-
         private Runner(MessageProducer prod)
         {
             _prod=prod;

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java?rev=613883&r1=613882&r2=613883&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
 Mon Jan 21 05:33:00 2008
@@ -26,6 +26,7 @@
 
 import javax.jms.*;
 import java.util.Properties;
+import java.io.FileWriter;
 
 
 public class Server
@@ -33,9 +34,14 @@
     /**
      * This class logger
      */
-    private static final Logger _logger =LoggerFactory.getLogger(Server.class);
+    private static final Logger _logger=LoggerFactory.getLogger(Server.class);
 
 
+    private final Object _lock=new Object();
+    private long _numMessages=0;
+    public FileWriter _file;
+    public boolean _running=true;
+
     public static void main(String[] args)
     {
         (new Server()).runServer();
@@ -49,6 +55,9 @@
             Properties properties=new Properties();
             
properties.load(this.getClass().getResourceAsStream("topic.properties"));
 
+            String logFilePath=System.getProperty("logFilePath", "./");
+            _file=new FileWriter(logFilePath + "server-" + 
System.currentTimeMillis() + ".cvs", true);
+
             //Create the initial context
             Context ctx=new InitialContext(properties);
 
@@ -70,10 +79,11 @@
 
             // Create a session on the connection
             // This session is a default choice of non-transacted and uses the 
auto acknowledge feature of a session.
-            Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            // Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
             for (int i=0; i < 50; i++)
             {
+                Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                 Topic topic=session.createTopic("topic-" + i);
                 TopicSubscriber dursub=session.createDurableSubscriber(topic, 
"durable-" + i);
                 dursub.setMessageListener(new MyListener());
@@ -81,11 +91,31 @@
 
             // Now the messageConsumer is set up we can start the connection
             connection.start();
-            synchronized (connection)
-            {
-                connection.wait();
-            }
-
+            _logger.info("Ready to consume messages");
+            // listen for the termination message
+            Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queueCompleted=session.createQueue("completed");
+            Queue queueStarted=session.createQueue("started");
+            MessageProducer prod=session.createProducer(queueStarted);
+
+            Thread logger=new Thread(new MyLogger());
+            logger.setDaemon(true);
+            logger.start();
+
+            prod.send(session.createTextMessage("start"));
+            long startTime=System.currentTimeMillis();
+            MessageConsumer cons=session.createConsumer(queueCompleted);
+            cons.receive();
+
+            _running=false;
+
+            long endTime=System.currentTimeMillis();
+            session.close();
+            _logger.info("Received " + _numMessages);
+            _file.write("Received " + _numMessages + "\n");
+            _logger.info("Throughput " + _numMessages / (endTime - startTime) 
* 1000 + "msg/s");
+            _file.write("Throughput " + _numMessages / (endTime - startTime) * 
1000 + "msg/s");
+            _file.close();
         }
         catch (Exception e)
         {
@@ -97,7 +127,45 @@
     {
         public void onMessage(Message message)
         {
-            _logger.debug("Received a message");
+            synchronized (_lock)
+            {
+                _numMessages++;
+                /*if(_numMessages % 1000 == 0)
+                {
+                    _logger.info("received: " + _numMessages);
+                } */
+            }
+        }
+    }
+
+    private class MyLogger implements Runnable
+    {
+        public void run()
+        {
+            long endTime=0;
+            while (_running)
+            {
+                synchronized (_lock)
+                {
+                    try
+                    {
+                        _lock.wait(5000);
+                        if (_running)
+                        {
+                            endTime=endTime + 5;
+                            String s="Throughput " + _numMessages / endTime + 
" msg/s";
+                            _logger.info(s);
+                            _file.write(s + "\n");
+                        }
+
+                    }
+                    catch (Exception e)
+                    {
+                        e.printStackTrace();
+                    }
+
+                }
+            }
         }
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties?rev=613883&r1=613882&r2=613883&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
 Mon Jan 21 05:33:00 2008
@@ -18,7 +18,7 @@
 #
 java.naming.factory.initial = 
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
 
-#connectionfactory.local = amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://localhost:5672'
+connectionfactory.qpidConnectionfactory = amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://localhost:5672'
 
 # A 0.10 connection factory
-connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL 
PROTECTED]:localhost:5672
+#connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL 
PROTECTED]:localhost:5672


Reply via email to