Author: schor
Date: Thu Dec 20 14:40:58 2018
New Revision: 1849399

URL: http://svn.apache.org/viewvc?rev=1849399&view=rev
Log:
[UIMA-5937] update pom version and scm, merge v2 updates, record merge

Added:
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
      - copied unchanged from r1846917, 
uima/uima-as/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/PriorityMessageHandler.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml
      - copied unchanged from r1846917, 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AggregateAnnotatorWithDOCTYPE.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml
      - copied unchanged from r1846917, 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_AsyncAggregateWithJmsService.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml
      - copied unchanged from r1846917, 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/deployment/Deploy_NoOpAnnotatorWithTargetingSupport.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml
      - copied unchanged from r1846917, 
uima/uima-as/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWithTargetingSupport.xml
Modified:
    uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/   (props changed)
    uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpAnnotator.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ae/noop/NoOpCC.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASExtended.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/ActiveMQSupport.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/utils/BaseTestSupport.java
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith2SecDelay.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/NoOpAnnotatorWith5SecDelay.xml
    
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/test/resources/descriptors/analysis_engine/SimpleTestAggregate.xml

Propchange: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 20 14:40:58 2018
@@ -1,2 +1,3 @@
 /uima/uima-as/branches/depend-on-parent-pom-4/uimaj-as-activemq:961335-961760
 /uima/uima-as/branches/mavenAlign/uimaj-as-activemq:941450-944450
+/uima/uima-as/trunk/uimaj-as-activemq:1786000-1846917

Modified: uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml (original)
+++ uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/pom.xml Thu Dec 20 14:40:58 2018
@@ -15,7 +15,7 @@
        <parent>
                <groupId>org.apache.uima</groupId>
                <artifactId>uima-as-parent</artifactId>
-               <version>3.0.0-SNAPSHOT</version>
+               <version>3.0.1-SNAPSHOT</version>
                <relativePath>../uima-as-parent/pom.xml</relativePath>
        </parent>
 
@@ -31,13 +31,13 @@
                cutting/pasting the <scm> element, and just changing the 
following two properties -->
        <scm>
                <connection>
-      
scm:svn:http://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq
+      
scm:svn:http://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
     </connection>
                <developerConnection>
-      
scm:svn:https://svn.apache.org/repos/asf/uima/uima-as/trunk/uimaj-as-activemq
+      
scm:svn:https://svn.apache.org/repos/asf/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
     </developerConnection>
                <url>
-      http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-activemq
+      http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq
     </url>
        </scm>
 

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/ConcurrentMessageListener.java
 Thu Dec 20 14:40:58 2018
@@ -30,12 +30,10 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.uima.UIMAFramework;
-import org.apache.uima.aae.InProcessCache.CacheEntry;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.UimaAsThreadFactory;
 import org.apache.uima.aae.UimaBlockingExecutor;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
-import org.apache.uima.aae.controller.AggregateAnalysisEngineController_impl;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.LocalCache.CasStateEntry;
 import org.apache.uima.aae.delegate.Delegate;

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsEndpointConnection_impl.java
 Thu Dec 20 14:40:58 2018
@@ -227,7 +227,7 @@ public class JmsEndpointConnection_impl
                              }
                              if ( brokerDestinations.getConnection() != null ) 
{
                                try {
-                                 //  Close the connection to avoid leaks in 
the broker
+                                       //  Close the connection to avoid leaks 
in the broker
                                  brokerDestinations.getConnection().close();
                                } catch( Exception e) {
                                  //  Ignore exceptions on a close of a bad 
connection
@@ -249,7 +249,9 @@ public class JmsEndpointConnection_impl
                                              ActiveMQConnectionFactory factory 
= new ActiveMQConnectionFactory(brokerUri);
                                              // White list packages for 
deserialization 
                                              factory.setTrustAllPackages(true);
+                                             
factory.setConnectionIDPrefix("JmsOutputChannel");
                                              
factory.setWatchTopicAdvisories(false);
+                                             
                                              //  Create shared jms connection 
to a broker
                                              conn = factory.createConnection();
                                              factory.setDispatchAsync(true);
@@ -755,7 +757,7 @@ public class JmsEndpointConnection_impl
          if ( command == AsynchAEMessage.ServiceInfo ) {
                  return false;
          }
-         if ( (msgType == AsynchAEMessage.Response || msgType == 
AsynchAEMessage.Request ) &&
+         if ( (msgType == AsynchAEMessage.Response || msgType == 
AsynchAEMessage.Request ) &&
                          command == AsynchAEMessage.Process ) {
                  String casReferenceId="";
                  try {
@@ -764,7 +766,7 @@ public class JmsEndpointConnection_impl
                        String key = "";
                        String endpointName = "";
                        if ( delegateEndpoint != null ) {
-                         delegateEndpoint.getDelegateKey();
+                         key = delegateEndpoint.getDelegateKey();
                          endpointName = ((ActiveMQDestination) 
delegateEndpoint.getDestination())
                          .getPhysicalName();
                        }
@@ -781,20 +783,22 @@ public class JmsEndpointConnection_impl
        
        
       if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        
+         
         String key = "";
         String endpointName = "";
         if ( delegateEndpoint != null ) {
-          delegateEndpoint.getDelegateKey();
+          key = delegateEndpoint.getDelegateKey();
           endpointName = ((ActiveMQDestination) 
delegateEndpoint.getDestination())
           .getPhysicalName();
-        }
+          
+            }
         if ( "Client".equals(target) ) {
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
                   "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_service_delivery_to_client_exception__WARNING",
                   new Object[] { controller.getComponentName(),endpointName });
         } else {
+
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
                   "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
                   "UIMAEE_service_delivery_exception__WARNING",new Object[] { 
controller.getComponentName(), key, endpointName});
@@ -1021,8 +1025,8 @@ public class JmsEndpointConnection_impl
                        }
                } else {
                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
-                         "send", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                         "UIMAEE_service_delivery_exception__WARNING",new 
Object[] { controller.getComponentName(), "", endpointName});
+                         "UimaAsAsyncCallbackListener.onException()", 
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAEE_service_delivery_exception_WARNING",new 
Object[] { controller.getComponentName(), exception} );
 
                }
        }

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsInputChannel.java
 Thu Dec 20 14:40:58 2018
@@ -19,6 +19,7 @@
 
 package org.apache.uima.adapter.jms.activemq;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -34,12 +35,12 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
@@ -54,12 +55,12 @@ import org.apache.uima.aae.jmx.RemoteJMX
 import org.apache.uima.aae.jmx.ServiceInfo;
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageWrapper;
 import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.adapter.jms.message.JmsMessageContext;
 import org.apache.uima.util.Level;
 import org.springframework.jms.listener.SessionAwareMessageListener;
-import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * Thin adapter for receiving JMS messages from Spring. It delegates 
processing of all messages to
@@ -558,6 +559,19 @@ public class JmsInputChannel implements
                           validEndpoint(messageContext) &&  
                           
isReplyRequired((Message)messageContext.getRawMessage()) );
   }
+  public void onMessage(MessageWrapper wrapper) {
+         try {
+                 onMessage((Message)wrapper.getMessage(), 
(Session)wrapper.getSession());
+         } finally {
+                 // semaphore is only added by target and process listeners. 
The semaphore is used
+                 // to throttle work into the service. A JMS thread blocks if 
semaphore permits are 
+                 // exhausted. The blocking is done in PriorityMessageHandler 
class. The JmsInputChannel
+                 // and PriorityMessageHandler are the only classes operating 
on the semaphore.
+                 if ( wrapper.getSemaphore() != null ) {
+                         wrapper.getSemaphore().release();
+                 }
+         }
+  }
   /**
    * Receives Messages from the JMS Provider. It checks the message header to 
determine the type of
    * message received. Based on the type, a MessageContext is created to 
facilitate access to the
@@ -810,10 +824,11 @@ public class JmsInputChannel implements
            }
            return ll;
   }
-  public synchronized void 
setListenerContainer(UimaDefaultMessageListenerContainer messageListener) {
-    this.messageListener = messageListener;
+  public synchronized void 
setListenerContainer(UimaDefaultMessageListenerContainer jmsL) {
+    this.messageListener = jmsL;
     System.setProperty("BrokerURI", messageListener.getBrokerUrl());
-    if ( messageListener.getMessageSelector() !=null && 
messageListener.getMessageSelector().equals("Command=2001") ) {
+   
+    if ( jmsL.isGetMetaListener() ) {
       brokerURL = messageListener.getBrokerUrl();
       getController().getOutputChannel().setServerURI(brokerURL);
     }
@@ -825,8 +840,9 @@ public class JmsInputChannel implements
         getController().addInputChannel(this);
         messageListener.setController(getController());
       } catch (Exception e) {
+         e.printStackTrace();
       }
-    }
+    } 
   }
 
   public ActiveMQConnectionFactory getConnectionFactory() {
@@ -1099,6 +1115,66 @@ public class JmsInputChannel implements
                                  new Object[] { 
getController().getComponentName(), connector.getEndpointName() });
          }
   }
+  public void createListenerForTargetedMessages() throws Exception {
+         List<UimaDefaultMessageListenerContainer> listeners =
+                         getListeners();
+         // the TargetServiceId property value will become part of a jms 
selector. 
+         String targetStringSelector = "";
+         if ( 
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) != null ) {
+                 targetStringSelector = 
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty);
+         } else {
+                 // the default selector is IP:PID 
+                 String ip = InetAddress.getLocalHost().getHostAddress();
+                 targetStringSelector = ip+":"+controller.getPID();
+         }
+         // find a listener instance which handles Process requests. The 
targeted 
+         // listener created here will share a Connection Factory and 
ThreadFactory.
+         // 
+         for(UimaDefaultMessageListenerContainer listener : listeners ) {
+             // Is this a Process listener instance? Check the selector
+                 if ( 
listener.getMessageSelector().endsWith(UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX)
 ) {
+                 // this will be a dedicated listener which handles targeted 
messages
+                         UimaDefaultMessageListenerContainer targetedListener 
= new UimaDefaultMessageListenerContainer();
+                         // setup jms selector
+                         if ( getController().isCasMultiplier()) {
+                                 
targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+"
 = '"+targetStringSelector+"' 
AND"+UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);//(Command=2000
 OR Command=2002)");
+                 } else {
+                                 
targetedListener.setMessageSelector(UimaAsynchronousEngine.TargetSelectorProperty+"
 = '"+targetStringSelector+"' 
AND"+UimaDefaultMessageListenerContainer.PROCESS_SELECTOR_SUFFIX);//(Command=2000
 OR Command=2002)");
+                 }
+                         
+                         // use shared ConnectionFactory
+                 
targetedListener.setConnectionFactory(listener.getConnectionFactory());
+                 // mark the listener as a 'Targeted' listener
+                 targetedListener.setTargetedListener();
+                 targetedListener.setController(getController());
+                 // there will only be one AMQ delivery thread. Its job will 
be to
+                 // add a targeted message to a BlockingQueue. Such thread 
will block
+                 // in an enqueue if a dequeue is not available. This will be 
prevent
+                 // the overwhelming the service with messages.
+                 targetedListener.setConcurrentConsumers(1);
+                         if ( listener.getMessageListener() instanceof 
PriorityMessageHandler ) {
+                                 // the targeted listener will use the same 
message handler as the
+                                 // Process listener. This handler will add a 
message wrapper 
+                                 // to enable prioritizing messages. 
+                                 
targetedListener.setMessageListener(listener.getMessageListener());
+                         }
+                         // Same queue as the Process queue
+                         
targetedListener.setDestination(listener.getDestination());
+                 registerListener(targetedListener);
+                 targetedListener.afterPropertiesSet();
+                 targetedListener.initialize();
+                 targetedListener.start();
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                   UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
+                           "createListenerForTargetedMessages", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                           "UIMAJMS_TARGET_LISTENER__INFO",
+                           new Object[] 
{targetedListener.getMessageSelector(), controller.getComponentName() });
+                 }
+                 break;
+
+             }
+         }
+  }
   public void createListener(String aDelegateKey, Endpoint endpointToUpdate) 
throws Exception {
     if (getController() instanceof AggregateAnalysisEngineController) {
       Delegate delegate = ((AggregateAnalysisEngineController) getController())

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/JmsOutputChannel.java
 Thu Dec 20 14:40:58 2018
@@ -25,16 +25,16 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.ConnectException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -74,8 +74,8 @@ import org.apache.uima.aae.jmx.ServicePe
 import org.apache.uima.aae.message.AsynchAEMessage;
 import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.aae.monitor.Monitor;
-import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
 import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.aae.monitor.statistics.LongNumericStatistic;
 import org.apache.uima.adapter.jms.JmsConstants;
 import org.apache.uima.cas.CAS;
 import org.apache.uima.cas.SerialFormat;
@@ -83,14 +83,6 @@ import org.apache.uima.cas.impl.XmiSeria
 import org.apache.uima.resource.metadata.ProcessingResourceMetaData;
 import org.apache.uima.util.Level;
 
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import org.apache.uima.resource.ResourceProcessException;
-
-
 import com.thoughtworks.xstream.XStream;
 
 public class JmsOutputChannel implements OutputChannel {
@@ -140,6 +132,8 @@ public class JmsOutputChannel implements
   private Semaphore connectionSemaphore = new Semaphore(1);
   
   public JmsOutputChannel() {
+       UimaSerializer.initXStream(xstream);
+          
     try {
        if( System.getenv("IP") != null ) {
                  hostIP = System.getenv("IP");
@@ -364,16 +358,17 @@ public class JmsOutputChannel implements
   private void invalidateConnectionAndEndpoints(BrokerConnectionEntry 
brokerConnectionEntry ) {
     Connection conn = brokerConnectionEntry.getConnection();
     try {
-       if ( conn != null && ((ActiveMQConnection)conn).isClosed()) {
+       if ( conn != null)
+       if ( conn != null && !((ActiveMQConnection)conn).isClosed()) {
            for (Entry<Object, JmsEndpointConnection_impl> endpoints : 
brokerConnectionEntry.endpointMap
                    .entrySet()) {
               endpoints.getValue().close(); // close session and producer
            }
-           brokerConnectionEntry.getConnection().stop();
            brokerConnectionEntry.getConnection().close();
            brokerConnectionEntry.setConnection(null);
        }
     } catch (Exception e) {
+       e.printStackTrace();
       // Ignore this for now. Attempting to close connection that has been 
closed
       // Ignore we are shutting down
     } finally {
@@ -648,7 +643,6 @@ public class JmsOutputChannel implements
            connectionSemaphore.release();        
       }
     
-    //System.out.println("+++++++++++++++++++++ ConnectionMap 
Size:"+connectionMap.size());
     return endpointConnection;
   }
 
@@ -850,10 +844,21 @@ public class JmsOutputChannel implements
                // If this service is a Cas Multiplier add to the message a 
FreeCasQueue.
                // The client may need send Stop request to that queue.
                if (aCommand == AsynchAEMessage.ServiceInfo
-                       && getAnalysisEngineController().isCasMultiplier() && 
freeCASTempQueue != null) {
-                 // Attach a temp queue to the outgoing message. This a queue 
where
-                 // Free CAS notifications need to be sent from the client
-                 tm.setJMSReplyTo(freeCASTempQueue);
+                       && getAnalysisEngineController().isCasMultiplier() ) {
+                 if ( freeCASTempQueue != null ) {
+                               // Attach a temp queue to the outgoing message. 
This a queue where
+                         // Free CAS notifications need to be sent from the 
client
+                         tm.setJMSReplyTo(freeCASTempQueue);
+                 }
+                 // new services will receive FreeCas request via a targeted 
queue
+                 StringBuffer selector = new StringBuffer().
+                                 append("TargetServiceId = ").
+                                 append("'").append(hostIP).append(":").
+                                 
append(getAnalysisEngineController().getPID()).
+                                 append("' AND").
+                                 
append(UimaDefaultMessageListenerContainer.CM_PROCESS_SELECTOR_SUFFIX);
+                 
tm.setStringProperty(AsynchAEMessage.TargetingSelector,selector.toString());
+       
                }
                //      Check if there was a failure while sending a message
                if ( !endpointConnection.send(tm, 0, false, 
notifyOnJmsException) && notifyOnJmsException ) {
@@ -1387,7 +1392,7 @@ public class JmsOutputChannel implements
    */
   private void populateHeaderWithRequestContext(Message aMessage, Endpoint 
anEndpoint, int aCommand)
           throws Exception {
-    aMessage.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
+         aMessage.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
     aMessage.setIntProperty(AsynchAEMessage.Command, aCommand);
     // TODO override default based on system property
     aMessage.setBooleanProperty(AsynchAEMessage.AcceptsDeltaCas, true);

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/activemq/UimaDefaultMessageListenerContainer.java
 Thu Dec 20 14:40:58 2018
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -38,6 +39,9 @@ import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 
 import org.apache.activemq.ActiveMQConnection;
@@ -46,8 +50,11 @@ import org.apache.activemq.command.Activ
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.InputChannel;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.UimaAsPriorityBasedThreadFactory;
 //import org.apache.uima.aae.UimaASCredentials;
 import org.apache.uima.aae.UimaAsThreadFactory;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+//import org.apache.uima.aae.UimaAsThreadFactory.UsedFor;
 import org.apache.uima.aae.controller.AggregateAnalysisEngineController;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import 
org.apache.uima.aae.controller.BaseAnalysisEngineController.ServiceState;
@@ -57,21 +64,30 @@ import org.apache.uima.aae.delegate.Dele
 import org.apache.uima.aae.error.ErrorHandler;
 import org.apache.uima.aae.error.Threshold;
 import org.apache.uima.aae.error.handler.GetMetaErrorHandler;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageWrapper;
 import org.apache.uima.adapter.jms.JmsConstants;
+import 
org.apache.uima.adapter.jms.activemq.JmsOutputChannel.BrokerConnectionEntry;
 import org.apache.uima.resource.ResourceInitializationException;
 import org.apache.uima.util.Level;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.JmsException;
 import org.springframework.jms.listener.AbstractJmsListeningContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.SessionAwareMessageListener;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 public class UimaDefaultMessageListenerContainer extends 
DefaultMessageListenerContainer implements
         ExceptionListener {
-  private static final Class CLASS_NAME = 
UimaDefaultMessageListenerContainer.class;
-
+  private static final Class<?> CLASS_NAME = 
UimaDefaultMessageListenerContainer.class;
+  public static final String PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR 
Command=2002)";
+  public static final String CM_PROCESS_SELECTOR_SUFFIX = "(Command=2000 OR 
Command=2002 OR Command=2005)";
+  public static final String GETMETA_SELECTOR_SUFFIX = "(Command=2001)";
+  
+  public static final int HIGH_PRIORITY = 9;
+  
   private String destinationName = "";
 
   private Endpoint endpoint;
@@ -125,6 +141,10 @@ public class UimaDefaultMessageListenerC
   private volatile boolean logListenerFailure=true;
   
   private static CountDownLatch recoveryLatch = new CountDownLatch(4);
+  // indicates if this listener is dedicated to pull targeted messages which 
are
+  // messages with a selector.
+  private boolean targetedListener = false;
+  
   public UimaDefaultMessageListenerContainer() {
     super();
     // reset global static. This only effects unit testing as services are 
deployed 
@@ -143,12 +163,28 @@ public class UimaDefaultMessageListenerC
     this();
     this.freeCasQueueListener = freeCasQueueListener;
   }
+  
+  public void setTargetedListener() {
+         targetedListener = true;
+  }
+  private static boolean connectionClosedOrFailed(ActiveMQConnection 
connection) {
+           if (connection == null
+                   || connection.isClosed()
+                   || connection.isClosing()
+                   || connection.isTransportFailed()) {
+             return true;
+           }
+           return false;
+  }
+
   /**
    * Overriden Spring's method that tries to recover from lost connection. We 
dont 
    * want to recover when the service is stopping.
    */
   protected void refreshConnectionUntilSuccessful() {
-    boolean doLogFailureMsg = true;
+        // System.out.println("............refreshConnectionUntilSuccessful() 
called");
+
+         boolean doLogFailureMsg = true;
     try {
        // Only one listener thread should enter to recover lost connection.
        // Seems like spring recovery api is not reentrant. If multiple 
listeners
@@ -156,6 +192,16 @@ public class UimaDefaultMessageListenerC
        // on observing jconsole attached to uima-as service with multiple 
listeners
        // on an endpoint.  
        synchronized(UimaDefaultMessageListenerContainer.class ) {
+               ActiveMQConnection c = 
null;//(ActiveMQConnection)super.getSharedConnection();
+               try {
+                       c = (ActiveMQConnection)super.getSharedConnection();
+               } catch( SharedConnectionNotInitializedException ee) {
+                       // the onnectionClosedOrFailed(c) below will test for 
null
+               }
+               if ( !connectionClosedOrFailed(c) ) {
+                       //System.out.println("............. 
Thread:"+Thread.currentThread().getId()+" Connection restored - returning");
+                       return;
+               }
            while (isRunning() && !terminating ) {
                Connection tcon = null;
                      try {
@@ -438,6 +484,7 @@ public class UimaDefaultMessageListenerC
    * @param t
    */
   private void handleQueueFailure(Throwable t) {
+       //  System.out.println("............handleQueueFailure() called");
     final String endpointName = (getDestination() == null) ? ""
             : ((ActiveMQDestination) getDestination()).getPhysicalName();
     if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
@@ -556,6 +603,10 @@ public class UimaDefaultMessageListenerC
    * This method is called by Spring when a listener fails
    */
   protected void handleListenerSetupFailure(Throwable t, boolean 
alreadyHandled) {
+    if ( t.getCause() instanceof InterruptedException ) {
+//       System.out.println("............handleListenerFailure(Throwable t, 
boolean alreadyHandled) called - Cause:"+t);
+         return;
+    }
     // If shutdown already, nothing to do
            // If controller is stopping no need to recover the connection
     if (awaitingShutdown || terminating || (controller != null && 
controller.isStopped()) ) {
@@ -646,10 +697,13 @@ public class UimaDefaultMessageListenerC
   }
 
   protected void handleListenerException(Throwable t) {
+        // System.out.println("............handleListenerException(Throwable 
t)");
+         
     // Already shutdown, nothing to do
     if (awaitingShutdown) {
       return;
     }
+    /*
     String endpointName = (getDestination() == null) ? ""
             : ((ActiveMQDestination) getDestination()).getPhysicalName();
 
@@ -659,6 +713,7 @@ public class UimaDefaultMessageListenerC
               "UIMAJMS_jms_listener_failed_WARNING",
               new Object[] { endpointName, getBrokerUrl(), t });
     }
+    */
     super.handleListenerException(t);
   }
 
@@ -676,15 +731,14 @@ public class UimaDefaultMessageListenerC
     super.setConnectionFactory(connectionFactory);
   }
 
-  private void injectTaskExecutor() {
-    super.setTaskExecutor(taskExecutor);
-  }
 
-  private boolean isGetMetaListener() {
+  public boolean isGetMetaListener() {
+         
     return getMessageSelector() != null
-            && __listenerRef.getMessageSelector().equals("Command=2001");
+            && 
__listenerRef.getMessageSelector().endsWith(GETMETA_SELECTOR_SUFFIX);
+//    && __listenerRef.getMessageSelector().endsWith("(Command=2001)");
   }
-
+  
   private boolean isActiveMQDestination() {
     return getDestination() != null && getDestination() instanceof 
ActiveMQDestination;
   }
@@ -728,8 +782,10 @@ public class UimaDefaultMessageListenerC
    **/
   public void setMessageListener(Object messageListener) {
     ml = messageListener;
-    if (this.freeCasQueueListener) {
+    if (this.freeCasQueueListener || targetedListener ) {
       super.setMessageListener(messageListener);
+    } else if ( endpoint != null && endpoint.isTempReplyDestination()) {
+        super.setMessageListener(messageListener);
     }
   }
   public void afterPropertiesSet() {
@@ -783,13 +839,18 @@ public class UimaDefaultMessageListenerC
         pluginThreadPool = true;
       }
     } else {
-      super.setConcurrentConsumers(cc);
+         super.setConcurrentConsumers(1);
+       //if ( targetedListener ) {
+           //super.setConcurrentConsumers(1);
+       //} else {
+        //    super.setConcurrentConsumers(cc);
+       //}
       pluginThreadPool = true;
     }
     Thread t = new Thread(threadGroup, new Runnable() {
       public void run() {
         Destination destination = __listenerRef.getDestination();
-        try {
+         try {
           // Wait until the connection factory is injected by Spring
           while (connectionFactory == null) {
             try {
@@ -816,7 +877,7 @@ public class UimaDefaultMessageListenerC
           }
           // Plug in connection Factory to Spring's Listener
           __listenerRef.injectConnectionFactory();
-          
+
           if ( pluginThreadPool ) {
             setUimaASThreadPoolExecutor(cc);
           }
@@ -824,13 +885,13 @@ public class UimaDefaultMessageListenerC
           // Initialize the TaskExecutor. This call injects a custom Thread 
Pool into the
           // TaskExecutor provided in the spring xml. The custom thread pool 
initializes
           // an instance of AE in a dedicated thread
-          if ( getMessageSelector() != null && !isGetMetaListener()) {
+          if ( !isGetMetaListener()) {
             initializeTaskExecutor(cc);
           }
-          if ( threadPoolExecutor == null ) {
-              // Plug in TaskExecutor to Spring's Listener
-              __listenerRef.injectTaskExecutor();
-          }
+//          if ( threadPoolExecutor == null ) {
+//              // Plug in TaskExecutor to Spring's Listener
+//              __listenerRef.injectTaskExecutor();
+//          }
           if ( propagate ) {
             // Notify Spring Listener that all properties are ready
             __listenerRef.allPropertiesSet();
@@ -863,7 +924,6 @@ public class UimaDefaultMessageListenerC
                     "UIMAJMS_listener_ready__INFO",
                     new Object[] {controller.getComponentName(),  
getBrokerUrl(), getDestination() });
           } 
-
         } catch (Exception e) {
          
           UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
this.getClass().getName(),
@@ -892,7 +952,7 @@ public class UimaDefaultMessageListenerC
       // by Spring on a different thread
       while ((((JmsInputChannel) pojoListener).getController()) == null) {
         try {
-          Thread.currentThread().sleep(50);
+          Thread.currentThread().wait(50);
         } catch (Exception e) {
         }
       }
@@ -956,7 +1016,7 @@ public class UimaDefaultMessageListenerC
   public void closeConnection() throws Exception {
     try {
         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
-               String msg = ".................... ection() Called";
+               String msg = ".................... closeConnection() Called";
              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, 
CLASS_NAME.getName(), "closeConnection",
                     JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_debug_msg__FINEST",
                      new Object[] { msg });
@@ -1041,6 +1101,7 @@ public class UimaDefaultMessageListenerC
     if (awaitingShutdown) {
       return;
     }
+    
     String endpointName = (getDestination() == null) ? ""
             : ((ActiveMQDestination) getDestination()).getPhysicalName();
 
@@ -1081,13 +1142,49 @@ public class UimaDefaultMessageListenerC
   public void doDestroy() {
     super.destroy();
   }
+  /**
+   * Called by Spring
+   */
   public void setMessageSelector( String messageSelector) {
+         if ( 
!messageSelector.startsWith(UimaAsynchronousEngine.TargetSelectorProperty)) {
+                 messageSelector = 
UimaAsynchronousEngine.TargetSelectorProperty+" is NULL 
AND("+messageSelector+")";
+         }
+       //this.doInvokeListene
     super.setMessageSelector(messageSelector);
     //  turn off auto startup. Selectors are only used on input queues. We dont
     //  want listeners on this queue to start now. Once the service 
initializes 
     //  we will start listeners on input queue.
     this.setAutoStartup(false);
   }
+  private boolean isProcessListener() {
+         return getMessageSelector().endsWith(PROCESS_SELECTOR_SUFFIX);
+  }
+  /**
+   * Callback called by Spring when it receives a messages. Its purpose is to 
assign high 
+   * priority for targeted messages. 
+   */
+  @SuppressWarnings("unchecked")
+  protected void doInvokeListener(@SuppressWarnings("rawtypes") 
SessionAwareMessageListener l, Session s, Message m) {
+         try {
+                 if ( targetedListener ) {
+                         m.setJMSPriority(HIGH_PRIORITY);
+                         m.setJMSType("TargetMessage");
+                         if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
+              UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINEST, 
CLASS_NAME.getName(), "doInvokeListener",
+                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_MSG_INTERCEPTOR__FINEST",
+                         new Object[] {controller.getComponentName(), 
m.getJMSPriority(), getMessageSelector() });
+             
+                         }
+                 }
+                 //System.out.println("................ doInvokeListener() - 
Thread ID:"+Thread.currentThread().getId()+" Listener 
Class:"+l.getClass().getName()+" Controller:"+controller.getComponentName());
+                 l.onMessage(m, s);
+         } catch( Throwable t ) {
+                 t.printStackTrace();
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
this.getClass().getName(),
+                  "destroy", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                  "UIMAJMS_exception__WARNING", t);
+         }
+  }
   public void shutdownTaskExecutor(ThreadPoolExecutor tpe, boolean 
stopImmediate) throws InterruptedException {
     tpe.awaitTermination(50, TimeUnit.MILLISECONDS);
     
@@ -1134,6 +1231,10 @@ public class UimaDefaultMessageListenerC
                          amqc.stop();
                  }
                    awaitingShutdown = true;
+        
+               if ( getMessageListener() instanceof PriorityMessageHandler ) {
+                               
((PriorityMessageHandler)getMessageListener()).getQueue().put(new 
MessageWrapper(null, null, null,HIGH_PRIORITY));
+                   }
               if (taskExecutor != null && taskExecutor instanceof 
ThreadPoolTaskExecutor) {
                //      Modify task executor to terminate idle threads. While 
the thread terminates
                //  it calls destroy() method on the pinned instance of AE
@@ -1159,6 +1260,12 @@ public class UimaDefaultMessageListenerC
                  shutdownTaskExecutor(threadPoolExecutor, true);
               }
           }
+          if ( getTaskExecutor() != null ) {
+               
+                 if ( getTaskExecutor() instanceof ThreadPoolTaskExecutor ) {
+                         
((ThreadPoolTaskExecutor)getTaskExecutor()).shutdown();
+                 }
+          }
           String controllerName = (__listenerRef.controller == null) ? "" 
:__listenerRef.controller.getComponentName();
           __listenerRef.shutdown();
           if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINEST)) {
@@ -1228,32 +1335,105 @@ public class UimaDefaultMessageListenerC
     
   }
   
-  private void setUimaASThreadPoolExecutor(int consumentCount) throws 
Exception{
-    super.setMessageListener(ml);
+  private void setUimaASThreadPoolExecutor(int consumerCount) throws Exception 
{
+         if ( isGetMetaListener() ) {
+                   super.setMessageListener(ml);
+         } else if ( isFreeCasQueueListener()) {
+                   super.setMessageListener(ml);
+         } else if (endpoint != null && endpoint.isTempReplyDestination()) {
+                   super.setMessageListener(ml);
+         } else {
+                 if ( isProcessListener() ) { //controller != null && 
controller instanceof PrimitiveAnalysisEngineController ) {
+
+                 // Singleton handler shared by Process CAS listener and a 
targeted listener. The handler
+                 // onMessage() is called by Spring when a message with a 
matching selector is available.
+                 // When onMessage() is called, it adds a message to the 
Priority Queue
+               //  PriorityMessageHandler h = 
PriorityMessageHandler.getInstance();
+                       //      System.out.println("+++++++++++++ 
Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" 
Selector:"+super.getMessageSelector());
+
+                 //super.setMessageListener(h);
+                 // targeted listener should not have its own thread pool 
because it needs to use
+                 // threads created by the Process Cas Listener. Each of these 
threads is pinned to
+                 // a dedicated AE initialized at startup. Contract says that 
each AE process() will be called
+                 // on the same thread that initialized it. The targeted 
listener and process listener share
+                 // the same handler where CASes are pushed onto a Blocking 
Priority Queue for processing.
+                               if (!targetedListener && 
!isFreeCasQueueListener()) {
+                       //              System.out.println(">>>>>>>>>>> 
Controller:"+controller.getComponentName()+" Listener Scaleout="+cc+" 
Selector:"+super.getMessageSelector());
+                                       PriorityMessageHandler h = new 
PriorityMessageHandler(cc);
+
+                                       super.setMessageListener(h);
+                                       try {
+                                               while 
(controller.getInputChannel() == null) {
+                                                       synchronized (h) {
+                                                               h.wait(100);
+                                                       }
+                                               }
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                       }
+                                       //if (isPrimitiveService()) {
+                                               
latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumerCount);
+                                               // Create a Custom Thread 
Factory. Provide it with an instance of
+                                               // PrimitiveController so that 
every thread can call it to initialize
+                                               // the next available instance 
of a AE.
+                                               tf = new 
UimaAsPriorityBasedThreadFactory(threadGroup,
+                                                               controller, 
latchToCountNumberOfTerminatedThreads);
+                                                               
//(PrimitiveAnalysisEngineController) controller, 
latchToCountNumberOfTerminatedThreads);
+                                               
((UimaAsPriorityBasedThreadFactory) tf).withQueue(h.getQueue())
+                                                               
.withChannel(controller.getInputChannel());
+
+                                               
((UimaAsPriorityBasedThreadFactory) tf).setDaemon(true);
+                                               if ( taskExecutor == null ) {  
// true for aggregates
+                                                       taskExecutor = new 
ThreadPoolTaskExecutor();
+                                               }
+                                               // This ThreadExecutor will use 
custom thread factory instead of defult one
+                                               ((ThreadPoolTaskExecutor) 
taskExecutor).setThreadFactory(tf);
+                                               // Initialize the thread pool
+                                               ((ThreadPoolTaskExecutor) 
taskExecutor).initialize();
+                                               // Make sure all threads are 
started. This forces each thread to call
+                                               // PrimitiveController to 
initialize the next instance of AE
+                                               ((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
+//                                     } else {
+//                                             
+//                                     }
+                         }
+                 } else {
+                         if ( controller.getInputChannel() == null ) {
+                                 //System.out.println("............. Error - 
JmsInputChannel not set yet...");
+                         } else if ( !(controller.getInputChannel() instanceof 
MessageListener ||
+                                         controller.getInputChannel() 
instanceof SessionAwareMessageListener) ) {
+                                 //System.out.println("............. Error - 
wrong MessageListener type - Getting 
this:"+controller.getInputChannel().getClass().getName());
+                         }
+                         
super.setMessageListener(controller.getInputChannel());
+                 }
+         }
    
     // create task executor with custom thread pool for:
     // 1) GetMeta request processing
     // 2) ReleaseCAS request
-    if ( taskExecutor == null ) {
-      UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
-      tf.setDaemon(false);
+    if ( !targetedListener && taskExecutor == null ) {
+      UimaAsThreadFactory utf = new UimaAsThreadFactory(threadGroup);
+      utf.setDaemon(false);
+//      tf.defineUsageAs(UsedFor.GetMetaHandling);//setForGetMetaHandling();
       if ( isFreeCasQueueListener()) {
-        tf.setThreadNamePrefix(controller.getComponentName()+" - 
FreeCASRequest Thread");
+        utf.setThreadNamePrefix(controller.getComponentName()+" - 
FreeCASRequest Thread");
       } else if ( isGetMetaListener()  ) {
-        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+        utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
       } else if ( getDestination() != null && getMessageSelector() != null ) {
-        tf.setThreadNamePrefix(controller.getComponentName() + " Process 
Thread");
+        utf.setThreadNamePrefix(controller.getComponentName() + " Process 
Thread");
       } else if ( endpoint != null && endpoint.isTempReplyDestination() ) {
-        tf.setThreadNamePrefix(super.getBeanName()+" - Thread");
+        utf.setThreadNamePrefix(super.getBeanName()+" - Thread");
       } else { 
         throw new Exception("Unknown Context Detected in 
setUimaASThreadPoolExecutor()");
       }
-      ExecutorService es = Executors.newFixedThreadPool(consumentCount,tf);
+      ExecutorService es = Executors.newFixedThreadPool(consumerCount,utf);
       if ( es instanceof ThreadPoolExecutor ) {
           threadPoolExecutor = (ThreadPoolExecutor)es;
           super.setTaskExecutor(es);
       }
-    } else {
+    }
+    /*
+    else {
         UimaAsThreadFactory tf = new UimaAsThreadFactory(threadGroup);
         tf.setDaemon(true);
         if ( isFreeCasQueueListener()) {
@@ -1269,8 +1449,13 @@ public class UimaDefaultMessageListenerC
         }
         
     }
+    */
   }
 
+  private boolean isPrimitiveService() {
+         return controller != null && controller instanceof 
PrimitiveAnalysisEngineController &&
+                         controller.getInputChannel() != null;
+  }
   
   /**
    * Called by Spring to inject TaskExecutor
@@ -1296,30 +1481,14 @@ public class UimaDefaultMessageListenerC
     if (controller instanceof PrimitiveAnalysisEngineController) {
       // in case the taskExecutor is not plugged in yet, wait until one
       // becomes available. The TaskExecutor is plugged in by Spring
-      synchronized (mux2) {
-        while (taskExecutor == null) {
-          mux2.wait(20);
-        }
-      }
-      latchToCountNumberOfTerminatedThreads = new CountDownLatch(consumers);
-      // Create a Custom Thread Factory. Provide it with an instance of
-      // PrimitiveController so that every thread can call it to initialize
-      // the next available instance of a AE.
-      tf = new UimaAsThreadFactory(threadGroup, 
(PrimitiveAnalysisEngineController) controller, 
latchToCountNumberOfTerminatedThreads);
-      ((UimaAsThreadFactory)tf).setDaemon(true);
-      // This ThreadExecutor will use custom thread factory instead of defult 
one
-      ((ThreadPoolTaskExecutor) taskExecutor).setThreadFactory(tf);
-      // Initialize the thread pool
-      ((ThreadPoolTaskExecutor) taskExecutor).initialize();
-      // Make sure all threads are started. This forces each thread to call
-      // PrimitiveController to initialize the next instance of AE
-      ((ThreadPoolTaskExecutor) 
taskExecutor).getThreadPoolExecutor().prestartAllCoreThreads();
-      //  Change the state of a collocated service
-      if ( !controller.isTopLevelComponent() ) {
-        controller.changeState(ServiceState.RUNNING);
+      if ( !targetedListener && !isFreeCasQueueListener() ) {
+       synchronized (mux2) {
+            while (taskExecutor == null) {
+              mux2.wait(20);
+            }
+          }
       }
     }
-    
     if ( threadPoolExecutor != null ) {
        threadPoolExecutor.prestartAllCoreThreads();
     }

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
 Thu Dec 20 14:40:58 2018
@@ -24,191 +24,400 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
+import org.apache.uima.aae.client.UimaASProcessStatus;
+import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.client.UimaAsynchronousEngine;
+import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.UimaMessageValidator;
 import org.apache.uima.adapter.jms.JmsConstants;
+import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.ClientRequest;
 import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl.SharedConnection;
+import org.apache.uima.adapter.jms.message.PendingMessage;
+import org.apache.uima.cas.CAS;
+import org.apache.uima.cas.SerialFormat;
 import org.apache.uima.util.Level;
+import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
- * Initializes JMS session and creates JMS MessageProducer to be used for 
sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker 
thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the 
worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the 
worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The 
application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
  * 
  */
 public class ActiveMQMessageSender extends BaseMessageSender {
-  private static final Class CLASS_NAME = ActiveMQMessageSender.class;
+       private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
 
-  private volatile Connection connection = null;
+       private volatile Connection connection = null;
 
-  private Session session = null;
+       private Session session = null;
 
-  private MessageProducer producer = null;
+       private MessageProducer producer = null;
 
-  private String destinationName = null;
-
-  private ConcurrentHashMap<Destination, MessageProducer> producerMap = new 
ConcurrentHashMap<Destination, MessageProducer>();
-
-  public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
-          BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
-    super(engine);
-    connection = aConnection;
-    destinationName = aDestinationName;
-  }
-
-  public synchronized MessageProducer getMessageProducer(Destination 
destination) throws Exception {
-    if (producerMap.containsKey(destination)) {
-      return (MessageProducer) producerMap.get(destination);
-    }
-    createSession();
-    MessageProducer mProducer = session.createProducer(destination);
-    mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-    producerMap.put(destination, mProducer);
-    return mProducer;
-  }
-  /**
-   * This is called when a new Connection is created after broker is restarted
-   */
-  public void setConnection(Connection aConnection) {
-    connection = aConnection;
-    cleanup();
-    try {
-      initializeProducer();
-    } catch( Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-      }
-    }
-    
-  }
-  private String getBrokerURL() {
-    try {
-      return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
-    } catch (Exception ex) { /* handle silently. */
-    }
-    return "";
-  }
-
-  private void createSession() throws Exception {
-    String broker = getBrokerURL();
-    try {
-      if (session == null || engine.producerInitialized == false) {
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-    } catch (JMSException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_client_failed_creating_session_INFO",
-                new Object[] { destinationName, broker });
-      }
-      if (connection == null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                  "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_client_connection_not_ready_INFO", new Object[] { 
broker });
-        }
-      } else if (((ActiveMQConnection) connection).isClosed()
-              || ((ActiveMQConnection) connection).isClosing()) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME)
-                  .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_connection_closed_INFO",
-                          new Object[] { destinationName, broker });
-        }
-      }
-      throw e;
-    } catch (Exception e) {
-      throw e;
-    }
-  }
-
-  /**
-   * Creates a jms session object used to instantiate message producer
-   */
-  protected void initializeProducer() throws Exception {
-    createSession();
-    producer = getMessageProducer(session.createQueue(destinationName));
-  }
-
-  /**
-   * Returns the full name of the destination queue
-   */
-  protected String getDestinationEndpoint() throws Exception {
-    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
-  }
-
-  /**
-   * Returns jsm MessageProducer
-   */
-  public MessageProducer getMessageProducer() {
-    if ( engine.running && engine.producerInitialized == false  ) {
-      try {
-        SharedConnection con = engine.lookupConnection(getBrokerURL());
-        if ( con != null ) {
-          setConnection(con.getConnection());
-          initializeProducer();
-          engine.producerInitialized = true;
-        }
-      } catch( Exception e) {
-        e.printStackTrace();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-        }
-      }
-    } 
-    return producer;
-  }
-
-  public TextMessage createTextMessage() throws Exception {
-    if (session == null) {
-    // Force initialization of Producer
-      initializeProducer();
-    }
-    return session.createTextMessage("");
-  }
-
-  public BytesMessage createBytesMessage() throws Exception {
-    if (session == null) {
-    // Force initialization of Producer
-      initializeProducer();
-    }
-    return session.createBytesMessage();
-  }
-
-  /**
-   * Cleanup any jms resources used by the worker thread
-   */
-  protected void cleanup() { 
-    try {
-      if (session != null) {
-        session.close();
-        session = null;
-      }
-      if (producer != null) {
-        producer.close();
-        producer = null;
-      }
-    } catch (Exception e) {
-      // Ignore we are shutting down
-    } finally {
-      producerMap.clear();
-    }
-  }
+       private String destinationName = null;
+
+       private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
new ConcurrentHashMap<Destination, MessageProducer>();
+
+       public ActiveMQMessageSender(Connection aConnection, String 
aDestinationName,
+                       BaseUIMAAsynchronousEngineCommon_impl engine) throws 
Exception {
+               super(engine);
+               connection = aConnection;
+               destinationName = aDestinationName;
+       }
+
+       public synchronized MessageProducer getMessageProducer(Destination 
destination) throws Exception {
+               if (producerMap.containsKey(destination)) {
+                       return (MessageProducer) producerMap.get(destination);
+               }
+               createSession();
+               MessageProducer mProducer = session.createProducer(destination);
+               mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+               producerMap.put(destination, mProducer);
+               return mProducer;
+       }
+
+       /**
+        * This is called when a new Connection is created after broker is 
restarted
+        */
+       public void setConnection(Connection aConnection) {
+               connection = aConnection;
+               cleanup();
+               try {
+                       initializeProducer();
+               } catch (Exception e) {
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"setConnection",
+                                               
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+                       }
+               }
+
+       }
+
+       private String getBrokerURL() {
+               try {
+                       return ((ActiveMQConnection) 
connection).getBrokerInfo().getBrokerURL();
+               } catch (Exception ex) { /* handle silently. */
+               }
+               return "";
+       }
+
+       private void createSession() throws Exception {
+               String broker = getBrokerURL();
+               try {
+                       if (session == null || engine.producerInitialized == 
false) {
+                               session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                       }
+               } catch (JMSException e) {
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_failed_creating_session_INFO",
+                                               new Object[] { destinationName, 
broker });
+                       }
+                       if (connection == null) {
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_connection_not_ready_INFO",
+                                                       new Object[] { broker 
});
+                               }
+                       } else if (((ActiveMQConnection) connection).isClosed() 
|| ((ActiveMQConnection) connection).isClosing()) {
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+                                                       new Object[] { 
destinationName, broker });
+                               }
+                       }
+                       throw e;
+               } catch (Exception e) {
+                       throw e;
+               }
+       }
+
+       /**
+        * Returns the full name of the destination queue
+        */
+       protected String getDestinationEndpoint() throws Exception {
+               return ((ActiveMQDestination) 
producer.getDestination()).getPhysicalName();
+       }
+
+       /**
+        * Creates a jms session object used to instantiate message producer
+        */
+       protected void initializeProducer() throws Exception {
+               createSession();
+               producer = 
getMessageProducer(session.createQueue(destinationName));
+       }
+
+       /**
+        * Returns jsm MessageProducer
+        */
+       public MessageProducer getMessageProducer() {
+               if (engine.running && engine.producerInitialized == false) {
+                       try {
+                               SharedConnection con = 
engine.lookupConnection(getBrokerURL());
+                               if (con != null) {
+                                       setConnection(con.getConnection());
+                                       initializeProducer();
+                                       engine.producerInitialized = true;
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"getMessageProducer",
+                                                       
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+                               }
+                       }
+               }
+               return producer;
+       }
+
+       public TextMessage createTextMessage() throws Exception {
+               synchronized (ActiveMQMessageSender.class) {
+                       if (session == null) {
+                               // Force initialization of Producer
+                               initializeProducer();
+                       }
+                       // return session.createTextMessage("");
+                       TextMessage msg = null;
+                       try {
+                               msg = session.createTextMessage("");
+                       } catch (IllegalStateException e) {
+                               // stale Session
+                               session = null;
+                               initializeProducer();
+                               msg = session.createTextMessage("");
+                       }
+                       return msg;
+               }
+
+       }
+
+       public BytesMessage createBytesMessage() throws Exception {
+               synchronized (ActiveMQMessageSender.class) {
+                       if (session == null) {
+                               // Force initialization of Producer
+                               initializeProducer();
+                       }
+                       BytesMessage msg = null;
+                       try {
+                               msg = session.createBytesMessage();
+                       } catch (IllegalStateException e) {
+                               // stale Session
+                               session = null;
+                               initializeProducer();
+                               msg = session.createBytesMessage();
+                       }
+                       return msg;
+               }
+
+               // return session.createBytesMessage();
+       }
+
+       /**
+        * Cleanup any jms resources used by the worker thread
+        */
+       protected void cleanup() {
+               try {
+                       if (session != null) {
+                               session.close();
+                               session = null;
+                       }
+                       if (producer != null) {
+                               producer.close();
+                               producer = null;
+                       }
+               } catch (Exception e) {
+                       // Ignore we are shutting down
+               } finally {
+                       producerMap.clear();
+               }
+       }
+
+       protected void dispatchMessage(PendingMessage pm, 
BaseUIMAAsynchronousEngineCommon_impl engine,
+                       boolean casProcessRequest) throws Exception {
+               SharedConnection sc = 
engine.lookupConnection(engine.getBrokerURI());
+               ClientRequest cacheEntry = null;
+               boolean doCallback = false;
+               boolean addTimeToLive = true;
+               Session jmsSession = null;
+
+               // Check the environment for existence of NoTTL tag. If present,
+               // the deployer of the service wants to disable message 
expiration.
+               if (System.getProperty("NoTTL") != null) {
+                       addTimeToLive = false;
+               }
+               try {
+                       // long t1 = System.currentTimeMillis();
+                       jmsSession = sc.getConnection().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                       // Request JMS Message from the concrete implementation
+                       Message message = null;
+                       // Determine if this a CAS Process Request
+                       // boolean casProcessRequest = isProcessRequest(pm);
+                       // Only Process request can be serialized as binary
+                       if (casProcessRequest && (engine.getSerialFormat() != 
SerialFormat.XMI)) {
+                               message = jmsSession.createBytesMessage();
+                       } else {
+                               message = jmsSession.createTextMessage();
+                       }
+                       // get the producer initialized from a valid connection
+                       // producer = getMessageProducer();
+
+                       Destination d = null;
+                       String selector = null;
+                       // UIMA-AS ver 2.10.0 + sends Free Cas request to a 
service targeted queue
+                       // instead of a temp queue. Regular queues can be 
recovered in case of
+                       // a broker restart. The test below will be true for 
UIMA-AS v. 2.10.0 +.
+                       // Code in JmsOutputChannel will add the selector if 
the service is a CM.
+                       if (pm.get(AsynchAEMessage.TargetingSelector) != null) {
+                               selector = (String) 
pm.get(AsynchAEMessage.TargetingSelector);
+                       }
+                       if (selector == null && (pm.getMessageType() == 
AsynchAEMessage.ReleaseCAS
+                                       || pm.getMessageType() == 
AsynchAEMessage.Stop)) {
+                               d = (Destination) 
pm.get(AsynchAEMessage.Destination);
+
+                       } else {
+                               d = jmsSession.createQueue(destinationName);
+                       }
+                       MessageProducer mProducer = 
jmsSession.createProducer(d);
+                       mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                       // System.out.println(">>>>>>> Time to create and 
initialize JMS
+                       // Sesssion:"+(System.currentTimeMillis()-t1));
+                       super.initializeMessage(pm, message);
+                       String destination = ((ActiveMQDestination) 
d).getPhysicalName();
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"run",
+                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+                                               new Object[] {
+                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+                                                                               
message.getIntProperty(AsynchAEMessage.Command)),
+                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+                                                                               
message.getIntProperty(AsynchAEMessage.MessageType)),
+                                                               destination });
+                       }
+                       if (casProcessRequest) {
+                               cacheEntry = (ClientRequest) 
engine.getCache().get(pm.get(AsynchAEMessage.CasReference));
+                               if (cacheEntry != null) {
+                                       // CAS cas = cacheEntry.getCAS();
+                                       // enable logging
+                                       if 
(System.getProperty("UimaAsCasTracking") != null) {
+                                               
message.setStringProperty("UimaAsCasTracking", "enable");
+                                       }
+                                       // Target specific service instance if 
targeting for the CAS is provided
+                                       // by the client application
+                                       if (cacheEntry.getTargetServiceId() != 
null) {
+                                               // 
System.out.println("------------Client Sending CAS to Service Instance With
+                                               // 
Id:"+cacheEntry.getTargetServiceId());;
+                                               
message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+                                                               
cacheEntry.getTargetServiceId());
+                                       }
+                                       // Use Process Timeout value for the 
time-to-live property in the
+                                       // outgoing JMS message. When this time 
is exceeded
+                                       // while the message sits in a queue, 
the JMS Server will remove it from
+                                       // the queue. What happens with the 
expired message depends on the
+                                       // configuration. Most JMS Providers 
create a special dead-letter queue
+                                       // where all expired messages are 
placed. NOTE: In ActiveMQ expired msgs in the
+                                       // DLQ
+                                       // are not auto evicted yet and 
accumulate taking up memory.
+                                       long timeoutValue = 
cacheEntry.getProcessTimeout();
+
+                                       if (timeoutValue > 0 && addTimeToLive) {
+                                               // Set high time to live value
+                                               message.setJMSExpiration(10 * 
timeoutValue);
+                                       }
+                                       if (pm.getMessageType() == 
AsynchAEMessage.Process) {
+                                               
cacheEntry.setCASDepartureTime(System.nanoTime());
+                                       }
+                                       
cacheEntry.setCASDepartureTime(System.nanoTime());
+
+                                       doCallback = true;
+
+                               } else {
+                                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+                                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), 
"run",
+                                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+                                                               new Object[] { 
pm.get(AsynchAEMessage.CasReference),
+                                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+                                                                               
                message.getIntProperty(AsynchAEMessage.Command)),
+                                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+                                                                               
                message.getIntProperty(AsynchAEMessage.MessageType)),
+                                                                               
destination });
+                                       }
+                                       return;  // no cacheEntry so just return
+                               }
+
+                       }
+                       // start timers
+                       if (casProcessRequest) {
+                               CAS cas = cacheEntry.getCAS();
+
+                               // Add the cas to a list of CASes pending 
reply. Also start the timer if
+                               // necessary
+                               
engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), 
cas.hashCode(),
+                                               engine.timerPerCAS); // 
true=timer per cas
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"sendCAS",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+                                                       new Object[] { 
cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+                                                                       
engine.serviceDelegate.toString() });
+                               }
+
+                       } else if (pm.getMessageType() == 
AsynchAEMessage.GetMeta
+                                       && 
engine.serviceDelegate.getGetMetaTimeout() > 0) {
+                               // timer for PING has been started in sendCAS()
+                               if 
(!engine.serviceDelegate.isAwaitingPingReply()) {
+                                       
engine.serviceDelegate.startGetMetaRequestTimer();
+                               }
+                       } else {
+                               doCallback = false; // dont call 
onBeforeMessageSend() callback on CPC
+                       }
+                       // Dispatch asynchronous request to Uima AS service
+                       mProducer.send(message);
+
+                       if (doCallback) {
+                               UimaASProcessStatus status = new 
UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+                                               cacheEntry.getCasReferenceId());
+                               // Notify engine before sending a message
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"run",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_calling_onBeforeMessageSend__FINE",
+                                                       new Object[] { 
pm.get(AsynchAEMessage.CasReference),
+                                                                       
String.valueOf(cacheEntry.getCAS().hashCode()) });
+                               }
+                               // Note the callback is a misnomer. The 
callback is made *after* the send now
+                               // Application receiving this callback can 
consider the CAS as delivere to a
+                               // queue
+                               engine.onBeforeMessageSend(status);
+
+                       }
+               } catch( Exception e) {
+            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) 
{
+                UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
+                        "run", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
+                        "UIMAEE_exception__WARNING", e);
+            }
+               } finally {
+                       if (jmsSession != null) {
+                               try {
+                                       jmsSession.close();
+                               } catch (Exception eee) {
+
+                               }
+                       }
+               }
+
+       }
 }
\ No newline at end of file

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 Thu Dec 20 14:40:58 2018
@@ -78,6 +78,7 @@ import org.apache.uima.adapter.jms.JmsCo
 import org.apache.uima.adapter.jms.activemq.ConnectionFactoryIniter;
 import org.apache.uima.adapter.jms.activemq.SpringContainerDeployer;
 import org.apache.uima.adapter.jms.activemq.UimaEEAdminSpringContext;
+import org.apache.uima.adapter.jms.message.PendingMessage;
 import org.apache.uima.adapter.jms.service.Dd2spring;
 import org.apache.uima.analysis_engine.metadata.AnalysisEngineMetaData;
 import org.apache.uima.cas.CAS;
@@ -99,7 +100,7 @@ public class BaseUIMAAsynchronousEngine_
         implements UimaAsynchronousEngine, MessageListener, 
ControllerCallbackListener, ApplicationListener<ApplicationEvent>{
   private static final Class CLASS_NAME = 
BaseUIMAAsynchronousEngine_impl.class;
 
-  private MessageSender sender = null;
+  private ActiveMQMessageSender sender = null;
 
   private MessageProducer producer;
 
@@ -137,6 +138,9 @@ public class BaseUIMAAsynchronousEngine_
   
   protected static Lock globalLock = new ReentrantLock();
   
+  //private String serviceTargetSelector = null;
+  
+  protected volatile boolean stopped = false;
   public BaseUIMAAsynchronousEngine_impl() {
          super();
     UIMAFramework.getLogger(CLASS_NAME).log(Level.INFO,
@@ -219,7 +223,10 @@ public class BaseUIMAAsynchronousEngine_
           SerialFormat serialFormat) throws ResourceProcessException {
     try {
       msg.setStringProperty(AsynchAEMessage.MessageFrom, 
consumerDestination.getQueueName());
-
+//      // check if this message should target specific service instance
+//      if ( serviceTargetSelector != null ) {
+//          
msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,serviceTargetSelector);
+//      }
       msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
       msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
       msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Process);
@@ -308,12 +315,16 @@ public class BaseUIMAAsynchronousEngine_
                                         JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_debug_msg__FINEST",
                                          new Object[] { msg });
                                }
-                                       if ( sharedConnection.getClientCount() 
== 1 ) {
+                                       if ( sharedConnection.getClientCount() 
<= 1 ) {
                                                
                                           sharedConnection.destroy();
                                           amqc.close();
                                        }
-                       }
+                       } else if ( sharedConnection.getClientCount() <= 1 ) {
+                                               
+                                          sharedConnection.destroy();
+                                          amqc.close();
+                                       }
                         } catch (Exception exx) {exx.printStackTrace();}
                  }
              // Delete client's temp reply queue from AMQ Broker 
@@ -341,7 +352,13 @@ public class BaseUIMAAsynchronousEngine_
   }
        public void stop() {
                try {
-                         System.out.println(this.getClass().getName()+".stop() 
- Stopping UIMA-AS Client");
+                         if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(
+                                               Level.INFO)) {
+                            
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO,
+                                                       CLASS_NAME.getName(), 
"stop",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                                                       
"UIMAJMS_stopping_as_client_INFO");
+                     }
                          stopConnection();
 
                      super.doStop();
@@ -390,7 +407,7 @@ public class BaseUIMAAsynchronousEngine_
                } 
        }
 
-  public void setCPCMessage(Message msg) throws Exception {
+  protected void setCPCMessage(Message msg) throws Exception {
     msg.setStringProperty(AsynchAEMessage.MessageFrom, 
consumerDestination.getQueueName());
     msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
     msg.setIntProperty(AsynchAEMessage.MessageType, AsynchAEMessage.Request);
@@ -402,7 +419,35 @@ public class BaseUIMAAsynchronousEngine_
       ((TextMessage) msg).setText("");
     }
   }
+  protected void setFreeCasMessage(Message msg, String aCasReferenceId, String 
selector) throws Exception {
+           msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+           msg.setStringProperty(AsynchAEMessage.CasReference, 
aCasReferenceId);
+           msg.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
+           msg.setIntProperty(AsynchAEMessage.Command, 
AsynchAEMessage.ReleaseCAS);
+           msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+           msg.setJMSReplyTo(consumerDestination);
+           if ( selector != null ) {
+               
msg.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+                               selector);
+           }
+
+           if (msg instanceof TextMessage) {
+             ((TextMessage) msg).setText("");
+           }
+         }
+  protected void setStopMessage(Message msg, String aCasReferenceId) throws 
Exception {
+           msg.setIntProperty(AsynchAEMessage.Payload, AsynchAEMessage.None);
+           msg.setStringProperty(AsynchAEMessage.CasReference, 
aCasReferenceId);
+           msg.setIntProperty(AsynchAEMessage.MessageType, 
AsynchAEMessage.Request);
+           msg.setIntProperty(AsynchAEMessage.Command, AsynchAEMessage.Stop);
+           msg.setStringProperty(UIMAMessage.ServerURI, brokerURI);
+           msg.setJMSReplyTo(consumerDestination);
+
+           if (msg instanceof TextMessage) {
+             ((TextMessage) msg).setText("");
+           }
 
+  }
        private boolean connectionClosedOrInvalid() {
                        SharedConnection sharedConnection = 
lookupConnection(brokerURI);
                        if (sharedConnection == null
@@ -419,6 +464,9 @@ public class BaseUIMAAsynchronousEngine_
        }
 
        private SharedConnection createAndInitializeAMQConnection( Semaphore 
semaphore, String aBrokerURI) throws Exception {
+               if ( stopped ) {
+                       return null;
+               }
                // This only effects Consumer
                // Create AMQ specific connection validator. It uses
                // AMQ specific approach to test the state of the connection
@@ -697,23 +745,7 @@ public class BaseUIMAAsynchronousEngine_
            
     // throws an exception if verions of UIMA-AS is not compatible with UIMA 
SDK
     VersionCompatibilityChecker.check(CLASS_NAME, "UIMA AS Client", 
"initialize");
-/*
-    // Check for compatibility with a version of uima sdk. Only check major 
versions.
-    if (UimaAsVersion.getMajorVersion() != UimaVersion.getMajorVersion() ) {
-      UIMAFramework.getLogger(CLASS_NAME).logrb(
-              Level.WARNING,
-              CLASS_NAME.getName(), 
-              "initialize",
-              UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-              "UIMAEE_incompatible_version_WARNING",
-              new Object[] { "UIMA AS Client", 
UimaAsVersion.getUimajFullVersionString(),
-                UimaVersion.getFullVersionString() });
-      throw new ResourceInitializationException(new AsynchAEException(
-              "Version of UIMA-AS is Incompatible with a Version of UIMA Core. 
UIMA-AS Version is built to depend on Core UIMA version:"
-                      + UimaAsVersion.getUimajFullVersionString() + " but is 
running with version:"
-                      + UimaVersion.getFullVersionString()));
-    }
-*/
+
     if (running) {
       throw new ResourceInitializationException(new 
UIMA_IllegalStateException());
     }
@@ -758,7 +790,11 @@ public class BaseUIMAAsynchronousEngine_
               .intValue();
       clientSideJmxStats.setCasPoolSize(casPoolSize);
     }
-
+//    if ( 
anApplicationContext.containsKey(UimaAsynchronousEngine.TargetSelectorProperty) 
) {
+//        serviceTargetSelector = 
+//                     
(String)anApplicationContext.get(UimaAsynchronousEngine.TargetSelectorProperty);
+//    }
+    
     if (anApplicationContext.containsKey(UimaAsynchronousEngine.Timeout)) {
       processTimeout = ((Integer) 
anApplicationContext.get(UimaAsynchronousEngine.Timeout))
               .intValue();
@@ -1304,6 +1340,32 @@ public class BaseUIMAAsynchronousEngine_
 
 //  private void stopProducingCases(ClientRequest clientCachedRequest) {
   private void stopProducingCases(String casReferenceId, Destination 
cmFreeCasQueue) {
+            PendingMessage msg = new PendingMessage(AsynchAEMessage.Stop);
+            msg.put(AsynchAEMessage.Destination, cmFreeCasQueue);
+                msg.put(AsynchAEMessage.CasReference, casReferenceId);
+            try {
+                if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                 UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
+                         "stopProducingCases", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                         "UIMAJMS_client_sending_stop_to_service__INFO", new 
Object[] {casReferenceId,cmFreeCasQueue});
+             }
+                    sender.dispatchMessage(msg, this, false);
+ 
+            } catch (Exception ex) {
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                     UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                             "stopProducingCases", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                             "UIMAJMS_exception__WARNING",
+                             ex);
+                 }
+                 if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
CLASS_NAME.getName(),
+                          "stopProducingCases", 
JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
+                          "UIMAJMS_client_unable_to_send_stop_to_cm__WARNING");
+              }
+            }
+
+         /*
     try {
 //      if (clientCachedRequest.getFreeCasNotificationQueue() != null) {
       if (cmFreeCasQueue != null) {
@@ -1350,6 +1412,18 @@ public class BaseUIMAAsynchronousEngine_
                 "UIMAJMS_exception__WARNING", e);
       }
     }
+    */
+  }
+  protected void dispatchFreeCasRequest(String casReferenceId, Message 
message) throws Exception {
+     PendingMessage msg = new PendingMessage(AsynchAEMessage.ReleaseCAS);
+//     if ( message.getStringProperty(AsynchAEMessage.TargetingSelector) != 
null ) {
+//      
msg.put(AsynchAEMessage.TargetingSelector,message.getStringProperty(AsynchAEMessage.TargetingSelector)
 );
+//     } else {
+//         msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
+//     }
+     msg.put(AsynchAEMessage.Destination, message.getJMSReplyTo());
+        msg.put(AsynchAEMessage.CasReference, casReferenceId);
+     sender.dispatchMessage(msg, this, false);
   }
   protected MessageSender getDispatcher() {
     return sender;

Modified: 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
URL: 
http://svn.apache.org/viewvc/uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java?rev=1849399&r1=1849398&r2=1849399&view=diff
==============================================================================
--- 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
 (original)
+++ 
uima/uv3/uima-as-v3/trunk/uimaj-as-activemq/src/main/java/org/apache/uima/examples/as/RunRemoteAsyncAE.java
 Thu Dec 20 14:40:58 2018
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,6 +80,9 @@ import org.apache.uima.util.XMLInputSour
  * <li>-uimaEeDebug true causes various debugging things to happen, including 
*not* deleting the
  * generated spring file generated by running dd-2-spring. This parameter only 
affects deployments
  * specified using the -d parameter that follow it in the command line 
sequence.</li>
+ * <li>-TargetServiceId specifies identifier of a service which should process 
a CAS. This
+ * identifier must match service's identifier. By default a service is 
launched with an IP:PID
+ * identifier but the identifier can be an arbitrary String. </li>
  * </ul>
  */
 public class RunRemoteAsyncAE {
@@ -109,6 +113,8 @@ public class RunRemoteAsyncAE {
   private BufferedWriter logPerfWriter = null;
   
   private boolean needPerfHeaders = true;
+  
+  private String selector=null;
 
   /**
    * Start time of the processing - used to compute elapsed time.
@@ -196,6 +202,11 @@ public class RunRemoteAsyncAE {
             cpc_timeout = Integer.parseInt(args[++i]);
           } else if (args[i].equals(UimaAsynchronousEngine.UimaEeDebug)) {
             appCtx.put(UimaAsynchronousEngine.UimaEeDebug, args[++i]);
+          } else if 
(args[i].equals("-"+UimaAsynchronousEngine.TargetSelectorProperty)) {
+                 selector = args[++i];
+                 System.out.println("... Target Selector:"+selector);
+               // when a service is internally deployed (-d option) it will 
use a selector defined below
+              
System.setProperty(UimaAsynchronousEngine.TargetSelectorProperty,selector);
           } else {
             System.err.println("Unknown switch " + args[i]);
             printUsageAndExit();
@@ -253,6 +264,11 @@ public class RunRemoteAsyncAE {
     appCtx.put(UimaAsynchronousEngine.CasPoolSize, casPoolSize);
     appCtx.put(UIMAFramework.CAS_INITIAL_HEAP_SIZE, Integer.valueOf(fsHeapSize 
/ 4).toString());
 
+    // if configured to use service targeting, tell the client which service 
to use
+    String target = null;
+    if ( (target = 
System.getProperty(UimaAsynchronousEngine.TargetSelectorProperty) ) != null ) {
+       appCtx.put(UimaAsynchronousEngine.TargetSelectorProperty, target);
+    }
     // initialize
     uimaEEEngine.initialize(appCtx);
 
@@ -266,7 +282,13 @@ public class RunRemoteAsyncAE {
       } else {
         // send an empty CAS
         CAS cas = uimaEEEngine.getCAS();
-        uimaEEEngine.sendCAS(cas);
+        if ( selector != null ) {
+               System.out.println("Sending CAS to targeted service using 
sendAndReceive() - target selector:"+selector);
+            ArrayList<AnalysisEnginePerformanceMetrics> perfList = new 
ArrayList<AnalysisEnginePerformanceMetrics>();
+            uimaEEEngine.sendAndReceiveCAS(cas,perfList , selector);
+        } else {
+            uimaEEEngine.sendCAS(cas);
+        }
         uimaEEEngine.collectionProcessingComplete();
       }
       if (logPerfWriter != null) {
@@ -319,7 +341,11 @@ public class RunRemoteAsyncAE {
                     + "-uimaEeDebug true    This is optional. Leave it out for 
normal operation. If specified, causes"
                     + " additional debugging things to happen, including *not* 
deleting the generated Spring xml file generated"
                     + " from running dd2spring.  It only affects deployments 
specified using the -d parameter that follow it on"
-                    + " the command line");
+                    + " the command line\n"
+                    + " -TargetServiceId - service identifier. This is 
optional. If specified, this" 
+                    + " identifier enables targeting of a specific service. 
The UIMA-AS client will send CASes to"
+                    + " a service instance which was launched with a matching 
identifier.")             
+            ;
     System.exit(1);
   }
 


Reply via email to