Author: boisvert
Date: Thu Mar 22 11:11:04 2007
New Revision: 521390

URL: http://svn.apache.org/viewvc?view=rev&rev=521390
Log:
Use sendSync() rather than send() for better load conditioning

Modified:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=521390&r1=521389&r2=521390
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
Thu Mar 22 11:11:04 2007
@@ -44,9 +44,13 @@
 class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor 
{
     private static final Log __log = LogFactory.getLog(OdeConsumer.class);
 
+    private static final long DEFAULT_SENDSYNC_TIMEOUT = 2 * 60 * 1000L;
+
     private OdeContext _ode;
 
-    private Map<String, String> _outstandingExchanges = new 
ConcurrentHashMap<String, String>();
+    private long _sendSyncTimeout = DEFAULT_SENDSYNC_TIMEOUT;
+
+    private Map<String, PartnerRoleMessageExchange> _outstandingExchanges = 
new ConcurrentHashMap<String, PartnerRoleMessageExchange>();
 
     OdeConsumer(OdeContext ode) {
         _ode = ode;
@@ -75,7 +79,7 @@
         QName opname = new QName(se.getServiceName().getNamespaceURI(), 
odeMex.getOperation().getName());
 
         MessageExchangeFactory mexf = 
_ode.getChannel().createExchangeFactory(se);
-        MessageExchange jbiMex;
+        final MessageExchange jbiMex;
         try {
             jbiMex = mexf.createExchange(isTwoWay ? 
MessageExchangePattern.IN_OUT : MessageExchangePattern.IN_ONLY);
             jbiMex.setEndpoint(se);
@@ -99,12 +103,16 @@
                 _ode._scheduler.registerSynchronizer(new 
Scheduler.Synchronizer() {
                     public void afterCompletion(boolean success) {
                         if (success) {
-                            _ode._executorService.submit( new Runnable() {
+                            _ode._executorService.submit(new Runnable() {
                                 public void run() {
                                     try {
-                                        _ode.getChannel().send(inonly);
+                                        boolean sendOk = 
_ode.getChannel().sendSync(inonly, _sendSyncTimeout);
+                                        if (!sendOk) {
+                                            __log.warn("Timeout while sending 
message for JBI message exchange: " + jbiMex.getExchangeId());
+                                        }
+                                        onJbiMessageExchange(inonly);
                                     } catch (MessagingException e) {
-                                        String errmsg = "Exception while 
sending in-only message to JBI for ODE mex " + odeMex;
+                                        String errmsg = "Error sending 
request-only message to JBI for ODE mex " + odeMex;
                                         __log.error(errmsg, e);
                                     }
                                 }
@@ -125,13 +133,17 @@
                 _ode._scheduler.registerSynchronizer(new 
Scheduler.Synchronizer() {
                     public void afterCompletion(boolean success) {
                         if (success) {
-                            _ode._executorService.submit( new Runnable() {
+                            _ode._executorService.submit(new Runnable() {
                                 public void run() {
                                     try {
-                                        
_outstandingExchanges.put(inout.getExchangeId(), odeMex.getMessageExchangeId());
-                                        _ode.getChannel().send(inout);
+                                        
_outstandingExchanges.put(inout.getExchangeId(), odeMex);
+                                        boolean sendOk = 
_ode.getChannel().sendSync(inout, _sendSyncTimeout);
+                                        if (!sendOk) {
+                                            __log.warn("Timeout while sending 
message for JBI message exchange: " + jbiMex.getExchangeId());
+                                        }
+                                        onJbiMessageExchange(inout);
                                     } catch (MessagingException e) {
-                                        String errmsg = "Exception while 
sending request-only message to JBI for ODE mex " + odeMex;
+                                        String errmsg = "Error sending 
request-only message to JBI for ODE mex " + odeMex;
                                         __log.error(errmsg, e);
                                     }
                                 }
@@ -159,41 +171,34 @@
     }
 
     public void onJbiMessageExchange(MessageExchange jbiMex) throws 
MessagingException {
-        if (jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)) {
-            // Ignore these, they're one way.
-        } else if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
-            if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
+        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
+            !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is 
of an unsupported pattern " + jbiMex.getPattern());
+            return;
+        }
+        if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
+            if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
                 outResponse((InOut) jbiMex);
-                jbiMex.setStatus(ExchangeStatus.DONE);
-                _ode.getChannel().send(jbiMex);
-            } else if (jbiMex.getStatus() == ExchangeStatus.ERROR)
-                outFailure((InOut) jbiMex);
-            else
-                __log.warn("Unexpected state for JBI message exchange: " + 
jbiMex.getExchangeId());
+            }
+            jbiMex.setStatus(ExchangeStatus.DONE);
+            _ode.getChannel().send(jbiMex);
+        } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
+            outFailure((InOut) jbiMex);
         } else {
-            __log.fatal("JBI MessageExchange " + jbiMex.getExchangeId() + " is 
of an unsupported pattern " + jbiMex.getPattern());
+            __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI 
message exchange: " + jbiMex.getExchangeId());
         }
-
     }
 
     private void outFailure(final InOut jbiMex) {
-        final String mexref = 
_outstandingExchanges.remove(jbiMex.getExchangeId());
-        if (mexref == null) {
-            __log.warn("Received a response for unkown JBI message exchange " 
+ jbiMex.getExchangeId());
+        final PartnerRoleMessageExchange pmex = 
_outstandingExchanges.remove(jbiMex.getExchangeId());
+        if (pmex == null) {
+            __log.warn("Received a response for unknown JBI message exchange " 
+ jbiMex.getExchangeId());
             return;
         }
 
         try {
             _ode._scheduler.execTransaction(new Callable<Boolean>() {
                 public Boolean call() throws Exception {
-                    PartnerRoleMessageExchange pmex = 
(PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
-                            mexref);
-
-                    if (pmex == null) {
-                        __log.warn("Cannot locate ODE message exchange: " + 
mexref + "; ignoring.");
-                        return null;
-                    }
-
                     pmex.replyWithFailure(FailureType.OTHER, "Error: " + 
jbiMex.getError(), null);
                     return null;
                 }
@@ -205,9 +210,9 @@
     }
 
     private void outResponse(final InOut jbiMex) {
-        final String mexref = 
_outstandingExchanges.remove(jbiMex.getExchangeId());
-        if (mexref == null) {
-            __log.warn("Received a response for unkown JBI message exchange " 
+ jbiMex.getExchangeId());
+        final PartnerRoleMessageExchange pmex = 
_outstandingExchanges.remove(jbiMex.getExchangeId());
+        if (pmex == null) {
+            __log.warn("Received a response for unknown JBI message exchange " 
+ jbiMex.getExchangeId());
             return;
         }
 
@@ -215,15 +220,6 @@
             _ode._scheduler.execTransaction(new Callable<Boolean>() {
                 @SuppressWarnings("unchecked")
                 public Boolean call() throws Exception {
-                    PartnerRoleMessageExchange pmex = 
(PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
-                            mexref);
-
-                    if (pmex == null) {
-                        // I'm a bit unclear as to why this would occur, but 
it appears to be possible.
-                        __log.warn("Cannot locate ODE message exchange: " + 
mexref + "; ignoring.");
-                        return null;
-                    }
-
                     String mapperName = 
pmex.getProperty(Mapper.class.getName());
                     Mapper mapper = mapperName == null ? 
_ode.getDefaultMapper() : _ode.getMapper(mapperName);
                     if (mapper == null) {
@@ -268,5 +264,13 @@
             __log.error("error delivering RESPONSE: ", ex);
 
         }
+    }
+
+    public void setSendSyncTimeout(long timeout) {
+       _sendSyncTimeout = timeout;
+    }
+
+    public long getSendSyncTimeout() {
+       return _sendSyncTimeout;
     }
 }


Reply via email to