http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java deleted file mode 100644 index a03fab9..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import junit.framework.Assert; -import junit.framework.AssertionFailedError; -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// For now, ignore it ... -@Ignore -public class InactiveDurableTopicTest extends JmsTestBase { - private static final transient Logger LOG = LoggerFactory.getLogger(InactiveDurableTopicTest.class); - - private static final int MESSAGE_COUNT = 2000; - private static final String DEFAULT_PASSWORD = ""; - private static final String USERNAME = "testuser"; - private static final String CLIENTID = "mytestclient"; - private static final String TOPIC_NAME = "testevent"; - private static final String SUBID = "subscription1"; - private static final int DELIVERY_MODE = javax.jms.DeliveryMode.PERSISTENT; - private static final int DELIVERY_PRIORITY = javax.jms.Message.DEFAULT_PRIORITY; - private Connection connection; - private MessageProducer publisher; - private TopicSubscriber subscriber; - private Topic topic; - private Session session; - private HedwigConnectionFactoryImpl connectionFactory; - - @Override - protected void setUp() throws Exception { - super.setUp(); - - connectionFactory = new HedwigConnectionFactoryImpl(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - public void test1CreateSubscription() throws Exception { - try { - /* - * Step 1 - Establish a connection with a client id and create a - * durable subscription - */ - connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD); - assertNotNull(connection); - connection.setClientID(CLIENTID); - connection.start(); - session = connection.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE); - assertNotNull(session); - topic = session.createTopic(TOPIC_NAME); - assertNotNull(topic); - subscriber = session.createDurableSubscriber(topic, SUBID, "", false); - assertNotNull(subscriber); - subscriber.close(); - session.close(); - connection.close(); - } catch (JMSException ex) { - try { - connection.close(); - } catch (Exception ignore) { - } - throw new AssertionFailedError("Create Subscription caught: " + ex); - } - } - - public void test2ProducerTestCase() { - /* - * Step 2 - Establish a connection without a client id and create a - * producer and start pumping messages. We will get hung - */ - try { - connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD); - assertNotNull(connection); - session = connection.createSession(false, javax.jms.Session.CLIENT_ACKNOWLEDGE); - assertNotNull(session); - topic = session.createTopic(TOPIC_NAME); - assertNotNull(topic); - publisher = session.createProducer(topic); - assertNotNull(publisher); - MapMessage msg = session.createMapMessage(); - assertNotNull(msg); - msg.setString("key1", "value1"); - int loop; - for (loop = 0; loop < MESSAGE_COUNT; loop++) { - msg.setInt("key2", loop); - publisher.send(msg, DELIVERY_MODE, DELIVERY_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); - if (loop % 5000 == 0) { - LOG.info("Sent " + loop + " messages"); - } - } - Assert.assertEquals(loop, MESSAGE_COUNT); - publisher.close(); - session.close(); - connection.stop(); - connection.stop(); - } catch (JMSException ex) { - try { - connection.close(); - } catch (Exception ignore) { - } - throw new AssertionFailedError("Create Subscription caught: " + ex); - } - } - - public void test3CreateSubscription() throws Exception { - try { - /* - * Step 1 - Establish a connection with a client id and create a - * durable subscription - */ - connection = connectionFactory.createConnection(USERNAME, DEFAULT_PASSWORD); - assertNotNull(connection); - connection.setClientID(CLIENTID); - connection.start(); - session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - assertNotNull(session); - topic = session.createTopic(TOPIC_NAME); - assertNotNull(topic); - subscriber = session.createDurableSubscriber(topic, SUBID, "", false); - assertNotNull(subscriber); - int loop; - for (loop = 0; loop < MESSAGE_COUNT; loop++) { - Message msg = subscriber.receive(); - if (loop % 500 == 0) { - LOG.debug("Received " + loop + " messages"); - } - } - this.assertEquals(loop, MESSAGE_COUNT); - subscriber.close(); - session.close(); - connection.close(); - } catch (JMSException ex) { - try { - connection.close(); - } catch (Exception ignore) { - } - throw new AssertionFailedError("Create Subscription caught: " + ex); - } - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java deleted file mode 100644 index 8959de2..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import junit.framework.TestCase; -import junit.textui.TestRunner; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -// For now, ignore it ... -@Ignore -public class NetworkedSyncTest extends JmsTestBase { - - // constants - public static final int MESSAGE_COUNT = 10000; //100000; - private static final Logger LOG = LoggerFactory.getLogger(NetworkedSyncTest.class); - /** - * @param name - */ - public NetworkedSyncTest(String name) { - super(name); - LOG.info("Testcase started."); - } - - public static void main(String args[]) { - TestRunner.run(NetworkedSyncTest.class); - } - - public void testMessageExchange() throws Exception { - LOG.info("testMessageExchange() called."); - - long start = System.currentTimeMillis(); - - // create producer and consumer threads - Thread producer = new Thread(new Producer()); - Thread consumer = new Thread(new Consumer()); - // start threads - consumer.start(); - Thread.sleep(2000); - producer.start(); - - // wait for threads to finish - producer.join(); - consumer.join(); - long end = System.currentTimeMillis(); - - System.out.println("Duration: "+(end-start)); - } -} - -/** - * Message producer running as a separate thread, connecting to broker1 - */ -class Producer implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(Producer.class); - - /** - * connect to broker and constantly send messages - */ - public void run() { - - Connection connection = null; - Session session = null; - MessageProducer producer = null; - - try { - HedwigConnectionFactoryImpl amq = new HedwigConnectionFactoryImpl(); - connection = amq.createConnection(); - - connection.setExceptionListener(new javax.jms.ExceptionListener() { - public void onException(javax.jms.JMSException e) { - e.printStackTrace(); - } - }); - - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic destination = session.createTopic("TEST.FOO"); - - producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - long counter = 0; - - // Create and send message - for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) { - - String text = "Hello world! From: " - + Thread.currentThread().getName() + " : " - + this.hashCode() + ":" + counter; - TextMessage message = session.createTextMessage(text); - producer.send(message); - counter++; - - if ((counter % 1000) == 0) - LOG.info("sent " + counter + " messages"); - - } - } catch (Exception ex) { - LOG.error(ex.toString()); - return; - } finally { - try { - if (producer != null) - producer.close(); - if (session != null) - session.close(); - if (connection != null) - connection.close(); - } catch (Exception e) { - LOG.error("Problem closing down JMS objects: " + e); - } - } - } -} - -/* - * Message consumer running as a separate thread, connecting to broker2 - */ -class Consumer implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);; - - /** - * connect to broker and receive messages - */ - public void run() { - Connection connection = null; - Session session = null; - MessageConsumer consumer = null; - - try { - HedwigConnectionFactoryImpl amq = new HedwigConnectionFactoryImpl(); - connection = amq.createConnection(); - // need to set clientID when using durable subscription. - if (null == connection.getClientID()) connection.setClientID("tmielke"); - - connection.setExceptionListener(new javax.jms.ExceptionListener() { - public void onException(javax.jms.JMSException e) { - e.printStackTrace(); - } - }); - - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic("TEST.FOO"); - consumer = session.createDurableSubscriber((Topic) destination,"tmielke"); - - long counter = 0; - // Wait for a message - for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) { - Message message2 = consumer.receive(); - if (message2 instanceof TextMessage) { - TextMessage textMessage = (TextMessage) message2; - String text = textMessage.getText(); - // logger.info("Received: " + text); - } else { - LOG.error("Received message of unsupported type. Expecting TextMessage. "+ message2); - } - counter++; - if ((counter % 1000) == 0) - LOG.info("received " + counter + " messages"); - } - } catch (Exception e) { - LOG.error("Error in Consumer: " + e); - return; - } finally { - try { - if (consumer != null) - consumer.close(); - if (session != null) - session.close(); - if (connection != null) - connection.close(); - } catch (Exception ex) { - LOG.error("Error closing down JMS objects: " + ex); - } - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NumberOfDestinationsTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NumberOfDestinationsTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NumberOfDestinationsTest.java deleted file mode 100644 index f05e482..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/NumberOfDestinationsTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -/** - * A NumberOfDestinationsTest - * - */ -import javax.jms.Topic; -import java.io.File; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// For now, ignore it ... -@Ignore -public class NumberOfDestinationsTest extends JmsTestBase { - protected static final int MESSAGE_COUNT = 1; - protected static final int NUMBER_OF_DESTINATIONS = 100000; - private static final Logger LOG = LoggerFactory.getLogger(NumberOfDestinationsTest.class); - protected int destinationCount; - - public void testDestinations() throws Exception { - ConnectionFactory factory = createConnectionFactory(); - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer mp = session.createProducer(null); - for (int j = 0; j < NUMBER_OF_DESTINATIONS; j++) { - Destination dest = getDestination(session); - - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message msg = session.createTextMessage("test" + i); - mp.send(dest, msg); - } - if (j % 500 == 0) { - LOG.info("Iterator " + j); - } - } - connection.close(); - } - - protected Destination getDestination(Session session) throws JMSException { - String topicName = getClass().getName() + "." + destinationCount++; - return session.createTopic(topicName); - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - HedwigConnectionFactoryImpl cf = new HedwigConnectionFactoryImpl(); - return cf; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfConsumer.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfConsumer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfConsumer.java deleted file mode 100644 index 9eefad5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfConsumer.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.hedwig.jms.LRUCacheSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class PerfConsumer implements MessageListener { - private static final Logger LOG = LoggerFactory.getLogger(PerfConsumer.class); - protected Connection connection; - protected MessageConsumer consumer; - protected long sleepDuration; - protected long initialDelay; - protected boolean enableAudit = false; - protected boolean firstMessage =true; - protected String lastMsgId; - - protected PerfRate rate = new PerfRate(); - - public PerfConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException { - connection = fac.createConnection(); - connection.setClientID(consumerName); - Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - if (dest instanceof Topic && consumerName != null && consumerName.length() > 0) { - consumer = s.createDurableSubscriber((Topic)dest, consumerName); - } else { - consumer = s.createConsumer(dest); - } - consumer.setMessageListener(this); - } - - public PerfConsumer(ConnectionFactory fac, Destination dest) throws JMSException { - this(fac, dest, null); - } - - public void start() throws JMSException { - connection.start(); - rate.reset(); - } - - public void stop() throws JMSException { - connection.stop(); - } - - public void shutDown() throws JMSException { - connection.close(); - } - - public PerfRate getRate() { - return rate; - } - - private LRUCacheSet<String> messageIdCache = new LRUCacheSet<String>(2048, false); - public void onMessage(Message msg) { - if (firstMessage) { - firstMessage=false; - if (getInitialDelay() > 0) { - try { - Thread.sleep(getInitialDelay()); - } catch (InterruptedException e) { - } - } - } - rate.increment(); - try { - if (enableAudit && this.messageIdCache.contains(msg.getJMSMessageID())){ - LOG.error("Duplicate Message!" + msg); - } - lastMsgId=msg.getJMSMessageID(); - this.messageIdCache.add(lastMsgId); - } catch (JMSException e1) { - e1.printStackTrace(); - } - try { - if (sleepDuration != 0) { - Thread.sleep(sleepDuration); - } - } catch (InterruptedException e) { - } - } - - public synchronized long getSleepDuration() { - return sleepDuration; - } - - public synchronized void setSleepDuration(long sleepDuration) { - this.sleepDuration = sleepDuration; - } - - public boolean isEnableAudit() { - return enableAudit; - } - - public void setEnableAudit(boolean doAudit) { - this.enableAudit = doAudit; - } - - /** - * @return the initialDelay - */ - public long getInitialDelay() { - return initialDelay; - } - - /** - * @param initialDelay the initialDelay to set - */ - public void setInitialDelay(long initialDelay) { - this.initialDelay = initialDelay; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfProducer.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfProducer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfProducer.java deleted file mode 100644 index a65b2c0..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfProducer.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class PerfProducer implements Runnable { - protected Connection connection; - protected MessageProducer producer; - protected PerfRate rate = new PerfRate(); - private final byte[] payload; - private Session session; - private final CountDownLatch stopped = new CountDownLatch(1); - private boolean running; - private final boolean transacted; - private int sleep = 0; - - public PerfProducer(ConnectionFactory fac, Destination dest, byte[] payload) throws JMSException { - this(fac, dest, payload, false); - } - public PerfProducer(ConnectionFactory fac, Destination dest, byte[] payload, boolean transacted) - throws JMSException { - connection = fac.createConnection(); - this.transacted = transacted; - if (transacted) { - session = connection.createSession(true, Session.SESSION_TRANSACTED); - } else { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - producer = session.createProducer(dest); - this.payload = payload; - } - - public void setDeliveryMode(int mode) throws JMSException { - producer.setDeliveryMode(mode); - } - - public void setTimeToLive(int ttl) throws JMSException { - producer.setTimeToLive(ttl); - } - - public void shutDown() throws JMSException { - connection.close(); - } - - public PerfRate getRate() { - return rate; - } - - public synchronized void start() throws JMSException { - if (!running) { - rate.reset(); - running = true; - connection.start(); - Thread t = new Thread(this); - t.setName("Producer"); - t.start(); - } - } - - public void stop() throws JMSException, InterruptedException { - synchronized (this) { - running = false; - } - stopped.await(1, TimeUnit.SECONDS); - connection.stop(); - } - - public synchronized boolean isRunning() { - return running; - } - - public void run() { - try { - while (isRunning()) { - BytesMessage msg; - msg = session.createBytesMessage(); - msg.writeBytes(payload); - producer.send(msg); - if(this.transacted) { - this.session.commit(); - } - rate.increment(); - if (sleep > 0) { - Thread.sleep(sleep); - } - } - } catch (Throwable e) { - e.printStackTrace(); - } finally { - stopped.countDown(); - } - } - - public int getSleep() { - return sleep; - } - - public void setSleep(int sleep) { - this.sleep = sleep; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfRate.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfRate.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfRate.java deleted file mode 100644 index 7942667..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/PerfRate.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -public class PerfRate { - - protected int totalCount; - protected int count; - protected long startTime = System.currentTimeMillis(); - - /** - * @return Returns the count. - */ - public int getCount() { - return totalCount; - } - - public synchronized void increment() { - totalCount++; - count++; - } - - public int getRate() { - long endTime = System.currentTimeMillis(); - long totalTime = endTime - startTime; - int result = (int)((count * 1000) / totalTime); - return result; - } - - /** - * Resets the rate sampling. - */ - public synchronized PerfRate cloneAndReset() { - PerfRate rc = new PerfRate(); - rc.totalCount = totalCount; - rc.count = count; - rc.startTime = startTime; - count = 0; - startTime = System.currentTimeMillis(); - return rc; - } - - /** - * Resets the rate sampling. - */ - public void reset() { - count = 0; - startTime = System.currentTimeMillis(); - } - - /** - * @return Returns the totalCount. - */ - public int getTotalCount() { - return totalCount; - } - - /** - * @param totalCount The totalCount to set. - */ - public void setTotalCount(int totalCount) { - this.totalCount = totalCount; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java deleted file mode 100644 index e909211..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicNetworkTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import org.junit.Ignore; - -import javax.jms.Topic; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; - - -// For now, ignore it ... -@Ignore -public class SimpleDurableTopicNetworkTest extends SimpleNetworkTest { - - protected void setUp() throws Exception { - numberofProducers=1; - numberOfConsumers=1; - sampleCount=1000; - playloadSize = 1024; - super.setUp(); - } - - protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, - int number, byte payload[]) throws JMSException { - PerfProducer pp = new PerfProducer(fac, dest, payload); - pp.setDeliveryMode(DeliveryMode.PERSISTENT); - return pp; - } - - protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { - return new PerfConsumer(fac, dest, "subs:" + number); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java deleted file mode 100644 index c10d3bf..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleDurableTopicTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Topic; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.junit.Ignore; - -// For now, ignore it ... -@Ignore -public class SimpleDurableTopicTest extends SimpleTopicTest { - protected long initialConsumerDelay = 0; - @Override - protected void setUp() throws Exception { - numberOfDestinations=1; - numberOfConsumers = 1; - numberofProducers = Integer.parseInt(System.getProperty("SimpleDurableTopicTest.numberofProducers", "20"), 20); - sampleCount= Integer.parseInt(System.getProperty("SimpleDurableTopicTest.sampleCount", "1000"), 10); - playloadSize = 1024; - super.setUp(); - } - - @Override - protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, - int number, byte payload[]) throws JMSException { - PerfProducer pp = new PerfProducer(fac, dest, payload); - pp.setDeliveryMode(DeliveryMode.PERSISTENT); - return pp; - } - - @Override - protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { - PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number); - result.setInitialDelay(this.initialConsumerDelay); - return result; - } - - @Override - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - HedwigConnectionFactoryImpl result = super.createConnectionFactory(); - //result.setSendAcksAsync(false); - return result; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java deleted file mode 100644 index 0d32d49..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNetworkTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Session; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -// For now, ignore it ... -@Ignore -public class SimpleNetworkTest extends SimpleTopicTest { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class); - protected HedwigConnectionFactoryImpl consumerFactory; - protected HedwigConnectionFactoryImpl producerFactory; - - protected void setUp() throws Exception { - super.setUp(); - consumerFactory = createConnectionFactory(); - producerFactory = createConnectionFactory(); - Connection con = consumerFactory.createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - producers = new PerfProducer[numberofProducers*numberOfDestinations]; - consumers = new PerfConsumer[numberOfConsumers*numberOfDestinations]; - - for (int k =0; k < numberOfDestinations;k++) { - Destination destination = createDestination(session, destinationName+":"+k); - LOG.info("Testing against destination: " + destination); - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i] = createConsumer(consumerFactory, destination, i); - consumers[i].start(); - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; - } - producers[i] = createProducer(producerFactory, destination, i, array); - producers[i].start(); - } - } - con.close(); - } - - protected void tearDown() throws Exception { - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i].shutDown(); - } - for (int i = 0; i < numberofProducers; i++) { - producers[i].shutDown(); - } - super.tearDown(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java deleted file mode 100644 index a2e2b7d..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleNonPersistentTopicTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import org.junit.Ignore; - -import javax.jms.Topic; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; - -// For now, ignore it ... -@Ignore -public class SimpleNonPersistentTopicTest extends SimpleTopicTest { - - protected PerfProducer createProducer(ConnectionFactory fac, - Destination dest, int number, byte[] payload) throws JMSException { - PerfProducer pp = new PerfProducer(fac, dest, payload); - pp.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - return pp; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java deleted file mode 100644 index 5df97e5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Session; -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -// For now, ignore it ... -@Ignore -public class SimpleTopicTest extends JmsTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicTest.class); - - protected PerfProducer[] producers; - protected PerfConsumer[] consumers; - protected String destinationName = getClass().getName(); - // protected int sampleCount = 20; - // protected long sampleInternal = 10000; - protected int sampleCount = 5; - protected long sampleInternal = 5000; - protected int numberOfDestinations=1; - protected int numberOfConsumers = 1; - protected int numberofProducers = 1; - protected int totalNumberOfProducers; - protected int totalNumberOfConsumers; - protected int playloadSize = 12; - protected byte[] array; - protected ConnectionFactory factory; - - /** - * Sets up a test where the producer and consumer have their own connection. - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - factory = createConnectionFactory(); - Connection con = factory.createConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOG.info("Running " + numberofProducers + " producer(s) and " + numberOfConsumers - + " consumer(s) per " + numberOfDestinations + " Destination(s)"); - - totalNumberOfConsumers=numberOfConsumers*numberOfDestinations; - totalNumberOfProducers=numberofProducers*numberOfDestinations; - producers = new PerfProducer[totalNumberOfProducers]; - consumers = new PerfConsumer[totalNumberOfConsumers]; - int consumerCount = 0; - int producerCount = 0; - for (int k =0; k < numberOfDestinations;k++) { - Destination destination = createDestination(session, destinationName+":"+k); - LOG.info("Testing against destination: " + destination); - for (int i = 0; i < numberOfConsumers; i++) { - consumers[consumerCount] = createConsumer(factory, destination, consumerCount); - consumerCount++; - } - for (int i = 0; i < numberofProducers; i++) { - array = new byte[playloadSize]; - for (int j = i; j < array.length; j++) { - array[j] = (byte)j; - } - producers[producerCount] = createProducer(factory, destination, i, array); - producerCount++; - } - } - con.close(); - // super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - for (int i = 0; i < numberOfConsumers; i++) { - consumers[i].shutDown(); - } - for (int i = 0; i < numberofProducers; i++) { - producers[i].shutDown(); - } - } - - protected Destination createDestination(Session s, String destinationName) throws JMSException { - return s.createTopic(destinationName); - } - - protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, - int number, byte[] payload) throws JMSException { - return new PerfProducer(fac, dest, payload); - } - - protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { - return new PerfConsumer(fac, dest); - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - public void testPerformance() throws JMSException, InterruptedException { - for (int i = 0; i < totalNumberOfConsumers; i++) { - consumers[i].start(); - } - for (int i = 0; i < totalNumberOfProducers; i++) { - producers[i].start(); - } - LOG.info("Sampling performance " + sampleCount + " times at a " + sampleInternal + " ms interval."); - for (int i = 0; i < sampleCount; i++) { - Thread.sleep(sampleInternal); - dumpProducerRate(); - dumpConsumerRate(); - } - for (int i = 0; i < totalNumberOfProducers; i++) { - producers[i].stop(); - } - for (int i = 0; i < totalNumberOfConsumers; i++) { - consumers[i].stop(); - } - } - - protected void dumpProducerRate() { - int totalRate = 0; - int totalCount = 0; - String producerString="Producers:"; - for (int i = 0; i < producers.length; i++) { - PerfRate rate = producers[i].getRate().cloneAndReset(); - totalRate += rate.getRate(); - totalCount += rate.getTotalCount(); - producerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; - } - if (producers != null && producers.length > 0) { - int avgRate = totalRate / producers.length; - System.out.println("Avg producer rate = " + avgRate - + " msg/sec | Total rate = " + totalRate + ", sent = " - + totalCount); - // System.out.println(producerString); - } - } - - protected void dumpConsumerRate() { - int totalRate = 0; - int totalCount = 0; - String consumerString="Consumers:"; - for (int i = 0; i < consumers.length; i++) { - PerfRate rate = consumers[i].getRate().cloneAndReset(); - totalRate += rate.getRate(); - totalCount += rate.getTotalCount(); - consumerString+="["+i+":"+rate.getRate() + ","+rate.getTotalCount()+"];"; - } - if (consumers != null && consumers.length > 0) { - int avgRate = totalRate / consumers.length; - System.out.println("Avg consumer rate = " + avgRate + " msg/sec | Total rate = " - + totalRate + ", received = " + totalCount); - System.out.println(consumerString); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumer.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumer.java deleted file mode 100644 index c670dbd..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumer.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SlowConsumer extends PerfConsumer { - private static final transient Logger LOG = LoggerFactory.getLogger(SlowConsumer.class); - - public SlowConsumer(ConnectionFactory fac, Destination dest, String consumerName) throws JMSException { - super(fac, dest, consumerName); - } - - public SlowConsumer(ConnectionFactory fac, Destination dest) throws JMSException { - super(fac, dest, null); - } - - public void onMessage(Message msg) { - super.onMessage(msg); - LOG.debug("GOT A MSG " + msg); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java deleted file mode 100644 index 7b66d77..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowConsumerTopicTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - - -import javax.jms.Topic; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.junit.Ignore; - - -// For now, ignore it ... -@Ignore -public class SlowConsumerTopicTest extends SimpleTopicTest { - - protected PerfConsumer[] slowConsumers; - protected void setUp() throws Exception { - playloadSize = 10 * 1024; - super.setUp(); - } - - protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException { - PerfConsumer result = new SlowConsumer(fac, dest); - return result; - } - - protected PerfProducer createProducer(ConnectionFactory fac, Destination dest, - int number, byte[] payload) throws JMSException { - PerfProducer result = super.createProducer(fac, dest, number, payload); - result.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - result.setSleep(10); - return result; - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - HedwigConnectionFactoryImpl result = super.createConnectionFactory(); - return result; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowDurableConsumerTopicTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowDurableConsumerTopicTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowDurableConsumerTopicTest.java deleted file mode 100644 index 45bfc28..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/SlowDurableConsumerTopicTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import org.junit.Ignore; - -import javax.jms.Topic; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; - -// For now, ignore it ... -@Ignore -public class SlowDurableConsumerTopicTest extends SlowConsumerTopicTest { - - protected PerfConsumer[] slowConsumers; - protected int numberOfSlowConsumers = 1; - - protected PerfConsumer createSlowConsumer(ConnectionFactory fac, - Destination dest, int number) throws JMSException { - return new SlowConsumer(fac, dest, "durableSlowConsumer" + number); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsResourceProvider.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsResourceProvider.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsResourceProvider.java deleted file mode 100644 index e05770e..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsResourceProvider.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import javax.jms.Connection; -import javax.jms.ConnectionConsumer; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ServerSessionPool; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -public class JmsResourceProvider { - - private boolean transacted; - private int ackMode = Session.AUTO_ACKNOWLEDGE; - private boolean isTopic; - private int deliveryMode = DeliveryMode.PERSISTENT; - private String durableName = "DummyName"; - private String clientID = getClass().getName(); - - /** - * Creates a connection factory. - * - * @see org.apache.activemq.test.JmsResourceProvider#createConnectionFactory() - */ - public ConnectionFactory createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - /** - * Creates a connection. - * - * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory) - */ - public Connection createConnection(ConnectionFactory cf) throws JMSException { - Connection connection = cf.createConnection(); - if (getClientID() != null) { - connection.setClientID(getClientID()); - } - return connection; - } - - /** - * @see org.apache.activemq.test.JmsResourceProvider#createSession(javax.jms.Connection) - */ - public Session createSession(Connection conn) throws JMSException { - return conn.createSession(transacted, ackMode); - } - - /** - * @see org.apache.activemq.test.JmsResourceProvider#createConsumer(javax.jms.Session, - * javax.jms.Destination) - */ - public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { - if (isDurableSubscriber()) { - return session.createDurableSubscriber((Topic)destination, durableName); - } - return session.createConsumer(destination); - } - - /** - * Creates a connection for a consumer. - * - * @param ssp - ServerSessionPool - * @return ConnectionConsumer - */ - public ConnectionConsumer createConnectionConsumer(Connection connection, - Destination destination, ServerSessionPool ssp) throws JMSException { - return connection.createConnectionConsumer(destination, null, ssp, 1); - } - - /** - * Creates a producer. - * - * @see org.apache.activemq.test.JmsResourceProvider#createProducer(javax.jms.Session, - * javax.jms.Destination) - */ - public MessageProducer createProducer(Session session, Destination destination) throws JMSException { - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - return producer; - } - - /** - * Creates a destination, which can either a topic or a queue. - * - * @see org.apache.activemq.test.JmsResourceProvider#createDestination(javax.jms.Session, - * java.lang.String) - */ - public Destination createDestination(Session session, String name) throws JMSException { - if (isTopic) { - return session.createTopic("TOPIC." + name); - } else { - return session.createTopic("QUEUE." + name); - } - } - - /** - * Returns true if the subscriber is durable. - * - * @return isDurableSubscriber - */ - public boolean isDurableSubscriber() { - return isTopic && durableName != null; - } - - /** - * Returns the acknowledgement mode. - * - * @return Returns the ackMode. - */ - public int getAckMode() { - return ackMode; - } - - /** - * Sets the acnknowledgement mode. - * - * @param ackMode The ackMode to set. - */ - public void setAckMode(int ackMode) { - this.ackMode = ackMode; - } - - /** - * Returns true if the destination is a topic, false if the destination is a - * queue. - * - * @return Returns the isTopic. - */ - public boolean isTopic() { - return isTopic; - } - - /** - * @param isTopic The isTopic to set. - */ - public void setTopic(boolean isTopic) { - this.isTopic = isTopic; - } - - /** - * Return true if the session is transacted. - * - * @return Returns the transacted. - */ - public boolean isTransacted() { - return transacted; - } - - /** - * Sets the session to be transacted. - * - * @param transacted - */ - public void setTransacted(boolean transacted) { - this.transacted = transacted; - if (transacted) { - setAckMode(Session.SESSION_TRANSACTED); - } - } - - /** - * Returns the delivery mode. - * - * @return deliveryMode - */ - public int getDeliveryMode() { - return deliveryMode; - } - - /** - * Sets the delivery mode. - * - * @param deliveryMode - */ - public void setDeliveryMode(int deliveryMode) { - this.deliveryMode = deliveryMode; - } - - /** - * Returns the client id. - * - * @return clientID - */ - public String getClientID() { - return clientID; - } - - /** - * Sets the client id. - * - * @param clientID - */ - public void setClientID(String clientID) { - this.clientID = clientID; - } - - /** - * Returns the durable name of the provider. - * - * @return durableName - */ - public String getDurableName() { - return durableName; - } - - /** - * Sets the durable name of the provider. - * - * @param durableName - */ - public void setDurableName(String durableName) { - this.durableName = durableName; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java deleted file mode 100644 index bc150df..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.AssertionFailedError; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class JmsSendReceiveTestSupport extends org.apache.activemq.TestSupport implements MessageListener { - private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveTestSupport.class); - - protected String[] data; - protected Session session; - protected Session consumeSession; - protected MessageConsumer consumer; - protected MessageProducer producer; - protected Destination consumerDestination; - protected Destination producerDestination; - protected List<Message> messages = createConcurrentList(); - protected boolean durable = true; - protected int deliveryMode = DeliveryMode.PERSISTENT; - protected final Object lock = new Object(); - protected boolean verbose; - protected boolean useSeparateSession; - protected boolean largeMessages; - protected int largeMessageLoopSize = 4 * 1024; - - protected int messageCountDef; - - /* - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - messageCountDef = 100; - { - int temp = Integer.getInteger("messageCount", -1); - - if (temp > 0) { - messageCountDef = temp; - } - } - - initializeData(messageCountDef); - } - - protected void initializeData(int messageCount) { - LOG.info("Message count for test case is: " + messageCount); - data = new String[messageCount]; - for (int i = 0; i < messageCount; i++) { - data[i] = createMessageText(i); - } - } - - - protected void initializeData(HedwigConnectionImpl connection, SessionImpl session) { - int messageCount = 100; - if (session.getTransacted() || Session.CLIENT_ACKNOWLEDGE == session.getAcknowledgeMode()){ - messageCount = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1; - } - - initializeData(messageCount); - } - - protected String createMessageText(int i) { - if (largeMessages) { - return createMessageBodyText(); - } else { - return "Text for message: " + i + " at " + new Date(); - } - } - - protected String createMessageBodyText() { - StringBuffer buffer = new StringBuffer(); - for (int i = 0; i < largeMessageLoopSize; i++) { - buffer.append("0123456789"); - } - return buffer.toString(); - } - - /** - * Test if all the messages sent are being received. - * - * @throws Exception - */ - public void testSendReceive() throws Exception { - initializeData((HedwigConnectionImpl) ((SessionImpl)session).getConnection(), (SessionImpl) session); - Thread.sleep(1000); - messages.clear(); - LOG.info("cleared messages, now sending the messages"); - - sendMessages(); - - assertMessagesAreReceived(); - LOG.info("" + data.length + " messages(s) received, closing down connections"); - } - - protected void sendMessages() throws Exception { - for (int i = 0; i < data.length; i++) { - Message message = createMessage(i); - configureMessage(message); - if (verbose) { - LOG.info("About to send a message: " + message + " with text: " + data[i]); - } - sendMessage(i, message); - } - LOG.info("Sent " + data.length + " messages to " + producerDestination); - } - - protected void sendMessage(int index, Message message) throws Exception { - producer.send(producerDestination, message); - } - - protected Message createMessage(int index) throws JMSException { - Message message = session.createTextMessage(data[index]); - return message; - } - - /** - * A hook to allow the message to be configured such as adding extra headers - * - * @throws JMSException - */ - protected void configureMessage(Message message) throws JMSException { - } - - /** - * Waits to receive the messages and performs the test if all messages have - * been received and are in sequential order. - * - * @throws JMSException - */ - protected void assertMessagesAreReceived() throws JMSException { - waitForMessagesToBeDelivered(); - assertMessagesReceivedAreValid(messages); - } - - /** - * Tests if the messages have all been received and are in sequential order. - * - * @param receivedMessages - * @throws JMSException - */ - protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException { - List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray()); - int counter = 0; - - if (data.length != copyOfMessages.size()) { - for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) { - Object message = iter.next(); - LOG.info("<== " + counter++ + " = " + message); - } - } - - assertEquals("Invalid number of messages received", data.length, receivedMessages.size()); - - for (int i = 0; i < data.length; i++) { - Message received = receivedMessages.get(i); - try { - assertMessageValid(i, received); - } catch (AssertionFailedError e) { - for (int j = 0; j < data.length; j++) { - Message m = receivedMessages.get(j); - System.out.println(j+" => "+m.getJMSMessageID()); - } - throw e; - } - } - } - - protected void assertMessageValid(int index, Message message) throws JMSException { - TextMessage textMessage = (TextMessage)message; - String text = textMessage.getText(); - - if (verbose) { - LOG.info("Received Text: " + text); - } - - assertEquals("Message: " + index, data[index], text); - } - - /** - * Waits for the messages to be delivered or when the wait time has been - * reached. - */ - protected void waitForMessagesToBeDelivered() { - final long maxWaitTime = 20000; - final long start = System.currentTimeMillis(); - long waitTime = maxWaitTime; - - synchronized (lock) { - while (messages.size() < data.length && waitTime >= 0) { - try { - lock.wait(200); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - waitTime = maxWaitTime - (System.currentTimeMillis() - start); - } - } - } - - /** - * @see javax.jms.MessageListener#onMessage(javax.jms.Message) - */ - public void onMessage(Message message) { - consumeMessage(message, messages); - } - - /** - * Consumes a received message. - * - * @param message - a newly received message. - * @param messageList - list containing the received messages. - */ - protected void consumeMessage(Message message, List<Message> messageList) { - if (verbose) { - LOG.info("Received message: " + message); - } - - messageList.add(message); - - if (messageList.size() >= data.length) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - - /** - * Creates a synchronized list. - * - * @return a synchronized view of the specified list. - */ - protected List<Message> createConcurrentList() { - return Collections.synchronizedList(new ArrayList<Message>()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java deleted file mode 100644 index 0fcde33..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSendReceiveTest.class); - - protected Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - - connectionFactory = createConnectionFactory(); - connection = createConnection(!durable); - if (durable) { - connection.setClientID(getClass().getName()); - } - - LOG.info("Created connection: " + connection); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumeSession = createConsumerSession(); - - LOG.info("Created session: " + session); - LOG.info("Created consumeSession: " + consumeSession); - producer = session.createProducer(null); - producer.setDeliveryMode(deliveryMode); - - LOG.info("Created producer: " + producer + " delivery mode = " - + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); - - consumerDestination = session.createTopic(getConsumerSubject()); - producerDestination = session.createTopic(getProducerSubject()); - - LOG.info("Created consumer destination: " + consumerDestination - + " of type: " + consumerDestination.getClass()); - LOG.info("Created producer destination: " + producerDestination - + " of type: " + producerDestination.getClass()); - consumer = createConsumer(); - consumer.setMessageListener(this); - startConnection(); - - LOG.info("Created connection: " + connection); - } - - protected void startConnection() throws JMSException { - connection.start(); - } - - protected void tearDown() throws Exception { - LOG.info("Dumping stats..."); - // TODO - // connectionFactory.getFactoryStats().dump(new IndentPrinter()); - - LOG.info("Closing down connection"); - - /** TODO we should be able to shut down properly */ - session.close(); - connection.close(); - super.tearDown(); - } - - /** - * Creates a session. - * - * @return session - * @throws JMSException - */ - protected Session createConsumerSession() throws JMSException { - if (useSeparateSession) { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } else { - return session; - } - } - - /** - * Creates a durable suscriber or a consumer. - * - * @return MessageConsumer - durable suscriber or consumer. - * @throws JMSException - */ - protected MessageConsumer createConsumer() throws JMSException { - if (durable) { - LOG.info("Creating durable consumer"); - return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName()); - } - return consumeSession.createConsumer(consumerDestination); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsAndByteSelectorTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsAndByteSelectorTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsAndByteSelectorTest.java deleted file mode 100644 index 6f55fcb..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsAndByteSelectorTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import javax.jms.Topic; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; - -public class JmsTopicSendReceiveWithTwoConnectionsAndByteSelectorTest - extends JmsTopicSendReceiveWithTwoConnectionsTest { - - protected void configureMessage(Message message) throws JMSException { - message.setByteProperty("dummy", (byte) 33); - } - - protected MessageConsumer createConsumer() throws JMSException { - return receiveSession.createConsumer(consumerDestination, "dummy = 33", false); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java deleted file mode 100644 index 2d4a633..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(JmsTopicSendReceiveWithTwoConnectionsTest.class); - - protected Connection sendConnection; - protected Connection receiveConnection; - protected Session receiveSession; - - /** - * Sets up a test where the producer and consumer have their own connection. - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - - connectionFactory = createConnectionFactory(); - - LOG.info("Creating send connection"); - sendConnection = createSendConnection(); - LOG.info("Starting send connection"); - sendConnection.start(); - - LOG.info("Creating receive connection"); - receiveConnection = createReceiveConnection(); - LOG.info("Starting receive connection"); - receiveConnection.start(); - - LOG.info("Created sendConnection: " + sendConnection); - LOG.info("Created receiveConnection: " + receiveConnection); - - session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOG.info("Created sendSession: " + session); - LOG.info("Created receiveSession: " + receiveSession); - - producer = session.createProducer(null); - producer.setDeliveryMode(deliveryMode); - - LOG.info("Created producer: " + producer + " delivery mode = " - + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT")); - - consumerDestination = session.createTopic(getConsumerSubject()); - producerDestination = session.createTopic(getProducerSubject()); - - LOG.info("Created consumer destination: " + consumerDestination - + " of type: " + consumerDestination.getClass()); - LOG.info("Created producer destination: " + producerDestination - + " of type: " + producerDestination.getClass()); - - consumer = createConsumer(); - consumer.setMessageListener(this); - - LOG.info("Started connections"); - } - - protected MessageConsumer createConsumer() throws JMSException { - return receiveSession.createConsumer(consumerDestination); - } - - /* - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - session.close(); - receiveSession.close(); - sendConnection.close(); - receiveConnection.close(); - super.tearDown(); - } - - /** - * Creates a connection. - * - * @return Connection - * @throws Exception - */ - protected Connection createReceiveConnection() throws Exception { - return createConnection(false); - } - - /** - * Creates a connection. - * - * @return Connection - * @throws Exception - */ - protected Connection createSendConnection() throws Exception { - return createConnection(false); - } - - /** - * Creates an HedwigConnectionFactoryImpl. - * - * @see org.apache.activemq.test.TestSupport#createConnectionFactory() - */ - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } -}