Author: boisvert
Date: Wed Jul 18 19:08:15 2007
New Revision: 557453

URL: http://svn.apache.org/viewvc?view=rev&rev=557453
Log:
Fix a problem where MEX are retained in memory until their timeout period 
expires.  
With the prior timeout mechanism implemented in OdeConsumerAsync, MEX were 
referenced 
by the ScheduledExecutorService for the complete timeout period (e.g. 2 minutes 
by default, 
despite explicitly cancelling ScheduledFuture's) requiring large amounts of 
memory for 
small-to-medium volume applications, and leading to poor performance (GC 
overload) and 
OutOfMemoryExceptions in high-volume applications. (Also made memory profiling 
difficult at best).

Modified:
    incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
    
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java
    
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=557453&r1=557452&r2=557453
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
Wed Jul 18 19:08:15 2007
@@ -146,6 +146,7 @@
 
     protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, 
InOut inout);
 
+    protected abstract void inOutDone(InOut inout);
 
     public void onJbiMessageExchange(MessageExchange jbiMex) throws 
MessagingException {
         if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
@@ -155,11 +156,13 @@
         }
         if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
             if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+                inOutDone((InOut) jbiMex);
                 outResponse((InOut) jbiMex);
             }
             jbiMex.setStatus(ExchangeStatus.DONE);
             _ode.getChannel().send(jbiMex);
         } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
+            inOutDone((InOut) jbiMex);
             outFailure((InOut) jbiMex);
         } else if (jbiMex.getStatus() == ExchangeStatus.DONE) {
             _outstandingExchanges.remove(jbiMex.getExchangeId());

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java?view=diff&rev=557453&r1=557452&r2=557453
==============================================================================
--- 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java 
(original)
+++ 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java 
Wed Jul 18 19:08:15 2007
@@ -1,8 +1,9 @@
 package org.apache.ode.jbi;
 
 
-import java.util.TimerTask;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -17,9 +18,7 @@
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 
 /**
- * 
- * @author mszefler
- *
+ * Asynchronous JBI service consumer
  */
 class OdeConsumerAsync extends OdeConsumer {
     private static final Log __log = LogFactory.getLog(OdeConsumerAsync.class);
@@ -28,13 +27,17 @@
      * We create an executor to handle all the asynchronous 
invocations/timeouts. Note, we don't need a lot of threads
      * here, the operations are all async, using single-thread executor avoids 
any possible problems in concurrent
      * use of delivery channel.
+     * 
+     * WARNING:  Canceling tasks does not immediately release them, so we 
don't use the schedule-cancel pattern here.  
      */
     private ScheduledExecutorService _executor;
 
+    private Map<String, Long> _mexTimeouts = new ConcurrentHashMap<String, 
Long>();
+    
     OdeConsumerAsync(OdeContext ode) {
         super(ode);
        _executor = Executors.newSingleThreadScheduledExecutor();
-
+        _executor.scheduleWithFixedDelay(new MEXReaper(), _responseTimeout, 
_responseTimeout/10, TimeUnit.MILLISECONDS);
     }
 
     @Override
@@ -59,14 +62,7 @@
             public void run() {
                 try {
                     _outstandingExchanges.put(inout.getExchangeId(), odeMex);
-                    _executor.schedule(new TimerTask() {
-
-                        @Override
-                        public void run() {
-                            doTimeoutCheck(inout);
-                        }
-
-                    }, _responseTimeout, TimeUnit.MILLISECONDS);
+                    _mexTimeouts.put(inout.getExchangeId(), 
System.currentTimeMillis()+_responseTimeout);
                     _ode.getChannel().send(inout);
                 } catch (MessagingException e) {
                     String errmsg = "Error sending request-only message to JBI 
for ODE mex " + odeMex;
@@ -77,24 +73,36 @@
 
     }
 
-    private void doTimeoutCheck(InOut inout) {
-        final PartnerRoleMessageExchange pmex = 
_outstandingExchanges.remove(inout.getExchangeId());
+    protected void inOutDone(InOut inout) {
+        _mexTimeouts.remove(inout.getExchangeId());
+    }
 
-        if (pmex == null) /* no worries, got a response. */
-            return;
+    private class MEXReaper implements Runnable {
+        public void run() {
+            long now = System.currentTimeMillis();
+            Object[] inouts = _mexTimeouts.keySet().toArray();
+            for (int i=0; i<inouts.length; i++) {
+                long timeout = _mexTimeouts.get(inouts[i]);
+                if (timeout >= now) {
+                    _mexTimeouts.remove(inouts[i]);
+                    final PartnerRoleMessageExchange pmex = 
_outstandingExchanges.remove(inouts[i]);
 
-        __log.warn("Timeout on JBI message exchange " + inout.getExchangeId());
+                    if (pmex == null) /* no worries, got a response. */
+                        continue;
 
+                    __log.warn("Timeout on JBI message exchange " + inouts[i]);
         try {
             _ode._scheduler.execIsolatedTransaction(new Callable<Void>() {
                 public Void call() throws Exception {
                     pmex.replyWithFailure(FailureType.NO_RESPONSE, "Response 
not received after " + _responseTimeout + "ms.", null);
                     return null;
                 }
-
             });
         } catch (Exception ex) {
             __log.error("Error executing transaction:  ", ex);
+        }
+    }
+            }
         }
     }
 }

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java?view=diff&rev=557453&r1=557452&r2=557453
==============================================================================
--- 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java 
(original)
+++ 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java 
Wed Jul 18 19:08:15 2007
@@ -58,4 +58,7 @@
         });
     }
 
+    protected void inOutDone(InOut inout) {
+        // nothing
+    }
 }


Reply via email to