This is an automated email from the ASF dual-hosted git repository.

cwiklik pushed a commit to branch uima-as-3
in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git

commit eec56f664844869f6072105109f0a2ee0728e767
Author: cwiklik <cwiklik>
AuthorDate: Thu Nov 29 17:29:40 2018 +0000

    UIMA-5501 refactored to use pluggagble endpoints
---
 .../adapter/jms/client/ActiveMQMessageSender.java  | 673 ++++++++++-----------
 .../client/BaseUIMAAsynchronousEngine_impl.java    |  35 +-
 .../apache/uima/as/dispatcher/LocalDispatcher.java |  81 ++-
 .../apache/uima/ee/test/TestUimaASNoErrors.java    |   9 +-
 4 files changed, 439 insertions(+), 359 deletions(-)

diff --git 
a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
 
b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
index 0ec5a1f..a95be45 100644
--- 
a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
+++ 
b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java
@@ -51,142 +51,138 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.impl.ProcessTrace_impl;
 
 /**
- * Initializes JMS session and creates JMS MessageProducer to be used for 
sending messages to a
- * given destination. It extends BaseMessageSender which starts the worker 
thread and is tasked with
- * sending messages. The application threads share a common 'queue' with the 
worker thread. The
- * application threads add messages to the pendingMessageList 'queue' and the 
worker thread consumes
- * them.
+ * Initializes JMS session and creates JMS MessageProducer to be used for
+ * sending messages to a given destination. It extends BaseMessageSender which
+ * starts the worker thread and is tasked with sending messages. The 
application
+ * threads share a common 'queue' with the worker thread. The application
+ * threads add messages to the pendingMessageList 'queue' and the worker thread
+ * consumes them.
  * 
  */
 public class ActiveMQMessageSender extends BaseMessageSender {
-  private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
-
-  private volatile Connection connection = null;
-
-  private Session session = null;
-
-  private MessageProducer producer = null;
-
-  private String destinationName = null;
-
-  private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
-                 new ConcurrentHashMap<Destination, MessageProducer>();
-
-  public ActiveMQMessageSender(Connection aConnection, String aDestinationName,
-          BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception {
-    super(engine);
-    connection = aConnection;
-    destinationName = aDestinationName;
-  }
-
-  public synchronized MessageProducer getMessageProducer(Destination 
destination) throws Exception {
-    if (producerMap.containsKey(destination)) {
-      return (MessageProducer) producerMap.get(destination);
-    }
-    createSession();
-    MessageProducer mProducer = session.createProducer(destination);
-    mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-    producerMap.put(destination, mProducer);
-    return mProducer;
-  }
-  /**
-   * This is called when a new Connection is created after broker is restarted
-   */
-  public void setConnection(Connection aConnection) {
-    connection = aConnection;
-    cleanup();
-    try {
-      initializeProducer();
-    } catch( Exception e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-      }
-    }
-    
-  }
-  private String getBrokerURL() {
-    try {
-      return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL();
-    } catch (Exception ex) { /* handle silently. */
-    }
-    return "";
-  }
-
-  private void createSession() throws Exception {
-    String broker = getBrokerURL();
-    try {
-      if (session == null || engine.producerInitialized == false) {
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      }
-    } catch (JMSException e) {
-      if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAJMS_client_failed_creating_session_INFO",
-                new Object[] { destinationName, broker });
-      }
-      if (connection == null) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, 
CLASS_NAME.getName(),
-                  "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                  "UIMAJMS_client_connection_not_ready_INFO", new Object[] { 
broker });
-        }
-      } else if (((ActiveMQConnection) connection).isClosed()
-              || ((ActiveMQConnection) connection).isClosing()) {
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
-          UIMAFramework.getLogger(CLASS_NAME)
-                  .logrb(Level.INFO, CLASS_NAME.getName(), "createSession",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_client_connection_closed_INFO",
-                          new Object[] { destinationName, broker });
-        }
-      }
-      throw e;
-    } catch (Exception e) {
-      throw e;
-    }
-  }
-  /**
-   * Returns the full name of the destination queue
-   */
-  protected String getDestinationEndpoint() throws Exception {
-    return ((ActiveMQDestination) producer.getDestination()).getPhysicalName();
-  }
-
-  /**
-   * Creates a jms session object used to instantiate message producer
-   */
-  protected void initializeProducer() throws Exception {
-    createSession();
-    producer = getMessageProducer(session.createQueue(destinationName));
-  }
-
-
-  /**
-   * Returns jsm MessageProducer
-   */
-  public MessageProducer getMessageProducer() {
-    if ( engine.running && engine.producerInitialized == false  ) {
-      try {
-        SharedConnection con = engine.lookupConnection(getBrokerURL());
-        if ( con != null ) {
-          setConnection(con.getConnection());
-          initializeProducer();
-          engine.producerInitialized = true;
-        }
-      } catch( Exception e) {
-        e.printStackTrace();
-        if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-        UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, 
getClass().getName(),
-                "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE,
-                "UIMAEE_exception__WARNING", e);
-        }
-      }
-    } 
-    return producer;
-  }
+       private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class;
+
+       private volatile Connection connection = null;
+
+       private Session session = null;
+
+       private MessageProducer producer = null;
+
+       private String destinationName = null;
+
+       private ConcurrentHashMap<Destination, MessageProducer> producerMap = 
new ConcurrentHashMap<Destination, MessageProducer>();
+
+       public ActiveMQMessageSender(Connection aConnection, String 
aDestinationName,
+                       BaseUIMAAsynchronousEngineCommon_impl engine) throws 
Exception {
+               super(engine);
+               connection = aConnection;
+               destinationName = aDestinationName;
+       }
+
+       public synchronized MessageProducer getMessageProducer(Destination 
destination) throws Exception {
+               if (producerMap.containsKey(destination)) {
+                       return (MessageProducer) producerMap.get(destination);
+               }
+               createSession();
+               MessageProducer mProducer = session.createProducer(destination);
+               mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+               producerMap.put(destination, mProducer);
+               return mProducer;
+       }
+
+       /**
+        * This is called when a new Connection is created after broker is 
restarted
+        */
+       public void setConnection(Connection aConnection) {
+               connection = aConnection;
+               cleanup();
+               try {
+                       initializeProducer();
+               } catch (Exception e) {
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"setConnection",
+                                               
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+                       }
+               }
+
+       }
+
+       private String getBrokerURL() {
+               try {
+                       return ((ActiveMQConnection) 
connection).getBrokerInfo().getBrokerURL();
+               } catch (Exception ex) { /* handle silently. */
+               }
+               return "";
+       }
+
+       private void createSession() throws Exception {
+               String broker = getBrokerURL();
+               try {
+                       if (session == null || engine.producerInitialized == 
false) {
+                               session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                       }
+               } catch (JMSException e) {
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_failed_creating_session_INFO",
+                                               new Object[] { destinationName, 
broker });
+                       }
+                       if (connection == null) {
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_client_connection_not_ready_INFO",
+                                                       new Object[] { broker 
});
+                               }
+                       } else if (((ActiveMQConnection) connection).isClosed() 
|| ((ActiveMQConnection) connection).isClosing()) {
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), 
"createSession",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO",
+                                                       new Object[] { 
destinationName, broker });
+                               }
+                       }
+                       throw e;
+               } catch (Exception e) {
+                       throw e;
+               }
+       }
+
+       /**
+        * Returns the full name of the destination queue
+        */
+       protected String getDestinationEndpoint() throws Exception {
+               return ((ActiveMQDestination) 
producer.getDestination()).getPhysicalName();
+       }
+
+       /**
+        * Creates a jms session object used to instantiate message producer
+        */
+       protected void initializeProducer() throws Exception {
+               createSession();
+               producer = 
getMessageProducer(session.createQueue(destinationName));
+       }
+
+       /**
+        * Returns jsm MessageProducer
+        */
+       public MessageProducer getMessageProducer() {
+               if (engine.running && engine.producerInitialized == false) {
+                       try {
+                               SharedConnection con = 
engine.lookupConnection(getBrokerURL());
+                               if (con != null) {
+                                       setConnection(con.getConnection());
+                                       initializeProducer();
+                                       engine.producerInitialized = true;
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"getMessageProducer",
+                                                       
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+                               }
+                       }
+               }
+               return producer;
+       }
 
        public TextMessage createTextMessage() throws Exception {
                synchronized (ActiveMQMessageSender.class) {
@@ -209,14 +205,14 @@ public class ActiveMQMessageSender extends 
BaseMessageSender {
 
        }
 
-  public BytesMessage createBytesMessage() throws Exception {
+       public BytesMessage createBytesMessage() throws Exception {
                synchronized (ActiveMQMessageSender.class) {
-                   if (session == null) {
-                       //      Force initialization of Producer
-                         initializeProducer();
-                   }
-                   BytesMessage msg = null;
-                   try {
+                       if (session == null) {
+                               // Force initialization of Producer
+                               initializeProducer();
+                       }
+                       BytesMessage msg = null;
+                       try {
                                msg = session.createBytesMessage();
                        } catch (IllegalStateException e) {
                                // stale Session
@@ -227,203 +223,202 @@ public class ActiveMQMessageSender extends 
BaseMessageSender {
                        return msg;
                }
 
- //   return session.createBytesMessage();
-  }
-
-  /**
-   * Cleanup any jms resources used by the worker thread
-   */
-  protected void cleanup() { 
-    try {
-      if (session != null) {
-        session.close();
-        session = null;
-      }
-      if (producer != null) {
-        producer.close();
-        producer = null;
-      }
-    } catch (Exception e) {
-      // Ignore we are shutting down
-    } finally {
-      producerMap.clear();
-    }
-  }
-  protected void dispatchMessage(PendingMessage pm, 
BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) 
throws Exception {
-      SharedConnection sc = 
-                 engine.lookupConnection(engine.getBrokerURI());
-      ClientRequest cacheEntry = null;
-      boolean doCallback = false;
-      boolean addTimeToLive = true;
-      Session jmsSession = null;
-      
-      // Check the environment for existence of NoTTL tag. If present,
-      // the deployer of the service wants to disable message expiration.
-      if (System.getProperty("NoTTL") != null) {
-        addTimeToLive = false;
-      }
-      try {
-//       long t1 = System.currentTimeMillis();
-         jmsSession = sc.getConnection().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-          
-          // Request JMS Message from the concrete implementation
-          Message message = null;
-          // Determine if this a CAS Process Request
-//          boolean casProcessRequest = isProcessRequest(pm);
-          // Only Process request can be serialized as binary
-          if (casProcessRequest && (engine.getSerialFormat() != 
SerialFormat.XMI)) {
-            message = jmsSession.createBytesMessage();
-          } else {
-            message = jmsSession.createTextMessage();
-          }
-          //  get the producer initialized from a valid connection
-         // producer = getMessageProducer();
-          
-          Destination d = null;
-          String selector = null;
-          // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted 
queue
-          // instead of a temp queue. Regular queues can be recovered in case 
of
-          // a broker restart. The test below will be true for UIMA-AS v. 
2.10.0 +.
-          // Code in JmsOutputChannel will add the selector if the service is 
a CM.
-          if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != 
null) {
-                 selector = 
(String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
-          }
-          if ( selector == null && (pm.getMessageType() == 
AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) {
-                 d = (Destination)pm.getProperty(AsynchAEMessage.Destination);
-              
-          } else {
-              d = jmsSession.createQueue(destinationName);
-          }
-          MessageProducer mProducer = jmsSession.createProducer(d);
-          mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-          //System.out.println(">>>>>>> Time to create and initialize JMS 
Sesssion:"+(System.currentTimeMillis()-t1));
-          super.initializeMessage(pm, message);
-         String destination = ((ActiveMQDestination) d).getPhysicalName();
-          if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-            UIMAFramework.getLogger(CLASS_NAME).logrb(
-                    Level.FINE,
-                    CLASS_NAME.getName(),
-                    "run",
-                    JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                    "UIMAJMS_sending_msg_to_endpoint__FINE",
-                    new Object[] {
-                        
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                .getIntProperty(AsynchAEMessage.Command)),
-                        
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                .getIntProperty(AsynchAEMessage.MessageType)), 
destination });
-          }
-          if (casProcessRequest) {
-            cacheEntry = (ClientRequest) engine.getCache().get(
-                    pm.getPropertyAsString(AsynchAEMessage.CasReference));
-            if (cacheEntry != null) {
-            //    CAS cas = cacheEntry.getCAS();
-                // enable logging 
-                if (System.getProperty("UimaAsCasTracking") != null) {
-                  message.setStringProperty("UimaAsCasTracking", "enable");
-                }
-                // Target specific service instance if targeting for the CAS 
is provided
-                // by the client application
-                       if ( cacheEntry.getTargetServiceId() != null ) {
- //                            System.out.println("------------Client Sending 
CAS to Service Instance With Id:"+cacheEntry.getTargetServiceId());;
-                               
message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, 
cacheEntry.getTargetServiceId());
-                       }
-                  // Use Process Timeout value for the time-to-live property 
in the
-              // outgoing JMS message. When this time is exceeded
-              // while the message sits in a queue, the JMS Server will remove 
it from
-              // the queue. What happens with the expired message depends on 
the
-              // configuration. Most JMS Providers create a special 
dead-letter queue
-              // where all expired messages are placed. NOTE: In ActiveMQ 
expired msgs in the DLQ
-              // are not auto evicted yet and accumulate taking up memory.
-              long timeoutValue = cacheEntry.getProcessTimeout();
-
-              if (timeoutValue > 0 && addTimeToLive ) {
-                // Set high time to live value
-                message.setJMSExpiration(10 * timeoutValue);
-              }
-              if (pm.getMessageType() == AsynchAEMessage.Process) {
-                cacheEntry.setCASDepartureTime(System.nanoTime());
-              }
-              cacheEntry.setCASDepartureTime(System.nanoTime());
-      
-              doCallback = true;
-              
-          } else {
-              if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
-
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(
-                          Level.WARNING,
-                          CLASS_NAME.getName(),
-                          "run",
-                          JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                          "UIMAJMS_failed_cache_lookup__WARNING",
-                          new Object[] {
-                                
pm.getPropertyAsString(AsynchAEMessage.CasReference),
-                              
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message
-                                      
.getIntProperty(AsynchAEMessage.Command)),
-                              
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message
-                                      
.getIntProperty(AsynchAEMessage.MessageType)), destination });
-                }
-             }
-                
-          }
-          // start timers
-          if( casProcessRequest ) { 
-                CAS cas = cacheEntry.getCAS();
-
-
-            // Add the cas to a list of CASes pending reply. Also start the 
timer if necessary
-            
engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), 
cas.hashCode(), engine.timerPerCAS); // true=timer per cas
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-                  UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, 
CLASS_NAME.getName(),
-                     "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                     "UIMAJMS_cas_added_to_pending_FINE", new Object[] { 
cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), 
engine.serviceDelegate.toString()});
-            }
-
-          
-          } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta &&
-                  engine.serviceDelegate.getGetMetaTimeout() > 0 ) {
-            // timer for PING has been started in sendCAS()
-            if ( !engine.serviceDelegate.isAwaitingPingReply()) {
-              engine.serviceDelegate.startGetMetaRequestTimer();
-            } 
-          } else {
-                doCallback = false;  // dont call onBeforeMessageSend() 
callback on CPC
-          }
-          //  Dispatch asynchronous request to Uima AS service
-          mProducer.send(message);
-          
-          if ( doCallback ) {
-            UimaASProcessStatus status = new UimaASProcessStatusImpl(new 
ProcessTrace_impl(),cacheEntry.getCAS(),
-                    cacheEntry.getCasReferenceId());
-            // Notify engine before sending a message
-            if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
-                UIMAFramework.getLogger(CLASS_NAME).logrb(
-                        Level.FINE,
-                        CLASS_NAME.getName(),
-                        "run",
-                        JmsConstants.JMS_LOG_RESOURCE_BUNDLE,
-                        "UIMAJMS_calling_onBeforeMessageSend__FINE",
-                        new Object[] {
-                          pm.getPropertyAsString(AsynchAEMessage.CasReference),
-                          String.valueOf(cacheEntry.getCAS().hashCode())
-                        });
-              }  
-            // Note the callback is a misnomer. The callback is made *after* 
the send now
-            // Application receiving this callback can consider the CAS as 
delivere to a queue
-            engine.onBeforeMessageSend(status);
-          
-          
-          }
-      } finally {
-         if ( jmsSession != null ) {
-                 try {
-                         jmsSession.close(); 
-                 } catch( Exception eee) {
-                         
-                 }
-         }
-      }
-
-
-  }
+               // return session.createBytesMessage();
+       }
+
+       /**
+        * Cleanup any jms resources used by the worker thread
+        */
+       protected void cleanup() {
+               try {
+                       if (session != null) {
+                               session.close();
+                               session = null;
+                       }
+                       if (producer != null) {
+                               producer.close();
+                               producer = null;
+                       }
+               } catch (Exception e) {
+                       // Ignore we are shutting down
+               } finally {
+                       producerMap.clear();
+               }
+       }
+
+       protected void dispatchMessage(PendingMessage pm, 
BaseUIMAAsynchronousEngineCommon_impl engine,
+                       boolean casProcessRequest) throws Exception {
+               SharedConnection sc = 
engine.lookupConnection(engine.getBrokerURI());
+               ClientRequest cacheEntry = null;
+               boolean doCallback = false;
+               boolean addTimeToLive = true;
+               Session jmsSession = null;
+
+               // Check the environment for existence of NoTTL tag. If present,
+               // the deployer of the service wants to disable message 
expiration.
+               if (System.getProperty("NoTTL") != null) {
+                       addTimeToLive = false;
+               }
+               try {
+                       // long t1 = System.currentTimeMillis();
+                       jmsSession = sc.getConnection().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                       // Request JMS Message from the concrete implementation
+                       Message message = null;
+                       // Determine if this a CAS Process Request
+                       // boolean casProcessRequest = isProcessRequest(pm);
+                       // Only Process request can be serialized as binary
+                       if (casProcessRequest && (engine.getSerialFormat() != 
SerialFormat.XMI)) {
+                               message = jmsSession.createBytesMessage();
+                       } else {
+                               message = jmsSession.createTextMessage();
+                       }
+                       // get the producer initialized from a valid connection
+                       // producer = getMessageProducer();
+
+                       Destination d = null;
+                       String selector = null;
+                       // UIMA-AS ver 2.10.0 + sends Free Cas request to a 
service targeted queue
+                       // instead of a temp queue. Regular queues can be 
recovered in case of
+                       // a broker restart. The test below will be true for 
UIMA-AS v. 2.10.0 +.
+                       // Code in JmsOutputChannel will add the selector if 
the service is a CM.
+                       if 
(pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) {
+                               selector = (String) 
pm.getPropertyAsString(AsynchAEMessage.TargetingSelector);
+                       }
+                       if (selector == null && (pm.getMessageType() == 
AsynchAEMessage.ReleaseCAS
+                                       || pm.getMessageType() == 
AsynchAEMessage.Stop)) {
+                               d = (Destination) 
pm.getProperty(AsynchAEMessage.Destination);
+
+                       } else {
+                               d = jmsSession.createQueue(destinationName);
+                       }
+                       MessageProducer mProducer = 
jmsSession.createProducer(d);
+                       mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                       // System.out.println(">>>>>>> Time to create and 
initialize JMS
+                       // Sesssion:"+(System.currentTimeMillis()-t1));
+                       super.initializeMessage(pm, message);
+                       String destination = ((ActiveMQDestination) 
d).getPhysicalName();
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"run",
+                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE",
+                                               new Object[] {
+                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+                                                                               
message.getIntProperty(AsynchAEMessage.Command)),
+                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+                                                                               
message.getIntProperty(AsynchAEMessage.MessageType)),
+                                                               destination });
+                       }
+                       if (casProcessRequest) {
+                               cacheEntry = (ClientRequest) engine.getCache()
+                                               
.get(pm.getPropertyAsString(AsynchAEMessage.CasReference));
+                               if (cacheEntry != null) {
+                                       // CAS cas = cacheEntry.getCAS();
+                                       // enable logging
+                                       if 
(System.getProperty("UimaAsCasTracking") != null) {
+                                               
message.setStringProperty("UimaAsCasTracking", "enable");
+                                       }
+                                       // Target specific service instance if 
targeting for the CAS is provided
+                                       // by the client application
+                                       if (cacheEntry.getTargetServiceId() != 
null) {
+                                               // 
System.out.println("------------Client Sending CAS to Service Instance With
+                                               // 
Id:"+cacheEntry.getTargetServiceId());;
+                                               
message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty,
+                                                               
cacheEntry.getTargetServiceId());
+                                       }
+                                       // Use Process Timeout value for the 
time-to-live property in the
+                                       // outgoing JMS message. When this time 
is exceeded
+                                       // while the message sits in a queue, 
the JMS Server will remove it from
+                                       // the queue. What happens with the 
expired message depends on the
+                                       // configuration. Most JMS Providers 
create a special dead-letter queue
+                                       // where all expired messages are 
placed. NOTE: In ActiveMQ expired msgs in the
+                                       // DLQ
+                                       // are not auto evicted yet and 
accumulate taking up memory.
+                                       long timeoutValue = 
cacheEntry.getProcessTimeout();
+
+                                       if (timeoutValue > 0 && addTimeToLive) {
+                                               // Set high time to live value
+                                               message.setJMSExpiration(10 * 
timeoutValue);
+                                       }
+                                       if (pm.getMessageType() == 
AsynchAEMessage.Process) {
+                                               
cacheEntry.setCASDepartureTime(System.nanoTime());
+                                       }
+                                       
cacheEntry.setCASDepartureTime(System.nanoTime());
+
+                                       doCallback = true;
+
+                               } else {
+                                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+
+                                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), 
"run",
+                                                               
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING",
+                                                               new Object[] { 
pm.getPropertyAsString(AsynchAEMessage.CasReference),
+                                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command,
+                                                                               
                message.getIntProperty(AsynchAEMessage.Command)),
+                                                                               
UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType,
+                                                                               
                message.getIntProperty(AsynchAEMessage.MessageType)),
+                                                                               
destination });
+                                       }
+                                       return; // no cache entry, done here
+                               }
+
+                       }
+                       // start timers
+                       if (casProcessRequest) {
+                               CAS cas = cacheEntry.getCAS();
+
+                               // Add the cas to a list of CASes pending 
reply. Also start the timer if
+                               // necessary
+                               
engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), 
cas.hashCode(),
+                                               engine.timerPerCAS); // 
true=timer per cas
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"sendCAS",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE",
+                                                       new Object[] { 
cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()),
+                                                                       
engine.serviceDelegate.toString() });
+                               }
+
+                       } else if (pm.getMessageType() == 
AsynchAEMessage.GetMeta
+                                       && 
engine.serviceDelegate.getGetMetaTimeout() > 0) {
+                               // timer for PING has been started in sendCAS()
+                               if 
(!engine.serviceDelegate.isAwaitingPingReply()) {
+                                       
engine.serviceDelegate.startGetMetaRequestTimer();
+                               }
+                       } else {
+                               doCallback = false; // dont call 
onBeforeMessageSend() callback on CPC
+                       }
+                       // Dispatch asynchronous request to Uima AS service
+                       mProducer.send(message);
+
+                       if (doCallback) {
+                               UimaASProcessStatus status = new 
UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(),
+                                               cacheEntry.getCasReferenceId());
+                               // Notify engine before sending a message
+                               if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) {
+                                       
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), 
"run",
+                                                       
JmsConstants.JMS_LOG_RESOURCE_BUNDLE, 
"UIMAJMS_calling_onBeforeMessageSend__FINE",
+                                                       new Object[] { 
pm.getPropertyAsString(AsynchAEMessage.CasReference),
+                                                                       
String.valueOf(cacheEntry.getCAS().hashCode()) });
+                               }
+                               // Note the callback is a misnomer. The 
callback is made *after* the send now
+                               // Application receiving this callback can 
consider the CAS as delivere to a
+                               // queue
+                               engine.onBeforeMessageSend(status);
+
+                       }
+               } catch (Exception e) {
+                       if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
+                               
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"dispatchMessage",
+                                               
UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e);
+                       }
+               } finally {
+                       if (jmsSession != null) {
+                               try {
+                                       jmsSession.close();
+                               } catch (Exception eee) {
+
+                               }
+                       }
+               }
+
+       }
 }
\ No newline at end of file
diff --git 
a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
 
b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
index f0623fb..d5ffa93 100644
--- 
a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
+++ 
b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java
@@ -70,9 +70,14 @@ import org.apache.uima.aae.client.UimaAsynchronousEngine;
 import org.apache.uima.aae.controller.AnalysisEngineController;
 import org.apache.uima.aae.controller.ControllerCallbackListener;
 import org.apache.uima.aae.controller.Endpoint;
+import org.apache.uima.aae.definition.connectors.Endpoints;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.error.UimaASMetaRequestTimeout;
 import org.apache.uima.aae.jmx.JmxManager;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.MessageProcessor;
 import org.apache.uima.aae.message.UIMAMessage;
 import org.apache.uima.aae.service.AsynchronousUimaASService;
 import org.apache.uima.aae.service.UimaASService;
@@ -495,12 +500,28 @@ public class BaseUIMAAsynchronousEngine_impl extends 
BaseUIMAAsynchronousEngineC
     if ( dispatchThread == null ) {
          // make sure we are in the running state. The local consumer depends 
on it
        running = true;
-      
+       UimaAsEndpoint clientEndpoint;
+       try {
+               
+               MessageProcessor messageProcessor = 
+                               new ClientMessageProcessor();
+               
+               clientEndpoint =
+                               Endpoints.newEndpoint(EndpointType.Direct, 
EndpointType.Direct.getName()+"Client", messageProcessor);
+               // create client consumers and connect them to the service
+               // producers
+               service.connect(clientEndpoint);
+               clientEndpoint.start();
+       } catch( Exception e) {
+               throw new ResourceInitializationException(e);
+       }
+
        // start message consumer to handle replies  
-       startLocalConsumer(anApplicationContext);
+   //  startLocalConsumer(anApplicationContext);
        // start dispatcher in its own thread. It will fetch messages from a 
shared 'pendingMessageQueue'
        LocalDispatcher dispatcher =
-                       new LocalDispatcher(this, service, pendingMessageQueue);
+                       new LocalDispatcher(this, service, pendingMessageQueue, 
clientEndpoint);
+       
        dispatchThread = new Thread(dispatcher,"LocalDispatcher");
        dispatchThread.start();
     }
@@ -1690,4 +1711,12 @@ public class BaseUIMAAsynchronousEngine_impl extends 
BaseUIMAAsynchronousEngineC
        }
          
   }
+  private class ClientMessageProcessor implements MessageProcessor {
+
+       @Override
+       public void process(MessageContext message) throws Exception {
+               onMessage((DirectMessage)message.getRawMessage());
+       }
+         
+  }
 }
diff --git 
a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
 
b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
index f3e03a4..a8fcd6a 100644
--- 
a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
+++ 
b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java
@@ -24,7 +24,12 @@ import org.apache.uima.UIMAFramework;
 import org.apache.uima.aae.UIMAEE_Constants;
 import org.apache.uima.aae.client.UimaASProcessStatus;
 import org.apache.uima.aae.client.UimaASProcessStatusImpl;
+import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint;
+import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType;
 import org.apache.uima.aae.message.AsynchAEMessage;
+import org.apache.uima.aae.message.MessageContext;
+import org.apache.uima.aae.message.UimaAsOrigin;
 import org.apache.uima.aae.service.UimaASService;
 import org.apache.uima.adapter.jms.JmsConstants;
 import 
org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
@@ -39,37 +44,89 @@ public class LocalDispatcher implements Runnable  {
        private BlockingQueue<PendingMessage> messageQueue = null;
        private BaseUIMAAsynchronousEngineCommon_impl client;
        private UimaASService service;
-
+       private UimaAsEndpoint clientEndpoint;
+       
        public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, 
UimaASService service,
-                       BlockingQueue<PendingMessage> pendingMessageQueue) {
+                       BlockingQueue<PendingMessage> pendingMessageQueue, 
UimaAsEndpoint clientEndpoint) {
                this.service = service;
                this.client = client;
                this.messageQueue = pendingMessageQueue;
+               this.clientEndpoint = clientEndpoint;
        }
-
+       
        private boolean reject(PendingMessage pm) {
                return false;
        }
 
        private void dispatch(PendingMessage pm) throws Exception {
                boolean doCallback = false;
-
+               StringBuilder serviceUri = new 
StringBuilder("direct").append(":").append(service.getName());
+               
                switch (pm.getMessageType()) {
                case AsynchAEMessage.GetMeta:
-                       service.sendGetMetaRequest();
+                       
serviceUri.append(":").append(ConsumerType.GetMetaRequest.name()).toString();
+                       MessageContext getMetaMessage =
+                               clientEndpoint.newMessageBuilder().
+                                       newGetMetaRequestMessage(new 
UimaAsOrigin("Client", EndpointType.Direct))
+//                                     newGetMetaRequestMessage(new 
UimaAsOrigin("direct:Client", EndpointType.Direct))
+                                       .withPayload(AsynchAEMessage.None)
+                                       .build();
+                       clientEndpoint.dispatch(getMetaMessage, 
serviceUri.toString());
+                       
                        
System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request");
                        break;
 
                case AsynchAEMessage.Process:
                        doCallback = true;
-                       service.process((CAS) 
pm.getProperty(AsynchAEMessage.CAS), 
pm.getPropertyAsString(AsynchAEMessage.CasReference));
-                       
System.out.println("LocalDispatcher.dispatch()-dispatched Process Request");
+                       
serviceUri.append(":").append(ConsumerType.ProcessCASRequest.name()).toString();
+                       
+                       MessageContext processMessage =
+                                       clientEndpoint.newMessageBuilder().
+                                               newProcessCASRequestMessage(new 
UimaAsOrigin("Client", EndpointType.Direct))
+//                                             newProcessCASRequestMessage(new 
UimaAsOrigin("direct:Client", EndpointType.Direct))
+                                               
.withPayload(AsynchAEMessage.CASRefID)
+                                               
.withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+                                               .build();
+
+                       service.add2Cache((CAS) 
pm.getProperty(AsynchAEMessage.CAS), processMessage, 
pm.getPropertyAsString(AsynchAEMessage.CasReference));
+
+                       clientEndpoint.dispatch(processMessage, 
serviceUri.toString());
+                               
+                       
System.out.println("LocalDispatcher.dispatch()-dispatched process Request");
+
+                               
                        break;
 
                case AsynchAEMessage.CollectionProcessComplete:
-                       service.collectionProcessComplete();
+                       
serviceUri.append(":").append(ConsumerType.CpcRequest.name()).toString();
+
+                       MessageContext cpcMessage =
+                          clientEndpoint.newMessageBuilder().
+                                 newCpCRequestMessage(new 
UimaAsOrigin("Client", EndpointType.Direct))
+//                               newCpCRequestMessage(new 
UimaAsOrigin("direct:Client", EndpointType.Direct))
+                                 .withPayload(AsynchAEMessage.None)
+                                 .build();
+                   clientEndpoint.dispatch(cpcMessage, serviceUri.toString());
+
                        
System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request");
                        break;
+
+               case AsynchAEMessage.ReleaseCAS:
+                       
serviceUri.append(":").append(ConsumerType.FreeCASRequest.name()).toString();
+
+                       MessageContext freeCASMessage =
+                          clientEndpoint.newMessageBuilder().
+                                 newCpCRequestMessage(new 
UimaAsOrigin("Client", EndpointType.Direct))
+//                               newCpCRequestMessage(new 
UimaAsOrigin("direct:Client", EndpointType.Direct))
+                                 .withPayload(AsynchAEMessage.CASRefID)
+                                 
.withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference))
+                                 .build();
+                   clientEndpoint.dispatch(freeCASMessage, 
serviceUri.toString());
+
+                       
System.out.println("LocalDispatcher.dispatch()-dispatched Free CAS Request");
+                       break;
+
+               
                }
         if ( doCallback ) {
             UimaASProcessStatus status = new UimaASProcessStatusImpl(new 
ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS),
@@ -99,9 +156,9 @@ public class LocalDispatcher implements Runnable  {
                while (client.isRunning()) {
                        PendingMessage pm = null;
                        try {
-                               System.out.println("LocalDispatcher.run()- 
waiting for new message ... queue hashcode:"+messageQueue.hashCode());
+                               System.out.println("Client 
LocalDispatcher.run()- waiting for new message ... queue 
hashcode:"+messageQueue.hashCode());
                                pm = messageQueue.take();
-                               System.out.println("LocalDispatcher.run()-got 
new message to dispatch");
+                               System.out.println("Client 
LocalDispatcher.run()-got new message to dispatch");
                        } catch (InterruptedException e) {
                                
                                return;
@@ -122,12 +179,8 @@ public class LocalDispatcher implements Runnable  {
                                        }
                                }
                                try {
-                                       
System.out.println(".................... calling 
LocalDispatch.beforeDispatch()");
                                        client.beforeDispatch(pm);
-                                       
-                                       
System.out.println(".................... calling LocalDispatch.dispatch()");
                                        dispatch(pm);
-                                       
System.out.println(".................... LocalDispatch.dispatch() returned");
                                } catch (Exception e) {
                                        if 
(UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) {
                                                
UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), 
"run",
diff --git 
a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
 
b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
index 1976e2f..2e966a5 100644
--- 
a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
+++ 
b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1229,7 +1230,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
            initialize(uimaAsClient, appCtx);
            waitUntilInitialized();
 
-           for (int i = 0; i < 500; i++) {
+           for (int i = 0; i < 2; i++) {
              CAS cas = uimaAsClient.getCAS();
              cas.setDocumentText("Some Text");
              System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " 
Request to a Service");
@@ -1441,6 +1442,8 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 
        @Test
        public void testDeployAsyncAggregateServiceOverJava() throws Exception {
+//             URL url = 
TestUimaASNoErrors.class.getResource("/Deploy_AsyncAggregate.xml");
+               
                testDeployAsyncAggregateService(Transport.Java);
        }
 
@@ -1457,7 +1460,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
 
                
addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class);
 
-               runTest(appCtx, uimaAsClient, "tcp://localhost:61616", 
"TopLevelTaeQueue", 200, PROCESS_LATCH);
+               runTest(appCtx, uimaAsClient, "tcp://localhost:61616", 
"TopLevelTaeQueue", 2, PROCESS_LATCH);
        }
 
         @Test
@@ -1476,7 +1479,7 @@ public class TestUimaASNoErrors extends BaseTestSupport {
            
               System.setProperty("NoOpBroker", "tcp::/localhost:61616");
               System.setProperty(JmsConstants.SessionTimeoutOverride, 
"2500000");
-//            deployService(eeUimaEngine, relativePath + 
"/Deploy_NoOpAnnotator.xml");
+             // deployService(uimaAsClient, relativePath + 
"/Deploy_NoOpAnnotator.xml");
                        deployJmsService(uimaAsClient, relativePath + 
"/Deploy_NoOpAnnotatorUsingPlaceholder.xml");
 
               Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");

Reply via email to