Author: mszefler Date: Fri Jan 12 09:56:02 2007 New Revision: 495664 URL: http://svn.apache.org/viewvc?view=rev&rev=495664 Log: defer invokes until after tx has done commit.
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java?view=diff&rev=495664&r1=495663&r2=495664 ============================================================================== --- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java (original) +++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ExternalService.java Fri Jan 12 09:56:02 2007 @@ -19,6 +19,12 @@ package org.apache.ode.axis2; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import javax.wsdl.Definition; +import javax.xml.namespace.QName; + import org.apache.axiom.om.OMElement; import org.apache.axis2.AxisFault; import org.apache.axis2.addressing.EndpointReference; @@ -37,15 +43,11 @@ import org.apache.ode.bpel.iapi.MessageExchange; import org.apache.ode.bpel.iapi.PartnerRoleChannel; import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange; +import org.apache.ode.bpel.iapi.Scheduler; +import org.apache.ode.bpel.iapi.MessageExchange.FailureType; import org.apache.ode.utils.DOMUtils; import org.w3c.dom.Element; -import javax.wsdl.Definition; -import javax.xml.namespace.QName; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - /** * Acts as a service not provided by ODE. Used mainly for invocation as a way to * maintain the WSDL decription of used services. @@ -63,13 +65,16 @@ private AxisConfiguration _axisConfig; private boolean _isReplicateEmptyNS = false; + private Scheduler _sched; + public ExternalService(Definition definition, QName serviceName, - String portName, ExecutorService executorService, AxisConfiguration axisConfig) { + String portName, ExecutorService executorService, AxisConfiguration axisConfig, Scheduler sched) { _definition = definition; _serviceName = serviceName; _portName = portName; _executorService = executorService; _axisConfig = axisConfig; + _sched = sched; } public void invoke(final PartnerRoleMessageExchange odeMex) { @@ -102,38 +107,42 @@ serviceClient.setOverrideOptions(mexOptions); if (isTwoWay) { - // Invoking in a separate thread even though we're supposed to wait for a synchronous reply - // to force clear transaction separation. - Future<OMElement> freply = _executorService.submit(new Callable<OMElement>() { - public OMElement call() throws Exception { - return serviceClient.sendReceive(payload); + // Defer the invoke until the transaction commits. + + _sched.registerSynchronizer(new Scheduler.Synchronizer() { + + public void afterCompletion(boolean success) { + // If the TX is rolled back, then we don't send the request. + if (!success) return; + OMElement reply; + try { + reply = serviceClient.sendReceive(payload); + reply(odeMex,reply); + } catch (Throwable t) { + String errmsg = "Error sending message to Axis2 for ODE mex " + odeMex; + __log.error(errmsg, t); + replyWithFailure(odeMex,MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null); + return; + } + if (reply == null) { + String errmsg = "Received empty (null) reply for ODE mex " + odeMex; + __log.error(errmsg); + replyWithFailure(odeMex,MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null); + } else { + } } - }); - OMElement reply; - try { - reply = freply.get(); - - if (reply == null) { - String errmsg = "Received empty (null) reply for ODE mex " + odeMex; - __log.error(errmsg); - odeMex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null); - } else { - Message response = odeMex.createMessage(odeMex.getOperation().getOutput().getMessage().getQName()); - Element responseElmt = OMUtils.toDOM(reply); - responseElmt = SOAPUtils.unwrap(responseElmt, _definition, - odeMex.getOperation().getOutput().getMessage(), _serviceName); - __log.debug("Received synchronous response for MEX " + odeMex); - __log.debug("Message: " + DOMUtils.domToString(responseElmt)); - response.setMessage(responseElmt); - odeMex.reply(response); + + + + public void beforeCompletion() { } - } catch (Exception e) { - String errmsg = "Error sending message to Axis2 for ODE mex " + odeMex; - __log.error(errmsg, e); - odeMex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, errmsg, null); - } + + }); + odeMex.replyAsync(); + } else /** one-way case **/ { serviceClient.fireAndForget(payload); + odeMex.replyOneWayOk(); } } catch (AxisFault axisFault) { String errmsg = "Error sending message to Axis2 for ODE mex " + odeMex; @@ -198,4 +207,53 @@ public QName getServiceName() { return _serviceName; } + + + private void replyWithFailure(final PartnerRoleMessageExchange odeMex, final FailureType error, final String errmsg,final Element details) { + // ODE MEX needs to be invoked in a TX. + try { + _sched.execTransaction(new Callable<Void>() { + public Void call() throws Exception { + odeMex.replyWithFailure(error, errmsg, details); + return null; + } + }); + + } catch (Exception e) { + String emsg = "Error executing replyWithFailure transaction; reply will be lost."; + __log.error(emsg, e); + + } + + } + + private void reply(final PartnerRoleMessageExchange odeMex, final OMElement reply) { + // ODE MEX needs to be invoked in a TX. + try { + _sched.execTransaction(new Callable<Void>() { + public Void call() throws Exception { + Message response = odeMex.createMessage(odeMex.getOperation().getOutput().getMessage().getQName()); + try { + Element responseElmt = OMUtils.toDOM(reply); + responseElmt = SOAPUtils.unwrap(responseElmt, _definition, + odeMex.getOperation().getOutput().getMessage(), _serviceName); + __log.debug("Received synchronous response for MEX " + odeMex); + __log.debug("Message: " + DOMUtils.domToString(responseElmt)); + response.setMessage(responseElmt); + odeMex.reply(response); + } catch (Exception ex) { + String errmsg = "Unable to process response: " + ex.getMessage(); + __log.error(errmsg, ex); + odeMex.replyWithFailure(FailureType.FORMAT_ERROR,errmsg, null); + } + return null; + } + }); + + } catch (Exception e) { + String errmsg = "Error executing reply transaction; reply will be lost."; + __log.error(errmsg, e); + } + } + } Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=495664&r1=495663&r2=495664 ============================================================================== --- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original) +++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Fri Jan 12 09:56:02 2007 @@ -245,7 +245,7 @@ if (extService != null) return extService; - extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig); + extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig, _scheduler); if (_odeConfig.isReplicateEmptyNS()) { __log.debug("Setting external service with empty namespace replication"); extService.setReplicateEmptyNS(true); @@ -527,6 +527,7 @@ String listenerCN = tokenizer.nextToken(); try { _server.registerBpelEventListener((BpelEventListener) Class.forName(listenerCN).newInstance()); + __log.info(__msgs.msgBpelEventListenerRegistered(listenerCN)); } catch (Exception e) { __log.warn("Couldn't register the event listener " + listenerCN + ", the class couldn't be " + "loaded properly."); Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java?view=diff&rev=495664&r1=495663&r2=495664 ============================================================================== --- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java (original) +++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServerJPA.java Fri Jan 12 09:56:02 2007 @@ -1,5 +1,18 @@ package org.apache.ode.axis2; +import java.io.File; +import java.util.StringTokenizer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.naming.InitialContext; +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.sql.DataSource; +import javax.transaction.TransactionManager; +import javax.wsdl.Definition; +import javax.xml.namespace.QName; + import org.apache.axis2.AxisFault; import org.apache.axis2.description.AxisOperation; import org.apache.axis2.description.AxisService; @@ -23,25 +36,8 @@ import org.apache.ode.dao.jpa.ojpa.BPELDAOConnectionFactoryImpl; import org.apache.ode.store.ProcessStoreImpl; import org.apache.ode.utils.fs.TempFileManager; -import org.apache.openjpa.ee.ManagedRuntime; import org.opentools.minerva.MinervaPool; -import javax.naming.InitialContext; -import javax.persistence.EntityManager; -import javax.persistence.EntityManagerFactory; -import javax.persistence.Persistence; -import javax.servlet.ServletConfig; -import javax.servlet.ServletException; -import javax.sql.DataSource; -import javax.transaction.TransactionManager; -import javax.wsdl.Definition; -import javax.xml.namespace.QName; -import java.io.File; -import java.util.HashMap; -import java.util.StringTokenizer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - /** * @author Matthieu Riou <mriou at apache dot org> */ @@ -203,7 +199,7 @@ if (extService != null) return extService; - extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig); + extService = new ExternalService(def, serviceName, portName, _executorService, _axisConfig, _scheduler); if (_odeConfig.isReplicateEmptyNS()) { __log.debug("Setting external service with empty namespace replication"); extService.setReplicateEmptyNS(true); @@ -336,30 +332,17 @@ } private void initJPA() { - HashMap propMap = new HashMap(); - propMap.put("openjpa.jdbc.DBDictionary", "org.apache.openjpa.jdbc.sql.DerbyDictionary"); - propMap.put("openjpa.ManagedRuntime", new TxMgrProvider()); - propMap.put("openjpa.ConnectionDriverName", org.apache.derby.jdbc.EmbeddedDriver.class.getName()); - propMap.put("javax.persistence.nonJtaDataSource", _datasource); - propMap.put("openjpa.Log", "DefaultLevel=TRACE"); - EntityManagerFactory emf = Persistence.createEntityManagerFactory("ode-dao", propMap); -// propMap.put("openjpa.ConnectionUserName", "sa"); -// propMap.put("openjpa.ConnectionPassword", ""); -// propMap.put("openjpa.ConnectionDriverName", org.apache.derby.jdbc.EmbeddedDriver.class.getName()); -// propMap.put("ConnectionDriverName", org.apache.derby.jdbc.EmbeddedDriver.class.getName()); -// propMap.put("openjpa.ConnectionURL", url); - EntityManager em = emf.createEntityManager(); -// ((EntityManagerImpl)em). - _daoCF = new BPELDAOConnectionFactoryImpl(em); - } - public class TxMgrProvider implements ManagedRuntime { - public TxMgrProvider() { - } - public TransactionManager getTransactionManager() throws Exception { - return _txMgr; - } + BPELDAOConnectionFactoryImpl daoCF = new BPELDAOConnectionFactoryImpl(); + daoCF.setTransactionManager(_txMgr); + daoCF.setDBDictionary("org.apache.openjpa.jdbc.sql.DerbyDictionary"); + daoCF.setDataSource(_datasource); + daoCF.init(null); + + _daoCF = daoCF; } + + /** * Initialize the Hibernate data store.