Author: schor Date: Thu Dec 20 14:40:58 2018 New Revision: 1849399 URL: http://svn.apache.org/viewvc?rev=1849399&view=rev Log: [UIMA-5937] update pom version and scm, merge v2 updates, record merge
Added: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java - copied unchanged from r1846917, uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml - copied unchanged from r1846917, uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml - copied unchanged from r1846917, uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml - copied unchanged from r1846917, uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml - copied unchanged from r1846917, uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/ (props changed) uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml Propchange: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Dec 20 14:40:58 2018 @@ -1,2 +1,3 @@ /uima/uima-as/branches/depend-on-parent-pom-4/uimaj-as-activemq:961335-961760 /uima/uima-as/branches/mavenAlign/uimaj-as-activemq:941450-944450 +/uima/uima-as/trunk/uimaj-as-activemq:1786000-1846917 Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml Thu Dec 20 14:40:58 2018 @@ -15,7 +15,7 @@ <parent> <groupId>org.apache.uima</groupId> <artifactId>uima-as-parent</artifactId> - <version>3.0.0-SNAPSHOT</version> + <version>3.0.1-SNAPSHOT</version> <relativePath>../uima-as-parent/pom.xml</relativePath> </parent> @@ -31,13 +31,13 @@ cutting/pasting the <scm> element, and just changing the following two properties --> <scm> <connection> - scm:svn:http://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq + scm:svn:http://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq </connection> <developerConnection> - scm:svn:https://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq + scm:svn:https://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq </developerConnection> <url> - http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq + http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq </url> </scm> Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java Thu Dec 20 14:40:58 2018 @@ -30,12 +30,10 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.uima.UIMAFramework; -import org.apache.uima.aae.InProcessCache.CacheEntry; import org.apache.uima.aae.UIMAEE_Constants; import org.apache.uima.aae.UimaAsThreadFactory; import org.apache.uima.aae.UimaBlockingExecutor; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; -import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.LocalCache.CasStateEntry; import org.apache.uima.aae.delegate.Delegate; Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java Thu Dec 20 14:40:58 2018 @@ -227,7 +227,7 @@ public class JmsEndpointConnection_impl } if ( brokerDestinations.getConnection() != null ) { try { - // Close the connection to avoid leaks in the broker + // Close the connection to avoid leaks in the broker brokerDestinations.getConnection().close(); } catch( Exception e) { // Ignore exceptions on a close of a bad connection @@ -249,7 +249,9 @@ public class JmsEndpointConnection_impl ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri); // White list packages for deserialization factory.setTrustAllPackages(true); + factory.setConnectionIDPrefix("JmsOutputChannel"); factory.setWatchTopicAdvisories(false); + // Create shared jms connection to a broker conn = factory.createConnection(); factory.setDispatchAsync(true); @@ -755,7 +757,7 @@ public class JmsEndpointConnection_impl if ( command == AsynchAEMessage.ServiceInfo ) { return false; } - if ( (msgType == AsynchAEMessage.Response || msgType == AsynchAEMessage.Request ) && + if ( (msgType == AsynchAEMessage.Response || msgType == AsynchAEMessage.Request ) && command == AsynchAEMessage.Process ) { String casReferenceId=""; try { @@ -764,7 +766,7 @@ public class JmsEndpointConnection_impl String key = ""; String endpointName = ""; if ( delegateEndpoint != null ) { - delegateEndpoint.getDelegateKey(); + key = delegateEndpoint.getDelegateKey(); endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination()) .getPhysicalName(); } @@ -781,20 +783,22 @@ public class JmsEndpointConnection_impl if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - + String key = ""; String endpointName = ""; if ( delegateEndpoint != null ) { - delegateEndpoint.getDelegateKey(); + key = delegateEndpoint.getDelegateKey(); endpointName = ((ActiveMQDestination) delegateEndpoint.getDestination()) .getPhysicalName(); - } + + } if ( "Client".equals(target) ) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_delivery_to_client_exception__WARNING", new Object[] { controller.getComponentName(),endpointName }); } else { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), key, endpointName}); @@ -1021,8 +1025,8 @@ public class JmsEndpointConnection_impl } } else { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), - "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_service_delivery_exception__WARNING",new Object[] { controller.getComponentName(), "", endpointName}); + "UimaAsAsyncCallbackListener.onException()", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_service_delivery_exception_WARNING",new Object[] { controller.getComponentName(), exception} ); } } Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java Thu Dec 20 14:40:58 2018 @@ -19,6 +19,7 @@ package org.apache.uima.adapter.jms.activemq; +import java.net.InetAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -34,12 +35,12 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.uima.UIMAFramework; import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.UIMAEE_Constants; +import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; @@ -54,12 +55,12 @@ import org.apache.uima.aae.jmx.RemoteJMX import org.apache.uima.aae.jmx.ServiceInfo; import org.apache.uima.aae.message.AsynchAEMessage; import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.MessageWrapper; import org.apache.uima.aae.message.UIMAMessage; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.message.JmsMessageContext; import org.apache.uima.util.Level; import org.springframework.jms.listener.SessionAwareMessageListener; -import org.springframework.jms.support.destination.DestinationResolver; /** * Thin adapter for receiving JMS messages from Spring. It delegates processing of all messages to @@ -558,6 +559,19 @@ public class JmsInputChannel implements validEndpoint(messageContext) && isReplyRequired((Message)messageContext.getRawMessage()) ); } + public void onMessage(MessageWrapper wrapper) { + try { + onMessage((Message)wrapper.getMessage(), (Session)wrapper.getSession()); + } finally { + // semaphore is only added by target and process listeners. The semaphore is used + // to throttle work into the service. A JMS thread blocks if semaphore permits are + // exhausted. The blocking is done in PriorityMessageHandler class. The JmsInputChannel + // and PriorityMessageHandler are the only classes operating on the semaphore. + if ( wrapper.getSemaphore() != null ) { + wrapper.getSemaphore().release(); + } + } + } /** * Receives Messages from the JMS Provider. It checks the message header to determine the type of * message received. Based on the type, a MessageContext is created to facilitate access to the @@ -810,10 +824,11 @@ public class JmsInputChannel implements } return ll; } - public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer messageListener) { - this.messageListener = messageListener; + public synchronized void setListenerContainer(UimaDefaultMessageListenerContainer jmsL) { + this.messageListener = jmsL; System.setProperty("BrokerURI", messageListener.getBrokerUrl()); - if ( messageListener.getMessageSelector() !=null && messageListener.getMessageSelector().equals("Command=2001") ) { + + if ( jmsL.isGetMetaListener() ) { brokerURL = messageListener.getBrokerUrl(); getController().getOutputChannel().setServerURI(brokerURL); } @@ -825,8 +840,9 @@ public class JmsInputChannel implements getController().addInputChannel(this); messageListener.setController(getController()); } catch (Exception e) { + e.printStackTrace(); } - } + } } public ActiveMQConnectionFactory getConnectionFactory() { @@ -1099,6 +1115,66 @@ public class JmsInputChannel implements new Object[] { getController().getComponentName(), connector.getEndpointName() }); } } + public void createListenerForTargetedMessages() throws Exception { + List<UimaDefaultMessageListenerContainer> listeners = + getListeners(); + // the TargetServiceId property value will become part of a jms selector. + String targetStringSelector = ""; + if ( System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) { + targetStringSelector = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty); + } else { + // the default selector is IP:PID + String ip = InetAddress.getLocalHost().getHostAddress(); + targetStringSelector = ip+":"+controller.getPID(); + } + // find a listener instance which handles Process requests. The targeted + // listener created here will share a Connection Factory and ThreadFactory. + // + for(UimaDefaultMessageListenerContainer listener : listeners ) { + // Is this a Process listener instance? Check the selector + if ( listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX) ) { + // this will be a dedicated listener which handles targeted messages + UimaDefaultMessageListenerContainer targetedListener = new UimaDefaultMessageListenerContainer(); + // setup jms selector + if ( getController().isCasMultiplier()) { + targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)"); + } else { + targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+" = '"+targetStringSelector+"' AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000 OR Command=2002)"); + } + + // use shared ConnectionFactory + targetedListener.setConnectionFactory(listener.getConnectionFactory()); + // mark the listener as a 'Targeted' listener + targetedListener.setTargetedListener(); + targetedListener.setController(getController()); + // there will only be one AMQ delivery thread. Its job will be to + // add a targeted message to a BlockingQueue. Such thread will block + // in an enqueue if a dequeue is not available. This will be prevent + // the overwhelming the service with messages. + targetedListener.setConcurrentConsumers(1); + if ( listener.getMessageListener() instanceof PriorityMessageHandler ) { + // the targeted listener will use the same message handler as the + // Process listener. This handler will add a message wrapper + // to enable prioritizing messages. + targetedListener.setMessageListener(listener.getMessageListener()); + } + // Same queue as the Process queue + targetedListener.setDestination(listener.getDestination()); + registerListener(targetedListener); + targetedListener.afterPropertiesSet(); + targetedListener.initialize(); + targetedListener.start(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "createListenerForTargetedMessages", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_TARGET_LISTENER__INFO", + new Object[] {targetedListener.getMessageSelector(), controller.getComponentName() }); + } + break; + + } + } + } public void createListener(String aDelegateKey, Endpoint endpointToUpdate) throws Exception { if (getController() instanceof AggregateAnalysisEngineController) { Delegate delegate = ((AggregateAnalysisEngineController) getController()) Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java Thu Dec 20 14:40:58 2018 @@ -25,16 +25,16 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.net.ConnectException; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -74,8 +74,8 @@ import org.apache.uima.aae.jmx.ServicePe import org.apache.uima.aae.message.AsynchAEMessage; import org.apache.uima.aae.message.UIMAMessage; import org.apache.uima.aae.monitor.Monitor; -import org.apache.uima.aae.monitor.statistics.LongNumericStatistic; import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics; +import org.apache.uima.aae.monitor.statistics.LongNumericStatistic; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.cas.CAS; import org.apache.uima.cas.SerialFormat; @@ -83,14 +83,6 @@ import org.apache.uima.cas.impl.XmiSeria import org.apache.uima.resource.metadata.ProcessingResourceMetaData; import org.apache.uima.util.Level; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import org.apache.uima.resource.ResourceProcessException; - - import com.thoughtworks.xstream.XStream; public class JmsOutputChannel implements OutputChannel { @@ -140,6 +132,8 @@ public class JmsOutputChannel implements private Semaphore connectionSemaphore = new Semaphore(1); public JmsOutputChannel() { + UimaSerializer.initXStream(xstream); + try { if( System.getenv("IP") != null ) { hostIP = System.getenv("IP"); @@ -364,16 +358,17 @@ public class JmsOutputChannel implements private void invalidateConnectionAndEndpoints(BrokerConnectionEntry brokerConnectionEntry ) { Connection conn = brokerConnectionEntry.getConnection(); try { - if ( conn != null && ((ActiveMQConnection)conn).isClosed()) { + if ( conn != null) + if ( conn != null && !((ActiveMQConnection)conn).isClosed()) { for (Entry<Object, JmsEndpointConnection_impl> endpoints : brokerConnectionEntry.endpointMap .entrySet()) { endpoints.getValue().close(); // close session and producer } - brokerConnectionEntry.getConnection().stop(); brokerConnectionEntry.getConnection().close(); brokerConnectionEntry.setConnection(null); } } catch (Exception e) { + e.printStackTrace(); // Ignore this for now. Attempting to close connection that has been closed // Ignore we are shutting down } finally { @@ -648,7 +643,6 @@ public class JmsOutputChannel implements connectionSemaphore.release(); } - //System.out.println("+++++++++++++++++++++ ConnectionMap Size:"+connectionMap.size()); return endpointConnection; } @@ -850,10 +844,21 @@ public class JmsOutputChannel implements // If this service is a Cas Multiplier add to the message a FreeCasQueue. // The client may need send Stop request to that queue. if (aCommand == AsynchAEMessage.ServiceInfo - && getAnalysisEngineController().isCasMultiplier() && freeCASTempQueue != null) { - // Attach a temp queue to the outgoing message. This a queue where - // Free CAS notifications need to be sent from the client - tm.setJMSReplyTo(freeCASTempQueue); + && getAnalysisEngineController().isCasMultiplier() ) { + if ( freeCASTempQueue != null ) { + // Attach a temp queue to the outgoing message. This a queue where + // Free CAS notifications need to be sent from the client + tm.setJMSReplyTo(freeCASTempQueue); + } + // new services will receive FreeCas request via a targeted queue + StringBuffer selector = new StringBuffer(). + append("TargetServiceId = "). + append("'").append(hostIP).append(":"). + append(getAnalysisEngineController().getPID()). + append("' AND"). + append(UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX); + tm.setStringProperty(AsynchAEMessage.TargetingSelector,selector.toString()); + } // Check if there was a failure while sending a message if ( !endpointConnection.send(tm, 0, false, notifyOnJmsException) && notifyOnJmsException ) { @@ -1387,7 +1392,7 @@ public class JmsOutputChannel implements */ private void populateHeaderWithRequestContext(Message aMessage, Endpoint anEndpoint, int aCommand) throws Exception { - aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); + aMessage.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); aMessage.setIntProperty(AsynchAEMessage.Command, aCommand); // TODO override default based on system property aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true); Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java Thu Dec 20 14:40:58 2018 @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -38,6 +39,9 @@ import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.InvalidDestinationException; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; import javax.jms.TemporaryQueue; import org.apache.activemq.ActiveMQConnection; @@ -46,8 +50,11 @@ import org.apache.activemq.command.Activ import org.apache.uima.UIMAFramework; import org.apache.uima.aae.InputChannel; import org.apache.uima.aae.UIMAEE_Constants; +import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory; //import org.apache.uima.aae.UimaASCredentials; import org.apache.uima.aae.UimaAsThreadFactory; +import org.apache.uima.aae.client.UimaAsynchronousEngine; +//import org.apache.uima.aae.UimaAsThreadFactory.UsedFor; import org.apache.uima.aae.controller.AggregateAnalysisEngineController; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState; @@ -57,21 +64,30 @@ import org.apache.uima.aae.delegate.Dele import org.apache.uima.aae.error.ErrorHandler; import org.apache.uima.aae.error.Threshold; import org.apache.uima.aae.error.handler.GetMetaErrorHandler; +import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageWrapper; import org.apache.uima.adapter.jms.JmsConstants; +import org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry; import org.apache.uima.resource.ResourceInitializationException; import org.apache.uima.util.Level; import org.springframework.core.task.TaskExecutor; import org.springframework.jms.JmsException; import org.springframework.jms.listener.AbstractJmsListeningContainer; import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.destination.DestinationResolver; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class UimaDefaultMessageListenerContainer extends DefaultMessageListenerContainer implements ExceptionListener { - private static final Class CLASS_NAME = UimaDefaultMessageListenerContainer.class; - + private static final Class<?> CLASS_NAME = UimaDefaultMessageListenerContainer.class; + public static final String PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002)"; + public static final String CM_PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR Command=2002 OR Command=2005)"; + public static final String GETMETA_SELECTOR_SUFFIX = "(Command=2001)"; + + public static final int HIGH_PRIORITY = 9; + private String destinationName = ""; private Endpoint endpoint; @@ -125,6 +141,10 @@ public class UimaDefaultMessageListenerC private volatile boolean logListenerFailure=true; private static CountDownLatch recoveryLatch = new CountDownLatch(4); + // indicates if this listener is dedicated to pull targeted messages which are + // messages with a selector. + private boolean targetedListener = false; + public UimaDefaultMessageListenerContainer() { super(); // reset global static. This only effects unit testing as services are deployed @@ -143,12 +163,28 @@ public class UimaDefaultMessageListenerC this(); this.freeCasQueueListener = freeCasQueueListener; } + + public void setTargetedListener() { + targetedListener = true; + } + private static boolean connectionClosedOrFailed(ActiveMQConnection connection) { + if (connection == null + || connection.isClosed() + || connection.isClosing() + || connection.isTransportFailed()) { + return true; + } + return false; + } + /** * Overriden Spring's method that tries to recover from lost connection. We dont * want to recover when the service is stopping. */ protected void refreshConnectionUntilSuccessful() { - boolean doLogFailureMsg = true; + // System.out.println("............refreshConnectionUntilSuccessful() called"); + + boolean doLogFailureMsg = true; try { // Only one listener thread should enter to recover lost connection. // Seems like spring recovery api is not reentrant. If multiple listeners @@ -156,6 +192,16 @@ public class UimaDefaultMessageListenerC // on observing jconsole attached to uima-as service with multiple listeners // on an endpoint. synchronized(UimaDefaultMessageListenerContainer.class ) { + ActiveMQConnection c = null;//(ActiveMQConnection)super.getSharedConnection(); + try { + c = (ActiveMQConnection)super.getSharedConnection(); + } catch( SharedConnectionNotInitializedException ee) { + // the onnectionClosedOrFailed(c) below will test for null + } + if ( !connectionClosedOrFailed(c) ) { + //System.out.println("............. Thread:"+Thread.currentThread().getId()+" Connection restored - returning"); + return; + } while (isRunning() && !terminating ) { Connection tcon = null; try { @@ -438,6 +484,7 @@ public class UimaDefaultMessageListenerC * @param t */ private void handleQueueFailure(Throwable t) { + // System.out.println("............handleQueueFailure() called"); final String endpointName = (getDestination() == null) ? "" : ((ActiveMQDestination) getDestination()).getPhysicalName(); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { @@ -556,6 +603,10 @@ public class UimaDefaultMessageListenerC * This method is called by Spring when a listener fails */ protected void handleListenerSetupFailure(Throwable t, boolean alreadyHandled) { + if ( t.getCause() instanceof InterruptedException ) { +// System.out.println("............handleListenerFailure(Throwable t, boolean alreadyHandled) called - Cause:"+t); + return; + } // If shutdown already, nothing to do // If controller is stopping no need to recover the connection if (awaitingShutdown || terminating || (controller != null && controller.isStopped()) ) { @@ -646,10 +697,13 @@ public class UimaDefaultMessageListenerC } protected void handleListenerException(Throwable t) { + // System.out.println("............handleListenerException(Throwable t)"); + // Already shutdown, nothing to do if (awaitingShutdown) { return; } + /* String endpointName = (getDestination() == null) ? "" : ((ActiveMQDestination) getDestination()).getPhysicalName(); @@ -659,6 +713,7 @@ public class UimaDefaultMessageListenerC "UIMAJMS_jms_listener_failed_WARNING", new Object[] { endpointName, getBrokerUrl(), t }); } + */ super.handleListenerException(t); } @@ -676,15 +731,14 @@ public class UimaDefaultMessageListenerC super.setConnectionFactory(connectionFactory); } - private void injectTaskExecutor() { - super.setTaskExecutor(taskExecutor); - } - private boolean isGetMetaListener() { + public boolean isGetMetaListener() { + return getMessageSelector() != null - && __listenerRef.getMessageSelector().equals("Command=2001"); + && __listenerRef.getMessageSelector().endsWith(GETMETA_SELECTOR_SUFFIX); +// && __listenerRef.getMessageSelector().endsWith("(Command=2001)"); } - + private boolean isActiveMQDestination() { return getDestination() != null && getDestination() instanceof ActiveMQDestination; } @@ -728,8 +782,10 @@ public class UimaDefaultMessageListenerC **/ public void setMessageListener(Object messageListener) { ml = messageListener; - if (this.freeCasQueueListener) { + if (this.freeCasQueueListener || targetedListener ) { super.setMessageListener(messageListener); + } else if ( endpoint != null && endpoint.isTempReplyDestination()) { + super.setMessageListener(messageListener); } } public void afterPropertiesSet() { @@ -783,13 +839,18 @@ public class UimaDefaultMessageListenerC pluginThreadPool = true; } } else { - super.setConcurrentConsumers(cc); + super.setConcurrentConsumers(1); + //if ( targetedListener ) { + //super.setConcurrentConsumers(1); + //} else { + // super.setConcurrentConsumers(cc); + //} pluginThreadPool = true; } Thread t = new Thread(threadGroup, new Runnable() { public void run() { Destination destination = __listenerRef.getDestination(); - try { + try { // Wait until the connection factory is injected by Spring while (connectionFactory == null) { try { @@ -816,7 +877,7 @@ public class UimaDefaultMessageListenerC } // Plug in connection Factory to Spring's Listener __listenerRef.injectConnectionFactory(); - + if ( pluginThreadPool ) { setUimaASThreadPoolExecutor(cc); } @@ -824,13 +885,13 @@ public class UimaDefaultMessageListenerC // Initialize the TaskExecutor. This call injects a custom Thread Pool into the // TaskExecutor provided in the spring xml. The custom thread pool initializes // an instance of AE in a dedicated thread - if ( getMessageSelector() != null && !isGetMetaListener()) { + if ( !isGetMetaListener()) { initializeTaskExecutor(cc); } - if ( threadPoolExecutor == null ) { - // Plug in TaskExecutor to Spring's Listener - __listenerRef.injectTaskExecutor(); - } +// if ( threadPoolExecutor == null ) { +// // Plug in TaskExecutor to Spring's Listener +// __listenerRef.injectTaskExecutor(); +// } if ( propagate ) { // Notify Spring Listener that all properties are ready __listenerRef.allPropertiesSet(); @@ -863,7 +924,6 @@ public class UimaDefaultMessageListenerC "UIMAJMS_listener_ready__INFO", new Object[] {controller.getComponentName(), getBrokerUrl(), getDestination() }); } - } catch (Exception e) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), @@ -892,7 +952,7 @@ public class UimaDefaultMessageListenerC // by Spring on a different thread while ((((JmsInputChannel) pojoListener).getController()) == null) { try { - Thread.currentThread().sleep(50); + Thread.currentThread().wait(50); } catch (Exception e) { } } @@ -956,7 +1016,7 @@ public class UimaDefaultMessageListenerC public void closeConnection() throws Exception { try { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { - String msg = ".................... ection() Called"; + String msg = ".................... closeConnection() Called"; UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "closeConnection", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST", new Object[] { msg }); @@ -1041,6 +1101,7 @@ public class UimaDefaultMessageListenerC if (awaitingShutdown) { return; } + String endpointName = (getDestination() == null) ? "" : ((ActiveMQDestination) getDestination()).getPhysicalName(); @@ -1081,13 +1142,49 @@ public class UimaDefaultMessageListenerC public void doDestroy() { super.destroy(); } + /** + * Called by Spring + */ public void setMessageSelector( String messageSelector) { + if ( !messageSelector.startsWith(UimaAsynchronousEngine.TargetSelectorProperty)) { + messageSelector = UimaAsynchronousEngine.TargetSelectorProperty+" is NULL AND("+messageSelector+")"; + } + //this.doInvokeListene super.setMessageSelector(messageSelector); // turn off auto startup. Selectors are only used on input queues. We dont // want listeners on this queue to start now. Once the service initializes // we will start listeners on input queue. this.setAutoStartup(false); } + private boolean isProcessListener() { + return getMessageSelector().endsWith(PROCESS_SELECTOR_SUFFIX); + } + /** + * Callback called by Spring when it receives a messages. Its purpose is to assign high + * priority for targeted messages. + */ + @SuppressWarnings("unchecked") + protected void doInvokeListener(@SuppressWarnings("rawtypes") SessionAwareMessageListener l, Session s, Message m) { + try { + if ( targetedListener ) { + m.setJMSPriority(HIGH_PRIORITY); + m.setJMSType("TargetMessage"); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, CLASS_NAME.getName(), "doInvokeListener", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_MSG_INTERCEPTOR__FINEST", + new Object[] {controller.getComponentName(), m.getJMSPriority(), getMessageSelector() }); + + } + } + //System.out.println("................ doInvokeListener() - Thread ID:"+Thread.currentThread().getId()+" Listener Class:"+l.getClass().getName()+" Controller:"+controller.getComponentName()); + l.onMessage(m, s); + } catch( Throwable t ) { + t.printStackTrace(); + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, this.getClass().getName(), + "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", t); + } + } public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean stopImmediate) throws InterruptedException { tpe.awaitTermination(50, TimeUnit.MILLISECONDS); @@ -1134,6 +1231,10 @@ public class UimaDefaultMessageListenerC amqc.stop(); } awaitingShutdown = true; + + if ( getMessageListener() instanceof PriorityMessageHandler ) { + ((PriorityMessageHandler)getMessageListener()).getQueue().put(new MessageWrapper(null, null, null,HIGH_PRIORITY)); + } if (taskExecutor != null && taskExecutor instanceof ThreadPoolTaskExecutor) { // Modify task executor to terminate idle threads. While the thread terminates // it calls destroy() method on the pinned instance of AE @@ -1159,6 +1260,12 @@ public class UimaDefaultMessageListenerC shutdownTaskExecutor(threadPoolExecutor, true); } } + if ( getTaskExecutor() != null ) { + + if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor ) { + ((ThreadPoolTaskExecutor)getTaskExecutor()).shutdown(); + } + } String controllerName = (__listenerRef.controller == null) ? "" :__listenerRef.controller.getComponentName(); __listenerRef.shutdown(); if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) { @@ -1228,32 +1335,105 @@ public class UimaDefaultMessageListenerC } - private void setUimaASThreadPoolExecutor(int consumentCount) throws Exception{ - super.setMessageListener(ml); + private void setUimaASThreadPoolExecutor(int consumerCount) throws Exception { + if ( isGetMetaListener() ) { + super.setMessageListener(ml); + } else if ( isFreeCasQueueListener()) { + super.setMessageListener(ml); + } else if (endpoint != null && endpoint.isTempReplyDestination()) { + super.setMessageListener(ml); + } else { + if ( isProcessListener() ) { //controller != null && controller instanceof PrimitiveAnalysisEngineController ) { + + // Singleton handler shared by Process CAS listener and a targeted listener. The handler + // onMessage() is called by Spring when a message with a matching selector is available. + // When onMessage() is called, it adds a message to the Priority Queue + // PriorityMessageHandler h = PriorityMessageHandler.getInstance(); + // System.out.println("+++++++++++++ Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector()); + + //super.setMessageListener(h); + // targeted listener should not have its own thread pool because it needs to use + // threads created by the Process Cas Listener. Each of these threads is pinned to + // a dedicated AE initialized at startup. Contract says that each AE process() will be called + // on the same thread that initialized it. The targeted listener and process listener share + // the same handler where CASes are pushed onto a Blocking Priority Queue for processing. + if (!targetedListener && !isFreeCasQueueListener()) { + // System.out.println(">>>>>>>>>>> Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" Selector:"+super.getMessageSelector()); + PriorityMessageHandler h = new PriorityMessageHandler(cc); + + super.setMessageListener(h); + try { + while (controller.getInputChannel() == null) { + synchronized (h) { + h.wait(100); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + //if (isPrimitiveService()) { + latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount); + // Create a Custom Thread Factory. Provide it with an instance of + // PrimitiveController so that every thread can call it to initialize + // the next available instance of a AE. + tf = new UimaAsPriorityBasedThreadFactory(threadGroup, + controller, latchToCountNumberOfTerminatedThreads); + //(PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads); + ((UimaAsPriorityBasedThreadFactory) tf).withQueue(h.getQueue()) + .withChannel(controller.getInputChannel()); + + ((UimaAsPriorityBasedThreadFactory) tf).setDaemon(true); + if ( taskExecutor == null ) { // true for aggregates + taskExecutor = new ThreadPoolTaskExecutor(); + } + // This ThreadExecutor will use custom thread factory instead of defult one + ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf); + // Initialize the thread pool + ((ThreadPoolTaskExecutor) taskExecutor).initialize(); + // Make sure all threads are started. This forces each thread to call + // PrimitiveController to initialize the next instance of AE + ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads(); +// } else { +// +// } + } + } else { + if ( controller.getInputChannel() == null ) { + //System.out.println("............. Error - JmsInputChannel not set yet..."); + } else if ( !(controller.getInputChannel() instanceof MessageListener || + controller.getInputChannel() instanceof SessionAwareMessageListener) ) { + //System.out.println("............. Error - wrong MessageListener type - Getting this:"+controller.getInputChannel().getClass().getName()); + } + super.setMessageListener(controller.getInputChannel()); + } + } // create task executor with custom thread pool for: // 1) GetMeta request processing // 2) ReleaseCAS request - if ( taskExecutor == null ) { - UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup); - tf.setDaemon(false); + if ( !targetedListener && taskExecutor == null ) { + UimaAsThreadFactory utf = new UimaAsThreadFactory(threadGroup); + utf.setDaemon(false); +// tf.defineUsageAs(UsedFor.GetMetaHandling);//setForGetMetaHandling(); if ( isFreeCasQueueListener()) { - tf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread"); + utf.setThreadNamePrefix(controller.getComponentName()+" - FreeCASRequest Thread"); } else if ( isGetMetaListener() ) { - tf.setThreadNamePrefix(super.getBeanName()+" - Thread"); + utf.setThreadNamePrefix(super.getBeanName()+" - Thread"); } else if ( getDestination() != null && getMessageSelector() != null ) { - tf.setThreadNamePrefix(controller.getComponentName() + " Process Thread"); + utf.setThreadNamePrefix(controller.getComponentName() + " Process Thread"); } else if ( endpoint != null && endpoint.isTempReplyDestination() ) { - tf.setThreadNamePrefix(super.getBeanName()+" - Thread"); + utf.setThreadNamePrefix(super.getBeanName()+" - Thread"); } else { throw new Exception("Unknown Context Detected in setUimaASThreadPoolExecutor()"); } - ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf); + ExecutorService es = Executors.newFixedThreadPool(consumerCount,utf); if ( es instanceof ThreadPoolExecutor ) { threadPoolExecutor = (ThreadPoolExecutor)es; super.setTaskExecutor(es); } - } else { + } + /* + else { UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup); tf.setDaemon(true); if ( isFreeCasQueueListener()) { @@ -1269,8 +1449,13 @@ public class UimaDefaultMessageListenerC } } + */ } + private boolean isPrimitiveService() { + return controller != null && controller instanceof PrimitiveAnalysisEngineController && + controller.getInputChannel() != null; + } /** * Called by Spring to inject TaskExecutor @@ -1296,30 +1481,14 @@ public class UimaDefaultMessageListenerC if (controller instanceof PrimitiveAnalysisEngineController) { // in case the taskExecutor is not plugged in yet, wait until one // becomes available. The TaskExecutor is plugged in by Spring - synchronized (mux2) { - while (taskExecutor == null) { - mux2.wait(20); - } - } - latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumers); - // Create a Custom Thread Factory. Provide it with an instance of - // PrimitiveController so that every thread can call it to initialize - // the next available instance of a AE. - tf = new UimaAsThreadFactory(threadGroup, (PrimitiveAnalysisEngineController) controller, latchToCountNumberOfTerminatedThreads); - ((UimaAsThreadFactory)tf).setDaemon(true); - // This ThreadExecutor will use custom thread factory instead of defult one - ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf); - // Initialize the thread pool - ((ThreadPoolTaskExecutor) taskExecutor).initialize(); - // Make sure all threads are started. This forces each thread to call - // PrimitiveController to initialize the next instance of AE - ((ThreadPoolTaskExecutor) taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads(); - // Change the state of a collocated service - if ( !controller.isTopLevelComponent() ) { - controller.changeState(ServiceState.RUNNING); + if ( !targetedListener && !isFreeCasQueueListener() ) { + synchronized (mux2) { + while (taskExecutor == null) { + mux2.wait(20); + } + } } } - if ( threadPoolExecutor != null ) { threadPoolExecutor.prestartAllCoreThreads(); } Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java Thu Dec 20 14:40:58 2018 @@ -24,191 +24,400 @@ import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; +import javax.jms.IllegalStateException; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQMessageProducer; -import org.apache.activemq.ActiveMQSession; import org.apache.activemq.command.ActiveMQDestination; import org.apache.uima.UIMAFramework; import org.apache.uima.aae.UIMAEE_Constants; +import org.apache.uima.aae.client.UimaASProcessStatus; +import org.apache.uima.aae.client.UimaASProcessStatusImpl; +import org.apache.uima.aae.client.UimaAsynchronousEngine; +import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.UimaMessageValidator; import org.apache.uima.adapter.jms.JmsConstants; +import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest; import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection; +import org.apache.uima.adapter.jms.message.PendingMessage; +import org.apache.uima.cas.CAS; +import org.apache.uima.cas.SerialFormat; import org.apache.uima.util.Level; +import org.apache.uima.util.impl.ProcessTrace_impl; /** - * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a - * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with - * sending messages. The application threads share a common 'queue' with the worker thread. The - * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes - * them. + * Initializes JMS session and creates JMS MessageProducer to be used for + * sending messages to a given destination. It extends BaseMessageSender which + * starts the worker thread and is tasked with sending messages. The application + * threads share a common 'queue' with the worker thread. The application + * threads add messages to the pendingMessageList 'queue' and the worker thread + * consumes them. * */ public class ActiveMQMessageSender extends BaseMessageSender { - private static final Class CLASS_NAME = ActiveMQMessageSender.class; + private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class; - private volatile Connection connection = null; + private volatile Connection connection = null; - private Session session = null; + private Session session = null; - private MessageProducer producer = null; + private MessageProducer producer = null; - private String destinationName = null; - - private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>(); - - public ActiveMQMessageSender(Connection aConnection, String aDestinationName, - BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception { - super(engine); - connection = aConnection; - destinationName = aDestinationName; - } - - public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception { - if (producerMap.containsKey(destination)) { - return (MessageProducer) producerMap.get(destination); - } - createSession(); - MessageProducer mProducer = session.createProducer(destination); - mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producerMap.put(destination, mProducer); - return mProducer; - } - /** - * This is called when a new Connection is created after broker is restarted - */ - public void setConnection(Connection aConnection) { - connection = aConnection; - cleanup(); - try { - initializeProducer(); - } catch( Exception e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); - } - } - - } - private String getBrokerURL() { - try { - return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL(); - } catch (Exception ex) { /* handle silently. */ - } - return ""; - } - - private void createSession() throws Exception { - String broker = getBrokerURL(); - try { - if (session == null || engine.producerInitialized == false) { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - } catch (JMSException e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_failed_creating_session_INFO", - new Object[] { destinationName, broker }); - } - if (connection == null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker }); - } - } else if (((ActiveMQConnection) connection).isClosed() - || ((ActiveMQConnection) connection).isClosing()) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME) - .logrb(Level.INFO, CLASS_NAME.getName(), "createSession", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_connection_closed_INFO", - new Object[] { destinationName, broker }); - } - } - throw e; - } catch (Exception e) { - throw e; - } - } - - /** - * Creates a jms session object used to instantiate message producer - */ - protected void initializeProducer() throws Exception { - createSession(); - producer = getMessageProducer(session.createQueue(destinationName)); - } - - /** - * Returns the full name of the destination queue - */ - protected String getDestinationEndpoint() throws Exception { - return ((ActiveMQDestination) producer.getDestination()).getPhysicalName(); - } - - /** - * Returns jsm MessageProducer - */ - public MessageProducer getMessageProducer() { - if ( engine.running && engine.producerInitialized == false ) { - try { - SharedConnection con = engine.lookupConnection(getBrokerURL()); - if ( con != null ) { - setConnection(con.getConnection()); - initializeProducer(); - engine.producerInitialized = true; - } - } catch( Exception e) { - e.printStackTrace(); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); - } - } - } - return producer; - } - - public TextMessage createTextMessage() throws Exception { - if (session == null) { - // Force initialization of Producer - initializeProducer(); - } - return session.createTextMessage(""); - } - - public BytesMessage createBytesMessage() throws Exception { - if (session == null) { - // Force initialization of Producer - initializeProducer(); - } - return session.createBytesMessage(); - } - - /** - * Cleanup any jms resources used by the worker thread - */ - protected void cleanup() { - try { - if (session != null) { - session.close(); - session = null; - } - if (producer != null) { - producer.close(); - producer = null; - } - } catch (Exception e) { - // Ignore we are shutting down - } finally { - producerMap.clear(); - } - } + private String destinationName = null; + + private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>(); + + public ActiveMQMessageSender(Connection aConnection, String aDestinationName, + BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception { + super(engine); + connection = aConnection; + destinationName = aDestinationName; + } + + public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception { + if (producerMap.containsKey(destination)) { + return (MessageProducer) producerMap.get(destination); + } + createSession(); + MessageProducer mProducer = session.createProducer(destination); + mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producerMap.put(destination, mProducer); + return mProducer; + } + + /** + * This is called when a new Connection is created after broker is restarted + */ + public void setConnection(Connection aConnection) { + connection = aConnection; + cleanup(); + try { + initializeProducer(); + } catch (Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "setConnection", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); + } + } + + } + + private String getBrokerURL() { + try { + return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL(); + } catch (Exception ex) { /* handle silently. */ + } + return ""; + } + + private void createSession() throws Exception { + String broker = getBrokerURL(); + try { + if (session == null || engine.producerInitialized == false) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + } catch (JMSException e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO", + new Object[] { destinationName, broker }); + } + if (connection == null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO", + new Object[] { broker }); + } + } else if (((ActiveMQConnection) connection).isClosed() || ((ActiveMQConnection) connection).isClosing()) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO", + new Object[] { destinationName, broker }); + } + } + throw e; + } catch (Exception e) { + throw e; + } + } + + /** + * Returns the full name of the destination queue + */ + protected String getDestinationEndpoint() throws Exception { + return ((ActiveMQDestination) producer.getDestination()).getPhysicalName(); + } + + /** + * Creates a jms session object used to instantiate message producer + */ + protected void initializeProducer() throws Exception { + createSession(); + producer = getMessageProducer(session.createQueue(destinationName)); + } + + /** + * Returns jsm MessageProducer + */ + public MessageProducer getMessageProducer() { + if (engine.running && engine.producerInitialized == false) { + try { + SharedConnection con = engine.lookupConnection(getBrokerURL()); + if (con != null) { + setConnection(con.getConnection()); + initializeProducer(); + engine.producerInitialized = true; + } + } catch (Exception e) { + e.printStackTrace(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "getMessageProducer", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); + } + } + } + return producer; + } + + public TextMessage createTextMessage() throws Exception { + synchronized (ActiveMQMessageSender.class) { + if (session == null) { + // Force initialization of Producer + initializeProducer(); + } + // return session.createTextMessage(""); + TextMessage msg = null; + try { + msg = session.createTextMessage(""); + } catch (IllegalStateException e) { + // stale Session + session = null; + initializeProducer(); + msg = session.createTextMessage(""); + } + return msg; + } + + } + + public BytesMessage createBytesMessage() throws Exception { + synchronized (ActiveMQMessageSender.class) { + if (session == null) { + // Force initialization of Producer + initializeProducer(); + } + BytesMessage msg = null; + try { + msg = session.createBytesMessage(); + } catch (IllegalStateException e) { + // stale Session + session = null; + initializeProducer(); + msg = session.createBytesMessage(); + } + return msg; + } + + // return session.createBytesMessage(); + } + + /** + * Cleanup any jms resources used by the worker thread + */ + protected void cleanup() { + try { + if (session != null) { + session.close(); + session = null; + } + if (producer != null) { + producer.close(); + producer = null; + } + } catch (Exception e) { + // Ignore we are shutting down + } finally { + producerMap.clear(); + } + } + + protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, + boolean casProcessRequest) throws Exception { + SharedConnection sc = engine.lookupConnection(engine.getBrokerURI()); + ClientRequest cacheEntry = null; + boolean doCallback = false; + boolean addTimeToLive = true; + Session jmsSession = null; + + // Check the environment for existence of NoTTL tag. If present, + // the deployer of the service wants to disable message expiration. + if (System.getProperty("NoTTL") != null) { + addTimeToLive = false; + } + try { + // long t1 = System.currentTimeMillis(); + jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Request JMS Message from the concrete implementation + Message message = null; + // Determine if this a CAS Process Request + // boolean casProcessRequest = isProcessRequest(pm); + // Only Process request can be serialized as binary + if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) { + message = jmsSession.createBytesMessage(); + } else { + message = jmsSession.createTextMessage(); + } + // get the producer initialized from a valid connection + // producer = getMessageProducer(); + + Destination d = null; + String selector = null; + // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue + // instead of a temp queue. Regular queues can be recovered in case of + // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +. + // Code in JmsOutputChannel will add the selector if the service is a CM. + if (pm.get(AsynchAEMessage.TargetingSelector) != null) { + selector = (String) pm.get(AsynchAEMessage.TargetingSelector); + } + if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS + || pm.getMessageType() == AsynchAEMessage.Stop)) { + d = (Destination) pm.get(AsynchAEMessage.Destination); + + } else { + d = jmsSession.createQueue(destinationName); + } + MessageProducer mProducer = jmsSession.createProducer(d); + mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // System.out.println(">>>>>>> Time to create and initialize JMS + // Sesssion:"+(System.currentTimeMillis()-t1)); + super.initializeMessage(pm, message); + String destination = ((ActiveMQDestination) d).getPhysicalName(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", + new Object[] { + UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, + message.getIntProperty(AsynchAEMessage.Command)), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, + message.getIntProperty(AsynchAEMessage.MessageType)), + destination }); + } + if (casProcessRequest) { + cacheEntry = (ClientRequest) engine.getCache().get(pm.get(AsynchAEMessage.CasReference)); + if (cacheEntry != null) { + // CAS cas = cacheEntry.getCAS(); + // enable logging + if (System.getProperty("UimaAsCasTracking") != null) { + message.setStringProperty("UimaAsCasTracking", "enable"); + } + // Target specific service instance if targeting for the CAS is provided + // by the client application + if (cacheEntry.getTargetServiceId() != null) { + // System.out.println("------------Client Sending CAS to Service Instance With + // Id:"+cacheEntry.getTargetServiceId());; + message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, + cacheEntry.getTargetServiceId()); + } + // Use Process Timeout value for the time-to-live property in the + // outgoing JMS message. When this time is exceeded + // while the message sits in a queue, the JMS Server will remove it from + // the queue. What happens with the expired message depends on the + // configuration. Most JMS Providers create a special dead-letter queue + // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the + // DLQ + // are not auto evicted yet and accumulate taking up memory. + long timeoutValue = cacheEntry.getProcessTimeout(); + + if (timeoutValue > 0 && addTimeToLive) { + // Set high time to live value + message.setJMSExpiration(10 * timeoutValue); + } + if (pm.getMessageType() == AsynchAEMessage.Process) { + cacheEntry.setCASDepartureTime(System.nanoTime()); + } + cacheEntry.setCASDepartureTime(System.nanoTime()); + + doCallback = true; + + } else { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING", + new Object[] { pm.get(AsynchAEMessage.CasReference), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, + message.getIntProperty(AsynchAEMessage.Command)), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, + message.getIntProperty(AsynchAEMessage.MessageType)), + destination }); + } + return; // no cacheEntry so just return + } + + } + // start timers + if (casProcessRequest) { + CAS cas = cacheEntry.getCAS(); + + // Add the cas to a list of CASes pending reply. Also start the timer if + // necessary + engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), + engine.timerPerCAS); // true=timer per cas + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE", + new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), + engine.serviceDelegate.toString() }); + } + + } else if (pm.getMessageType() == AsynchAEMessage.GetMeta + && engine.serviceDelegate.getGetMetaTimeout() > 0) { + // timer for PING has been started in sendCAS() + if (!engine.serviceDelegate.isAwaitingPingReply()) { + engine.serviceDelegate.startGetMetaRequestTimer(); + } + } else { + doCallback = false; // dont call onBeforeMessageSend() callback on CPC + } + // Dispatch asynchronous request to Uima AS service + mProducer.send(message); + + if (doCallback) { + UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(), + cacheEntry.getCasReferenceId()); + // Notify engine before sending a message + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE", + new Object[] { pm.get(AsynchAEMessage.CasReference), + String.valueOf(cacheEntry.getCAS().hashCode()) }); + } + // Note the callback is a misnomer. The callback is made *after* the send now + // Application receiving this callback can consider the CAS as delivere to a + // queue + engine.onBeforeMessageSend(status); + + } + } catch( Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), + "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAEE_exception__WARNING", e); + } + } finally { + if (jmsSession != null) { + try { + jmsSession.close(); + } catch (Exception eee) { + + } + } + } + + } } \ No newline at end of file Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java Thu Dec 20 14:40:58 2018 @@ -78,6 +78,7 @@ import org.apache.uima.adapter.jms.JmsCo import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter; import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer; import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext; +import org.apache.uima.adapter.jms.message.PendingMessage; import org.apache.uima.adapter.jms.service.Dd2spring; import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData; import org.apache.uima.cas.CAS; @@ -99,7 +100,7 @@ public class BaseUIMAAsynchronousEngine_ implements UimaAsynchronousEngine, MessageListener, ControllerCallbackListener, ApplicationListener<ApplicationEvent>{ private static final Class CLASS_NAME = BaseUIMAAsynchronousEngine_impl.class; - private MessageSender sender = null; + private ActiveMQMessageSender sender = null; private MessageProducer producer; @@ -137,6 +138,9 @@ public class BaseUIMAAsynchronousEngine_ protected static Lock globalLock = new ReentrantLock(); + //private String serviceTargetSelector = null; + + protected volatile boolean stopped = false; public BaseUIMAAsynchronousEngine_impl() { super(); UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO, @@ -219,7 +223,10 @@ public class BaseUIMAAsynchronousEngine_ SerialFormat serialFormat) throws ResourceProcessException { try { msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName()); - +// // check if this message should target specific service instance +// if ( serviceTargetSelector != null ) { +// msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,serviceTargetSelector); +// } msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process); @@ -308,12 +315,16 @@ public class BaseUIMAAsynchronousEngine_ JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_debug_msg__FINEST", new Object[] { msg }); } - if ( sharedConnection.getClientCount() == 1 ) { + if ( sharedConnection.getClientCount() <= 1 ) { sharedConnection.destroy(); amqc.close(); } - } + } else if ( sharedConnection.getClientCount() <= 1 ) { + + sharedConnection.destroy(); + amqc.close(); + } } catch (Exception exx) {exx.printStackTrace();} } // Delete client's temp reply queue from AMQ Broker @@ -341,7 +352,13 @@ public class BaseUIMAAsynchronousEngine_ } public void stop() { try { - System.out.println(this.getClass().getName()+".stop() - Stopping UIMA-AS Client"); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable( + Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, + CLASS_NAME.getName(), "stop", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_stopping_as_client_INFO"); + } stopConnection(); super.doStop(); @@ -390,7 +407,7 @@ public class BaseUIMAAsynchronousEngine_ } } - public void setCPCMessage(Message msg) throws Exception { + protected void setCPCMessage(Message msg) throws Exception { msg.setStringProperty(AsynchAEMessage.MessageFrom, consumerDestination.getQueueName()); msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); @@ -402,7 +419,35 @@ public class BaseUIMAAsynchronousEngine_ ((TextMessage) msg).setText(""); } } + protected void setFreeCasMessage(Message msg, String aCasReferenceId, String selector) throws Exception { + msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); + msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId); + msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); + msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.ReleaseCAS); + msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); + msg.setJMSReplyTo(consumerDestination); + if ( selector != null ) { + msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, + selector); + } + + if (msg instanceof TextMessage) { + ((TextMessage) msg).setText(""); + } + } + protected void setStopMessage(Message msg, String aCasReferenceId) throws Exception { + msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None); + msg.setStringProperty(AsynchAEMessage.CasReference, aCasReferenceId); + msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request); + msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop); + msg.setStringProperty(UIMAMessage.ServerURI, brokerURI); + msg.setJMSReplyTo(consumerDestination); + + if (msg instanceof TextMessage) { + ((TextMessage) msg).setText(""); + } + } private boolean connectionClosedOrInvalid() { SharedConnection sharedConnection = lookupConnection(brokerURI); if (sharedConnection == null @@ -419,6 +464,9 @@ public class BaseUIMAAsynchronousEngine_ } private SharedConnection createAndInitializeAMQConnection( Semaphore semaphore, String aBrokerURI) throws Exception { + if ( stopped ) { + return null; + } // This only effects Consumer // Create AMQ specific connection validator. It uses // AMQ specific approach to test the state of the connection @@ -697,23 +745,7 @@ public class BaseUIMAAsynchronousEngine_ // throws an exception if verions of UIMA-AS is not compatible with UIMA SDK VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client", "initialize"); -/* - // Check for compatibility with a version of uima sdk. Only check major versions. - if (UimaAsVersion.getMajorVersion() != UimaVersion.getMajorVersion() ) { - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.WARNING, - CLASS_NAME.getName(), - "initialize", - UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_incompatible_version_WARNING", - new Object[] { "UIMA AS Client", UimaAsVersion.getUimajFullVersionString(), - UimaVersion.getFullVersionString() }); - throw new ResourceInitializationException(new AsynchAEException( - "Version of UIMA-AS is Incompatible with a Version of UIMA Core. UIMA-AS Version is built to depend on Core UIMA version:" - + UimaAsVersion.getUimajFullVersionString() + " but is running with version:" - + UimaVersion.getFullVersionString())); - } -*/ + if (running) { throw new ResourceInitializationException(new UIMA_IllegalStateException()); } @@ -758,7 +790,11 @@ public class BaseUIMAAsynchronousEngine_ .intValue(); clientSideJmxStats.setCasPoolSize(casPoolSize); } - +// if ( anApplicationContext.containsKey(UimaAsynchronousEngine.TargetSelectorProperty) ) { +// serviceTargetSelector = +// (String)anApplicationContext.get(UimaAsynchronousEngine.TargetSelectorProperty); +// } + if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) { processTimeout = ((Integer) anApplicationContext.get(UimaAsynchronousEngine.Timeout)) .intValue(); @@ -1304,6 +1340,32 @@ public class BaseUIMAAsynchronousEngine_ // private void stopProducingCases(ClientRequest clientCachedRequest) { private void stopProducingCases(String casReferenceId, Destination cmFreeCasQueue) { + PendingMessage msg = new PendingMessage(AsynchAEMessage.Stop); + msg.put(AsynchAEMessage.Destination, cmFreeCasQueue); + msg.put(AsynchAEMessage.CasReference, casReferenceId); + try { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), + "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_client_sending_stop_to_service__INFO", new Object[] {casReferenceId,cmFreeCasQueue}); + } + sender.dispatchMessage(msg, this, false); + + } catch (Exception ex) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_exception__WARNING", + ex); + } + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), + "stopProducingCases", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, + "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING"); + } + } + + /* try { // if (clientCachedRequest.getFreeCasNotificationQueue() != null) { if (cmFreeCasQueue != null) { @@ -1350,6 +1412,18 @@ public class BaseUIMAAsynchronousEngine_ "UIMAJMS_exception__WARNING", e); } } + */ + } + protected void dispatchFreeCasRequest(String casReferenceId, Message message) throws Exception { + PendingMessage msg = new PendingMessage(AsynchAEMessage.ReleaseCAS); +// if ( message.getStringProperty(AsynchAEMessage.TargetingSelector) != null ) { +// msg.put(AsynchAEMessage.TargetingSelector,message.getStringProperty(AsynchAEMessage.TargetingSelector) ); +// } else { +// msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo()); +// } + msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo()); + msg.put(AsynchAEMessage.CasReference, casReferenceId); + sender.dispatchMessage(msg, this, false); } protected MessageSender getDispatcher() { return sender; Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java URL: http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java?rev=1849399&r1=1849398&r2=1849399&view=diff ============================================================================== --- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java (original) +++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java Thu Dec 20 14:40:58 2018 @@ -26,6 +26,7 @@ import java.io.FileWriter; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -79,6 +80,9 @@ import org.apache.uima.util.XMLInputSour * <li>-uimaEeDebug true causes various debugging things to happen, including *not* deleting the * generated spring file generated by running dd-2-spring. This parameter only affects deployments * specified using the -d parameter that follow it in the command line sequence.</li> + * <li>-TargetServiceId specifies identifier of a service which should process a CAS. This + * identifier must match service's identifier. By default a service is launched with an IP:PID + * identifier but the identifier can be an arbitrary String. </li> * </ul> */ public class RunRemoteAsyncAE { @@ -109,6 +113,8 @@ public class RunRemoteAsyncAE { private BufferedWriter logPerfWriter = null; private boolean needPerfHeaders = true; + + private String selector=null; /** * Start time of the processing - used to compute elapsed time. @@ -196,6 +202,11 @@ public class RunRemoteAsyncAE { cpc_timeout = Integer.parseInt(args[++i]); } else if (args[i].equals(UimaAsynchronousEngine.UimaEeDebug)) { appCtx.put(UimaAsynchronousEngine.UimaEeDebug, args[++i]); + } else if (args[i].equals("-"+UimaAsynchronousEngine.TargetSelectorProperty)) { + selector = args[++i]; + System.out.println("... Target Selector:"+selector); + // when a service is internally deployed (-d option) it will use a selector defined below + System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,selector); } else { System.err.println("Unknown switch " + args[i]); printUsageAndExit(); @@ -253,6 +264,11 @@ public class RunRemoteAsyncAE { appCtx.put(UimaAsynchronousEngine.CasPoolSize, casPoolSize); appCtx.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, Integer.valueOf(fsHeapSize / 4).toString()); + // if configured to use service targeting, tell the client which service to use + String target = null; + if ( (target = System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) ) != null ) { + appCtx.put(UimaAsynchronousEngine.TargetSelectorProperty, target); + } // initialize uimaEEEngine.initialize(appCtx); @@ -266,7 +282,13 @@ public class RunRemoteAsyncAE { } else { // send an empty CAS CAS cas = uimaEEEngine.getCAS(); - uimaEEEngine.sendCAS(cas); + if ( selector != null ) { + System.out.println("Sending CAS to targeted service using sendAndReceive() - target selector:"+selector); + ArrayList<AnalysisEnginePerformanceMetrics> perfList = new ArrayList<AnalysisEnginePerformanceMetrics>(); + uimaEEEngine.sendAndReceiveCAS(cas,perfList , selector); + } else { + uimaEEEngine.sendCAS(cas); + } uimaEEEngine.collectionProcessingComplete(); } if (logPerfWriter != null) { @@ -319,7 +341,11 @@ public class RunRemoteAsyncAE { + "-uimaEeDebug true This is optional. Leave it out for normal operation. If specified, causes" + " additional debugging things to happen, including *not* deleting the generated Spring xml file generated" + " from running dd2spring. It only affects deployments specified using the -d parameter that follow it on" - + " the command line"); + + " the command line\n" + + " -TargetServiceId - service identifier. This is optional. If specified, this" + + " identifier enables targeting of a specific service. The UIMA-AS client will send CASes to" + + " a service instance which was launched with a matching identifier.") + ; System.exit(1); }