Author: mriou Date: Thu May 31 11:52:49 2007 New Revision: 543214 URL: http://svn.apache.org/viewvc?view=rev&rev=543214 Log: Fixed a couple of potential leaks in in-memory DAOs.
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original) +++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Thu May 31 11:52:49 2007 @@ -19,28 +19,6 @@ package org.apache.ode.axis2; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.transaction.TransactionManager; -import javax.wsdl.Definition; -import javax.wsdl.Port; -import javax.wsdl.Service; -import javax.wsdl.extensions.UnknownExtensibilityElement; -import javax.wsdl.extensions.soap.SOAPAddress; -import javax.xml.namespace.QName; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.transaction.TransactionManager; -import javax.wsdl.Definition; -import javax.wsdl.Port; -import javax.wsdl.Service; -import javax.wsdl.extensions.UnknownExtensibilityElement; -import javax.wsdl.extensions.soap.SOAPAddress; -import javax.xml.namespace.QName; - import org.apache.axiom.om.OMElement; import org.apache.axiom.soap.SOAPEnvelope; import org.apache.axiom.soap.SOAPFactory; @@ -49,7 +27,6 @@ import org.apache.axis2.description.AxisService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ode.axis2.util.OMUtils; import org.apache.ode.axis2.util.SoapMessageConverter; import org.apache.ode.bpel.epr.EndpointFactory; import org.apache.ode.bpel.epr.MutableEndpoint; @@ -65,6 +42,16 @@ import org.w3c.dom.Document; import org.w3c.dom.Element; +import javax.transaction.TransactionManager; +import javax.wsdl.Definition; +import javax.wsdl.Port; +import javax.wsdl.Service; +import javax.wsdl.extensions.UnknownExtensibilityElement; +import javax.wsdl.extensions.soap.SOAPAddress; +import javax.xml.namespace.QName; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * A running service, encapsulates the Axis service, its receivers and our * receivers as well. @@ -101,7 +88,7 @@ public void onAxisMessageExchange(MessageContext msgContext, MessageContext outMsgContext, SOAPFactory soapFactory) throws AxisFault { boolean success = true; - MyRoleMessageExchange odeMex; + MyRoleMessageExchange odeMex = null; Future responseFuture = null; try { _txManager.begin(); @@ -147,6 +134,7 @@ throw new OdeFault("An exception occured while invoking ODE.", e); } finally { if (!success) { + if (odeMex != null) odeMex.release(); try { _txManager.rollback(); } catch (Exception e) { @@ -192,20 +180,21 @@ __log.error("Error processing response for MEX " + odeMex, e); throw new OdeFault("An exception occured when invoking ODE.", e); } finally { - if (commit) { + odeMex.release(); + if (commit) { try { if (__log.isDebugEnabled()) __log.debug("Comitting transaction."); _txManager.commit(); } catch (Exception e) { throw new OdeFault("Commit failed!", e); } - } else { + } else { try { _txManager.rollback(); } catch (Exception ex) { throw new OdeFault("Rollback failed!", ex); } - } + } } } if (!success) { Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu May 31 11:52:49 2007 @@ -542,25 +542,29 @@ if (BpelProcess.__log.isDebugEnabled()) { __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex); } - switch (m.getStatus()) { - case FAILURE: - // We can't seem to get the failure out of the myrole mex? - pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null); - break; - case FAULT: - Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart()) - .getMessage().getQName()); - faultRes.setMessage(m.getResponse().getMessage()); - pmex.replyWithFault(m.getFault(), faultRes); - break; - case RESPONSE: - Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName()); - response.setMessage(m.getResponse().getMessage()); - pmex.reply(response); - break; - default: - __log.warn("Unexpected state: " + m.getStatus()); - break; + try { + switch (m.getStatus()) { + case FAILURE: + // We can't seem to get the failure out of the myrole mex? + pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null); + break; + case FAULT: + Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart()) + .getMessage().getQName()); + faultRes.setMessage(m.getResponse().getMessage()); + pmex.replyWithFault(m.getFault(), faultRes); + break; + case RESPONSE: + Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName()); + response.setMessage(m.getResponse().getMessage()); + pmex.reply(response); + break; + default: + __log.warn("Unexpected state: " + m.getStatus()); + break; + } + } finally { + mex.release(); } } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m); @@ -686,8 +690,8 @@ } if (BpelProcess.__log.isDebugEnabled()) { - BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink + ", op=" + operation.getName() + " channel=" - + channel + ")"); + BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink + + ", op=" + operation.getName() + " channel=" + channel + ")"); } // prepare event Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java Thu May 31 11:52:49 2007 @@ -27,7 +27,6 @@ import javax.xml.namespace.QName; public class MessageDAOImpl extends DaoBaseImpl implements MessageDAO { - private QName type; private Element data; private MessageExchangeDAO messageExchange; Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Thu May 31 11:52:49 2007 @@ -35,7 +35,7 @@ public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO { - private String messageExchangeId; + private String messageExchangeId; private MessageDAO response; private Date createTime; private MessageDAO request; @@ -63,7 +63,7 @@ public MessageExchangeDAOImpl(char direction, String messageEchangeId){ this.direction = direction; this.messageExchangeId = messageEchangeId; - } + } public String getMessageExchangeId() { return messageExchangeId; @@ -279,7 +279,6 @@ response = null; BpelDAOConnectionImpl.removeMessageExchange(getMessageExchangeId()); } - public String toString() { return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")"; Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Thu May 31 11:52:49 2007 @@ -34,6 +34,7 @@ import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; /** * A very simple, in-memory implementation of the [EMAIL PROTECTED] ProcessDAO} interface. @@ -50,6 +51,7 @@ private Map<QName, ProcessDaoImpl> _store; private BpelDAOConnectionImpl _conn; private int _executionCount = 0; + private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>(); private String _guid; @@ -128,8 +130,16 @@ // Cleaning up __log.debug("Removing completed process instance " + instance.getInstanceId() + " from in-memory store."); ProcessInstanceDAO removed = _instances.remove(instance.getInstanceId()); - if (removed == null) - __log.warn("Couldn't find process instance " + instance.getInstanceId() + " for cleanup."); + if (removed == null) { + // Checking for leftover instances that should be removed + for (Long iid : _instancesToRemove) { + _instances.remove(iid); + } + + // The instance can't be found probably because the transaction isn't committed yet and + // it doesn't exist. Saving its id for later cleanup. + _instancesToRemove.add(instance.getInstanceId()); + } } public void delete() { Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=543214&r1=543213&r2=543214 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Thu May 31 11:52:49 2007 @@ -52,40 +52,24 @@ private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList(); private short _previousState; - private short _state; - private Long _instanceId; - private ProcessDaoImpl _processDao; - private byte[] _jacobState; - private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>(); - private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, List<ScopeDAO>>(); - private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>(); - private ScopeDAO _rootScope; - private FaultDAO _fault; - private CorrelatorDAO _instantiatingCorrelator; - private BpelDAOConnection _conn; - private int _failureCount; - private Date _failureDateTime; - private Map<String, ActivityRecoveryDAO> _activityRecoveries = new HashMap<String, ActivityRecoveryDAO>(); // TODO: Remove this, we should be using the main event store... private List<ProcessInstanceEvent> _events = new ArrayList<ProcessInstanceEvent>(); - private Date _lastActive; - private int _seq; ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, CorrelatorDAO correlator) {