Author: mszefler
Date: Fri Jan 12 09:56:02 2007
New Revision: 495664

URL: http://svn.apache.org/viewvc?view=rev&rev=495664
Log:
defer invokes until after tx has done commit.

Modified:
    
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
    incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
    
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java

Modified: 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java?view=diff&rev=495664&r1=495663&r2=495664
==============================================================================
--- 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
 (original)
+++ 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java
 Fri Jan 12 09:56:02 2007
@@ -19,6 +19,12 @@
 
 package org.apache.ode.axis2;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import javax.wsdl.Definition;
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
@@ -37,15 +43,11 @@
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.utils.DOMUtils;
 import org.w3c.dom.Element;
 
-import javax.wsdl.Definition;
-import javax.xml.namespace.QName;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
 /**
  * Acts as a service not provided by ODE. Used mainly for invocation as a way 
to
  * maintain the WSDL decription of used services.
@@ -63,13 +65,16 @@
     private AxisConfiguration _axisConfig;
     private boolean _isReplicateEmptyNS = false;
 
+    private Scheduler _sched;
+
     public ExternalService(Definition definition, QName serviceName,
-                           String portName, ExecutorService executorService, 
AxisConfiguration axisConfig) {
+                           String portName, ExecutorService executorService, 
AxisConfiguration axisConfig, Scheduler sched) {
         _definition = definition;
         _serviceName = serviceName;
         _portName = portName;
         _executorService = executorService;
         _axisConfig = axisConfig;
+        _sched = sched;
     }
 
     public void invoke(final PartnerRoleMessageExchange odeMex) {
@@ -102,38 +107,42 @@
             serviceClient.setOverrideOptions(mexOptions);
 
             if (isTwoWay) {
-                // Invoking in a separate thread even though we're supposed to 
wait for a synchronous reply
-                // to force clear transaction separation.
-                Future<OMElement> freply = _executorService.submit(new 
Callable<OMElement>() {
-                    public OMElement call() throws Exception {
-                        return serviceClient.sendReceive(payload);
+                // Defer the invoke until the transaction commits. 
+                
+                _sched.registerSynchronizer(new Scheduler.Synchronizer() {
+
+                    public void afterCompletion(boolean success) {
+                        // If the TX is rolled back, then we don't send the 
request.
+                        if (!success) return;
+                        OMElement reply;
+                        try {
+                            reply = serviceClient.sendReceive(payload);
+                            reply(odeMex,reply);
+                        } catch (Throwable t) {
+                            String errmsg = "Error sending message to Axis2 
for ODE mex " + odeMex;
+                            __log.error(errmsg, t);
+                            
replyWithFailure(odeMex,MessageExchange.FailureType.COMMUNICATION_ERROR, 
errmsg, null);
+                            return;
+                        }
+                        if (reply == null) {
+                            String errmsg = "Received empty (null) reply for 
ODE mex " + odeMex;
+                            __log.error(errmsg);
+                            
replyWithFailure(odeMex,MessageExchange.FailureType.COMMUNICATION_ERROR, 
errmsg, null);
+                        } else {
+                        }
                     }
-                });
-                OMElement reply;
-                try {
-                    reply = freply.get();
-
-                    if (reply == null) {
-                        String errmsg = "Received empty (null) reply for ODE 
mex " + odeMex;
-                        __log.error(errmsg);
-                        
odeMex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, 
errmsg, null);
-                    } else {
-                        Message response = 
odeMex.createMessage(odeMex.getOperation().getOutput().getMessage().getQName());
-                        Element responseElmt = OMUtils.toDOM(reply);
-                        responseElmt = SOAPUtils.unwrap(responseElmt, 
_definition,
-                                
odeMex.getOperation().getOutput().getMessage(), _serviceName);
-                        __log.debug("Received synchronous response for MEX " + 
odeMex);
-                        __log.debug("Message: " + 
DOMUtils.domToString(responseElmt));
-                        response.setMessage(responseElmt);
-                        odeMex.reply(response);
+
+
+
+                    public void beforeCompletion() {                
                     }
-                } catch (Exception e) {
-                    String errmsg = "Error sending message to Axis2 for ODE 
mex " + odeMex;
-                    __log.error(errmsg, e);
-                    
odeMex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, 
errmsg, null);
-                }
+                    
+                });
+                odeMex.replyAsync();
+              
             } else /** one-way case **/ {
                 serviceClient.fireAndForget(payload);
+                odeMex.replyOneWayOk();
             }
         } catch (AxisFault axisFault) {
             String errmsg = "Error sending message to Axis2 for ODE mex " + 
odeMex;
@@ -198,4 +207,53 @@
     public QName getServiceName() {
         return _serviceName;
     }
+    
+    
+    private void replyWithFailure(final PartnerRoleMessageExchange odeMex, 
final FailureType error, final String errmsg,final Element details) {
+        // ODE MEX needs to be invoked in a TX.
+        try {
+            _sched.execTransaction(new Callable<Void>() {
+                public Void call() throws Exception {
+                    odeMex.replyWithFailure(error, errmsg, details);
+                    return null;
+                }
+            });
+
+        } catch (Exception e) {
+            String emsg = "Error executing replyWithFailure transaction; reply 
will be lost.";
+            __log.error(emsg, e);
+
+        }
+        
+    }
+
+    private void reply(final PartnerRoleMessageExchange odeMex, final 
OMElement reply) {
+        // ODE MEX needs to be invoked in a TX.
+        try {
+            _sched.execTransaction(new Callable<Void>() {
+                public Void call() throws Exception {
+                    Message response = 
odeMex.createMessage(odeMex.getOperation().getOutput().getMessage().getQName());
+                    try {
+                        Element responseElmt = OMUtils.toDOM(reply);
+                        responseElmt = SOAPUtils.unwrap(responseElmt, 
_definition,
+                                
odeMex.getOperation().getOutput().getMessage(), _serviceName);
+                        __log.debug("Received synchronous response for MEX " + 
odeMex);
+                        __log.debug("Message: " + 
DOMUtils.domToString(responseElmt));
+                        response.setMessage(responseElmt);
+                        odeMex.reply(response);
+                    } catch (Exception ex) {
+                        String errmsg =  "Unable to process response: " + 
ex.getMessage();
+                        __log.error(errmsg, ex);
+                        
odeMex.replyWithFailure(FailureType.FORMAT_ERROR,errmsg, null);
+                    }
+                    return null;
+                }
+            });
+
+        } catch (Exception e) {
+            String errmsg = "Error executing reply transaction; reply will be 
lost.";
+            __log.error(errmsg, e);
+        }        
+    }
+
 }

Modified: 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=495664&r1=495663&r2=495664
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java 
(original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java 
Fri Jan 12 09:56:02 2007
@@ -245,7 +245,7 @@
         if (extService != null)
             return extService;
 
-        extService = new ExternalService(def, serviceName, portName, 
_executorService, _axisConfig);
+        extService = new ExternalService(def, serviceName, portName, 
_executorService, _axisConfig, _scheduler);
         if (_odeConfig.isReplicateEmptyNS()) {
             __log.debug("Setting external service with empty namespace 
replication");
             extService.setReplicateEmptyNS(true);
@@ -527,6 +527,7 @@
                 String listenerCN = tokenizer.nextToken();
                 try {
                     _server.registerBpelEventListener((BpelEventListener) 
Class.forName(listenerCN).newInstance());
+                    
__log.info(__msgs.msgBpelEventListenerRegistered(listenerCN));
                 } catch (Exception e) {
                     __log.warn("Couldn't register the event listener " + 
listenerCN + ", the class couldn't be " +
                             "loaded properly.");

Modified: 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java?view=diff&rev=495664&r1=495663&r2=495664
==============================================================================
--- 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java 
(original)
+++ 
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java 
Fri Jan 12 09:56:02 2007
@@ -1,5 +1,18 @@
 package org.apache.ode.axis2;
 
+import java.io.File;
+import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.naming.InitialContext;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.Definition;
+import javax.xml.namespace.QName;
+
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.description.AxisService;
@@ -23,25 +36,8 @@
 import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.fs.TempFileManager;
-import org.apache.openjpa.ee.ManagedRuntime;
 import org.opentools.minerva.MinervaPool;
 
-import javax.naming.InitialContext;
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.Definition;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.util.HashMap;
-import java.util.StringTokenizer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * @author Matthieu Riou <mriou at apache dot org>
  */
@@ -203,7 +199,7 @@
         if (extService != null)
             return extService;
 
-        extService = new ExternalService(def, serviceName, portName, 
_executorService, _axisConfig);
+        extService = new ExternalService(def, serviceName, portName, 
_executorService, _axisConfig, _scheduler);
         if (_odeConfig.isReplicateEmptyNS()) {
             __log.debug("Setting external service with empty namespace 
replication");
             extService.setReplicateEmptyNS(true);
@@ -336,30 +332,17 @@
     }
 
     private void initJPA() {
-        HashMap propMap = new HashMap();
-        propMap.put("openjpa.jdbc.DBDictionary", 
"org.apache.openjpa.jdbc.sql.DerbyDictionary");
-        propMap.put("openjpa.ManagedRuntime", new TxMgrProvider());
-        propMap.put("openjpa.ConnectionDriverName", 
org.apache.derby.jdbc.EmbeddedDriver.class.getName());
-        propMap.put("javax.persistence.nonJtaDataSource", _datasource);
-        propMap.put("openjpa.Log", "DefaultLevel=TRACE");
-        EntityManagerFactory emf = 
Persistence.createEntityManagerFactory("ode-dao", propMap);
-//        propMap.put("openjpa.ConnectionUserName", "sa");
-//        propMap.put("openjpa.ConnectionPassword", "");
-//        propMap.put("openjpa.ConnectionDriverName", 
org.apache.derby.jdbc.EmbeddedDriver.class.getName());
-//        propMap.put("ConnectionDriverName", 
org.apache.derby.jdbc.EmbeddedDriver.class.getName());
-//        propMap.put("openjpa.ConnectionURL", url);
-        EntityManager em = emf.createEntityManager();
-//        ((EntityManagerImpl)em).
-        _daoCF = new BPELDAOConnectionFactoryImpl(em);
-    }
 
-    public class TxMgrProvider implements ManagedRuntime {
-        public TxMgrProvider() {
-        }
-        public TransactionManager getTransactionManager() throws Exception {
-            return _txMgr;
-        }
+        BPELDAOConnectionFactoryImpl daoCF = new 
BPELDAOConnectionFactoryImpl();
+        daoCF.setTransactionManager(_txMgr);
+        daoCF.setDBDictionary("org.apache.openjpa.jdbc.sql.DerbyDictionary");
+        daoCF.setDataSource(_datasource);
+        daoCF.init(null);
+
+        _daoCF = daoCF;
     }
+
+  
 
     /**
      * Initialize the Hibernate data store.


Reply via email to