Author: gnodet
Date: Thu Sep 27 07:52:36 2007
New Revision: 580043

URL: http://svn.apache.org/viewvc?rev=580043&view=rev
Log:
SM-998: In-Out Exchanges in a JMS queue cannot be successfully processed after 
a crash/shutdown

Modified:
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
 Thu Sep 27 07:52:36 2007
@@ -50,6 +50,8 @@
 import org.apache.servicemix.soap.SoapHelper;
 import org.apache.servicemix.soap.marshalers.SoapMessage;
 import org.apache.servicemix.soap.marshalers.SoapWriter;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
 
 public abstract class AbstractJmsProcessor implements ExchangeProcessor {
 
@@ -66,6 +68,8 @@
     protected ComponentContext context;
     protected DeliveryChannel channel;
 
+    protected Store store;
+
     public AbstractJmsProcessor(JmsEndpoint endpoint) throws Exception {
         this.endpoint = endpoint;
         this.soapHelper = new SoapHelper(endpoint);
@@ -80,6 +84,16 @@
             connectionFactory = getConnectionFactory(ctx);
             connection = connectionFactory.createConnection();
             connection.start();
+
+            // set up the Store
+            if (endpoint.store != null) {
+                store = endpoint.store;
+            } else if (endpoint.storeFactory != null) {
+                store = 
endpoint.storeFactory.open(endpoint.getService().toString() + 
endpoint.getEndpoint());
+            } else {
+                store = new 
MemoryStoreFactory().open(endpoint.getService().toString() + 
endpoint.getEndpoint());
+            }
+
             doStart(ctx);
         } catch (Exception e) {
             try {
@@ -124,6 +138,10 @@
             BaseLifeCycle lf = (BaseLifeCycle) 
endpoint.getServiceUnit().getComponent().getLifeCycle();
             return lf.getContext().getNamingContext();
         }
+    }
+
+    protected Store getStore() {
+        return store;
     }
 
     protected void doStart(InitialContext ctx) throws Exception {

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
 Thu Sep 27 07:52:36 2007
@@ -36,6 +36,8 @@
 import org.apache.servicemix.jbi.security.auth.AuthenticationService;
 import org.apache.servicemix.jbi.security.keystore.KeystoreManager;
 import org.apache.servicemix.soap.SoapEndpoint;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
 
 /**
  * 
@@ -56,7 +58,9 @@
     protected String jndiConnectionFactoryName;
     protected String jndiDestinationName;
     protected String jmsProviderDestinationName;
-    
+    protected String jndiReplyToName;
+    protected String jmsProviderReplyToName;
+
     protected boolean useMsgIdInResponse;
     //
     // Spring configuration
@@ -75,6 +79,12 @@
     
     // Other configuration flags
     protected boolean needJavaIdentifiers;
+
+    /**
+     * The store to keep pending exchanges
+     */
+    protected Store store;
+    protected StoreFactory storeFactory;
     
     /**
      * The BootstrapContext to use for a JCA consumer endpoint.
@@ -193,6 +203,43 @@
     }
 
     /**
+     * The name of the JMS Reply-to destination to lookup in JNDI.
+     * Optional: a temporary queue will be used
+     * if a replyTo is not provided.
+     *
+     * @return Returns the jndiReplyToName.
+     */
+    public String getJndiReplyToName() {
+        return jndiReplyToName;
+    }
+
+    /**
+     * @param jndiReplyToName The jndiReplyToName to set.
+     */
+    public void setJndiReplyToName(String jndiReplyToName) {
+        this.jndiReplyToName = jndiReplyToName;
+    }
+    /**
+     * The name of the reply destination create by a call to 
+     * <code>Session.createQueue</code> or <code>Session.createTopic</code>.
+     * This property is used when <code>jndiReplyToName</code> is
+     * <code>null</code>.  Optional: a temporary queue will be used
+     * if a replyTo is not provided.
+     * 
+     * @return Returns the jmsProviderReplyToName.
+     */
+    public String getJmsProviderReplyToName() {
+        return jmsProviderReplyToName;
+    }
+
+    /**
+     * @param jmsProviderReplyToName The jmsProviderReplyToName to set.
+     */
+    public void setJmsProviderReplyToName(String jmsProviderReplyToName) {
+        this.jmsProviderReplyToName = jmsProviderReplyToName;
+    }
+
+    /**
      * The name of the JMS ConnectionFactory to lookup in JNDI.
      * Used if <code>connectionFactory</code> is <code>null</code>
      * 
@@ -314,6 +361,31 @@
      */
     public void setRoleAsString(String role) {
         super.setRoleAsString(role);
+    }
+
+    /**
+     * @return Returns the store.
+     */
+    public Store getStore() {
+        return store;
+    }
+    /**
+     * @param store The store to set.
+     */
+    public void setStore(Store store) {
+        this.store = store;
+    }
+    /**
+     * @return Returns the storeFactory.
+     */
+    public StoreFactory getStoreFactory() {
+        return storeFactory;
+    }
+    /**
+     * @param storeFactory The storeFactory to set.
+     */
+    public void setStoreFactory(StoreFactory storeFactory) {
+        this.storeFactory = storeFactory;
     }
 
     protected ExchangeProcessor createProviderProcessor() {

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
 Thu Sep 27 07:52:36 2007
@@ -18,7 +18,6 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
-import java.util.Map;
 
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
@@ -39,8 +38,6 @@
 import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.servicemix.jms.AbstractJmsProcessor;
 import org.apache.servicemix.jms.JmsEndpoint;
 import org.apache.servicemix.soap.marshalers.SoapMessage;
@@ -52,7 +49,6 @@
     protected Destination replyToDestination;
     protected MessageConsumer consumer;
     protected MessageProducer producer;
-    protected Map pendingExchanges = new ConcurrentHashMap();
     protected DeliveryChannel channel;
 
     public MultiplexingProviderProcessor(JmsEndpoint endpoint) throws 
Exception {
@@ -76,10 +72,20 @@
                 throw new IllegalStateException("No destination provided");
             }
         }
-        if (destination instanceof Queue) {
-            replyToDestination = session.createTemporaryQueue();
+        if (endpoint.getJndiReplyToName() != null) {
+            replyToDestination = (Destination) 
ctx.lookup(endpoint.getJndiReplyToName());
+        } else if (endpoint.getJmsProviderReplyToName() != null) {
+            if (destination instanceof Queue) {
+                replyToDestination = 
session.createQueue(endpoint.getJmsProviderReplyToName());
+            } else {
+                replyToDestination = 
session.createTopic(endpoint.getJmsProviderReplyToName());
+            }
         } else {
-            replyToDestination = session.createTemporaryTopic();
+            if (destination instanceof Queue) {
+                replyToDestination = session.createTemporaryQueue();
+            } else {    
+                replyToDestination = session.createTemporaryTopic();
+            }
         }
         producer = session.createProducer(destination);
         consumer = session.createConsumer(replyToDestination);
@@ -104,7 +110,7 @@
                     if (log.isDebugEnabled()) {
                         log.debug("Handling jms message " + message);
                     }
-                    InOut exchange = (InOut) 
pendingExchanges.remove(message.getJMSCorrelationID());
+                    InOut exchange = (InOut) 
store.load(message.getJMSCorrelationID());
                     if (exchange == null) {
                         throw new IllegalStateException("Could not find 
exchange " + message.getJMSCorrelationID());
                     }
@@ -159,13 +165,13 @@
         } else if (exchange instanceof InOut) {
             msg.setJMSCorrelationID(exchange.getExchangeId());
             msg.setJMSReplyTo(replyToDestination);
-            pendingExchanges.put(exchange.getExchangeId(), exchange);
+            store.store(exchange.getExchangeId(), exchange);
             try {
                 synchronized (producer) {
                     producer.send(msg);
                 }
             } catch (Exception e) {
-                pendingExchanges.remove(exchange.getExchangeId());
+                store.load(exchange.getExchangeId());
                 throw e;
             }
         } else {

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
 Thu Sep 27 07:52:36 2007
@@ -44,6 +44,7 @@
 public class StandardProviderProcessor extends AbstractJmsProcessor {
 
     protected Destination destination;
+    protected Destination permanentReplyToDestination;
     protected DeliveryChannel channel;
     
     public StandardProviderProcessor(JmsEndpoint endpoint) throws Exception {
@@ -52,12 +53,37 @@
 
     protected void doStart(InitialContext ctx) throws Exception {
         channel = 
endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
+        Session session = null;
         destination = endpoint.getDestination();
-        if (destination == null) {
-            if (endpoint.getJndiDestinationName() != null) {
-                destination = (Destination) 
ctx.lookup(endpoint.getJndiDestinationName());
-            } else if (endpoint.getJmsProviderDestinationName() == null) {
-                throw new IllegalStateException("No destination provided");
+        try {
+            if (destination == null) {
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+                if (endpoint.getJndiDestinationName() != null) {
+                    destination = (Destination) 
ctx.lookup(endpoint.getJndiDestinationName());
+                } else if (endpoint.getJmsProviderDestinationName() != null) {
+                    if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
+                        destination = 
session.createQueue(endpoint.getJmsProviderDestinationName());
+                    } else {
+                        destination = 
session.createTopic(endpoint.getJmsProviderDestinationName());
+                    }
+                } else {
+                    throw new IllegalStateException("No destination provided");
+                }
+
+                if (endpoint.getJndiReplyToName() != null) {
+                    permanentReplyToDestination = (Destination) 
ctx.lookup(endpoint.getJndiReplyToName());
+                } else if (endpoint.getJmsProviderReplyToName() != null) {
+                    if (destination instanceof Queue) {
+                        permanentReplyToDestination = 
session.createQueue(endpoint.getJmsProviderReplyToName());
+                    } else {
+                        permanentReplyToDestination = 
session.createTopic(endpoint.getJmsProviderReplyToName());
+                    }
+                }
+            }
+        } finally {
+            if (session != null) {
+                session.close();
             }
         }
     }
@@ -75,13 +101,7 @@
         Session session = null;
         try {
             session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            if (destination == null) {
-                if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
-                    destination = 
session.createQueue(endpoint.getJmsProviderDestinationName());
-                } else {
-                    destination = 
session.createTopic(endpoint.getJmsProviderDestinationName());
-                }
-            }
+
             MessageProducer producer = session.createProducer(destination);
             
             TextMessage msg = session.createTextMessage();
@@ -94,10 +114,14 @@
                 channel.send(exchange);
             } else if (exchange instanceof InOut) {
                 Destination replyToDestination;
-                if (destination instanceof Queue) {
-                    replyToDestination = session.createTemporaryQueue();
+                if (permanentReplyToDestination != null) {
+                    replyToDestination = permanentReplyToDestination;
                 } else {
-                    replyToDestination = session.createTemporaryTopic();
+                    if (destination instanceof Queue) {
+                        replyToDestination = session.createTemporaryQueue();
+                    } else {
+                        replyToDestination = session.createTemporaryTopic();
+                    }
                 }
                 MessageConsumer consumer = 
session.createConsumer(replyToDestination);
                 msg.setJMSCorrelationID(exchange.getExchangeId());


Reply via email to