Added: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml (added) +++ sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml Wed Jul 27 12:10:12 2016 @@ -0,0 +1,170 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- START SNIPPET: example --> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + <!-- + Allows us to use system properties as variables in this configuration file + --> + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> + <property name="locations"> + <value>file:${activemq.conf}/credentials.properties</value> + </property> + </bean> + <!-- + + The <broker> element is used to configure the ActiveMQ broker. + + --> + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> + <!-- + + For better performances use VM cursor and small memory limit. + For more information, see: + + http://activemq.apache.org/message-cursors.html + + Also, if your producer is "hanging", it's probably due to producer flow control. + For more information, see: + http://activemq.apache.org/producer-flow-control.html + + --> + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry topic=">" producerFlowControl="true"> + <!-- + The constantPendingMessageLimitStrategy is used to prevent + slow topic consumers to block producers and affect other consumers + by limiting the number of messages that are retained + For more information, see: + + http://activemq.apache.org/slow-consumer-handling.html + + + --> + <pendingMessageLimitStrategy> + <constantPendingMessageLimitStrategy limit="1000"/> + </pendingMessageLimitStrategy> + </policyEntry> + <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> + <!-- + Use VM cursor for better latency + For more information, see: + + http://activemq.apache.org/message-cursors.html + + <pendingQueuePolicy> + <vmQueueCursor/> + </pendingQueuePolicy> + + --> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> + <!-- + + The managementContext is used to configure how ActiveMQ is exposed in + JMX. By default, ActiveMQ uses the MBean server that is started by + the JVM. For more information, see: + + http://activemq.apache.org/jmx.html + + --> + <managementContext> + <managementContext createConnector="false"/> + </managementContext> + <!-- + + Configure message persistence for the broker. The default persistence + mechanism is the KahaDB store (identified by the kahaDB tag). + For more information, see: + + http://activemq.apache.org/persistence.html + + --> + <persistenceAdapter> + <kahaDB directory="${activemq.data}/kahadb"/> + </persistenceAdapter> + <!-- + + The systemUsage controls the maximum amount of space the broker will + use before slowing down producers. For more information, see: + http://activemq.apache.org/producer-flow-control.html + If using ActiveMQ embedded - the following limits could safely be used: + + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage limit="20 mb"/> + </memoryUsage> + <storeUsage> + <storeUsage limit="1 gb"/> + </storeUsage> + <tempUsage> + <tempUsage limit="100 mb"/> + </tempUsage> + </systemUsage> + </systemUsage> + + --> + <systemUsage> + <systemUsage> + <memoryUsage> + <memoryUsage limit="64 mb"/> + </memoryUsage> + <storeUsage> + <storeUsage limit="100 gb"/> + </storeUsage> + <tempUsage> + <tempUsage limit="50 gb"/> + </tempUsage> + </systemUsage> + </systemUsage> + <!-- + + The transport connectors expose ActiveMQ over a given protocol to + clients and other brokers. For more information, see: + + http://activemq.apache.org/configuring-transports.html + + --> + <transportConnectors> + <!-- + DOS protection, limit concurrent connections to 1000 and frame size to 100MB + --> + <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> + <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> + </transportConnectors> + <!-- + destroy the spring context on shutdown to stop jetty + --> + <shutdownHooks> + <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/> + </shutdownHooks> + </broker> + <!-- + + Enable web consoles, REST and Ajax APIs and demos + + Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details + + --> + <import resource="jetty.xml"/> +</beans> + <!-- END SNIPPET: example --> \ No newline at end of file
Propchange: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/activemq.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml Added: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/credentials.properties URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/credentials.properties?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/credentials.properties (added) +++ sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/credentials.properties Wed Jul 27 12:10:12 2016 @@ -0,0 +1,22 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# Defines credentials that will be used by components (like web console) to access the broker + +activemq.username=system +activemq.password=manager +guest.password=password \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/credentials.properties ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml (added) +++ sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml Wed Jul 27 12:10:12 2016 @@ -0,0 +1,144 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to You under + the Apache License, Version 2.0 (the "License"); you may not use this file except in + compliance with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or + agreed to in writing, software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. See the License for the specific language governing permissions and + limitations under the License. + +--> +<!-- + + An embedded servlet engine for serving up the Admin consoles, REST and Ajax APIs and + some demos Include this file in your configuration to enable ActiveMQ web components + e.g. <import resource="jetty.xml"/> + +--> +<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + <bean id="securityLoginService" class="org.eclipse.jetty.security.HashLoginService"> + <property name="name" value="ActiveMQRealm"/> + <property name="config" value="${activemq.conf}/jetty-realm.properties"/> + </bean> + <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> + <property name="name" value="BASIC"/> + <property name="roles" value="user,admin"/> + <property name="authenticate" value="true"/> + </bean> + <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint"> + <property name="name" value="BASIC"/> + <property name="roles" value="admin"/> + <property name="authenticate" value="true"/> + </bean> + <bean id="securityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping"> + <property name="constraint" ref="securityConstraint"/> + <property name="pathSpec" value="/,*.jsp"/> + </bean> + <bean id="adminSecurityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping"> + <property name="constraint" ref="adminSecurityConstraint"/> + <property name="pathSpec" value="*.action"/> + </bean> + <bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler"> + <property name="loginService" ref="securityLoginService"/> + <property name="authenticator"> + <bean class="org.eclipse.jetty.security.authentication.BasicAuthenticator"/> + </property> + <property name="constraintMappings"> + <list> + <ref bean="adminSecurityConstraintMapping"/> + <ref bean="securityConstraintMapping"/> + </list> + </property> + <property name="handler"> + <bean id="sec" class="org.eclipse.jetty.server.handler.HandlerCollection"> + <property name="handlers"> + <list> + <bean class="org.eclipse.jetty.webapp.WebAppContext"> + <property name="contextPath" value="/hawtio"/> + <property name="war" value="${activemq.home}/webapps/hawtio"/> + <property name="logUrlOnStart" value="true"/> + </bean> + <bean class="org.eclipse.jetty.webapp.WebAppContext"> + <property name="contextPath" value="/admin"/> + <property name="resourceBase" value="${activemq.home}/webapps/admin"/> + <property name="logUrlOnStart" value="true"/> + </bean> + <bean class="org.eclipse.jetty.webapp.WebAppContext"> + <property name="contextPath" value="/fileserver"/> + <property name="resourceBase" value="${activemq.home}/webapps/fileserver"/> + <property name="logUrlOnStart" value="true"/> + <property name="parentLoaderPriority" value="true"/> + </bean> + <bean class="org.eclipse.jetty.webapp.WebAppContext"> + <property name="contextPath" value="/api"/> + <property name="resourceBase" value="${activemq.home}/webapps/api"/> + <property name="logUrlOnStart" value="true"/> + </bean> + <bean class="org.eclipse.jetty.server.handler.ResourceHandler"> + <property name="directoriesListed" value="false"/> + <property name="welcomeFiles"> + <list> + <value>index.html</value> + </list> + </property> + <property name="resourceBase" value="${activemq.home}/webapps/"/> + </bean> + <bean id="defaultHandler" class="org.eclipse.jetty.server.handler.DefaultHandler"> + <property name="serveIcon" value="false"/> + </bean> + </list> + </property> + </bean> + </property> + </bean> + <bean id="rewrite" class="org.eclipse.jetty.rewrite.handler.RewriteHandler"> + <property name="rules"> + <set> + <bean class="org.eclipse.jetty.rewrite.handler.RedirectRegexRule"> + <property name="regex" value="/api/jolokia(.*)"/> + <property name="replacement" value="/hawtio/jolokia$1"/> + </bean> + </set> + </property> + </bean> + <bean id="contexts" class="org.eclipse.jetty.server.handler.ContextHandlerCollection"></bean> + <bean id="Server" class="org.eclipse.jetty.server.Server" init-method="start" destroy-method="stop"> + <property name="connectors"> + <list> + <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> + <property name="port" value="8161"/> + </bean> + <!-- + + Enable this connector if you wish to use https with web console + + --> + <!-- + + <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> + <property name="port" value="8162" /> + <property name="keystore" value="file:${activemq.conf}/broker.ks" /> + <property name="password" value="password" /> + </bean> + + --> + </list> + </property> + <property name="handler"> + <bean id="handlers" class="org.eclipse.jetty.server.handler.HandlerCollection"> + <property name="handlers"> + <list> + <ref bean="rewrite"/> + <ref bean="contexts"/> + <ref bean="securityHandler"/> + </list> + </property> + </bean> + </property> + </bean> +</beans> \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/commons/mom/jms/src/main/resources/org/apache/sling/amq/jetty.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml Added: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java (added) +++ sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.amq; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.jms.*; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * Created by ieb on 31/03/2016. + */ +public class ActiveMQConnectionFactoryServiceTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQConnectionFactoryServiceTest.class); + + @Test + public void testGetConnectionFactory() throws Exception { + LOGGER.info("Starting test"); + ActiveMQConnectionFactoryService cfs = ActiveMQConnectionFactoryServiceTest.activate(null); + ConnectionFactory cf = cfs.getConnectionFactory(); + Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Topic t = session.createTopic("testTopic"); + MessageConsumer consumer = session.createConsumer(t); + LOGGER.info("Starting connection"); + connection.start(); + LOGGER.info("Connection started.. sending message"); + session.createProducer(t).send(session.createTextMessage("testing with a message")); + session.commit(); + LOGGER.info("Message sent ... receiving message"); + Message m = consumer.receive(); + LOGGER.info("Message received"); + assertTrue(m instanceof TextMessage); + assertEquals("testing with a message", ((TextMessage)m).getText()); + session.close(); + connection.stop(); + + deactivate(cfs); + } + + public static void deactivate(@Nonnull ActiveMQConnectionFactoryService cfs) { + cfs.deactivate(new HashMap<String, Object>()); + } + + @Nonnull + public static ActiveMQConnectionFactoryService activate(@Nullable Map<String, Object> props) { + ActiveMQConnectionFactoryService amqConnectionFactoryService = new ActiveMQConnectionFactoryService(); + if ( props == null ) { + props = new HashMap<String, Object>(); + props.put(ActiveMQConnectionFactoryService.BROKER_URI, ActiveMQConnectionFactoryService.DEFAULT_BROKER_URI); + } + amqConnectionFactoryService.activate(props); + return amqConnectionFactoryService; + } + +} \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/amq/ActiveMQConnectionFactoryServiceTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java (added) +++ sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jms; + +import org.apache.sling.amq.ActiveMQConnectionFactoryService; +import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest; +import org.apache.sling.mom.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.lang.reflect.Field; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * Created by ieb on 01/04/2016. + */ +public class JMSQueueManagerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManagerTest.class); + private ActiveMQConnectionFactoryService amqConnectionFactoryService; + private JMSQueueManager jmsQueueManager; + private Map<String, Object> testMap; + private boolean passed; + private int ndeliveries; + + @Mock + private ServiceReference<QueueReader> serviceReference; + @Mock + private Bundle bundle; + @Mock + private BundleContext bundleContext; + private Map<String, Object> serviceProperties = new HashMap<String, Object>(); + + public JMSQueueManagerTest() { + MockitoAnnotations.initMocks(this); + } + + @Before + public void setUp() throws Exception { + Mockito.when(serviceReference.getBundle()).thenReturn(bundle); + Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext); + Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new Answer<String[]>() { + @Override + public String[] answer(InvocationOnMock invocationOnMock) throws Throwable { + return (String[]) serviceProperties.keySet().toArray(new String[serviceProperties.size()]); + } + }); + Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return serviceProperties.get(invocationOnMock.getArguments()[0]); + } + }); + amqConnectionFactoryService = ActiveMQConnectionFactoryServiceTest.activate(null); + jmsQueueManager = JMSQueueManagerTest.activate(amqConnectionFactoryService); + testMap = JsonTest.createTestMap(); + passed = false; + + } + + + private static JMSQueueManager activate(ActiveMQConnectionFactoryService amqConnectionFactoryService) throws NoSuchFieldException, IllegalAccessException, JMSException { + JMSQueueManager jmsQueueManager = new JMSQueueManager(); + setPrivate(jmsQueueManager, "connectionFactoryService", amqConnectionFactoryService); + jmsQueueManager.activate(new HashMap<String, Object>()); + return jmsQueueManager; + + } + + private static void setPrivate(Object object, String name, Object value) throws NoSuchFieldException, IllegalAccessException { + Field field = object.getClass().getDeclaredField(name); + if ( !field.isAccessible()) { + field.setAccessible(true); + } + field.set(object, value); + } + + + @After + public void after() throws JMSException { + JMSQueueManagerTest.deactivate(jmsQueueManager); + ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService); + } + + public static void deactivate(JMSQueueManager jmsQueueManager) throws JMSException { + jmsQueueManager.deactivate(new HashMap<String, Object>()); + } + + @Test + public void testQueue() throws JMSException, InterruptedException { + // clean the queue out of messages from earlier tests, which may have failed. + final String queueName = "testQueueReject"; + + emptyQueue(queueName); + // make the test map unique. + testMap.put("testing", queueName + System.currentTimeMillis()); + jmsQueueManager.add(Types.queueName(queueName), testMap); + + checkMessagesInQueue(queueName, 1); + + ndeliveries = 0; + QueueReader queueReader = new QueueReader() { + @Override + public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException { + ndeliveries++; + JsonTest.checkEquals(testMap, message); + passed = true; + } + }; + + serviceProperties.clear(); + serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName); + + Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader); + jmsQueueManager.addReader(serviceReference); + + + + + waitForPassed(1000); + checkMessagesInQueue(queueName, 0); + waitForErrors(1000); + + jmsQueueManager.removeReader(serviceReference); + assertEquals(1, ndeliveries); + + + } + + + private void waitForErrors(long t) throws InterruptedException { + Thread.sleep(t); + } + + private boolean waitForPassed(long t) { + long end = System.currentTimeMillis() + t; + while(System.currentTimeMillis() < end) { + if (passed) { + return true; + } else { + Thread.yield(); + } + } + LOGGER.info("Message not received after " + t + " ms"); + return false; + } + + private void checkMessagesInQueue(String name, int expected) throws JMSException { + Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name); + QueueBrowser browser = session.createBrowser(queue); + int n = 0; + for(Enumeration e = browser.getEnumeration(); e.hasMoreElements(); ) { + Message m = (Message) e.nextElement(); + LOGGER.info("Message at {} is {} ", n,m); + n++; + } + browser.close(); + session.close(); + connection.stop(); + assertEquals(expected, n); + } + + @Test + public void testQueueReject() throws JMSException, InterruptedException { + // clean the queue out of messages from earlier tests, which may have failed. + final String queueName = "testQueueReject"; + emptyQueue(queueName); + // make the test map unique, if the dequeue fails, then the message wont be the first. + testMap.put("testing", queueName + System.currentTimeMillis()); + LOGGER.info("Sending message to queue"); + jmsQueueManager.add(Types.queueName(queueName), testMap); + LOGGER.info("Sent message to queue ... receiving from queue"); + + checkMessagesInQueue(queueName, 1); + + ndeliveries = 0; + QueueReader queueReader = new QueueReader() { + @Override + public void onMessage(Types.QueueName queueName, Map<String, Object> message) throws RequeueMessageException { + JsonTest.checkEquals(testMap, message); + ndeliveries++; + if ( ndeliveries == 1) { + LOGGER.info("Requesting requeue of message"); + throw new RequeueMessageException("Requeing"); + } else if ( ndeliveries == 2) { + LOGGER.info("Got message, accepting with no retry."); + passed = true; + } else if ( ndeliveries > 2) { + fail("Multiple delivered"); + } + } + }; + + serviceProperties.clear(); + serviceProperties.put(QueueReader.QUEUE_NAME_PROP, queueName); + + Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(queueReader); + jmsQueueManager.addReader(serviceReference); + + + + waitForPassed(30000); + + jmsQueueManager.removeReader(serviceReference); + checkMessagesInQueue(queueName, 0); + assertEquals(2, ndeliveries); + + + } + + private void dumpQueue(String name) throws JMSException { + Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name); + QueueBrowser browser = session.createBrowser(queue); + LOGGER.info("Starting to dump queue {} ", name); + int n = 0; + for ( Enumeration messages = browser.getEnumeration(); messages.hasMoreElements(); ) { + Message m = (Message) messages.nextElement(); + LOGGER.info("Message at {} is {} ", n, m); + n++; + } + LOGGER.info("Done dump queue {} ", name); + browser.close(); + session.close(); + connection.stop(); + + } + + private void emptyQueue(String name) throws JMSException { + dumpQueue(name); + Connection connection = amqConnectionFactoryService.getConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name); + MessageConsumer consumer = session.createConsumer(queue); + + for (;;) { + Message m = consumer.receive(100); + if ( m == null) { + LOGGER.info("No more messages in queue {} ", name); + break; + } + LOGGER.info("Got message {}",m); + m.acknowledge(); + session.commit(); + } + boolean shouldFail = false; + QueueBrowser browser = session.createBrowser(queue); + for ( Enumeration messages = browser.getEnumeration(); messages.hasMoreElements(); ) { + Message m = (Message) messages.nextElement(); + LOGGER.info("Queued message {} ", m); + shouldFail = true; + } + browser.close(); + if ( shouldFail) { + fail("Queue was not emptied as expected"); + } + consumer.close(); + session.close(); + connection.stop(); + } + +} \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java (added) +++ sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.jms; + +import org.apache.sling.amq.ActiveMQConnectionFactoryService; +import org.apache.sling.amq.ActiveMQConnectionFactoryServiceTest; +import org.apache.sling.mom.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.osgi.framework.Bundle; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.JMSException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; + +/** + * Created by ieb on 31/03/2016. + */ +public class JMSTopicManagerTest { + + private static final long MESSAGE_LATENCY = 1000; + private static final Logger LOGGER = LoggerFactory.getLogger(JMSTopicManagerTest.class); + private JMSTopicManager jsmTopicManager; + private ActiveMQConnectionFactoryService amqConnectionFactoryService; + private Map<String, Object> testMap; + private boolean passed; + private long lastSent; + @Mock + private ServiceReference<Subscriber> serviceReference; + @Mock + private Bundle bundle; + @Mock + private BundleContext bundleContext; + private Map<String, Object> serviceProperties = new HashMap<String, Object>(); + + public JMSTopicManagerTest() { + MockitoAnnotations.initMocks(this); + } + + @Before + public void before() throws NoSuchFieldException, IllegalAccessException, JMSException { + Mockito.when(serviceReference.getBundle()).thenReturn(bundle); + Mockito.when(bundle.getBundleContext()).thenReturn(bundleContext); + Mockito.when(serviceReference.getPropertyKeys()).thenAnswer(new Answer<String[]>() { + @Override + public String[] answer(InvocationOnMock invocationOnMock) throws Throwable { + return (String[]) serviceProperties.keySet().toArray(new String[serviceProperties.size()]); + } + }); + Mockito.when(serviceReference.getProperty(Mockito.anyString())).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + return serviceProperties.get(invocationOnMock.getArguments()[0]); + } + }); + amqConnectionFactoryService = ActiveMQConnectionFactoryServiceTest.activate(null); + jsmTopicManager = JMSTopicManagerTest.activate(amqConnectionFactoryService); + testMap = JsonTest.createTestMap(); + passed = false; + } + + public static JMSTopicManager activate(ActiveMQConnectionFactoryService amqConnectionFactoryService) throws NoSuchFieldException, IllegalAccessException, JMSException { + JMSTopicManager jsmTopicManager = new JMSTopicManager(); + setPrivate(jsmTopicManager, "connectionFactoryService", amqConnectionFactoryService); + jsmTopicManager.activate(new HashMap<String, Object>()); + return jsmTopicManager; + + } + + private static void setPrivate(Object object, String name, Object value) throws NoSuchFieldException, IllegalAccessException { + Field field = object.getClass().getDeclaredField(name); + if ( !field.isAccessible()) { + field.setAccessible(true); + } + field.set(object, value); + } + + @After + public void after() throws JMSException { + JMSTopicManagerTest.deactivate(jsmTopicManager); + ActiveMQConnectionFactoryServiceTest.deactivate(amqConnectionFactoryService); + } + + public static void deactivate(JMSTopicManager jsmTopicManager) throws JMSException { + jsmTopicManager.deactivate(new HashMap<String, Object>()); + } + + + /** + * Test a working publish operation, read the message and check all ok. Will try and read the message for 1s. Normally messages + * arrive within 15ms. + * @throws Exception + */ + @Test + public void testPublish() throws Exception { + // make the test map unique. + testMap.put("testing", "testPublish" + System.currentTimeMillis()); + + addSubscriber(new String[]{"testtopic"}, true); + + jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap); + lastSent = System.currentTimeMillis(); + assertTrue(waitForPassed(MESSAGE_LATENCY)); + + removeSubscriber(); + } + + + private void addSubscriber(String[] topics, boolean match) { + + Subscriber subscriber = new TestingSubscriber(this, match, topics); + + serviceProperties.clear(); + serviceProperties.put(Subscriber.TOPIC_NAMES_PROP, topics); + + Mockito.when(bundleContext.getService(Mockito.eq(serviceReference))).thenReturn(subscriber); + jsmTopicManager.addSubscriber(serviceReference); + + } + + private void removeSubscriber() { + jsmTopicManager.removeSubscriber(serviceReference); + } + + + /** + * Test that a message sent with the wrong topic doesn't arrive, filtered by the topic inside the jmsTopicManager. + * @throws Exception + */ + @Test + public void testFilterdByTopic() throws Exception { + // make the test map unique. + testMap.put("testing", "testFilterdByTopic" + System.currentTimeMillis()); + addSubscriber(new String[]{"testtopic"}, false); + + lastSent = System.currentTimeMillis(); + assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all + + removeSubscriber(); + } + + /** + * Check that a message sent to the correct topic is filtered by the MessageFilter. + * The test waits 1s for the message to arrive. If testPublish does not fail, message + * latency is < 1s. + * @throws Exception + */ + @Test + public void testFilterdByFilter() throws Exception { + // make the test map unique. + testMap.put("testing", "testFilterdByFilter" + System.currentTimeMillis()); + addSubscriber(new String[]{"testtopic"}, false); + + jsmTopicManager.publish(Types.topicName("testtopic"), Types.commandName("testcommand"), testMap); + lastSent = System.currentTimeMillis(); + assertFalse(waitForPassed(MESSAGE_LATENCY)); // not expecting a message at all + + removeSubscriber(); + } + + + private boolean waitForPassed(long t) { + long end = System.currentTimeMillis() + t; + while(System.currentTimeMillis() < end) { + if (passed) { + return true; + } else { + Thread.yield(); + } + } + LOGGER.info("Message not recieved after "+t+" ms"); + return false; + } + + + private static class TestingSubscriber implements Subscriber, MessageFilter { + private JMSTopicManagerTest test; + private final boolean accept; + private final Set<Types.Name> topicnames; + + public TestingSubscriber(JMSTopicManagerTest test, boolean accept, String[] topicname) { + this.test = test; + this.accept = accept; + this.topicnames = new HashSet<Types.Name>(); + for(String t : topicname) { + topicnames.add(Types.topicName(t)); + } + } + + @Override + public void onMessage(Types.TopicName topic, Map<String, Object> message) { + LOGGER.info("Got message in "+(System.currentTimeMillis()-test.lastSent)+" ms"); + JsonTest.checkEquals(test.testMap, message); + test.passed = true; + } + + @Override + public boolean accept(Types.Name name, Map<String, Object> mapMessage) { + return topicnames.contains(name) == accept; + } + + + } +} \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSTopicManagerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JsonTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JsonTest.java?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JsonTest.java (added) +++ sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JsonTest.java Wed Jul 27 12:10:12 2016 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.jms; + +import org.junit.Before; +import org.junit.Test; + +import java.util.*; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by ieb on 31/03/2016. + */ +public class JsonTest { + + + private static final Logger LOGGER = LoggerFactory.getLogger(JsonTest.class); + private Map<String, Object> testMap; + + @Before + public void setup() { + testMap = JsonTest.createTestMap(); + } + + public static Map<String,Object> createTestMap() { + Map<String, Object> testMap = new HashMap<String, Object>(); + Map<String, Object> innerTestMap = new HashMap<String, Object>(); + Map<String, Object> inner2TestMap = new HashMap<String, Object>(); + Map<String, Object> listMap = new HashMap<String, Object>(); + + listMap.put("listMaplong",100L); + listMap.put("listMapboolean",true); + listMap.put("listMapstring","A String"); + listMap.put("listMapdouble",1.001D); + + testMap.put("long",100L); + testMap.put("boolean",true); + testMap.put("string","A String"); + testMap.put("double",1.001D); + testMap.put("map",innerTestMap); + innerTestMap.put("innerlong",100L); + innerTestMap.put("innerboolean",true); + innerTestMap.put("innerstring","A String"); + innerTestMap.put("innerdouble",1.001D); + innerTestMap.put("innermap",inner2TestMap); + inner2TestMap.put("inner2long",100L); + inner2TestMap.put("inner3boolean",true); + inner2TestMap.put("inner3string","A String"); + inner2TestMap.put("inner3double",1.001D); + inner2TestMap.put("inner3list", Arrays.asList("string1","string2", "string2")); + inner2TestMap.put("inner3listofMaps", Arrays.asList(listMap, listMap, listMap)); + return testMap; + } + + @Test + public void testJson() throws Exception { + checkEquals(testMap, Json.toMap(Json.toJson(testMap))); + + } + + public static void checkEquals(Map<String, Object> expected, Map<String, Object> actual) { + LOGGER.info("Expected {}", expected); + LOGGER.info("Actual {}", actual); + for(Map.Entry<String, Object> e : expected.entrySet()) { + if ( e.getValue() instanceof Map ) { + checkEquals((Map<String, Object>) e.getValue(), (Map<String, Object>) actual.get(e.getKey())); + } else if ( e.getValue() instanceof List ) { + checkEquals((List<Object>) e.getValue(), (List<Object>) actual.get(e.getKey())); + } else { + if ( e.getValue() == null && actual.get(e.getKey()) != null ) { + LOGGER.info("Expected value for {} is null but actual is {} ", e.getKey(), actual.get(e.getKey())); + } + if ( e.getValue() != null && !e.getValue().equals(actual.get(e.getKey()))) { + LOGGER.info("Expected value for {} is {} but actual is {}", new Object[]{e.getKey(), e.getValue(), actual.get(e.getKey())}); + LOGGER.info("Expected value for {} is {} but actual is {}", new Object[]{e.getKey(), e.getValue().getClass(), actual.get(e.getKey()).getClass()}); + } + Assert.assertEquals(e.getValue(), actual.get(e.getKey())); + } + } + LOGGER.info("Maps equal ok"); + } + + private static void checkEquals(List<Object> expected, List<Object> actual) { + Assert.assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + Object e = expected.get(i); + Object a = actual.get(i); + if ( e instanceof Map ) { + checkEquals((Map<String, Object>) e, (Map<String, Object>) a); + } else if ( e instanceof List ) { + checkEquals((List<Object>) e, (List<Object>) a); + } else { + Assert.assertEquals(e,a); + } + } + } + +} \ No newline at end of file Propchange: sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JsonTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/commons/mom/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/pom.xml?rev=1754255&view=auto ============================================================================== --- sling/trunk/contrib/commons/mom/pom.xml (added) +++ sling/trunk/contrib/commons/mom/pom.xml Wed Jul 27 12:10:12 2016 @@ -0,0 +1,22 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.sling</groupId> + <artifactId>sling</artifactId> + <version>22</version> + </parent> + + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.mom.reactor</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>pom</packaging> + <name>Apache Sling MoM Reactor</name> + + <modules> + <module>jms</module> + <module>api</module> + <module>examples/jobs/core</module> + <module>examples/jobs/it-services</module> + </modules> +</project> Propchange: sling/trunk/contrib/commons/mom/pom.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/commons/mom/pom.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml Modified: sling/trunk/contrib/crankstart/launcher/src/main/java/org/apache/sling/crankstart/junit/CrankstartSetup.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/crankstart/launcher/src/main/java/org/apache/sling/crankstart/junit/CrankstartSetup.java?rev=1754255&r1=1754254&r2=1754255&view=diff ============================================================================== --- sling/trunk/contrib/crankstart/launcher/src/main/java/org/apache/sling/crankstart/junit/CrankstartSetup.java (original) +++ sling/trunk/contrib/crankstart/launcher/src/main/java/org/apache/sling/crankstart/junit/CrankstartSetup.java Wed Jul 27 12:10:12 2016 @@ -9,10 +9,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.net.ServerSocket; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.UUID; +import java.util.*; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpUriRequest;