Author: arnaudsimon
Date: Mon Jan 21 01:20:54 2008
New Revision: 613807

URL: http://svn.apache.org/viewvc?rev=613807&view=rev
Log:
Added new perf test for topic (based on Andy's usecase)

Added:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
      - copied, changed from r613114, 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java?rev=613807&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java
 Mon Jan 21 01:20:54 2008
@@ -0,0 +1,192 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.topic;
+
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.jms.*;
+import java.util.Properties;
+import java.util.Random;
+import java.util.List;
+import java.util.ArrayList;
+
+public class Client
+{
+    /**
+     * This class logger
+     */
+    private static final Logger _logger=LoggerFactory.getLogger(Client.class);
+
+    private long _messagesProduced=0;
+    private final Object _lock=new Object();
+    private Message _message;
+    private List<Runner> _runners=new ArrayList<Runner>();
+
+
+    /**
+     * Run the message consumer example.
+     *
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args)
+    {
+        Client syncConsumer=new Client();
+        int firstArg=120;
+        if (args.length > 0)
+        {
+            try
+            {
+                firstArg=Integer.parseInt(args[0]);
+            }
+            catch (NumberFormatException e)
+            {
+                _logger.warn("Argument must be an integer, running for 2 
minutes");
+            }
+        }
+        syncConsumer.runClient(firstArg);
+    }
+
+
+    void runClient(long duration)
+    {
+        try
+        {
+            // Load JNDI properties
+            Properties properties=new Properties();
+            
properties.load(this.getClass().getResourceAsStream("topic.properties"));
+
+            //Create the initial context
+            Context ctx=new InitialContext(properties);
+
+            // Lookup the connection factory
+            ConnectionFactory conFac=(ConnectionFactory) 
ctx.lookup("qpidConnectionfactory");
+            // create the connection
+            Connection connection=conFac.createConnection();
+
+            connection.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException jmse)
+                {
+                    // The connection may have broken invoke reconnect code if 
available.
+                    // The connection may have broken invoke reconnect code if 
available.
+                    System.err.println("Received an exception through the 
ExceptionListener");
+                    System.exit(0);
+                }
+            });
+
+            // Now the messageConsumer is set up we can start the connection
+            connection.start();
+
+            // Create a session on the connection
+            // This session is a default choice of non-transacted and uses the 
auto acknowledge feature of a session.
+            Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            _message=TestMessageFactory.newBytesMessage(session, 1024);
+
+            Random random=new Random();
+            long testDuration=0;
+            long totalMessagesProduced;
+            long messagesProducedLastInterval=0;
+            long intervalThroughput;
+            long totalThroughput;
+            long numProducers=1;
+            startNewProducer(session, random);
+            while (testDuration < duration)
+            {
+                // every 5 second creates a thread an print the throughput
+                synchronized (_lock)
+                {
+                    _lock.wait(5000);
+                    totalMessagesProduced=_messagesProduced;
+                }
+                testDuration=testDuration + 5;
+                intervalThroughput=(totalMessagesProduced - 
messagesProducedLastInterval) / 5;
+                totalThroughput=totalMessagesProduced / testDuration;
+                messagesProducedLastInterval=totalMessagesProduced;
+                _logger.info("Number of producers " + numProducers + " | This 
interval throughput = " +
+                        intervalThroughput + " | Total throughput = " + 
totalThroughput);
+                startNewProducer(session, random);
+                numProducers++;
+            }
+            // stop all the producers
+            for (Runner runner : _runners)
+            {
+                runner.stop();
+            }
+
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private void startNewProducer(Session session, Random random)
+            throws JMSException
+    {
+        // select a random topic
+        int topicNumber=random.nextInt(50);
+        Topic topic=session.createTopic("topic-" + topicNumber);
+        MessageProducer prod=session.createProducer(topic);
+        Runner runner=new Runner(prod);
+        _runners.add(runner);
+        Thread thread=new Thread(runner);
+        thread.start();
+    }
+
+    private class Runner implements Runnable
+    {
+        MessageProducer _prod;
+        boolean _produce=true;
+
+        private Runner(MessageProducer prod)
+        {
+            _prod=prod;
+        }
+
+        public void run()
+        {
+            while (_produce)
+            {
+                try
+                {
+                    _prod.send(_message, DeliveryMode.PERSISTENT, 
Message.DEFAULT_PRIORITY,
+                            Message.DEFAULT_TIME_TO_LIVE);
+                    synchronized (_lock)
+                    {
+                        _messagesProduced++;
+                    }
+                }
+                catch (Exception e)
+                {
+                    e.printStackTrace();
+                    _produce=false;
+                }
+            }
+        }
+
+        public void stop()
+        {
+            _produce=false;
+        }
+    }
+
+}

Added: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java?rev=613807&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java
 Mon Jan 21 01:20:54 2008
@@ -0,0 +1,103 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.client.topic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+import javax.jms.*;
+import java.util.Properties;
+
+
+public class Server
+{
+    /**
+     * This class logger
+     */
+    private static final Logger _logger =LoggerFactory.getLogger(Server.class);
+
+
+    public static void main(String[] args)
+    {
+        (new Server()).runServer();
+    }
+
+    void runServer()
+    {
+        try
+        {
+            // Load JNDI properties
+            Properties properties=new Properties();
+            
properties.load(this.getClass().getResourceAsStream("topic.properties"));
+
+            //Create the initial context
+            Context ctx=new InitialContext(properties);
+
+            // Lookup the connection factory
+            ConnectionFactory conFac=(ConnectionFactory) 
ctx.lookup("qpidConnectionfactory");
+            // create the connection
+            Connection connection=conFac.createConnection();
+
+            connection.setExceptionListener(new ExceptionListener()
+            {
+                public void onException(JMSException jmse)
+                {
+                    // The connection may have broken invoke reconnect code if 
available.
+                    // The connection may have broken invoke reconnect code if 
available.
+                    _logger.warn("Received an exception through the 
ExceptionListener");
+                    System.exit(0);
+                }
+            });
+
+            // Create a session on the connection
+            // This session is a default choice of non-transacted and uses the 
auto acknowledge feature of a session.
+            Session session=connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            for (int i=0; i < 50; i++)
+            {
+                Topic topic=session.createTopic("topic-" + i);
+                TopicSubscriber dursub=session.createDurableSubscriber(topic, 
"durable-" + i);
+                dursub.setMessageListener(new MyListener());
+            }
+
+            // Now the messageConsumer is set up we can start the connection
+            connection.start();
+            synchronized (connection)
+            {
+                connection.wait();
+            }
+
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    private class MyListener implements MessageListener
+    {
+        public void onMessage(Message message)
+        {
+            _logger.debug("Received a message");
+        }
+    }
+}

Copied: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
 (from r613114, 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties?p2=incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties&p1=incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties&r1=613114&r2=613807&rev=613807&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties
 Mon Jan 21 01:20:54 2008
@@ -18,10 +18,7 @@
 #
 java.naming.factory.initial = 
org.apache.qpid.jndi.PropertiesFileInitialContextFactory
 
-# register some connection factories
-# connectionfactory.[jndiname] = [ConnectionURL]
-connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL 
PROTECTED]:localhost:5672
+#connectionfactory.local = amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='tcp://localhost:5672'
 
-# Register an AMQP destination in JNDI
-# destination.[jniName] = [BindingURL]
-destination.directQueue = 
direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file
+# A 0.10 connection factory
+connectionfactory.qpidConnectionfactory = qpid:password=pass;[EMAIL 
PROTECTED]:localhost:5672


Reply via email to