Modified: 
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
--- 
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
 (original)
+++ 
incubator/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
 Tue Sep 19 17:42:16 2006
@@ -19,7 +19,7 @@
 package org.apache.ode.daohib.bpel.hobj;
 
 /**
- * @hibernate.subclass table="BPEL_CORRELATOR_ENTRY" discriminator-value="S"
+ * @hibernate.class table="BPEL_SELECTORS"
  * 
  */
 public class HCorrelatorSelector extends HCorrelatorEntry {
@@ -30,6 +30,10 @@
 
     private int _idx;
 
+    private HCorrelator _correlator;
+
+    private String _correlationKey;
+
     /**
      * @hibernate.many-to-one column="PIID"
      */
@@ -54,7 +58,7 @@
     }
 
     /**
-     * @hibernate.property column="IDX"
+     * @hibernate.property column="IDX" not-null="true"
      */
     public int getIndex() {
         return _idx;
@@ -63,4 +67,37 @@
     public void setIndex(int idx) {
         _idx = idx;
     }
+    
+    @Override
+    public String toString() {
+        return "{HCorrelatorSelector correlator=" + this.getCorrelator() + ", 
ckey=" + getCorrelationKey() + 
+        ", groupId=" + getGroupId() + ", idx=" + getIndex() + ", iid=" + 
getInstance().getId() + "}";
+    }
+
+    /**
+     * @hibernate.property column="CORRELATION_KEY"
+     * @hibernate.column name="CORRELATION_KEY"
+     *                   index="IDX_SELECTOR_CKEY"
+     *                   
+     */
+    public String getCorrelationKey() {
+        return _correlationKey;
+    }
+
+    public void setCorrelationKey(String correlationKey) {
+        _correlationKey = correlationKey;
+    }
+
+    /**
+     * @hibernate.many-to-one
+     * @hibernate.column name="CORRELATOR" index="IDX_SELECTOR_CORRELATOR"
+     */
+    public HCorrelator getCorrelator() {
+        return _correlator;
+    }
+
+    public void setCorrelator(HCorrelator correlator) {
+        _correlator = correlator;
+    }
+    
 }

Modified: 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
--- 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
 (original)
+++ 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
 Tue Sep 19 17:42:16 2006
@@ -131,12 +131,6 @@
       continuation = (Continuation)it.next();
       it.remove();
     }
-    // At this point it is wise to clone the continuation, so that we do not 
have weird
-    // concurrency issues. We only clone the closure, the arguments should not 
be
-    // a problem.
-//    Continuation clone = new 
Continuation(cloneClosure(continuation.getClosure()), continuation.getMethod(), 
continuation.getArgs());
-//    clone.setDescription(continuation.getDescription());
-//    return clone;
     return continuation;
   }
 

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java 
Tue Sep 19 17:42:16 2006
@@ -40,7 +40,7 @@
  * type handles all communications initiated by ODE that is destined for other
  * JBI providers.
  */
-class OdeConsumer implements JbiMessageExchangeProcessor {
+class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor 
{
   private static final Log __log = LogFactory.getLog(OdeConsumer.class);
 
   private OdeContext _ode;

Modified: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java?view=diff&rev=448019&r1=448018&r2=448019
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java 
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeService.java 
Tue Sep 19 17:42:16 2006
@@ -32,6 +32,27 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.jbi.msgmap.Mapper;
+import org.apache.ode.jbi.msgmap.MessageTranslationException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
@@ -44,314 +65,309 @@
 /**
  * Bridge JBI (consumer) to ODE (provider).
  */
-public class OdeService implements JbiMessageExchangeProcessor {
+public class OdeService extends ServiceBridge implements 
JbiMessageExchangeProcessor {
 
-  private static final Log __log = LogFactory.getLog(OdeService.class);
+    private static final Log __log = LogFactory.getLog(OdeService.class);
 
-  /** utility for tracking outstanding JBI message exchanges. */
-  private final JbiMexTracker _jbiMexTracker = new JbiMexTracker();
-  
-  /** JBI-Generated Endpoint */
-  private ServiceEndpoint _internal;
-
-  /** External endpoint. */
-  private ServiceEndpoint _external;
-
-  private OdeContext  _ode;
-
-  private Element _serviceref;
-
-  private Endpoint _endpoint;
-  
-  
-  public OdeService(OdeContext odeContext, Endpoint endpoint)
-      throws Exception {
-    _ode = odeContext;
-    _endpoint = endpoint;
-  }
-
-  /**
-   * Do the JBI endpoint activation.
-   *
-   * @throws JBIException
-   */
-  public void activate() throws JBIException {
-    if (_serviceref ==  null) {
-      ServiceEndpoint[] candidates = 
_ode.getContext().getExternalEndpointsForService(_endpoint.serviceName);
-      if (candidates.length != 0) {
-        _external = candidates[0];
-      }
-    }
-    _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, 
_endpoint.portName);
-    if (__log.isDebugEnabled()) {
-      __log.debug("Activated endpoint " + _endpoint);
-    }
-    // TODO: Is there a race situation here?
-  }
-
-  /**
-   * Deactivate endpoints in JBI.
-   */
-  public void deactivate() throws JBIException {
-    _ode.getContext().deactivateEndpoint(_internal);
-    __log.debug("Dectivated endpoint " +  _endpoint);
-  }
-
-  public ServiceEndpoint getInternalServiceEndpoint() {
-    return _internal;
-  }
-  
-  public ServiceEndpoint getExternalServiceEndpoint() {
-    return _external;
-  }
-  
-
-  public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange jbiMex) 
throws MessagingException {
-    if (jbiMex.getRole() != javax.jbi.messaging.MessageExchange.Role.PROVIDER) 
{
-      String errmsg ="Message exchange is not in PROVIDER role as expected: " 
+ jbiMex.getExchangeId(); 
-      __log.fatal(errmsg);
-      throw new IllegalArgumentException(errmsg);
-    }
-
-    if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) {
-      // We can forget about the exchange.
-      _jbiMexTracker.consume(jbiMex.getExchangeId());
-      return;
-    }
-    
-    if 
(jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) 
{
-      boolean success = false;
-      Exception err = null;
-      try {
-        invokeOde(jbiMex, ((InOnly)jbiMex).getInMessage());
-        success = true;
-      } catch (Exception ex) {
-        __log.error("Error invoking ODE.",ex);
-        err = ex;
-      } finally {
-        if (!success) {
-          jbiMex.setStatus(ExchangeStatus.ERROR);
-          jbiMex.setError(err);
-        } else {
-          jbiMex.setStatus(ExchangeStatus.DONE);
-        }        
-      }
-    } else if 
(jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) {
-      boolean success = false;
-      Exception err = null;
-      try {
-        invokeOde(jbiMex, ((InOut)jbiMex).getInMessage());
-        success = true;
-      } catch (Exception ex) {
-        __log.error("Error invoking ODE.", ex);
-        err = ex;
-      } finally {
-        if (!success) {
-          jbiMex.setError(err);
-          jbiMex.setStatus(ExchangeStatus.ERROR);
-        }
-      }
-    } else {
-      __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an 
unsupported pattern "
-          + jbiMex.getPattern());
-      jbiMex.setStatus(ExchangeStatus.ERROR);
-      jbiMex.setError(new Exception("Unknown message exchange pattern: " + 
jbiMex.getPattern()));
-    }
-
-  }
-
-  /**
-   * Called from [EMAIL PROTECTED] 
MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}
-   * @param mex message exchenge
-   */
-  public void onResponse(MyRoleMessageExchange mex) {
-    javax.jbi.messaging.MessageExchange jbiMex = 
_jbiMexTracker.consume(mex.getClientId());
-    if (jbiMex == null) {
-      __log.warn("Ignoring unknown async reply: " + mex);
-      return;
-    }
-
-    switch (mex.getStatus()) {
-      case FAULT:
-        outResponseFault(mex, jbiMex);
-        break;
-      case RESPONSE:
-        outResponse(mex, jbiMex);
-        break;
-      case FAILURE:
-        outFailure(mex, jbiMex);
-        break;
-      default :
-        __log.warn("Received ODE message exchange in unexpected state: " + 
mex.getStatus());
-    }
-  }
-
-  /**
-   * Forward a JBI input message to ODE.
-   *
-   * @param jbiMex
-   */
-  private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, 
-      NormalizedMessage request) throws Exception {
-
-    // If this has already been tracked, we will not invoke!
-    if (_jbiMexTracker.track(jbiMex)) {
-      __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", already 
received!");
-      return;
-    }
-    
-    _ode.getTransactionManager().begin();
-
-    boolean success = false;
-    MyRoleMessageExchange odeMex = null;
-    try {
-      if (__log.isDebugEnabled()) {
-          __log.debug("invokeOde() JBI exchangeId=" + jbiMex.getExchangeId() + 
" endpoint=" + _endpoint + " operation=" + jbiMex.getOperation() );
-      }
-      odeMex = _ode._server.getEngine().createMessageExchange(
-        jbiMex.getExchangeId(),
-        _endpoint.serviceName,
-        jbiMex.getOperation().getLocalPart());
-
-      if (odeMex.getOperation() != null) {
-        javax.wsdl.Message msgdef = 
odeMex.getOperation().getInput().getMessage();
-        Message odeRequest = 
odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
-        Mapper mapper = _ode.findMapper(request,odeMex.getOperation());
-        if (mapper == null) {
-          String errmsg = "Could not find a mapper for request message for JBI 
MEX " + jbiMex.getExchangeId()
-            + "; ODE MEX " + odeMex.getMessageExchangeId() + " is failed. "; 
-          __log.error(errmsg);
-          throw new MessageTranslationException(errmsg);
-          
-        }
-        odeMex.setProperty(Mapper.class.getName(), 
mapper.getClass().getName());
-        mapper.toODE(odeRequest, request, msgdef);
-        odeMex.invoke(odeRequest);
-        
-        // Handle the response if it is immediately available.
-        if (odeMex.getStatus() != Status.ASYNC) {
-          __log.debug("ODE MEX "  + odeMex  + " completed SYNCHRONOUSLY.");
-          onResponse(odeMex);
-          _jbiMexTracker.consume(jbiMex.getExchangeId());
+    /** utility for tracking outstanding JBI message exchanges. */
+    private final JbiMexTracker _jbiMexTracker = new JbiMexTracker();
+
+    /** JBI-Generated Endpoint */
+    private ServiceEndpoint _internal;
+
+    /** External endpoint. */
+    private ServiceEndpoint _external;
+
+    private OdeContext _ode;
+
+    private Element _serviceref;
+
+    private Endpoint _endpoint;
+
+    public OdeService(OdeContext odeContext, Endpoint endpoint) throws 
Exception {
+        _ode = odeContext;
+        _endpoint = endpoint;
+    }
+
+    /**
+     * Do the JBI endpoint activation.
+     * 
+     * @throws JBIException
+     */
+    public void activate() throws JBIException {
+        if (_serviceref == null) {
+            ServiceEndpoint[] candidates = 
_ode.getContext().getExternalEndpointsForService(_endpoint.serviceName);
+            if (candidates.length != 0) {
+                _external = candidates[0];
+            }
+        }
+        _internal = _ode.getContext().activateEndpoint(_endpoint.serviceName, 
_endpoint.portName);
+        if (__log.isDebugEnabled()) {
+            __log.debug("Activated endpoint " + _endpoint);
+        }
+        // TODO: Is there a race situation here?
+    }
+
+    /**
+     * Deactivate endpoints in JBI.
+     */
+    public void deactivate() throws JBIException {
+        _ode.getContext().deactivateEndpoint(_internal);
+        __log.debug("Dectivated endpoint " + _endpoint);
+    }
+
+    public ServiceEndpoint getInternalServiceEndpoint() {
+        return _internal;
+    }
+
+    public ServiceEndpoint getExternalServiceEndpoint() {
+        return _external;
+    }
+
+    public void onJbiMessageExchange(javax.jbi.messaging.MessageExchange 
jbiMex) throws MessagingException {
+        if (jbiMex.getRole() != 
javax.jbi.messaging.MessageExchange.Role.PROVIDER) {
+            String errmsg = "Message exchange is not in PROVIDER role as 
expected: " + jbiMex.getExchangeId();
+            __log.fatal(errmsg);
+            throw new IllegalArgumentException(errmsg);
+        }
+
+        if (jbiMex.getStatus() != ExchangeStatus.ACTIVE) {
+            // We can forget about the exchange.
+            _jbiMexTracker.consume(jbiMex.getExchangeId());
+            return;
+        }
+
+        if 
(jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_ONLY)) 
{
+            boolean success = false;
+            Exception err = null;
+            try {
+                invokeOde(jbiMex, ((InOnly) jbiMex).getInMessage());
+                success = true;
+            } catch (Exception ex) {
+                __log.error("Error invoking ODE.", ex);
+                err = ex;
+            } finally {
+                if (!success) {
+                    jbiMex.setStatus(ExchangeStatus.ERROR);
+                    jbiMex.setError(err);
+                } else {
+                    jbiMex.setStatus(ExchangeStatus.DONE);
+                }
+            }
+        } else if 
(jbiMex.getPattern().equals(org.apache.ode.jbi.MessageExchangePattern.IN_OUT)) {
+            boolean success = false;
+            Exception err = null;
+            try {
+                invokeOde(jbiMex, ((InOut) jbiMex).getInMessage());
+                success = true;
+            } catch (Exception ex) {
+                __log.error("Error invoking ODE.", ex);
+                err = ex;
+            } finally {
+                if (!success) {
+                    jbiMex.setError(err);
+                    jbiMex.setStatus(ExchangeStatus.ERROR);
+                }
+            }
         } else {
-          __log.debug("ODE MEX " + odeMex + " completed ASYNCHRONOUSLY.");
+            __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is 
of an unsupported pattern "
+                    + jbiMex.getPattern());
+            jbiMex.setStatus(ExchangeStatus.ERROR);
+            jbiMex.setError(new Exception("Unknown message exchange pattern: " 
+ jbiMex.getPattern()));
         }
-      } else {
-        __log.error("ODE MEX "  +odeMex + " was unroutable.");
-        jbiMex.setError(new IllegalArgumentException("Unroutable 
invocation."));
-      }
-
-      success = true;
-      // For one-way invocation we do not need to maintain the association
-      if (odeMex.getMessageExchangePattern() != 
MessageExchangePattern.REQUEST_RESPONSE)
-        _jbiMexTracker.consume(jbiMex.getExchangeId());
-
-    } finally {
-      if (success) {
-        __log.debug("Commiting ODE MEX "  + odeMex );
-        _ode.getTransactionManager().commit();
-      } else {
-        __log.debug("Rolling back ODE MEX "  + odeMex );
-        _jbiMexTracker.consume(jbiMex.getExchangeId());
-        _ode.getTransactionManager().rollback();
-        
-      }
-    }
-    
-  }
-
-  private void outFailure(MyRoleMessageExchange odeMex, 
javax.jbi.messaging.MessageExchange jbiMex) {
-    try {
-      jbiMex.setError(new Exception("MEXFailure"));
-      jbiMex.setStatus(ExchangeStatus.ERROR);
-      // TODO: get failure codes out of the message.
-    } catch (MessagingException ex) {
-      __log.fatal("Error bridging ODE out response: ", ex);
-    }
-  }
-
-  private void outResponse(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
-    InOut inout = (InOut)jbiMex;
-    
-    try {
-      NormalizedMessage nmsg = inout.createMessage();
-      String mapperName = mex.getProperty(Mapper.class.getName());
-      Mapper mapper = _ode.getMapper(mapperName);
-      if (mapper == null) {
-        String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " 
-          + mex.getMessageExchangeId() + " is no longer available.";
-        __log.error(errmsg);
-        throw new MessageTranslationException(errmsg);
-      }
-      
-      mapper.toNMS(nmsg,mex.getResponse(),
-            mex.getOperation().getOutput().getMessage());
-      
-      inout.setOutMessage(nmsg);
-      _ode.getChannel().send(inout);
-      
-    } catch (MessagingException ex) {
-      __log.error("Error bridging ODE out response: ", ex);
-      inout.setError(ex);
-    } catch (MessageTranslationException e) {
-      __log.error("Error translating ODE message " + mex.getResponse()
-          + " to NMS format!", e);
-      inout.setError(e);
-    }
-  }
-
-  private void outResponseFault(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
-
-    InOut inout = (InOut)jbiMex;
-
-    try {
-      Fault flt = inout.createFault();
-      String mapperName = mex.getProperty(Mapper.class.getName());
-      Mapper mapper = _ode.getMapper(mapperName);
-      if (mapper == null) {
-        String errmsg = "Message-mapper " + mapperName + " used in ODE MEX " 
-          + mex.getMessageExchangeId() + " is no longer available.";
-        __log.error(errmsg);
-        throw new MessageTranslationException(errmsg);
-      }
-      
-      mapper.toNMS(flt,mex.getResponse(),
-            mex.getOperation().getOutput().getMessage());
-      inout.setFault(flt);
-      _ode.getChannel().send(inout);
-    } catch (MessagingException e) {
-      __log.error("Error bridging ODE fault response: ", e);
-      inout.setError(e);
-    } catch (MessageTranslationException mte) {
-      __log.error("Error translating ODE fault message " + 
mex.getFaultResponse()
-          + " to NMS format!", mte);
-      inout.setError(mte);
-    }
-  }
-
-  public Endpoint getEndpoint() {
-    return _endpoint;
-  }
-
-  
-  /**
-   * Class for tracking outstanding message exchanges from JBI.
-   */
-  private static class JbiMexTracker {
-    /** Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI MEX 
*/
-    private Map<String, javax.jbi.messaging.MessageExchange> 
_outstandingJbiExchanges =
-      new HashMap<String, javax.jbi.messaging.MessageExchange>();
-
-    synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) {
-      boolean found = 
_outstandingJbiExchanges.containsKey(jbiMex.getExchangeId());
-      _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex);
-      return found;
-    }
-
-    synchronized javax.jbi.messaging.MessageExchange consume(String clientId) {
-      return _outstandingJbiExchanges.remove(clientId);
-    }
-    
-    
-  }
+
+    }
+
+    /**
+     * Called from
+     * [EMAIL PROTECTED] 
MessageExchangeContextImpl#onAsyncReply(MyRoleMessageExchange)}
+     * 
+     * @param mex
+     *            message exchenge
+     */
+    public void onResponse(MyRoleMessageExchange mex) {
+        javax.jbi.messaging.MessageExchange jbiMex = 
_jbiMexTracker.consume(mex.getClientId());
+        if (jbiMex == null) {
+            __log.warn("Ignoring unknown async reply: " + mex);
+            return;
+        }
+
+        switch (mex.getStatus()) {
+        case FAULT:
+            outResponseFault(mex, jbiMex);
+            break;
+        case RESPONSE:
+            outResponse(mex, jbiMex);
+            break;
+        case FAILURE:
+            outFailure(mex, jbiMex);
+            break;
+        default:
+            __log.warn("Received ODE message exchange in unexpected state: " + 
mex.getStatus());
+        }
+    }
+
+    /**
+     * Forward a JBI input message to ODE.
+     * 
+     * @param jbiMex
+     */
+    private void invokeOde(javax.jbi.messaging.MessageExchange jbiMex, 
NormalizedMessage request) throws Exception {
+
+        // If this has already been tracked, we will not invoke!
+        if (_jbiMexTracker.track(jbiMex)) {
+            __log.debug("Skipping JBI MEX " + jbiMex.getExchangeId() + ", 
already received!");
+            return;
+        }
+
+        _ode.getTransactionManager().begin();
+
+        boolean success = false;
+        MyRoleMessageExchange odeMex = null;
+        try {
+            if (__log.isDebugEnabled()) {
+                __log.debug("invokeOde() JBI exchangeId=" + 
jbiMex.getExchangeId() + " endpoint=" + _endpoint
+                        + " operation=" + jbiMex.getOperation());
+            }
+            odeMex = 
_ode._server.getEngine().createMessageExchange(jbiMex.getExchangeId(), 
_endpoint.serviceName,
+                    jbiMex.getOperation().getLocalPart());
+
+            if (odeMex.getOperation() != null) {
+                copyMexProperties(odeMex, jbiMex);
+                javax.wsdl.Message msgdef = 
odeMex.getOperation().getInput().getMessage();
+                Message odeRequest = 
odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+                Mapper mapper = _ode.findMapper(request, 
odeMex.getOperation());
+                if (mapper == null) {
+                    String errmsg = "Could not find a mapper for request 
message for JBI MEX " + jbiMex.getExchangeId()
+                            + "; ODE MEX " + odeMex.getMessageExchangeId() + " 
is failed. ";
+                    __log.error(errmsg);
+                    throw new MessageTranslationException(errmsg);
+
+                }
+                odeMex.setProperty(Mapper.class.getName(), 
mapper.getClass().getName());
+                mapper.toODE(odeRequest, request, msgdef);
+                odeMex.invoke(odeRequest);
+
+                // Handle the response if it is immediately available.
+                if (odeMex.getStatus() != Status.ASYNC) {
+                    __log.debug("ODE MEX " + odeMex + " completed 
SYNCHRONOUSLY.");
+                    onResponse(odeMex);
+                    _jbiMexTracker.consume(jbiMex.getExchangeId());
+                } else {
+                    __log.debug("ODE MEX " + odeMex + " completed 
ASYNCHRONOUSLY.");
+                }
+            } else {
+                __log.error("ODE MEX " + odeMex + " was unroutable.");
+                jbiMex.setError(new IllegalArgumentException("Unroutable 
invocation."));
+            }
+
+            success = true;
+            // For one-way invocation we do not need to maintain the 
association
+            if (odeMex.getMessageExchangePattern() != 
MessageExchangePattern.REQUEST_RESPONSE)
+                _jbiMexTracker.consume(jbiMex.getExchangeId());
+
+        } finally {
+            if (success) {
+                __log.debug("Commiting ODE MEX " + odeMex);
+                _ode.getTransactionManager().commit();
+            } else {
+                __log.debug("Rolling back ODE MEX " + odeMex);
+                _jbiMexTracker.consume(jbiMex.getExchangeId());
+                _ode.getTransactionManager().rollback();
+
+            }
+        }
+
+    }
+
+    private void outFailure(MyRoleMessageExchange odeMex, 
javax.jbi.messaging.MessageExchange jbiMex) {
+        try {
+            jbiMex.setError(new Exception("MEXFailure"));
+            jbiMex.setStatus(ExchangeStatus.ERROR);
+            // TODO: get failure codes out of the message.
+        } catch (MessagingException ex) {
+            __log.fatal("Error bridging ODE out response: ", ex);
+        }
+    }
+
+    private void outResponse(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
+        InOut inout = (InOut) jbiMex;
+
+        try {
+            NormalizedMessage nmsg = inout.createMessage();
+            String mapperName = mex.getProperty(Mapper.class.getName());
+            Mapper mapper = _ode.getMapper(mapperName);
+            if (mapper == null) {
+                String errmsg = "Message-mapper " + mapperName + " used in ODE 
MEX " + mex.getMessageExchangeId()
+                        + " is no longer available.";
+                __log.error(errmsg);
+                throw new MessageTranslationException(errmsg);
+            }
+
+            mapper.toNMS(nmsg, mex.getResponse(), 
mex.getOperation().getOutput().getMessage());
+
+            inout.setOutMessage(nmsg);
+            _ode.getChannel().send(inout);
+
+        } catch (MessagingException ex) {
+            __log.error("Error bridging ODE out response: ", ex);
+            inout.setError(ex);
+        } catch (MessageTranslationException e) {
+            __log.error("Error translating ODE message " + mex.getResponse() + 
" to NMS format!", e);
+            inout.setError(e);
+        }
+    }
+
+    private void outResponseFault(MyRoleMessageExchange mex, 
javax.jbi.messaging.MessageExchange jbiMex) {
+
+        InOut inout = (InOut) jbiMex;
+
+        try {
+            Fault flt = inout.createFault();
+            String mapperName = mex.getProperty(Mapper.class.getName());
+            Mapper mapper = _ode.getMapper(mapperName);
+            if (mapper == null) {
+                String errmsg = "Message-mapper " + mapperName + " used in ODE 
MEX " + mex.getMessageExchangeId()
+                        + " is no longer available.";
+                __log.error(errmsg);
+                throw new MessageTranslationException(errmsg);
+            }
+
+            mapper.toNMS(flt, mex.getResponse(), 
mex.getOperation().getOutput().getMessage());
+            inout.setFault(flt);
+            _ode.getChannel().send(inout);
+        } catch (MessagingException e) {
+            __log.error("Error bridging ODE fault response: ", e);
+            inout.setError(e);
+        } catch (MessageTranslationException mte) {
+            __log.error("Error translating ODE fault message " + 
mex.getFaultResponse() + " to NMS format!", mte);
+            inout.setError(mte);
+        }
+    }
+
+    public Endpoint getEndpoint() {
+        return _endpoint;
+    }
+
+    /**
+     * Class for tracking outstanding message exchanges from JBI.
+     */
+    private static class JbiMexTracker {
+        /**
+         * Outstanding JBI-initiated exchanges: mapping for JBI MEX ID to JBI
+         * MEX
+         */
+        private Map<String, javax.jbi.messaging.MessageExchange> 
_outstandingJbiExchanges = new HashMap<String, 
javax.jbi.messaging.MessageExchange>();
+
+        synchronized boolean track(javax.jbi.messaging.MessageExchange jbiMex) 
{
+            boolean found = 
_outstandingJbiExchanges.containsKey(jbiMex.getExchangeId());
+            _outstandingJbiExchanges.put(jbiMex.getExchangeId(), jbiMex);
+            return found;
+        }
+
+        synchronized javax.jbi.messaging.MessageExchange consume(String 
clientId) {
+            return _outstandingJbiExchanges.remove(clientId);
+        }
+
+    }
 }

Added: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java?view=auto&rev=448019
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java 
(added)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java 
Tue Sep 19 17:42:16 2006
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.jbi;
+
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+
+/**
+ * Base-class for classes providing JBI-ODE translation services. 
+ * 
+ * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
+ *
+ */
+public class ServiceBridge {
+    
+    private static final Log __log = LogFactory.getLog(ServiceBridge.class);
+
+    /**
+     * Transfer message properties from the ODE message exchange to the JBI 
message exchange object.
+     * 
+     * @param jbiMex destination JBI message-exchange
+     * @param odeMex source ODE message-exchange
+     */
+    protected void copyMexProperties(javax.jbi.messaging.MessageExchange 
jbiMex, MyRoleMessageExchange odeMex) {
+        for (String propName : odeMex.getPropertyNames()) {
+            String val = odeMex.getProperty(propName);
+            if (val != null) {
+                jbiMex.setProperty(propName, val);
+                __log.debug(jbiMex + ": set property " + propName + " = " + 
val);
+            }
+        }
+    }
+    
+    /**
+     * Transfer message properties from the JBI message exchange to the ODE 
message exchange object.
+     * 
+     * @param odeMex destination ODE message-exchange
+     * @param jbiMex source JBI message-exchange
+     */
+    @SuppressWarnings("unchecked")
+    protected void copyMexProperties(MyRoleMessageExchange odeMex, 
javax.jbi.messaging.MessageExchange jbiMex) {
+        for (String propName : (Set<String>) jbiMex.getPropertyNames()) {
+            if (propName.startsWith("org.apache.ode")) {
+                // Handle ODE-specific properties
+                Object val = jbiMex.getProperty(propName);
+                if (val != null) {
+                    String sval = val.toString();
+                    odeMex.setProperty(propName, sval);
+                    __log.debug(odeMex + ": set property " + propName + " = " 
+ sval);
+                }
+            } else {
+                // Non ODE-specific properties,
+                // TODO: Should we copy these?
+            }
+        }
+    }
+
+
+}

Propchange: 
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/ServiceBridge.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to