Author: boisvert Date: Wed Jul 18 19:08:15 2007 New Revision: 557453 URL: http://svn.apache.org/viewvc?view=rev&rev=557453 Log: Fix a problem where MEX are retained in memory until their timeout period expires. With the prior timeout mechanism implemented in OdeConsumerAsync, MEX were referenced by the ScheduledExecutorService for the complete timeout period (e.g. 2 minutes by default, despite explicitly cancelling ScheduledFuture's) requiring large amounts of memory for small-to-medium volume applications, and leading to poor performance (GC overload) and OutOfMemoryExceptions in high-volume applications. (Also made memory profiling difficult at best).
Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=557453&r1=557452&r2=557453 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Wed Jul 18 19:08:15 2007 @@ -146,6 +146,7 @@ protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout); + protected abstract void inOutDone(InOut inout); public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException { if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) && @@ -155,11 +156,13 @@ } if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) { if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) { + inOutDone((InOut) jbiMex); outResponse((InOut) jbiMex); } jbiMex.setStatus(ExchangeStatus.DONE); _ode.getChannel().send(jbiMex); } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) { + inOutDone((InOut) jbiMex); outFailure((InOut) jbiMex); } else if (jbiMex.getStatus() == ExchangeStatus.DONE) { _outstandingExchanges.remove(jbiMex.getExchangeId()); Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java?view=diff&rev=557453&r1=557452&r2=557453 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java (original) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerAsync.java Wed Jul 18 19:08:15 2007 @@ -1,8 +1,9 @@ package org.apache.ode.jbi; -import java.util.TimerTask; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -17,9 +18,7 @@ import org.apache.ode.bpel.iapi.MessageExchange.FailureType; /** - * - * @author mszefler - * + * Asynchronous JBI service consumer */ class OdeConsumerAsync extends OdeConsumer { private static final Log __log = LogFactory.getLog(OdeConsumerAsync.class); @@ -28,13 +27,17 @@ * We create an executor to handle all the asynchronous invocations/timeouts. Note, we don't need a lot of threads * here, the operations are all async, using single-thread executor avoids any possible problems in concurrent * use of delivery channel. + * + * WARNING: Canceling tasks does not immediately release them, so we don't use the schedule-cancel pattern here. */ private ScheduledExecutorService _executor; + private Map<String, Long> _mexTimeouts = new ConcurrentHashMap<String, Long>(); + OdeConsumerAsync(OdeContext ode) { super(ode); _executor = Executors.newSingleThreadScheduledExecutor(); - + _executor.scheduleWithFixedDelay(new MEXReaper(), _responseTimeout, _responseTimeout/10, TimeUnit.MILLISECONDS); } @Override @@ -59,14 +62,7 @@ public void run() { try { _outstandingExchanges.put(inout.getExchangeId(), odeMex); - _executor.schedule(new TimerTask() { - - @Override - public void run() { - doTimeoutCheck(inout); - } - - }, _responseTimeout, TimeUnit.MILLISECONDS); + _mexTimeouts.put(inout.getExchangeId(), System.currentTimeMillis()+_responseTimeout); _ode.getChannel().send(inout); } catch (MessagingException e) { String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex; @@ -77,24 +73,36 @@ } - private void doTimeoutCheck(InOut inout) { - final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(inout.getExchangeId()); + protected void inOutDone(InOut inout) { + _mexTimeouts.remove(inout.getExchangeId()); + } - if (pmex == null) /* no worries, got a response. */ - return; + private class MEXReaper implements Runnable { + public void run() { + long now = System.currentTimeMillis(); + Object[] inouts = _mexTimeouts.keySet().toArray(); + for (int i=0; i<inouts.length; i++) { + long timeout = _mexTimeouts.get(inouts[i]); + if (timeout >= now) { + _mexTimeouts.remove(inouts[i]); + final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(inouts[i]); - __log.warn("Timeout on JBI message exchange " + inout.getExchangeId()); + if (pmex == null) /* no worries, got a response. */ + continue; + __log.warn("Timeout on JBI message exchange " + inouts[i]); try { _ode._scheduler.execIsolatedTransaction(new Callable<Void>() { public Void call() throws Exception { pmex.replyWithFailure(FailureType.NO_RESPONSE, "Response not received after " + _responseTimeout + "ms.", null); return null; } - }); } catch (Exception ex) { __log.error("Error executing transaction: ", ex); + } + } + } } } } Modified: incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java?view=diff&rev=557453&r1=557452&r2=557453 ============================================================================== --- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java (original) +++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumerSync.java Wed Jul 18 19:08:15 2007 @@ -58,4 +58,7 @@ }); } + protected void inOutDone(InOut inout) { + // nothing + } }