Author: arnaudsimon
Date: Fri Nov 30 04:56:14 2007
New Revision: 599810

URL: http://svn.apache.org/viewvc?rev=599810&view=rev
Log:
added latency tests

Added:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java?rev=599810&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java
 Fri Nov 30 04:56:14 2007
@@ -0,0 +1,127 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.latency;
+
+import org.apache.qpid.requestreply.InitialContextHelper;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.perf.Options;
+import org.apache.qpid.server.queue.AMQQueueMBean;
+
+import javax.jms.*;
+
+/**
+ *
+ *
+ */
+public class MessageConsumer  extends Options  implements MessageListener 
+{
+    private javax.jms.MessageProducer _producer;
+    private AMQConnection _connection;
+    private final Object _lock = new Object();
+    private Session _session;
+    private int _receivedMessages = 0;
+    private long _timeFirstMessage;
+    private long _timeLastMessage;
+   private void init()
+    {
+        this.parseOptions();
+        try
+        {
+            ConnectionFactory factory = (ConnectionFactory) 
InitialContextHelper.getInitialContext("").lookup("local");
+             _connection = (AMQConnection) 
factory.createConnection("guest","guest");
+             _session = _connection.createSession(_transacted, 
Session.AUTO_ACKNOWLEDGE);
+            Destination dest = Boolean.getBoolean("useQueue")? new 
AMQQueue(_connection,_destination) : new AMQTopic(
+                    _connection,_destination);        
+            Destination syncQueue   = new AMQQueue(_connection, "syncQueue");
+            _producer = _session.createProducer(syncQueue);
+            // this should speedup the message producer
+            _producer.setDisableMessageTimestamp(true);
+            javax.jms.MessageConsumer consumer = _session.createConsumer(dest);
+            consumer.setMessageListener(this);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private void run()
+    {
+        try
+        {
+           synchronized(_lock)
+           {
+               _connection.start();
+               try
+               {
+                   _lock.wait();
+               }
+               catch (InterruptedException e)
+               {
+                   e.printStackTrace();
+               }
+           }
+            // send sync message;
+           _producer.send(_session.createMessage());
+            System.out.println("Time to receive " + _logFrequency + " messages 
is: " + (_timeLastMessage - _timeFirstMessage) );
+            double rate =  _logFrequency /  ((_timeLastMessage - 
_timeFirstMessage)  *1.0) *1000 ;
+             System.out.println("The rate is " + rate  + " msg/s" );
+            double latency =  ((_timeLastMessage - _timeFirstMessage)  *1.0) / 
_logFrequency;
+            System.out.println("The latency is " + latency  + " milli secs" );
+            _connection.close();
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+   public void onMessage(Message message)
+    {
+        if( _receivedMessages == 0)
+        {
+            _timeFirstMessage = System.currentTimeMillis();
+        }
+        _receivedMessages++;      
+        if( _receivedMessages == _logFrequency)
+        {
+            _timeLastMessage = System.currentTimeMillis();
+            synchronized(_lock)
+            {
+                _lock.notify();
+            }
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        try
+        {
+            MessageConsumer test = new MessageConsumer();
+            test.init();
+            test.run();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+}
\ No newline at end of file

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java?rev=599810&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java
 Fri Nov 30 04:56:14 2007
@@ -0,0 +1,104 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.latency;
+
+import org.apache.qpid.client.perf.Options;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.requestreply.InitialContextHelper;
+
+import javax.jms.*;
+import java.util.Date;
+
+/**
+ *
+ *
+ */
+public class MessageProducer  extends Options
+{
+    private BytesMessage _payload;
+    private javax.jms.MessageProducer _producer;
+    private javax.jms.MessageConsumer _consumer;
+    private AMQConnection _connection;
+   private void init()
+    {
+        this.parseOptions();
+        try
+        {
+            ConnectionFactory factory = (ConnectionFactory) 
InitialContextHelper.getInitialContext("").lookup("local");
+             _connection = (AMQConnection) factory.createConnection();
+            _connection.start();
+            Session session = _connection.createSession(_transacted, 
Session.AUTO_ACKNOWLEDGE);
+            _payload = TestMessageFactory.newBytesMessage(session, 
_messageSize);
+            Destination dest = Boolean.getBoolean("useQueue")? new 
AMQQueue(_connection,_destination) : new AMQTopic(
+                    _connection,_destination);
+            Destination syncQueue   = new AMQQueue(_connection, "syncQueue");
+            _producer = session.createProducer(dest);
+            _consumer = session.createConsumer(syncQueue);
+            // this should speedup the message producer
+            _producer.setDisableMessageTimestamp(true);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private void run()
+    {
+        try
+        {
+            long startTime = System.currentTimeMillis();
+            for(int i =0; i < _logFrequency; i++ )
+            {
+                _producer.send(_payload);
+            }
+            long endProducing = System.currentTimeMillis();
+            double throughput = (_logFrequency * 1000.0) / (endProducing - 
startTime);
+            System.out.println("The producer throughput is: " + throughput + " 
msg/s");
+            // now wait for the sync message
+            _consumer.receive();
+            // this is done 
+            long endTime = System.currentTimeMillis();
+            System.out.println("Time to send and receive " + _logFrequency + " 
messages is: " + (endTime - startTime) );
+            double latency = ( (endTime - startTime)  * 1.0) /_logFrequency;
+            System.out.println("The latency is " + latency + " milli secs" );
+            _connection.close();
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String[] args)
+    {
+        try
+        {
+            MessageProducer test = new MessageProducer();
+            test.init();
+            test.run();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+}


Reply via email to