OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61c646c3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61c646c3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61c646c3 Branch: refs/heads/master Commit: 61c646c332e6129f3502ee235e5e6d28fe55addd Parents: 117153a Author: Gezapeti Cseh <gezap...@apache.org> Authored: Wed May 16 13:52:57 2018 +0200 Committer: Gezapeti Cseh <gezap...@apache.org> Committed: Wed May 16 13:52:57 2018 +0200 ---------------------------------------------------------------------- .../oozie/service/TestJMSAccessorService.java | 267 ++++++++++--------- release-log.txt | 1 + 2 files changed, 137 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java index 41241d2..dbf892e 100644 --- a/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java +++ b/core/src/test/java/org/apache/oozie/service/TestJMSAccessorService.java @@ -19,6 +19,7 @@ package org.apache.oozie.service; import java.net.URI; +import java.net.URISyntaxException; import java.util.Random; import javax.jms.Session; @@ -36,6 +37,7 @@ import org.junit.Test; public class TestJMSAccessorService extends XTestCase { private Services services; private static Random random = new Random(); + private static final int JMS_TIMEOUT_MS = 5000; @Override protected void setUp() throws Exception { @@ -66,89 +68,63 @@ public class TestJMSAccessorService extends XTestCase { } @Test - public void testRegisterSingleConsumerPerTopic() { - - try { - HCatAccessorService hcatService = services.get(HCatAccessorService.class); - JMSAccessorService jmsService = services.get(JMSAccessorService.class); - String server = "hcat.server.com:5080"; - String topic = "hcat.mydb.mytable"; - - JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); - jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); + public void testRegisterSingleConsumerPerTopic() throws URISyntaxException { + HCatAccessorService hcatService = services.get(HCatAccessorService.class); + JMSAccessorService jmsService = services.get(JMSAccessorService.class); + String server = "hcat.server.com:5080"; + String topic = "hcat.mydb.mytable"; - MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic); - jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); + JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); + jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); - MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic); - assertEquals(receiver1, receiver2); - } - catch (Exception e) { - e.printStackTrace(); - fail("Exception encountered : " + e); - } + MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic); + jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); + MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic); + assertEquals(receiver1, receiver2); } @Test - public void testUnRegisterTopic() { - - try { - HCatAccessorService hcatService = services.get(HCatAccessorService.class); - JMSAccessorService jmsService = services.get(JMSAccessorService.class); - String server = "hcat.server.com:5080"; - String topic = "hcatalog.mydb.mytable"; - - JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); - jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); + public void testUnRegisterTopic() throws URISyntaxException { + HCatAccessorService hcatService = services.get(HCatAccessorService.class); + JMSAccessorService jmsService = services.get(JMSAccessorService.class); + String server = "hcat.server.com:5080"; + String topic = "hcatalog.mydb.mytable"; - MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic); - assertNotNull(receiver1); + JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); + jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server)); - jmsService.unregisterFromNotification(connInfo, topic); + MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic); + assertNotNull(receiver1); - receiver1 = jmsService.getMessageReceiver(connInfo, topic); - assertEquals(null, receiver1); - } - catch (Exception e) { - e.printStackTrace(); - fail("Exception encountered : " + e); - } + jmsService.unregisterFromNotification(connInfo, topic); + receiver1 = jmsService.getMessageReceiver(connInfo, topic); + assertEquals(null, receiver1); } @Test - public void testConnectionContext() throws ServiceException { - try { - services.destroy(); - services = super.setupServicesForHCatalog(); - Configuration conf = services.getConf(); - // set the connection factory name - String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" + - "org.apache.activemq.jndi.ActiveMQInitialContextFactory" + - ";java.naming.provider.url#vm://localhost?broker.persistent=false;" + - "connectionFactoryNames#dynamicFactories/hcat.prod.${1}"; - conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL); - services.init(); - HCatAccessorService hcatService = services.get(HCatAccessorService.class); - JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020")); - assertEquals( - "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" + - "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver", - connInfo.getJNDIPropertiesString()); - - ConnectionContext ctx1 = new DefaultConnectionContext(); - ctx1.createConnection(connInfo.getJNDIProperties()); - BrokerService broker = new BrokerService(); - broker.setDataDirectory(getTestCaseDir()); - // Without this stop testConnectionRetry fails with - // javax.management.InstanceAlreadyExistsException: org.apache.activemq:BrokerName=localhost,Type=Broker - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - fail("Unexpected exception " + e); - } + public void testConnectionContext() throws Exception { + services.destroy(); + services = super.setupServicesForHCatalog(); + Configuration conf = services.getConf(); + // set the connection factory name + String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" + + "org.apache.activemq.jndi.ActiveMQInitialContextFactory" + + ";java.naming.provider.url#vm://localhost?broker.persistent=false;" + + "connectionFactoryNames#dynamicFactories/hcat.prod.${1}"; + conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL); + services.init(); + HCatAccessorService hcatService = services.get(HCatAccessorService.class); + JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020")); + assertEquals( + "java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" + + "vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver", + connInfo.getJNDIPropertiesString()); + + ConnectionContext ctx = new DefaultConnectionContext(); + ctx.createConnection(connInfo.getJNDIProperties()); + ctx.close(); } @Test @@ -176,69 +152,94 @@ public class TestJMSAccessorService extends XTestCase { assertTrue(jmsService.isTopicInRetryList(connInfo, topic)); // Start the broker and check if listening to topic now BrokerService broker = new BrokerService(); - broker.addConnector(brokerURl); - broker.setDataDirectory(getTestCaseDir()); - broker.start(); - Thread.sleep(1000); - assertTrue(jmsService.isListeningToTopic(connInfo, topic)); - assertFalse(jmsService.isConnectionInRetryList(connInfo)); - assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); - broker.stop(); - jmsService.destroy(); - + try { + broker.addConnector(brokerURl); + broker.setDataDirectory(getTestCaseDir()); + broker.start(); + + waitFor(JMS_TIMEOUT_MS, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return jmsService.isListeningToTopic(connInfo, topic); + } + }); + assertTrue(jmsService.isListeningToTopic(connInfo, topic)); + assertFalse(jmsService.isConnectionInRetryList(connInfo)); + assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); + } finally { + broker.stop(); + } } @Test public void testConnectionRetryExceptionListener() throws Exception { - services.destroy(); - services = super.setupServicesForHCatalog(); - int randomPort = 30000 + random.nextInt(10000); - String brokerURL = "tcp://localhost:" + randomPort; - String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";" - + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory"; - Configuration servicesConf = services.getConf(); - servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1"); - servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3"); - servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString); - services.init(); - HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); - JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); - - String publisherAuthority = "hcat.server.com:5080"; - String topic = "topic.topic1"; - // Start the broker - BrokerService broker = new BrokerService(); - broker.addConnector(brokerURL); - broker.setDataDirectory(getTestCaseDir()); - broker.start(); - JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); - jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority)); - assertTrue(jmsService.isListeningToTopic(connInfo, topic)); - assertFalse(jmsService.isConnectionInRetryList(connInfo)); - assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); - ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo); - broker.stop(); + BrokerService broker = null; try { - connCtxt.createSession(Session.AUTO_ACKNOWLEDGE); - fail("Exception expected"); - } - catch (Exception e) { - Thread.sleep(100); - assertFalse(jmsService.isListeningToTopic(connInfo, topic)); - assertTrue(jmsService.isConnectionInRetryList(connInfo)); - assertTrue(jmsService.isTopicInRetryList(connInfo, topic)); - } - broker = new BrokerService(); - broker.addConnector(brokerURL); - broker.setDataDirectory(getTestCaseDir()); - broker.start(); - Thread.sleep(1000); - assertTrue(jmsService.isListeningToTopic(connInfo, topic)); - assertFalse(jmsService.isConnectionInRetryList(connInfo)); - assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); - broker.stop(); - jmsService.destroy(); + services.destroy(); + services = super.setupServicesForHCatalog(); + int randomPort = 30000 + random.nextInt(10000); + String brokerURL = "tcp://localhost:" + randomPort; + String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";" + + "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory"; + Configuration servicesConf = services.getConf(); + servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1"); + servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3"); + servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString); + services.init(); + HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class); + JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class); + + String publisherAuthority = "hcat.server.com:5080"; + String topic = "topic.topic1"; + // Start the broker + broker = new BrokerService(); + broker.addConnector(brokerURL); + broker.setDataDirectory(getTestCaseDir()); + broker.start(); + JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); + jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority)); + assertTrue(jmsService.isListeningToTopic(connInfo, topic)); + assertFalse(jmsService.isConnectionInRetryList(connInfo)); + assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); + ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo); + broker.stop(); + + try { + connCtxt.createSession(Session.AUTO_ACKNOWLEDGE); + fail("Exception expected"); + } + catch (Exception e) { + waitFor(JMS_TIMEOUT_MS, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return !jmsService.isListeningToTopic(connInfo, topic); + } + }); + assertFalse(jmsService.isListeningToTopic(connInfo, topic)); + assertTrue(jmsService.isConnectionInRetryList(connInfo)); + assertTrue(jmsService.isTopicInRetryList(connInfo, topic)); + } + broker = new BrokerService(); + + broker.addConnector(brokerURL); + broker.setDataDirectory(getTestCaseDir()); + broker.start(); + waitFor(JMS_TIMEOUT_MS, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return jmsService.isListeningToTopic(connInfo, topic); + } + }); + assertTrue(jmsService.isListeningToTopic(connInfo, topic)); + assertFalse(jmsService.isConnectionInRetryList(connInfo)); + assertFalse(jmsService.isTopicInRetryList(connInfo, topic)); + broker.stop(); + } finally { + if (broker != null) { + broker.stop(); + } + } } @Test @@ -258,18 +259,22 @@ public class TestJMSAccessorService extends XTestCase { String publisherAuthority = "hcat.server.com:5080"; String topic = "topic.topic1"; JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020")); + jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority)); assertTrue(jmsService.isConnectionInRetryList(connInfo)); assertTrue(jmsService.isTopicInRetryList(connInfo, topic)); assertFalse(jmsService.isListeningToTopic(connInfo, topic)); - Thread.sleep(1100); + waitFor(JMS_TIMEOUT_MS, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return jmsService.getNumConnectionAttempts(connInfo) == 1; + } + }); + // Should not retry again as max attempt is 1 assertTrue(jmsService.isConnectionInRetryList(connInfo)); assertTrue(jmsService.isTopicInRetryList(connInfo, topic)); assertFalse(jmsService.isListeningToTopic(connInfo, topic)); - assertEquals(1, jmsService.getNumConnectionAttempts(connInfo)); assertFalse(jmsService.retryConnection(connInfo)); - jmsService.destroy(); } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/61c646c3/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 060f4fb..fd7bd76 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti) OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti) OOZIE-3236 Fix flaky test TestHiveActionExecutor#testHiveAction (pbacsko via gezapeti) OOZIE-3235 Upgrade ActiveMQ to 5.15.3 (matijhs via andras.piros)