Author: arnaudsimon
Date: Mon Jan 21 01:20:54 2008
New Revision: 613807
URL: http://svn.apache.org/viewvc?rev=613807&view=rev
Log:
Added new perf test for topic (based on Andy's usecase)
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
- copied, changed from r613114,
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java?rev=613807&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
(added)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
Mon Jan 21 01:20:54 2008
@@ -0,0 +1,192 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.topic;
+
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.jms.*;
+import java.util.Properties;
+import java.util.Random;
+import java.util.List;
+import java.util.ArrayList;
+
+public class Client
+{
+ /**
+ * This class logger
+ */
+ private static final Logger _logger=LoggerFactory.getLogger(Client.class);
+
+ private long _messagesProduced=0;
+ private final Object _lock=new Object();
+ private Message _message;
+ private List<Runner> _runners=new ArrayList<Runner>();
+
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ Client syncConsumer=new Client();
+ int firstArg=120;
+ if (args.length > 0)
+ {
+ try
+ {
+ firstArg=Integer.parseInt(args[0]);
+ }
+ catch (NumberFormatException e)
+ {
+ _logger.warn("Argument must be an integer, running for 2
minutes");
+ }
+ }
+ syncConsumer.runClient(firstArg);
+ }
+
+
+ void runClient(long duration)
+ {
+ try
+ {
+ // Load JNDI properties
+ Properties properties=new Properties();
+
properties.load(this.getClass().getResourceAsStream("topic.properties"));
+
+ //Create the initial context
+ Context ctx=new InitialContext(properties);
+
+ // Lookup the connection factory
+ ConnectionFactory conFac=(ConnectionFactory)
ctx.lookup("qpidConnectionfactory");
+ // create the connection
+ Connection connection=conFac.createConnection();
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if
available.
+ // The connection may have broken invoke reconnect code if
available.
+ System.err.println("Received an exception through the
ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Now the messageConsumer is set up we can start the connection
+ connection.start();
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the
auto acknowledge feature of a session.
+ Session session=connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ _message=TestMessageFactory.newBytesMessage(session, 1024);
+
+ Random random=new Random();
+ long testDuration=0;
+ long totalMessagesProduced;
+ long messagesProducedLastInterval=0;
+ long intervalThroughput;
+ long totalThroughput;
+ long numProducers=1;
+ startNewProducer(session, random);
+ while (testDuration < duration)
+ {
+ // every 5 second creates a thread an print the throughput
+ synchronized (_lock)
+ {
+ _lock.wait(5000);
+ totalMessagesProduced=_messagesProduced;
+ }
+ testDuration=testDuration + 5;
+ intervalThroughput=(totalMessagesProduced -
messagesProducedLastInterval) / 5;
+ totalThroughput=totalMessagesProduced / testDuration;
+ messagesProducedLastInterval=totalMessagesProduced;
+ _logger.info("Number of producers " + numProducers + " | This
interval throughput = " +
+ intervalThroughput + " | Total throughput = " +
totalThroughput);
+ startNewProducer(session, random);
+ numProducers++;
+ }
+ // stop all the producers
+ for (Runner runner : _runners)
+ {
+ runner.stop();
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private void startNewProducer(Session session, Random random)
+ throws JMSException
+ {
+ // select a random topic
+ int topicNumber=random.nextInt(50);
+ Topic topic=session.createTopic("topic-" + topicNumber);
+ MessageProducer prod=session.createProducer(topic);
+ Runner runner=new Runner(prod);
+ _runners.add(runner);
+ Thread thread=new Thread(runner);
+ thread.start();
+ }
+
+ private class Runner implements Runnable
+ {
+ MessageProducer _prod;
+ boolean _produce=true;
+
+ private Runner(MessageProducer prod)
+ {
+ _prod=prod;
+ }
+
+ public void run()
+ {
+ while (_produce)
+ {
+ try
+ {
+ _prod.send(_message, DeliveryMode.PERSISTENT,
Message.DEFAULT_PRIORITY,
+ Message.DEFAULT_TIME_TO_LIVE);
+ synchronized (_lock)
+ {
+ _messagesProduced++;
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _produce=false;
+ }
+ }
+ }
+
+ public void stop()
+ {
+ _produce=false;
+ }
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java?rev=613807&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
(added)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
Mon Jan 21 01:20:54 2008
@@ -0,0 +1,103 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client.topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import javax.jms.*;
+import java.util.Properties;
+
+
+public class Server
+{
+ /**
+ * This class logger
+ */
+ private static final Logger _logger =LoggerFactory.getLogger(Server.class);
+
+
+ public static void main(String[] args)
+ {
+ (new Server()).runServer();
+ }
+
+ void runServer()
+ {
+ try
+ {
+ // Load JNDI properties
+ Properties properties=new Properties();
+
properties.load(this.getClass().getResourceAsStream("topic.properties"));
+
+ //Create the initial context
+ Context ctx=new InitialContext(properties);
+
+ // Lookup the connection factory
+ ConnectionFactory conFac=(ConnectionFactory)
ctx.lookup("qpidConnectionfactory");
+ // create the connection
+ Connection connection=conFac.createConnection();
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if
available.
+ // The connection may have broken invoke reconnect code if
available.
+ _logger.warn("Received an exception through the
ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the
auto acknowledge feature of a session.
+ Session session=connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ for (int i=0; i < 50; i++)
+ {
+ Topic topic=session.createTopic("topic-" + i);
+ TopicSubscriber dursub=session.createDurableSubscriber(topic,
"durable-" + i);
+ dursub.setMessageListener(new MyListener());
+ }
+
+ // Now the messageConsumer is set up we can start the connection
+ connection.start();
+ synchronized (connection)
+ {
+ connection.wait();
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private class MyListener implements MessageListener
+ {
+ public void onMessage(Message message)
+ {
+ _logger.debug("Received a message");
+ }
+ }
+}
Copied:
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
(from r613114,
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties)
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties?p2=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties&p1=incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties&r1=613114&r2=613807&rev=613807&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
(original)
+++
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
Mon Jan 21 01:20:54 2008
@@ -18,10 +18,7 @@
#
java.naming.factory.initial =
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-# register some connection factories
-# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL
PROTECTED]:localhost:5672
+#connectionfactory.local = amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='tcp://localhost:5672'
-# Register an AMQP destination in JNDI
-# destination.[jniName] = [BindingURL]
-destination.directQueue =
direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file
+# A 0.10 connection factory
+connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL
PROTECTED]:localhost:5672