This is an automated email from the ASF dual-hosted git repository. cwiklik pushed a commit to branch uima-as-3 in repository https://gitbox.apache.org/repos/asf/uima-async-scaleout.git
commit eec56f664844869f6072105109f0a2ee0728e767 Author: cwiklik <cwiklik> AuthorDate: Thu Nov 29 17:29:40 2018 +0000 UIMA-5501 refactored to use pluggagble endpoints --- .../adapter/jms/client/ActiveMQMessageSender.java | 673 ++++++++++----------- .../client/BaseUIMAAsynchronousEngine_impl.java | 35 +- .../apache/uima/as/dispatcher/LocalDispatcher.java | 81 ++- .../apache/uima/ee/test/TestUimaASNoErrors.java | 9 +- 4 files changed, 439 insertions(+), 359 deletions(-) diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java index 0ec5a1f..a95be45 100644 --- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java +++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/ActiveMQMessageSender.java @@ -51,142 +51,138 @@ import org.apache.uima.util.Level; import org.apache.uima.util.impl.ProcessTrace_impl; /** - * Initializes JMS session and creates JMS MessageProducer to be used for sending messages to a - * given destination. It extends BaseMessageSender which starts the worker thread and is tasked with - * sending messages. The application threads share a common 'queue' with the worker thread. The - * application threads add messages to the pendingMessageList 'queue' and the worker thread consumes - * them. + * Initializes JMS session and creates JMS MessageProducer to be used for + * sending messages to a given destination. It extends BaseMessageSender which + * starts the worker thread and is tasked with sending messages. The application + * threads share a common 'queue' with the worker thread. The application + * threads add messages to the pendingMessageList 'queue' and the worker thread + * consumes them. * */ public class ActiveMQMessageSender extends BaseMessageSender { - private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class; - - private volatile Connection connection = null; - - private Session session = null; - - private MessageProducer producer = null; - - private String destinationName = null; - - private ConcurrentHashMap<Destination, MessageProducer> producerMap = - new ConcurrentHashMap<Destination, MessageProducer>(); - - public ActiveMQMessageSender(Connection aConnection, String aDestinationName, - BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception { - super(engine); - connection = aConnection; - destinationName = aDestinationName; - } - - public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception { - if (producerMap.containsKey(destination)) { - return (MessageProducer) producerMap.get(destination); - } - createSession(); - MessageProducer mProducer = session.createProducer(destination); - mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producerMap.put(destination, mProducer); - return mProducer; - } - /** - * This is called when a new Connection is created after broker is restarted - */ - public void setConnection(Connection aConnection) { - connection = aConnection; - cleanup(); - try { - initializeProducer(); - } catch( Exception e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "setConnection", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); - } - } - - } - private String getBrokerURL() { - try { - return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL(); - } catch (Exception ex) { /* handle silently. */ - } - return ""; - } - - private void createSession() throws Exception { - String broker = getBrokerURL(); - try { - if (session == null || engine.producerInitialized == false) { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - } catch (JMSException e) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_failed_creating_session_INFO", - new Object[] { destinationName, broker }); - } - if (connection == null) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), - "createSession", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_connection_not_ready_INFO", new Object[] { broker }); - } - } else if (((ActiveMQConnection) connection).isClosed() - || ((ActiveMQConnection) connection).isClosing()) { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { - UIMAFramework.getLogger(CLASS_NAME) - .logrb(Level.INFO, CLASS_NAME.getName(), "createSession", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_client_connection_closed_INFO", - new Object[] { destinationName, broker }); - } - } - throw e; - } catch (Exception e) { - throw e; - } - } - /** - * Returns the full name of the destination queue - */ - protected String getDestinationEndpoint() throws Exception { - return ((ActiveMQDestination) producer.getDestination()).getPhysicalName(); - } - - /** - * Creates a jms session object used to instantiate message producer - */ - protected void initializeProducer() throws Exception { - createSession(); - producer = getMessageProducer(session.createQueue(destinationName)); - } - - - /** - * Returns jsm MessageProducer - */ - public MessageProducer getMessageProducer() { - if ( engine.running && engine.producerInitialized == false ) { - try { - SharedConnection con = engine.lookupConnection(getBrokerURL()); - if ( con != null ) { - setConnection(con.getConnection()); - initializeProducer(); - engine.producerInitialized = true; - } - } catch( Exception e) { - e.printStackTrace(); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), - "getMessageProducer", UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAEE_exception__WARNING", e); - } - } - } - return producer; - } + private static final Class<?> CLASS_NAME = ActiveMQMessageSender.class; + + private volatile Connection connection = null; + + private Session session = null; + + private MessageProducer producer = null; + + private String destinationName = null; + + private ConcurrentHashMap<Destination, MessageProducer> producerMap = new ConcurrentHashMap<Destination, MessageProducer>(); + + public ActiveMQMessageSender(Connection aConnection, String aDestinationName, + BaseUIMAAsynchronousEngineCommon_impl engine) throws Exception { + super(engine); + connection = aConnection; + destinationName = aDestinationName; + } + + public synchronized MessageProducer getMessageProducer(Destination destination) throws Exception { + if (producerMap.containsKey(destination)) { + return (MessageProducer) producerMap.get(destination); + } + createSession(); + MessageProducer mProducer = session.createProducer(destination); + mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producerMap.put(destination, mProducer); + return mProducer; + } + + /** + * This is called when a new Connection is created after broker is restarted + */ + public void setConnection(Connection aConnection) { + connection = aConnection; + cleanup(); + try { + initializeProducer(); + } catch (Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "setConnection", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); + } + } + + } + + private String getBrokerURL() { + try { + return ((ActiveMQConnection) connection).getBrokerInfo().getBrokerURL(); + } catch (Exception ex) { /* handle silently. */ + } + return ""; + } + + private void createSession() throws Exception { + String broker = getBrokerURL(); + try { + if (session == null || engine.producerInitialized == false) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + } catch (JMSException e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_failed_creating_session_INFO", + new Object[] { destinationName, broker }); + } + if (connection == null) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_not_ready_INFO", + new Object[] { broker }); + } + } else if (((ActiveMQConnection) connection).isClosed() || ((ActiveMQConnection) connection).isClosing()) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(), "createSession", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_client_connection_closed_INFO", + new Object[] { destinationName, broker }); + } + } + throw e; + } catch (Exception e) { + throw e; + } + } + + /** + * Returns the full name of the destination queue + */ + protected String getDestinationEndpoint() throws Exception { + return ((ActiveMQDestination) producer.getDestination()).getPhysicalName(); + } + + /** + * Creates a jms session object used to instantiate message producer + */ + protected void initializeProducer() throws Exception { + createSession(); + producer = getMessageProducer(session.createQueue(destinationName)); + } + + /** + * Returns jsm MessageProducer + */ + public MessageProducer getMessageProducer() { + if (engine.running && engine.producerInitialized == false) { + try { + SharedConnection con = engine.lookupConnection(getBrokerURL()); + if (con != null) { + setConnection(con.getConnection()); + initializeProducer(); + engine.producerInitialized = true; + } + } catch (Exception e) { + e.printStackTrace(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "getMessageProducer", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); + } + } + } + return producer; + } public TextMessage createTextMessage() throws Exception { synchronized (ActiveMQMessageSender.class) { @@ -209,14 +205,14 @@ public class ActiveMQMessageSender extends BaseMessageSender { } - public BytesMessage createBytesMessage() throws Exception { + public BytesMessage createBytesMessage() throws Exception { synchronized (ActiveMQMessageSender.class) { - if (session == null) { - // Force initialization of Producer - initializeProducer(); - } - BytesMessage msg = null; - try { + if (session == null) { + // Force initialization of Producer + initializeProducer(); + } + BytesMessage msg = null; + try { msg = session.createBytesMessage(); } catch (IllegalStateException e) { // stale Session @@ -227,203 +223,202 @@ public class ActiveMQMessageSender extends BaseMessageSender { return msg; } - // return session.createBytesMessage(); - } - - /** - * Cleanup any jms resources used by the worker thread - */ - protected void cleanup() { - try { - if (session != null) { - session.close(); - session = null; - } - if (producer != null) { - producer.close(); - producer = null; - } - } catch (Exception e) { - // Ignore we are shutting down - } finally { - producerMap.clear(); - } - } - protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, boolean casProcessRequest ) throws Exception { - SharedConnection sc = - engine.lookupConnection(engine.getBrokerURI()); - ClientRequest cacheEntry = null; - boolean doCallback = false; - boolean addTimeToLive = true; - Session jmsSession = null; - - // Check the environment for existence of NoTTL tag. If present, - // the deployer of the service wants to disable message expiration. - if (System.getProperty("NoTTL") != null) { - addTimeToLive = false; - } - try { -// long t1 = System.currentTimeMillis(); - jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Request JMS Message from the concrete implementation - Message message = null; - // Determine if this a CAS Process Request -// boolean casProcessRequest = isProcessRequest(pm); - // Only Process request can be serialized as binary - if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) { - message = jmsSession.createBytesMessage(); - } else { - message = jmsSession.createTextMessage(); - } - // get the producer initialized from a valid connection - // producer = getMessageProducer(); - - Destination d = null; - String selector = null; - // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue - // instead of a temp queue. Regular queues can be recovered in case of - // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +. - // Code in JmsOutputChannel will add the selector if the service is a CM. - if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) { - selector = (String)pm.getPropertyAsString(AsynchAEMessage.TargetingSelector); - } - if ( selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS || pm.getMessageType() == AsynchAEMessage.Stop) ) { - d = (Destination)pm.getProperty(AsynchAEMessage.Destination); - - } else { - d = jmsSession.createQueue(destinationName); - } - MessageProducer mProducer = jmsSession.createProducer(d); - mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - //System.out.println(">>>>>>> Time to create and initialize JMS Sesssion:"+(System.currentTimeMillis()-t1)); - super.initializeMessage(pm, message); - String destination = ((ActiveMQDestination) d).getPhysicalName(); - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.FINE, - CLASS_NAME.getName(), - "run", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_sending_msg_to_endpoint__FINE", - new Object[] { - UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message - .getIntProperty(AsynchAEMessage.Command)), - UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message - .getIntProperty(AsynchAEMessage.MessageType)), destination }); - } - if (casProcessRequest) { - cacheEntry = (ClientRequest) engine.getCache().get( - pm.getPropertyAsString(AsynchAEMessage.CasReference)); - if (cacheEntry != null) { - // CAS cas = cacheEntry.getCAS(); - // enable logging - if (System.getProperty("UimaAsCasTracking") != null) { - message.setStringProperty("UimaAsCasTracking", "enable"); - } - // Target specific service instance if targeting for the CAS is provided - // by the client application - if ( cacheEntry.getTargetServiceId() != null ) { - // System.out.println("------------Client Sending CAS to Service Instance With Id:"+cacheEntry.getTargetServiceId());; - message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, cacheEntry.getTargetServiceId()); - } - // Use Process Timeout value for the time-to-live property in the - // outgoing JMS message. When this time is exceeded - // while the message sits in a queue, the JMS Server will remove it from - // the queue. What happens with the expired message depends on the - // configuration. Most JMS Providers create a special dead-letter queue - // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the DLQ - // are not auto evicted yet and accumulate taking up memory. - long timeoutValue = cacheEntry.getProcessTimeout(); - - if (timeoutValue > 0 && addTimeToLive ) { - // Set high time to live value - message.setJMSExpiration(10 * timeoutValue); - } - if (pm.getMessageType() == AsynchAEMessage.Process) { - cacheEntry.setCASDepartureTime(System.nanoTime()); - } - cacheEntry.setCASDepartureTime(System.nanoTime()); - - doCallback = true; - - } else { - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { - - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.WARNING, - CLASS_NAME.getName(), - "run", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_failed_cache_lookup__WARNING", - new Object[] { - pm.getPropertyAsString(AsynchAEMessage.CasReference), - UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, message - .getIntProperty(AsynchAEMessage.Command)), - UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, message - .getIntProperty(AsynchAEMessage.MessageType)), destination }); - } - } - - } - // start timers - if( casProcessRequest ) { - CAS cas = cacheEntry.getCAS(); - - - // Add the cas to a list of CASes pending reply. Also start the timer if necessary - engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), engine.timerPerCAS); // true=timer per cas - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), - "sendCAS", JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_cas_added_to_pending_FINE", new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), engine.serviceDelegate.toString()}); - } - - - } else if ( pm.getMessageType() == AsynchAEMessage.GetMeta && - engine.serviceDelegate.getGetMetaTimeout() > 0 ) { - // timer for PING has been started in sendCAS() - if ( !engine.serviceDelegate.isAwaitingPingReply()) { - engine.serviceDelegate.startGetMetaRequestTimer(); - } - } else { - doCallback = false; // dont call onBeforeMessageSend() callback on CPC - } - // Dispatch asynchronous request to Uima AS service - mProducer.send(message); - - if ( doCallback ) { - UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),cacheEntry.getCAS(), - cacheEntry.getCasReferenceId()); - // Notify engine before sending a message - if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { - UIMAFramework.getLogger(CLASS_NAME).logrb( - Level.FINE, - CLASS_NAME.getName(), - "run", - JmsConstants.JMS_LOG_RESOURCE_BUNDLE, - "UIMAJMS_calling_onBeforeMessageSend__FINE", - new Object[] { - pm.getPropertyAsString(AsynchAEMessage.CasReference), - String.valueOf(cacheEntry.getCAS().hashCode()) - }); - } - // Note the callback is a misnomer. The callback is made *after* the send now - // Application receiving this callback can consider the CAS as delivere to a queue - engine.onBeforeMessageSend(status); - - - } - } finally { - if ( jmsSession != null ) { - try { - jmsSession.close(); - } catch( Exception eee) { - - } - } - } - - - } + // return session.createBytesMessage(); + } + + /** + * Cleanup any jms resources used by the worker thread + */ + protected void cleanup() { + try { + if (session != null) { + session.close(); + session = null; + } + if (producer != null) { + producer.close(); + producer = null; + } + } catch (Exception e) { + // Ignore we are shutting down + } finally { + producerMap.clear(); + } + } + + protected void dispatchMessage(PendingMessage pm, BaseUIMAAsynchronousEngineCommon_impl engine, + boolean casProcessRequest) throws Exception { + SharedConnection sc = engine.lookupConnection(engine.getBrokerURI()); + ClientRequest cacheEntry = null; + boolean doCallback = false; + boolean addTimeToLive = true; + Session jmsSession = null; + + // Check the environment for existence of NoTTL tag. If present, + // the deployer of the service wants to disable message expiration. + if (System.getProperty("NoTTL") != null) { + addTimeToLive = false; + } + try { + // long t1 = System.currentTimeMillis(); + jmsSession = sc.getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Request JMS Message from the concrete implementation + Message message = null; + // Determine if this a CAS Process Request + // boolean casProcessRequest = isProcessRequest(pm); + // Only Process request can be serialized as binary + if (casProcessRequest && (engine.getSerialFormat() != SerialFormat.XMI)) { + message = jmsSession.createBytesMessage(); + } else { + message = jmsSession.createTextMessage(); + } + // get the producer initialized from a valid connection + // producer = getMessageProducer(); + + Destination d = null; + String selector = null; + // UIMA-AS ver 2.10.0 + sends Free Cas request to a service targeted queue + // instead of a temp queue. Regular queues can be recovered in case of + // a broker restart. The test below will be true for UIMA-AS v. 2.10.0 +. + // Code in JmsOutputChannel will add the selector if the service is a CM. + if (pm.getPropertyAsString(AsynchAEMessage.TargetingSelector) != null) { + selector = (String) pm.getPropertyAsString(AsynchAEMessage.TargetingSelector); + } + if (selector == null && (pm.getMessageType() == AsynchAEMessage.ReleaseCAS + || pm.getMessageType() == AsynchAEMessage.Stop)) { + d = (Destination) pm.getProperty(AsynchAEMessage.Destination); + + } else { + d = jmsSession.createQueue(destinationName); + } + MessageProducer mProducer = jmsSession.createProducer(d); + mProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + // System.out.println(">>>>>>> Time to create and initialize JMS + // Sesssion:"+(System.currentTimeMillis()-t1)); + super.initializeMessage(pm, message); + String destination = ((ActiveMQDestination) d).getPhysicalName(); + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_sending_msg_to_endpoint__FINE", + new Object[] { + UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, + message.getIntProperty(AsynchAEMessage.Command)), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, + message.getIntProperty(AsynchAEMessage.MessageType)), + destination }); + } + if (casProcessRequest) { + cacheEntry = (ClientRequest) engine.getCache() + .get(pm.getPropertyAsString(AsynchAEMessage.CasReference)); + if (cacheEntry != null) { + // CAS cas = cacheEntry.getCAS(); + // enable logging + if (System.getProperty("UimaAsCasTracking") != null) { + message.setStringProperty("UimaAsCasTracking", "enable"); + } + // Target specific service instance if targeting for the CAS is provided + // by the client application + if (cacheEntry.getTargetServiceId() != null) { + // System.out.println("------------Client Sending CAS to Service Instance With + // Id:"+cacheEntry.getTargetServiceId());; + message.setStringProperty(UimaAsynchronousEngine.TargetSelectorProperty, + cacheEntry.getTargetServiceId()); + } + // Use Process Timeout value for the time-to-live property in the + // outgoing JMS message. When this time is exceeded + // while the message sits in a queue, the JMS Server will remove it from + // the queue. What happens with the expired message depends on the + // configuration. Most JMS Providers create a special dead-letter queue + // where all expired messages are placed. NOTE: In ActiveMQ expired msgs in the + // DLQ + // are not auto evicted yet and accumulate taking up memory. + long timeoutValue = cacheEntry.getProcessTimeout(); + + if (timeoutValue > 0 && addTimeToLive) { + // Set high time to live value + message.setJMSExpiration(10 * timeoutValue); + } + if (pm.getMessageType() == AsynchAEMessage.Process) { + cacheEntry.setCASDepartureTime(System.nanoTime()); + } + cacheEntry.setCASDepartureTime(System.nanoTime()); + + doCallback = true; + + } else { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_failed_cache_lookup__WARNING", + new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.Command, + message.getIntProperty(AsynchAEMessage.Command)), + UimaMessageValidator.decodeIntToString(AsynchAEMessage.MessageType, + message.getIntProperty(AsynchAEMessage.MessageType)), + destination }); + } + return; // no cache entry, done here + } + + } + // start timers + if (casProcessRequest) { + CAS cas = cacheEntry.getCAS(); + + // Add the cas to a list of CASes pending reply. Also start the timer if + // necessary + engine.serviceDelegate.addCasToOutstandingList(cacheEntry.getCasReferenceId(), cas.hashCode(), + engine.timerPerCAS); // true=timer per cas + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "sendCAS", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_cas_added_to_pending_FINE", + new Object[] { cacheEntry.getCasReferenceId(), String.valueOf(cas.hashCode()), + engine.serviceDelegate.toString() }); + } + + } else if (pm.getMessageType() == AsynchAEMessage.GetMeta + && engine.serviceDelegate.getGetMetaTimeout() > 0) { + // timer for PING has been started in sendCAS() + if (!engine.serviceDelegate.isAwaitingPingReply()) { + engine.serviceDelegate.startGetMetaRequestTimer(); + } + } else { + doCallback = false; // dont call onBeforeMessageSend() callback on CPC + } + // Dispatch asynchronous request to Uima AS service + mProducer.send(message); + + if (doCallback) { + UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(), cacheEntry.getCAS(), + cacheEntry.getCasReferenceId()); + // Notify engine before sending a message + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.FINE)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.FINE, CLASS_NAME.getName(), "run", + JmsConstants.JMS_LOG_RESOURCE_BUNDLE, "UIMAJMS_calling_onBeforeMessageSend__FINE", + new Object[] { pm.getPropertyAsString(AsynchAEMessage.CasReference), + String.valueOf(cacheEntry.getCAS().hashCode()) }); + } + // Note the callback is a misnomer. The callback is made *after* the send now + // Application receiving this callback can consider the CAS as delivere to a + // queue + engine.onBeforeMessageSend(status); + + } + } catch (Exception e) { + if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { + UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "dispatchMessage", + UIMAEE_Constants.JMS_LOG_RESOURCE_BUNDLE, "UIMAEE_exception__WARNING", e); + } + } finally { + if (jmsSession != null) { + try { + jmsSession.close(); + } catch (Exception eee) { + + } + } + } + + } } \ No newline at end of file diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java index f0623fb..d5ffa93 100644 --- a/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java +++ b/uimaj-as-activemq/src/main/java/org/apache/uima/adapter/jms/client/BaseUIMAAsynchronousEngine_impl.java @@ -70,9 +70,14 @@ import org.apache.uima.aae.client.UimaAsynchronousEngine; import org.apache.uima.aae.controller.AnalysisEngineController; import org.apache.uima.aae.controller.ControllerCallbackListener; import org.apache.uima.aae.controller.Endpoint; +import org.apache.uima.aae.definition.connectors.Endpoints; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType; import org.apache.uima.aae.error.UimaASMetaRequestTimeout; import org.apache.uima.aae.jmx.JmxManager; import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.MessageProcessor; import org.apache.uima.aae.message.UIMAMessage; import org.apache.uima.aae.service.AsynchronousUimaASService; import org.apache.uima.aae.service.UimaASService; @@ -495,12 +500,28 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC if ( dispatchThread == null ) { // make sure we are in the running state. The local consumer depends on it running = true; - + UimaAsEndpoint clientEndpoint; + try { + + MessageProcessor messageProcessor = + new ClientMessageProcessor(); + + clientEndpoint = + Endpoints.newEndpoint(EndpointType.Direct, EndpointType.Direct.getName()+"Client", messageProcessor); + // create client consumers and connect them to the service + // producers + service.connect(clientEndpoint); + clientEndpoint.start(); + } catch( Exception e) { + throw new ResourceInitializationException(e); + } + // start message consumer to handle replies - startLocalConsumer(anApplicationContext); + // startLocalConsumer(anApplicationContext); // start dispatcher in its own thread. It will fetch messages from a shared 'pendingMessageQueue' LocalDispatcher dispatcher = - new LocalDispatcher(this, service, pendingMessageQueue); + new LocalDispatcher(this, service, pendingMessageQueue, clientEndpoint); + dispatchThread = new Thread(dispatcher,"LocalDispatcher"); dispatchThread.start(); } @@ -1690,4 +1711,12 @@ public class BaseUIMAAsynchronousEngine_impl extends BaseUIMAAsynchronousEngineC } } + private class ClientMessageProcessor implements MessageProcessor { + + @Override + public void process(MessageContext message) throws Exception { + onMessage((DirectMessage)message.getRawMessage()); + } + + } } diff --git a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java index f3e03a4..a8fcd6a 100644 --- a/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java +++ b/uimaj-as-activemq/src/main/java/org/apache/uima/as/dispatcher/LocalDispatcher.java @@ -24,7 +24,12 @@ import org.apache.uima.UIMAFramework; import org.apache.uima.aae.UIMAEE_Constants; import org.apache.uima.aae.client.UimaASProcessStatus; import org.apache.uima.aae.client.UimaASProcessStatusImpl; +import org.apache.uima.aae.definition.connectors.UimaAsConsumer.ConsumerType; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint; +import org.apache.uima.aae.definition.connectors.UimaAsEndpoint.EndpointType; import org.apache.uima.aae.message.AsynchAEMessage; +import org.apache.uima.aae.message.MessageContext; +import org.apache.uima.aae.message.UimaAsOrigin; import org.apache.uima.aae.service.UimaASService; import org.apache.uima.adapter.jms.JmsConstants; import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl; @@ -39,37 +44,89 @@ public class LocalDispatcher implements Runnable { private BlockingQueue<PendingMessage> messageQueue = null; private BaseUIMAAsynchronousEngineCommon_impl client; private UimaASService service; - + private UimaAsEndpoint clientEndpoint; + public LocalDispatcher(BaseUIMAAsynchronousEngineCommon_impl client, UimaASService service, - BlockingQueue<PendingMessage> pendingMessageQueue) { + BlockingQueue<PendingMessage> pendingMessageQueue, UimaAsEndpoint clientEndpoint) { this.service = service; this.client = client; this.messageQueue = pendingMessageQueue; + this.clientEndpoint = clientEndpoint; } - + private boolean reject(PendingMessage pm) { return false; } private void dispatch(PendingMessage pm) throws Exception { boolean doCallback = false; - + StringBuilder serviceUri = new StringBuilder("direct").append(":").append(service.getName()); + switch (pm.getMessageType()) { case AsynchAEMessage.GetMeta: - service.sendGetMetaRequest(); + serviceUri.append(":").append(ConsumerType.GetMetaRequest.name()).toString(); + MessageContext getMetaMessage = + clientEndpoint.newMessageBuilder(). + newGetMetaRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct)) +// newGetMetaRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct)) + .withPayload(AsynchAEMessage.None) + .build(); + clientEndpoint.dispatch(getMetaMessage, serviceUri.toString()); + System.out.println("LocalDispatcher.dispatch()-dispatched getMeta Request"); break; case AsynchAEMessage.Process: doCallback = true; - service.process((CAS) pm.getProperty(AsynchAEMessage.CAS), pm.getPropertyAsString(AsynchAEMessage.CasReference)); - System.out.println("LocalDispatcher.dispatch()-dispatched Process Request"); + serviceUri.append(":").append(ConsumerType.ProcessCASRequest.name()).toString(); + + MessageContext processMessage = + clientEndpoint.newMessageBuilder(). + newProcessCASRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct)) +// newProcessCASRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct)) + .withPayload(AsynchAEMessage.CASRefID) + .withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference)) + .build(); + + service.add2Cache((CAS) pm.getProperty(AsynchAEMessage.CAS), processMessage, pm.getPropertyAsString(AsynchAEMessage.CasReference)); + + clientEndpoint.dispatch(processMessage, serviceUri.toString()); + + System.out.println("LocalDispatcher.dispatch()-dispatched process Request"); + + break; case AsynchAEMessage.CollectionProcessComplete: - service.collectionProcessComplete(); + serviceUri.append(":").append(ConsumerType.CpcRequest.name()).toString(); + + MessageContext cpcMessage = + clientEndpoint.newMessageBuilder(). + newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct)) +// newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct)) + .withPayload(AsynchAEMessage.None) + .build(); + clientEndpoint.dispatch(cpcMessage, serviceUri.toString()); + System.out.println("LocalDispatcher.dispatch()-dispatched CPC Request"); break; + + case AsynchAEMessage.ReleaseCAS: + serviceUri.append(":").append(ConsumerType.FreeCASRequest.name()).toString(); + + MessageContext freeCASMessage = + clientEndpoint.newMessageBuilder(). + newCpCRequestMessage(new UimaAsOrigin("Client", EndpointType.Direct)) +// newCpCRequestMessage(new UimaAsOrigin("direct:Client", EndpointType.Direct)) + .withPayload(AsynchAEMessage.CASRefID) + .withCasReferenceId(pm.getPropertyAsString(AsynchAEMessage.CasReference)) + .build(); + clientEndpoint.dispatch(freeCASMessage, serviceUri.toString()); + + System.out.println("LocalDispatcher.dispatch()-dispatched Free CAS Request"); + break; + + } if ( doCallback ) { UimaASProcessStatus status = new UimaASProcessStatusImpl(new ProcessTrace_impl(),(CAS)pm.getProperty(AsynchAEMessage.CAS), @@ -99,9 +156,9 @@ public class LocalDispatcher implements Runnable { while (client.isRunning()) { PendingMessage pm = null; try { - System.out.println("LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode()); + System.out.println("Client LocalDispatcher.run()- waiting for new message ... queue hashcode:"+messageQueue.hashCode()); pm = messageQueue.take(); - System.out.println("LocalDispatcher.run()-got new message to dispatch"); + System.out.println("Client LocalDispatcher.run()-got new message to dispatch"); } catch (InterruptedException e) { return; @@ -122,12 +179,8 @@ public class LocalDispatcher implements Runnable { } } try { - System.out.println(".................... calling LocalDispatch.beforeDispatch()"); client.beforeDispatch(pm); - - System.out.println(".................... calling LocalDispatch.dispatch()"); dispatch(pm); - System.out.println(".................... LocalDispatch.dispatch() returned"); } catch (Exception e) { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.WARNING)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.WARNING, getClass().getName(), "run", diff --git a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java index 1976e2f..2e966a5 100644 --- a/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java +++ b/uimaj-as-activemq/src/test/java/org/apache/uima/ee/test/TestUimaASNoErrors.java @@ -26,6 +26,7 @@ import java.io.FileWriter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -1229,7 +1230,7 @@ public class TestUimaASNoErrors extends BaseTestSupport { initialize(uimaAsClient, appCtx); waitUntilInitialized(); - for (int i = 0; i < 500; i++) { + for (int i = 0; i < 2; i++) { CAS cas = uimaAsClient.getCAS(); cas.setDocumentText("Some Text"); System.out.println("UIMA AS Client Sending CAS#" + (i + 1) + " Request to a Service"); @@ -1441,6 +1442,8 @@ public class TestUimaASNoErrors extends BaseTestSupport { @Test public void testDeployAsyncAggregateServiceOverJava() throws Exception { +// URL url = TestUimaASNoErrors.class.getResource("/Deploy_AsyncAggregate.xml"); + testDeployAsyncAggregateService(Transport.Java); } @@ -1457,7 +1460,7 @@ public class TestUimaASNoErrors extends BaseTestSupport { addExceptionToignore(org.apache.uima.aae.error.UimaEEServiceException.class); - runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 200, PROCESS_LATCH); + runTest(appCtx, uimaAsClient, "tcp://localhost:61616", "TopLevelTaeQueue", 2, PROCESS_LATCH); } @Test @@ -1476,7 +1479,7 @@ public class TestUimaASNoErrors extends BaseTestSupport { System.setProperty("NoOpBroker", "tcp::/localhost:61616"); System.setProperty(JmsConstants.SessionTimeoutOverride, "2500000"); -// deployService(eeUimaEngine, relativePath + "/Deploy_NoOpAnnotator.xml"); + // deployService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotator.xml"); deployJmsService(uimaAsClient, relativePath + "/Deploy_NoOpAnnotatorUsingPlaceholder.xml"); Map<String, Object> appCtx = defaultContext("TopLevelTaeQueue");