http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java deleted file mode 100644 index d4155a7..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/CombinationTestSupport.java +++ /dev/null @@ -1,250 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Poor mans way of getting JUnit to run a test case through a few different - * combinations of options. Usage: If you have a test case called testFoo what - * you want to run through a few combinations, of of values for the attributes - * age and color, you would something like: <code> - * public void initCombosForTestFoo() { - * addCombinationValues( "age", new Object[]{ new Integer(21), new Integer(30) } ); - * addCombinationValues( "color", new Object[]{"blue", "green"} ); - * } - * </code> - * The testFoo test case would be run for each possible combination of age and - * color that you setup in the initCombosForTestFoo method. Before each - * combination is run, the age and color fields of the test class are set to one - * of the values defined. This is done before the normal setUp method is called. - * If you want the test combinations to show up as separate test runs in the - * JUnit reports, add a suite method to your test case similar to: <code> - * public static Test suite() { - * return suite(FooTest.class); - * } - * </code> - * - * - */ -public abstract class CombinationTestSupport extends AutoFailTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class); - - private HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>(); - private boolean combosEvaluated; - private Map<String, Object> options; - - static class ComboOption { - final String attribute; - final LinkedHashSet<Object> values = new LinkedHashSet<Object>(); - - public ComboOption(String attribute, Collection<Object> options) { - this.attribute = attribute; - this.values.addAll(options); - } - } - - public void addCombinationValues(String attribute, Object[] options) { - ComboOption co = this.comboOptions.get(attribute); - if (co == null) { - this.comboOptions.put(attribute, new ComboOption(attribute, Arrays.asList(options))); - } else { - co.values.addAll(Arrays.asList(options)); - } - } - - public void runBare() throws Throwable { - if (combosEvaluated) { - super.runBare(); - } else { - CombinationTestSupport[] combinations = getCombinations(); - for (int i = 0; i < combinations.length; i++) { - CombinationTestSupport test = combinations[i]; - if (getName() == null || getName().equals(test.getName())) { - test.runBare(); - } - } - } - } - - private void setOptions(Map<String, Object> options) throws NoSuchFieldException, IllegalAccessException { - this.options = options; - for (Iterator<String> iterator = options.keySet().iterator(); iterator.hasNext();) { - String attribute = iterator.next(); - Object value = options.get(attribute); - try { - Field field = getClass().getField(attribute); - field.set(this, value); - } catch (Throwable e) { - try { - boolean found = false; - String setterName = "set" + attribute.substring(0, 1).toUpperCase() + - attribute.substring(1); - for(Method method : getClass().getMethods()) { - if (method.getName().equals(setterName)) { - method.invoke(this, value); - found = true; - break; - } - } - - if (!found) { - throw new NoSuchMethodError("No setter found for field: " + attribute); - } - - } catch(Throwable ex) { - LOG.info("Could not set field '" + attribute + "' to value '" + value + - "', make sure the field exists and is public or has a setter."); - } - } - } - } - - private CombinationTestSupport[] getCombinations() { - try { - Method method = getClass().getMethod("initCombos", (Class[])null); - method.invoke(this, (Object[])null); - } catch (Throwable e) { - } - - String name = getName().split(" ")[0]; - String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1); - try { - Method method = getClass().getMethod(comboSetupMethodName, (Class[])null); - method.invoke(this, (Object[])null); - } catch (Throwable e) { - } - - try { - ArrayList<HashMap<String, Object>> expandedOptions = new ArrayList<HashMap<String, Object>>(); - expandCombinations(new ArrayList<ComboOption>(comboOptions.values()), expandedOptions); - - if (expandedOptions.isEmpty()) { - combosEvaluated = true; - return new CombinationTestSupport[] {this}; - } else { - - ArrayList<CombinationTestSupport> result = new ArrayList<CombinationTestSupport>(); - // Run the test case for each possible combination - for (Iterator<HashMap<String, Object>> iter = expandedOptions.iterator(); iter.hasNext();) { - CombinationTestSupport combo = (CombinationTestSupport)TestSuite.createTest(getClass(), name); - combo.combosEvaluated = true; - combo.setOptions(iter.next()); - result.add(combo); - } - - CombinationTestSupport rc[] = new CombinationTestSupport[result.size()]; - result.toArray(rc); - return rc; - } - } catch (Throwable e) { - combosEvaluated = true; - return new CombinationTestSupport[] {this}; - } - - } - - private void expandCombinations(List<ComboOption> optionsLeft, List<HashMap<String, Object>> expandedCombos) { - if (!optionsLeft.isEmpty()) { - HashMap<String, Object> map; - if (comboOptions.size() == optionsLeft.size()) { - map = new HashMap<String, Object>(); - expandedCombos.add(map); - } else { - map = expandedCombos.get(expandedCombos.size() - 1); - } - - LinkedList<ComboOption> l = new LinkedList<ComboOption>(optionsLeft); - ComboOption comboOption = l.removeLast(); - int i = 0; - for (Iterator<Object> iter = comboOption.values.iterator(); iter.hasNext();) { - Object value = iter.next(); - if (i != 0) { - map = new HashMap<String, Object>(map); - expandedCombos.add(map); - } - map.put(comboOption.attribute, value); - expandCombinations(l, expandedCombos); - i++; - } - } - } - - public static Test suite(Class<? extends CombinationTestSupport> clazz) { - TestSuite suite = new TestSuite(); - - ArrayList<String> names = new ArrayList<String>(); - Method[] methods = clazz.getMethods(); - for (int i = 0; i < methods.length; i++) { - String name = methods[i].getName(); - if (names.contains(name) || !isPublicTestMethod(methods[i])) { - continue; - } - names.add(name); - Test test = TestSuite.createTest(clazz, name); - if (test instanceof CombinationTestSupport) { - CombinationTestSupport[] combinations = ((CombinationTestSupport)test).getCombinations(); - for (int j = 0; j < combinations.length; j++) { - suite.addTest(combinations[j]); - } - } else { - suite.addTest(test); - } - } - return suite; - } - - private static boolean isPublicTestMethod(Method m) { - return isTestMethod(m) && Modifier.isPublic(m.getModifiers()); - } - - private static boolean isTestMethod(Method m) { - String name = m.getName(); - Class<?>[] parameters = m.getParameterTypes(); - Class<?> returnType = m.getReturnType(); - return parameters.length == 0 && name.startsWith("test") && returnType.equals(Void.TYPE); - } - - public String getName() { - return getName(false); - } - - public String getName(boolean original) { - if (options != null && !original) { - return super.getName() + " " + options; - } - return super.getName(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java deleted file mode 100644 index e1e85e5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConnectionCleanupTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.JMSException; -import javax.jms.Session; - -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; - -public class ConnectionCleanupTest extends JmsTestBase { - - private HedwigConnectionImpl connection; - private HedwigConnectionFactoryImpl factory; - - protected void setUp() throws Exception { - super.setUp(); - this.factory = new HedwigConnectionFactoryImpl(); - connection = factory.createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - connection.close(); - super.tearDown(); - } - - /** - * @throws JMSException - */ - public void testChangeClientID() throws JMSException { - - connection.setClientID("test"); - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - try { - connection.setClientID("test"); - fail("Should have received JMSException"); - } catch (JMSException e) { - } - - connection.close(); - connection = factory.createConnection(); - connection.setClientID("test"); - - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - try { - connection.setClientID("test"); - fail("Should have received JMSException"); - } catch (JMSException e) { - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java deleted file mode 100644 index 93e0615..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -public class ConsumerReceiveWithTimeoutTest extends TestSupport { - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Test to check if consumer thread wakes up inside a receive(timeout) after - * a message is dispatched to the consumer - * - * @throws javax.jms.JMSException - */ - public void testConsumerReceiveBeforeMessageDispatched() throws JMSException { - - connection.start(); - - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Topic queue = session.createTopic("test"); - MessageConsumer consumer = session.createConsumer(queue); - - Thread t = new Thread() { - public void run() { - try { - // wait for 10 seconds to allow consumer.receive to be run - // first - Thread.sleep(10000); - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage("Hello")); - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - - t.start(); - - // Consume the message... - Message msg = consumer.receive(60000); - assertNotNull(msg); - session.close(); - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java deleted file mode 100644 index 6856837..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/ExpiryHogTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.util.concurrent.TimeUnit; -import javax.jms.ConnectionFactory; -import javax.jms.Session; -import javax.jms.TextMessage; - - - - - -/** - * User: gtully - */ -public class ExpiryHogTest extends JmsMultipleClientsTestSupport { - boolean sleep = false; - - int numMessages = 4; - - public void testImmediateDispatchWhenCacheDisabled() throws Exception { - ConnectionFactory f = createConnectionFactory(); - destination = createDestination(); - startConsumers(f, destination); - sleep = true; - this.startProducers(f, destination, numMessages); - allMessagesList.assertMessagesReceived(numMessages); - } - - protected TextMessage createTextMessage(Session session, String initText) throws Exception { - if (sleep) { - TimeUnit.SECONDS.sleep(10); - } - TextMessage msg = super.createTextMessage(session, initText); - // what is the point of setting this ! - // msg.setJMSExpiration(4000); - return msg; - } - - @Override - protected void setUp() throws Exception { - autoFail = false; - persistent = true; - super.setUp(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java deleted file mode 100644 index 408b40e..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ /dev/null @@ -1,936 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - - - - -import junit.framework.Test; - - -import javax.jms.Destination; - -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test cases used to test the JMS message consumer. - */ -public class JMSConsumerTest extends JmsTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class); - - public Destination destination; - public int deliveryMode; - public int prefetch; - public int ackMode; - public MessagingSessionFacade.DestinationType destinationType; - public boolean durableConsumer; - - public static Test suite() { - return suite(JMSConsumerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - public void initCombosForTestMessageListenerWithConsumerCanBeStopped() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), - Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new MessagingSessionFacade.DestinationType[] { - MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMessageListenerWithConsumerCanBeStopped() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch done1 = new CountDownLatch(1); - final CountDownLatch done2 = new CountDownLatch(1); - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - counter.incrementAndGet(); - if (counter.get() == 1) { - done1.countDown(); - } - if (counter.get() == 2) { - done2.countDown(); - } - } - }); - - // Send a first message to make sure that the consumer dispatcher is - // running - sendMessages(session, destination, 1); - assertTrue(done1.await(1, TimeUnit.SECONDS)); - assertEquals(1, counter.get()); - - // Stop the consumer. - connection.stop(); - - // Send a message, but should not get delivered. - sendMessages(session, destination, 1); - assertFalse(done2.await(1, TimeUnit.SECONDS)); - assertEquals(1, counter.get()); - - // Start the consumer, and the message should now get delivered. - connection.start(); - assertTrue(done2.await(1, TimeUnit.SECONDS)); - assertEquals(2, counter.get()); - } - - public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch closeDone = new CountDownLatch(1); - - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC); - - final Map<Thread, Throwable> exceptions = - Collections.synchronizedMap(new HashMap<Thread, Throwable>()); - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Uncaught exception:", e); - exceptions.put(t, e); - } - }); - - final int numOutStanding = (connection.getHedwigClientConfig().getMaximumOutstandingMessages() * 2 / 3) + 1; - - final MessageConsumer consumer = (MessageConsumer)session.createConsumer(destination); - - final class AckAndClose implements Runnable { - private Message message; - - public AckAndClose(Message m) { - this.message = m; - } - - public void run() { - try { - message.acknowledge(); - int count = counter.incrementAndGet(); - if (590 == count) { - // close in a separate thread is ok by jms - consumer.close(); - closeDone.countDown(); - } - } catch (Exception e) { - LOG.error("Exception on close or ack:", e); - exceptions.put(Thread.currentThread(), e); - } - } - }; - - final AtomicInteger listenerReceivedCount = new AtomicInteger(0); - // final ExecutorService executor = Executors.newSingleThreadExecutor(); - final ExecutorService executor = Executors.newCachedThreadPool(); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - // close can be in a different thread, but NOT acknowledge iirc - // - this will not cause a problem for us though ... - // ack and close eventually in separate thread - int val = listenerReceivedCount.incrementAndGet(); - // System.out.println("message count : " + val + ", message : " + m); - executor.execute(new AckAndClose(m)); - // new AckAndClose(m).run(); - } - }); - - // preload the queue - sendMessages(session, destination, 600); - - assert closeDone.await(10, TimeUnit.SECONDS) : - "closeDone : " + closeDone.getCount() + ", counter : " + counter.get() - + ", listenerReceivedCount : " + listenerReceivedCount.get(); - // await possible exceptions - Thread.sleep(1000); - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } - - - public void initCombosForTestMutiReceiveWithPrefetch1() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("ackMode", new Object[] { - Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), - Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMutiReceiveWithPrefetch1() throws Exception { - - // Set prefetch to 1 - connection.start(); - - // Use all the ack modes - Session session = connection.createSession(false, ackMode); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - sendMessages(session, destination, 4); - - // Make sure 4 messages were delivered. - Message message = null; - for (int i = 0; i < 4; i++) { - message = consumer.receive(1000); - assertNotNull(message); - } - assertNull(consumer.receiveNoWait()); - assert null != message; - message.acknowledge(); - } - - public void initCombosForTestDurableConsumerSelectorChange() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testDurableConsumerSelectorChange() throws Exception { - - // Receive a message with the JMS API - if (null == connection.getClientID()) connection.setClientID(getName() + "test"); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false); - - // Send the messages - TextMessage message = session.createTextMessage("1st"); - message.setStringProperty("color", "red"); - producer.send(message); - - Message m = consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", ((TextMessage)m).getText()); - - // Change the subscription. - consumer.close(); - consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false); - - message = session.createTextMessage("2nd"); - message.setStringProperty("color", "red"); - producer.send(message); - message = session.createTextMessage("3rd"); - message.setStringProperty("color", "blue"); - producer.send(message); - - // Selector should skip the 2nd message. - m = consumer.receive(1000); - assertNotNull(m); - assertEquals("3rd", ((TextMessage)m).getText()); - - assertNull(consumer.receiveNoWait()); - } - - public void initCombosForTestSendReceiveBytesMessage() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testSendReceiveBytesMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - BytesMessage message = session.createBytesMessage(); - message.writeBoolean(true); - message.writeBoolean(false); - producer.send(message); - - // Make sure only 1 message was delivered. - BytesMessage m = (BytesMessage)consumer.receive(1000); - assertNotNull(m); - assertTrue(m.readBoolean()); - assertFalse(m.readBoolean()); - - assertNull(consumer.receiveNoWait()); - } - - public void initCombosForTestSetMessageListenerAfterStart() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testSetMessageListenerAfterStart() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch done = new CountDownLatch(1); - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - - // See if the message get sent to the listener - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - counter.incrementAndGet(); - if (counter.get() == 4) { - done.countDown(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - - assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); - Thread.sleep(200); - - // Make sure only 4 messages were delivered. - assertEquals(4, counter.get()); - } - - public void initCombosForTestPassMessageListenerIntoCreateConsumer() { - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testPassMessageListenerIntoCreateConsumer() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch done = new CountDownLatch(1); - - // Receive a message with the JMS API - connection.start(); - SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - counter.incrementAndGet(); - if (counter.get() == 4) { - done.countDown(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - - assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); - Thread.sleep(200); - - // Make sure only 4 messages were delivered. - assertEquals(4, counter.get()); - } - - public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception { - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch sendDone = new CountDownLatch(1); - final CountDownLatch got2Done = new CountDownLatch(1); - - // Set prefetch to 1 - // This test case does not work if optimized message dispatch is used as - // the main thread send block until the consumer receives the - // message. This test depends on thread decoupling so that the main - // thread can stop the consumer thread. - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1"); - connection.start(); - - // Use all the ack modes - Session session = connection.createSession(false, ackMode); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1"); - consumer.setMessageListener(new MessageListener() { - private final HedwigConnectionImpl _connection = connection; - public void onMessage(Message m) { - try { - TextMessage tm = (TextMessage)m; - LOG.info("Got in first listener: " + tm.getText()); - assertEquals(messageTextPrefix + counter.get(), tm.getText()); - counter.incrementAndGet(); - if (counter.get() == 2) { - sendDone.await(); - _connection.close(); - got2Done.countDown(); - } - // will fail when we close connection when counter == 2 ! - tm.acknowledge(); - } catch (Throwable e) { - // e.printStackTrace(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - sendDone.countDown(); - - // Wait for first 2 messages to arrive. - assert got2Done.await(5, TimeUnit.SECONDS) : - "counter1 : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount(); - - // Re-start connection. - connection.close(); - connection = (HedwigConnectionImpl)factory.createConnection(); - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-1"); - connections.add(connection); - - // Pickup the remaining messages. - final CountDownLatch done2 = new CountDownLatch(1); - session = connection.createSession(false, ackMode); - consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1"); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - try { - TextMessage tm = (TextMessage)m; - LOG.info("Got in second listener: " + tm.getText()); - // order is not guaranteed as the connection is started before the listener is set. - // assertEquals(messageTextPrefix + counter.get(), tm.getText()); - counter.incrementAndGet(); - tm.acknowledge(); - if (counter.get() == 4) { - done2.countDown(); - } - } catch (Throwable e) { - LOG.error("unexpected ex onMessage: ", e); - } - } - }); - - connection.start(); - - assert done2.await(2000, TimeUnit.MILLISECONDS) : - "count2 : " + done2.getCount() + ", counter : " + counter.get(); - Thread.sleep(200); - - // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode - assert 5 == counter.get(): "count3 : " + done2.getCount() + ", counter : " + counter.get(); - } - - public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("ackMode", new Object[] { - Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception { - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch sendDone = new CountDownLatch(1); - final CountDownLatch got2Done = new CountDownLatch(1); - - // Set prefetch to 1 - // This test case does not work if optimized message dispatch is used as - // the main thread send block until the consumer receives the - // message. This test depends on thread decoupling so that the main - // thread can stop the consumer thread. - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2"); - connection.start(); - - // Use all the ack modes - Session session = connection.createSession(false, ackMode); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2"); - final List<Message> receivedMessages = new ArrayList<Message>(8); - consumer.setMessageListener(new MessageListener() { - final HedwigConnectionImpl _connection = connection; - public void onMessage(Message m) { - try { - TextMessage tm = (TextMessage)m; - LOG.info("Got in first listener: " + tm.getText()); - assertEquals(messageTextPrefix + counter.get(), tm.getText()); - counter.incrementAndGet(); - m.acknowledge(); - receivedMessages.add(m); - if (counter.get() == 2) { - sendDone.await(); - _connection.close(); - got2Done.countDown(); - } - } catch (Throwable e) { - e.printStackTrace(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - sendDone.countDown(); - - // Wait for first 2 messages to arrive. - assert got2Done.await(5, TimeUnit.SECONDS) : - "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " + sendDone.getCount(); - - // Re-start connection. - connection.close(); - connection = (HedwigConnectionImpl)factory.createConnection(); - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-2"); - connections.add(connection); - - // Pickup the remaining messages. - final CountDownLatch done2 = new CountDownLatch(1); - session = connection.createSession(false, ackMode); - consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2"); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - try { - TextMessage tm = (TextMessage)m; - LOG.info("Got in second listener: " + tm.getText()); - counter.incrementAndGet(); - m.acknowledge(); - receivedMessages.add(m); - if (counter.get() == 4) { - done2.countDown(); - } - } catch (Throwable e) { - LOG.error("unexpected ex onMessage: ", e); - } - } - }); - - connection.start(); - - assert done2.await(5, TimeUnit.SECONDS) : "count : " + done2.getCount() + ", counter : " + counter.get(); - Thread.sleep(200); - - // close from onMessage with Auto_ack will ack - // Make sure only 4 messages were delivered. - assert 4 == counter.get() : - "counter : " + counter.get() + ", got2Done : " + got2Done.getCount() + ", sendDone : " - + sendDone.getCount() + ", messages : " + receivedMessages; - } - - public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMessageListenerWithConsumerWithPrefetch1() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch done = new CountDownLatch(1); - - // Receive a message with the JMS API - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - counter.incrementAndGet(); - if (counter.get() == 4) { - done.countDown(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - - assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); - Thread.sleep(200); - - // Make sure only 4 messages were delivered. - assertEquals(4, counter.get()); - } - - public void initCombosForTestMessageListenerWithConsumer() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testMessageListenerWithConsumer() throws Exception { - - final AtomicInteger counter = new AtomicInteger(0); - final CountDownLatch done = new CountDownLatch(1); - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message m) { - counter.incrementAndGet(); - if (counter.get() == 4) { - done.countDown(); - } - } - }); - - // Send the messages - sendMessages(session, destination, 4); - - assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); - Thread.sleep(200); - - // Make sure only 4 messages were delivered. - assertEquals(4, counter.get()); - } - - public void initCombosForTestUnackedWithPrefetch1StayInQueue() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("ackMode", new Object[] { - Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), - Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testUnackedWithPrefetch1StayInQueue() throws Exception { - - // Set prefetch to 1 - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3"); - connection.start(); - - // Use all the ack modes - Session session = connection.createSession(false, ackMode); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3"); - - // Send the messages - sendMessages(session, destination, 4); - - // Only pick up the first 2 messages. - Message message = null; - for (int i = 0; i < 2; i++) { - message = consumer.receive(1000); - assertNotNull(message); - assert (message instanceof TextMessage); - assert (((TextMessage) message).getText().equals(messageTextPrefix + i)) - : "Received message " + ((TextMessage) message).getText() + " .. i = " + i; - } - assert null != message; - message.acknowledge(); - - connection.close(); - connection = (HedwigConnectionImpl)factory.createConnection(); - if (null == connection.getClientID()) connection.setClientID(getName() + "test-client-id-3"); - // Use all the ack modes - session = connection.createSession(false, ackMode); - consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id3"); - connections.add(connection); - connection.start(); - - // Pickup the rest of the messages. - for (int i = 0; i < 2; i++) { - message = consumer.receive(1000); - assertNotNull(message); - assert (message instanceof TextMessage); - assert (((TextMessage) message).getText().equals(messageTextPrefix + (i + 2))) : - "Received message " + ((TextMessage) message).getText() + " .. i = " + i; - } - message.acknowledge(); - // assertNull(consumer.receiveNoWait()); - { - Message msg = consumer.receiveNoWait(); - assert null == msg : "Unexpected message " + msg; - } - - } - - public void initCombosForTestPrefetch1MessageNotDispatched() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - } - - public void testPrefetch1MessageNotDispatched() throws Exception { - - // Set prefetch to 1 - connection.start(); - - Session session = connection.createSession(true, 0); - destination = SessionImpl.asTopic("TEST"); - MessageConsumer consumer = session.createConsumer(destination); - - // The prefetch should fill up with 1 message. - // Since prefetch is still full, the 2nd message should get dispatched - // to another consumer.. lets create the 2nd consumer test that it does - // make sure it does. - HedwigConnectionImpl connection2 = (HedwigConnectionImpl)factory.createConnection(); - connection2.start(); - connections.add(connection2); - Session session2 = connection2.createSession(true, 0); - MessageConsumer consumer2 = session2.createConsumer(destination); - - // Send 2 messages to the destination. - sendMessages(session, destination, 2); - session.commit(); - - // Pick up the first message. - Message message1 = consumer.receive(1000); - assertNotNull(message1); - assertNotNull(consumer.receive(1000)); - - // Pick up the 2nd messages. - Message message2 = consumer2.receive(5000); - assertNotNull(message2); - assertNotNull(consumer2.receive(1000)); - - session.commit(); - session2.commit(); - - assertNull(consumer.receiveNoWait()); - - } - - public void initCombosForTestDontStart() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); - addCombinationValues("destinationType", new Object[] { MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testDontStart() throws Exception { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - sendMessages(session, destination, 1); - - // Make sure no messages were delivered. - assertNull(consumer.receive(1000)); - } - - public void initCombosForTestStartAfterSend() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testStartAfterSend() throws Exception { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - sendMessages(session, destination, 1); - - // Start the conncection after the message was sent. - connection.start(); - - // Make sure only 1 message was delivered. - assertNotNull(consumer.receive(1000)); - assertNull(consumer.receiveNoWait()); - } - - public void initCombosForTestReceiveMessageWithConsumer() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testReceiveMessageWithConsumer() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - sendMessages(session, destination, 1); - - // Make sure only 1 message was delivered. - Message m = consumer.receive(1000); - assertNotNull(m); - assertEquals("0", ((TextMessage)m).getText()); - assertNull(consumer.receiveNoWait()); - } - - - public void testDupsOkConsumer() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4"); - - // Send the messages - sendMessages(session, destination, 4); - - // Make sure only 4 message are delivered. - for( int i=0; i < 4; i++){ - Message m = consumer.receive(1000); - assertNotNull(m); - } - assertNull(consumer.receive(1000)); - - // Close out the consumer.. no other messages should be left on the queue. - consumer.close(); - - consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id4"); - assertNull(consumer.receive(1000)); - } - - public void testRedispatchOfUncommittedTx() throws Exception { - - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id2"); - - sendMessages(connection, destination, 2); - - assertNotNull(consumer.receive(1000)); - assertNotNull(consumer.receive(1000)); - - // install another consumer while message dispatch is unacked/uncommitted - - // no commit so will auto rollback and get re-dispatched to redisptachConsumer - session.close(); - - Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer redispatchConsumer - = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id2"); - - Message msg = redispatchConsumer.receive(1000); - assertNotNull(msg); - // assertTrue("redelivered flag set", msg.getJMSRedelivered()); - - msg = redispatchConsumer.receive(1000); - assertNotNull(msg); - // assertTrue(msg.getJMSRedelivered()); - redispatchSession.commit(); - - assertNull(redispatchConsumer.receive(500)); - redispatchSession.close(); - } - - - public void testRedispatchOfRolledbackTx() throws Exception { - - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - destination = createDestination(session, MessagingSessionFacade.DestinationType.TOPIC); - MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "subscriber-id1"); - - sendMessages(connection, destination, 2); - - assertNotNull(consumer.receive(1000)); - assertNotNull(consumer.receive(1000)); - - // install another consumer while message dispatch is unacked/uncommitted - - session.rollback(); - session.close(); - - Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer redispatchConsumer - = redispatchSession.createDurableSubscriber((Topic) destination, "subscriber-id1"); - - Message msg = redispatchConsumer.receive(1000); - assertNotNull(msg); - // assertTrue(msg.getJMSRedelivered()); - msg = redispatchConsumer.receive(1000); - assertNotNull(msg); - // assertTrue(msg.getJMSRedelivered()); - redispatchSession.commit(); - - assertNull(redispatchConsumer.receive(500)); - redispatchSession.close(); - } - - public void initCombosForTestAckOfExpired() { - addCombinationValues("destinationType", - new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testAckOfExpired() throws Exception { - HedwigConnectionFactoryImpl fact = new HedwigConnectionFactoryImpl(); - connection = fact.createConnection(); - - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = (Destination) (destinationType == MessagingSessionFacade.DestinationType.QUEUE ? - session.createTopic("test") : session.createTopic("test")); - - MessageConsumer consumer = session.createConsumer(destination); - - Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(destination); - final int count = 4; - - - // producer.setTimeToLive(0); - for (int i = 0; i < count; i++) { - TextMessage message = sendSession.createTextMessage("no expiry" + i); - producer.send(message); - } - - MessageConsumer amqConsumer = (MessageConsumer) consumer; - - for(int i=0; i<count; i++) { - TextMessage msg = (TextMessage) amqConsumer.receive(); - assertNotNull(msg); - assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry")); - - // force an ack when there are expired messages - msg.acknowledge(); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java deleted file mode 100644 index 343919a..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest { - - private static final Logger LOG = LoggerFactory.getLogger(JMSDurableTopicRedeliverTest.class); - - protected void setUp() throws Exception { - durable = true; - super.setUp(); - } - - /** - * Sends and consumes the messages. - * - * @throws Exception - */ - public void testRedeliverNewSession() throws Exception { - String text = "TEST: " + System.currentTimeMillis(); - Message sendMessage = session.createTextMessage(text); - - if (verbose) { - LOG.info("About to send a message: " + sendMessage + " with text: " + text); - } - producer.send(producerDestination, sendMessage); - - // receive but don't acknowledge - Message unackMessage = consumer.receive(1000); - assertNotNull(unackMessage); - String unackId = unackMessage.getJMSMessageID(); - assertEquals(((TextMessage)unackMessage).getText(), text); - assertFalse(unackMessage.getJMSRedelivered()); - consumeSession.close(); - consumer.close(); - - // receive then acknowledge - consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = createConsumer(); - Message ackMessage = consumer.receive(1000); - assertNotNull(ackMessage); - ackMessage.acknowledge(); - String ackId = ackMessage.getJMSMessageID(); - assertEquals(((TextMessage)ackMessage).getText(), text); - // assertTrue(ackMessage.getJMSRedelivered()); - assertEquals(unackId, ackId); - consumeSession.close(); - consumer.close(); - - consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = createConsumer(); - assertNull(consumer.receive(1000)); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java deleted file mode 100644 index 24551f3..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSIndividualAckTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -/** - * - */ -public class JMSIndividualAckTest extends TestSupport { - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Tests if acknowledged messages are being consumed. - * - * @throws JMSException - */ - public void testAckedMessageAreConsumed() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - msg.acknowledge(); - - // Reset the session. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } - - /** - * Tests if acknowledged messages are being consumed. - * - * @throws JMSException - */ - // This test cant, unfortunately, pass - //- in hedwig, acknowledge is a ACKNOWLEDGE UNTIL. So the last ack will ack all messages until then ... - /* - public void testLastMessageAcked() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - TextMessage msg1 = session.createTextMessage("msg1"); - TextMessage msg2 = session.createTextMessage("msg2"); - TextMessage msg3 = session.createTextMessage("msg3"); - producer.send(msg1); - producer.send(msg2); - producer.send(msg3); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - msg = consumer.receive(1000); - assertNotNull(msg); - msg = consumer.receive(1000); - assertNotNull(msg); - msg.acknowledge(); - - // Reset the session. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - msg = consumer.receive(1000); - assertNotNull(msg); - assertEquals(msg1,msg); - msg = consumer.receive(1000); - assertNotNull(msg); - assertEquals(msg2,msg); - msg = consumer.receive(1000); - assertNull(msg); - session.close(); - } - */ - - /** - * Tests if unacknowledged messages are being re-delivered when the consumer connects again. - * - * @throws JMSException - */ - public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3"); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - // Don't ack the message. - - // Reset the session. This should cause the unacknowledged message to be re-delivered. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id3"); - msg = consumer.receive(2000); - assertNotNull(msg); - msg.acknowledge(); - - session.close(); - } - - protected String getQueueName() { - return getClass().getName() + "." + getName(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java deleted file mode 100644 index cee1698..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSMessageTest.java +++ /dev/null @@ -1,504 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.net.URISyntaxException; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Vector; - -import javax.jms.BytesMessage; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import junit.framework.Test; -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.message.StreamMessageImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import javax.jms.Destination; - -/** - * Test cases used to test the JMS message consumer. - */ -public class JMSMessageTest extends JmsTestSupport { - - public Destination destination; - public int deliveryMode = DeliveryMode.NON_PERSISTENT; - public int prefetch; - public int ackMode; - public MessagingSessionFacade.DestinationType destinationType = MessagingSessionFacade.DestinationType.TOPIC; - public boolean durableConsumer; - - /** - * Run all these tests in both marshaling and non-marshaling mode. - */ - public void initCombos() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), - Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testTextMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // Send the message. - { - TextMessage message = session.createTextMessage(); - message.setText("Hi"); - producer.send(message); - } - - // Check the Message - { - TextMessage message = (TextMessage)consumer.receive(1000); - assertNotNull(message); - assertEquals("Hi", message.getText()); - } - - assertNull(consumer.receiveNoWait()); - } - - public static Test suite() { - return suite(JMSMessageTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - protected ConnectionFactory createConnectionFactory() throws URISyntaxException { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - return factory; - } - - public void testBytesMessageLength() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // Send the message - { - BytesMessage message = session.createBytesMessage(); - message.writeInt(1); - message.writeInt(2); - message.writeInt(3); - message.writeInt(4); - producer.send(message); - } - - // Check the message. - { - BytesMessage message = (BytesMessage)consumer.receive(1000); - assertNotNull(message); - assertEquals(16, message.getBodyLength()); - } - - assertNull(consumer.receiveNoWait()); - } - - public void testObjectMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // send the message. - { - ObjectMessage message = session.createObjectMessage(); - message.setObject("Hi"); - producer.send(message); - } - - // Check the message - { - ObjectMessage message = (ObjectMessage)consumer.receive(1000); - assertNotNull(message); - assertEquals("Hi", message.getObject()); - } - assertNull(consumer.receiveNoWait()); - } - - public void testBytesMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // Send the message - { - BytesMessage message = session.createBytesMessage(); - message.writeBoolean(true); - producer.send(message); - } - - // Check the message - { - BytesMessage message = (BytesMessage)consumer.receive(1000); - assertNotNull(message); - assertTrue(message.readBoolean()); - - try { - message.readByte(); - fail("Expected exception not thrown."); - } catch (MessageEOFException e) { - } - - } - assertNull(consumer.receiveNoWait()); - } - - public void testStreamMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // Send the message. - { - StreamMessage message = session.createStreamMessage(); - message.writeString("This is a test to see how it works."); - producer.send(message); - } - - // Check the message. - { - StreamMessage message = (StreamMessage)consumer.receive(1000); - assertNotNull(message); - - // Invalid conversion should throw exception and not move the stream - // position. - try { - message.readByte(); - fail("Should have received NumberFormatException"); - } catch (NumberFormatException e) { - } catch (MessageFormatException e) { - } - - assertEquals("This is a test to see how it works.", message.readString()); - - // Invalid conversion should throw exception and not move the stream - // position. - try { - message.readByte(); - fail("Should have received MessageEOFException"); - } catch (MessageEOFException e) { - } - } - assertNull(consumer.receiveNoWait()); - } - - public void testMapMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // send the message. - { - MapMessage message = session.createMapMessage(); - message.setBoolean("boolKey", true); - producer.send(message); - } - - // get the message. - { - MapMessage message = (MapMessage)consumer.receive(1000); - assertNotNull(message); - assertTrue(message.getBoolean("boolKey")); - } - assertNull(consumer.receiveNoWait()); - } - - static class ForeignMessage implements TextMessage { - - public int deliveryMode; - - private String messageId; - private long timestamp; - private String correlationId; - private Destination replyTo; - private Destination destination; - private boolean redelivered; - private String type; - private long expiration; - private int priority; - private String text; - private HashMap<String, Object> props = new HashMap<String, Object>(); - - public String getJMSMessageID() throws JMSException { - return messageId; - } - - public void setJMSMessageID(String arg0) throws JMSException { - messageId = arg0; - } - - public long getJMSTimestamp() throws JMSException { - return timestamp; - } - - public void setJMSTimestamp(long arg0) throws JMSException { - timestamp = arg0; - } - - public byte[] getJMSCorrelationIDAsBytes() throws JMSException { - return null; - } - - public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException { - } - - public void setJMSCorrelationID(String arg0) throws JMSException { - correlationId = arg0; - } - - public String getJMSCorrelationID() throws JMSException { - return correlationId; - } - - public Destination getJMSReplyTo() throws JMSException { - return replyTo; - } - - public void setJMSReplyTo(Destination arg0) throws JMSException { - replyTo = arg0; - } - - public Destination getJMSDestination() throws JMSException { - return destination; - } - - public void setJMSDestination(Destination arg0) throws JMSException { - destination = arg0; - } - - public int getJMSDeliveryMode() throws JMSException { - return deliveryMode; - } - - public void setJMSDeliveryMode(int arg0) throws JMSException { - deliveryMode = arg0; - } - - public boolean getJMSRedelivered() throws JMSException { - return redelivered; - } - - public void setJMSRedelivered(boolean arg0) throws JMSException { - redelivered = arg0; - } - - public String getJMSType() throws JMSException { - return type; - } - - public void setJMSType(String arg0) throws JMSException { - type = arg0; - } - - public long getJMSExpiration() throws JMSException { - return expiration; - } - - public void setJMSExpiration(long arg0) throws JMSException { - expiration = arg0; - } - - public int getJMSPriority() throws JMSException { - return priority; - } - - public void setJMSPriority(int arg0) throws JMSException { - priority = arg0; - } - - public void clearProperties() throws JMSException { - } - - public boolean propertyExists(String arg0) throws JMSException { - return false; - } - - public boolean getBooleanProperty(String arg0) throws JMSException { - return false; - } - - public byte getByteProperty(String arg0) throws JMSException { - return 0; - } - - public short getShortProperty(String arg0) throws JMSException { - return 0; - } - - public int getIntProperty(String arg0) throws JMSException { - return 0; - } - - public long getLongProperty(String arg0) throws JMSException { - return 0; - } - - public float getFloatProperty(String arg0) throws JMSException { - return 0; - } - - public double getDoubleProperty(String arg0) throws JMSException { - return 0; - } - - public String getStringProperty(String arg0) throws JMSException { - return (String)props.get(arg0); - } - - public Object getObjectProperty(String arg0) throws JMSException { - return props.get(arg0); - } - - public Enumeration getPropertyNames() throws JMSException { - return new Vector<String>(props.keySet()).elements(); - } - - public void setBooleanProperty(String arg0, boolean arg1) throws JMSException { - } - - public void setByteProperty(String arg0, byte arg1) throws JMSException { - } - - public void setShortProperty(String arg0, short arg1) throws JMSException { - } - - public void setIntProperty(String arg0, int arg1) throws JMSException { - } - - public void setLongProperty(String arg0, long arg1) throws JMSException { - } - - public void setFloatProperty(String arg0, float arg1) throws JMSException { - } - - public void setDoubleProperty(String arg0, double arg1) throws JMSException { - } - - public void setStringProperty(String arg0, String arg1) throws JMSException { - props.put(arg0, arg1); - } - - public void setObjectProperty(String arg0, Object arg1) throws JMSException { - props.put(arg0, arg1); - } - - public void acknowledge() throws JMSException { - } - - public void clearBody() throws JMSException { - } - - public void setText(String arg0) throws JMSException { - text = arg0; - } - - public String getText() throws JMSException { - return text; - } - } - - public void testForeignMessage() throws Exception { - - // Receive a message with the JMS API - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - // Send the message. - { - ForeignMessage message = new ForeignMessage(); - message.text = "Hello"; - message.setStringProperty("test", "value"); - // long timeToLive = 10000L; - long timeToLive = 0L; - long start = System.currentTimeMillis(); - producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive); - long end = System.currentTimeMillis(); - - - //validate jms spec 1.1 section 3.4.11 table 3.1 - // JMSDestination, JMSDeliveryMode, JMSPriority, JMSMessageID, and JMSTimestamp - //must be set by sending a message. - - // This is NOT specified in the spec ! - // exception for jms destination as the format is provider defined so it is only set on the copy - // assertNull(message.getJMSDestination()); - - assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode()); - // assertTrue(start + timeToLive <= message.getJMSExpiration()); - // assertTrue(end + timeToLive >= message.getJMSExpiration()); - assertEquals(7, message.getJMSPriority()); - assertNotNull(message.getJMSMessageID()); - assertTrue(start <= message.getJMSTimestamp()); - assertTrue(end >= message.getJMSTimestamp()); - } - - // Validate message is OK. - { - TextMessage message = (TextMessage)consumer.receive(1000); - assertNotNull(message); - assertEquals("Hello", message.getText()); - assertEquals("value", message.getStringProperty("test")); - } - - assertNull(consumer.receiveNoWait()); - } - -}