Author: mriou
Date: Thu May 31 11:52:49 2007
New Revision: 543214

URL: http://svn.apache.org/viewvc?view=rev&rev=543214
Log:
Fixed a couple of potential leaks in in-memory DAOs.

Modified:
    incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java

Modified: 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java 
(original)
+++ 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java 
Thu May 31 11:52:49 2007
@@ -19,28 +19,6 @@
 
 package org.apache.ode.axis2;
 
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.wsdl.Port;
-import javax.wsdl.Service;
-import javax.wsdl.extensions.UnknownExtensibilityElement;
-import javax.wsdl.extensions.soap.SOAPAddress;
-import javax.xml.namespace.QName;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.wsdl.Port;
-import javax.wsdl.Service;
-import javax.wsdl.extensions.UnknownExtensibilityElement;
-import javax.wsdl.extensions.soap.SOAPAddress;
-import javax.xml.namespace.QName;
-
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
@@ -49,7 +27,6 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.axis2.util.OMUtils;
 import org.apache.ode.axis2.util.SoapMessageConverter;
 import org.apache.ode.bpel.epr.EndpointFactory;
 import org.apache.ode.bpel.epr.MutableEndpoint;
@@ -65,6 +42,16 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
+import javax.transaction.TransactionManager;
+import javax.wsdl.Definition;
+import javax.wsdl.Port;
+import javax.wsdl.Service;
+import javax.wsdl.extensions.UnknownExtensibilityElement;
+import javax.wsdl.extensions.soap.SOAPAddress;
+import javax.xml.namespace.QName;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A running service, encapsulates the Axis service, its receivers and our
  * receivers as well.
@@ -101,7 +88,7 @@
     public void onAxisMessageExchange(MessageContext msgContext, 
MessageContext outMsgContext, SOAPFactory soapFactory)
             throws AxisFault {
         boolean success = true;
-        MyRoleMessageExchange odeMex;
+        MyRoleMessageExchange odeMex = null;
         Future responseFuture = null;
         try {
             _txManager.begin();
@@ -147,6 +134,7 @@
             throw new OdeFault("An exception occured while invoking ODE.", e);
         } finally {
             if (!success) {
+                if (odeMex != null) odeMex.release();
                 try {
                     _txManager.rollback();
                 } catch (Exception e) {
@@ -192,20 +180,21 @@
                         __log.error("Error processing response for MEX " + 
odeMex, e);
                         throw new OdeFault("An exception occured when invoking 
ODE.", e);
                     } finally {
-                    if (commit) {
+                        odeMex.release();
+                        if (commit) {
                             try {
                                 if (__log.isDebugEnabled()) 
__log.debug("Comitting transaction.");
                                 _txManager.commit();
                             } catch (Exception e) {
                                 throw new OdeFault("Commit failed!", e);
                             }
-                    } else {
+                        } else {
                             try {
                                 _txManager.rollback();
                             } catch (Exception ex) {
                                 throw new OdeFault("Rollback failed!", ex);
                             }
-                    }
+                        }
                 }
             }
             if (!success) {

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Thu May 31 11:52:49 2007
@@ -542,25 +542,29 @@
             if (BpelProcess.__log.isDebugEnabled()) {
                 __log.debug("Replying to a p2p mex, myrole " + m + " - 
partnerole " + pmex);
             }
-            switch (m.getStatus()) {
-                case FAILURE:
-                    // We can't seem to get the failure out of the myrole mex?
-                    pmex.replyWithFailure(MessageExchange.FailureType.OTHER, 
"operation failed", null);
-                    break;
-                case FAULT:
-                    Message faultRes = 
pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
-                            .getMessage().getQName());
-                    faultRes.setMessage(m.getResponse().getMessage());
-                    pmex.replyWithFault(m.getFault(), faultRes);
-                    break;
-                case RESPONSE:
-                    Message response = 
pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                    response.setMessage(m.getResponse().getMessage());
-                    pmex.reply(response);
-                    break;
-                default:
-                    __log.warn("Unexpected state: " + m.getStatus());
-                    break;
+            try {
+                switch (m.getStatus()) {
+                    case FAILURE:
+                        // We can't seem to get the failure out of the myrole 
mex?
+                        
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", 
null);
+                        break;
+                    case FAULT:
+                        Message faultRes = 
pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
+                                .getMessage().getQName());
+                        faultRes.setMessage(m.getResponse().getMessage());
+                        pmex.replyWithFault(m.getFault(), faultRes);
+                        break;
+                    case RESPONSE:
+                        Message response = 
pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                        response.setMessage(m.getResponse().getMessage());
+                        pmex.reply(response);
+                        break;
+                    default:
+                        __log.warn("Unexpected state: " + m.getStatus());
+                        break;
+                }
+            } finally {
+                mex.release();
             }
         } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
 
@@ -686,8 +690,8 @@
         }
 
         if (BpelProcess.__log.isDebugEnabled()) {
-            BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + 
partnerLink + ", op=" + operation.getName() + " channel="
-                    + channel + ")");
+            BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + 
partnerLink +
+                    ", op=" + operation.getName() + " channel=" + channel + 
")");
         }
 
         // prepare event

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
 Thu May 31 11:52:49 2007
@@ -27,7 +27,6 @@
 import javax.xml.namespace.QName;
 
 public class MessageDAOImpl extends DaoBaseImpl implements MessageDAO {
-       
        private QName type;
        private Element data;
        private MessageExchangeDAO messageExchange;

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
 Thu May 31 11:52:49 2007
@@ -35,7 +35,7 @@
 
 public class MessageExchangeDAOImpl extends DaoBaseImpl implements 
MessageExchangeDAO {
 
-       private String messageExchangeId;
+    private String messageExchangeId;
        private MessageDAO response;
        private Date createTime;
        private MessageDAO request;
@@ -63,7 +63,7 @@
        public MessageExchangeDAOImpl(char direction, String messageEchangeId){
                this.direction = direction;
                this.messageExchangeId = messageEchangeId;
-       }
+    }
        
        public String getMessageExchangeId() {
                return messageExchangeId;
@@ -279,7 +279,6 @@
         response = null;
         BpelDAOConnectionImpl.removeMessageExchange(getMessageExchangeId());
     }
-
 
     public String toString() {
         return "mem.mex(direction=" + direction + " id=" + messageExchangeId + 
")";

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
 Thu May 31 11:52:49 2007
@@ -34,6 +34,7 @@
 import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * A very simple, in-memory implementation of the [EMAIL PROTECTED] 
ProcessDAO} interface.
@@ -50,6 +51,7 @@
     private Map<QName, ProcessDaoImpl> _store;
     private BpelDAOConnectionImpl _conn;
     private int _executionCount = 0;
+    private Collection<Long> _instancesToRemove = new 
ConcurrentLinkedQueue<Long>();
 
     private String _guid;
 
@@ -128,8 +130,16 @@
         // Cleaning up
         __log.debug("Removing completed process instance " + 
instance.getInstanceId() + " from in-memory store.");
         ProcessInstanceDAO removed = 
_instances.remove(instance.getInstanceId());
-        if (removed == null)
-            __log.warn("Couldn't find process instance " + 
instance.getInstanceId() + " for cleanup.");
+        if (removed == null) {
+            // Checking for leftover instances that should be removed
+            for (Long iid : _instancesToRemove) {
+                _instances.remove(iid);
+            }
+
+            // The instance can't be found probably because the transaction 
isn't committed yet and
+            // it doesn't exist. Saving its id for later cleanup.
+            _instancesToRemove.add(instance.getInstanceId());
+        }
     }
 
     public void delete() {

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?view=diff&rev=543214&r1=543213&r2=543214
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
 Thu May 31 11:52:49 2007
@@ -52,40 +52,24 @@
     private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = 
Collections.emptyList();
 
     private short _previousState;
-
     private short _state;
-
     private Long _instanceId;
-
     private ProcessDaoImpl _processDao;
-
     private byte[] _jacobState;
-
     private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
-
     private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, 
List<ScopeDAO>>();
-
     private Map<String, byte[]> _messageExchanges = new HashMap<String, 
byte[]>();
-
     private ScopeDAO _rootScope;
-
     private FaultDAO _fault;
-
     private CorrelatorDAO _instantiatingCorrelator;
-
     private BpelDAOConnection _conn;
-
     private int _failureCount;
-
     private Date _failureDateTime;
-
     private Map<String, ActivityRecoveryDAO> _activityRecoveries = new 
HashMap<String, ActivityRecoveryDAO>();
 
     // TODO: Remove this, we should be using the main event store...
     private List<ProcessInstanceEvent> _events = new 
ArrayList<ProcessInstanceEvent>();
-
     private Date _lastActive;
-
     private int _seq;
 
     ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, 
CorrelatorDAO correlator) {


Reply via email to