Author: gnodet
Date: Thu Sep 27 07:52:36 2007
New Revision: 580043
URL: http://svn.apache.org/viewvc?rev=580043&view=rev
Log:
SM-998: In-Out Exchanges in a JMS queue cannot be successfully processed after
a crash/shutdown
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
Thu Sep 27 07:52:36 2007
@@ -50,6 +50,8 @@
import org.apache.servicemix.soap.SoapHelper;
import org.apache.servicemix.soap.marshalers.SoapMessage;
import org.apache.servicemix.soap.marshalers.SoapWriter;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
public abstract class AbstractJmsProcessor implements ExchangeProcessor {
@@ -66,6 +68,8 @@
protected ComponentContext context;
protected DeliveryChannel channel;
+ protected Store store;
+
public AbstractJmsProcessor(JmsEndpoint endpoint) throws Exception {
this.endpoint = endpoint;
this.soapHelper = new SoapHelper(endpoint);
@@ -80,6 +84,16 @@
connectionFactory = getConnectionFactory(ctx);
connection = connectionFactory.createConnection();
connection.start();
+
+ // set up the Store
+ if (endpoint.store != null) {
+ store = endpoint.store;
+ } else if (endpoint.storeFactory != null) {
+ store =
endpoint.storeFactory.open(endpoint.getService().toString() +
endpoint.getEndpoint());
+ } else {
+ store = new
MemoryStoreFactory().open(endpoint.getService().toString() +
endpoint.getEndpoint());
+ }
+
doStart(ctx);
} catch (Exception e) {
try {
@@ -124,6 +138,10 @@
BaseLifeCycle lf = (BaseLifeCycle)
endpoint.getServiceUnit().getComponent().getLifeCycle();
return lf.getContext().getNamingContext();
}
+ }
+
+ protected Store getStore() {
+ return store;
}
protected void doStart(InitialContext ctx) throws Exception {
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
Thu Sep 27 07:52:36 2007
@@ -36,6 +36,8 @@
import org.apache.servicemix.jbi.security.auth.AuthenticationService;
import org.apache.servicemix.jbi.security.keystore.KeystoreManager;
import org.apache.servicemix.soap.SoapEndpoint;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
/**
*
@@ -56,7 +58,9 @@
protected String jndiConnectionFactoryName;
protected String jndiDestinationName;
protected String jmsProviderDestinationName;
-
+ protected String jndiReplyToName;
+ protected String jmsProviderReplyToName;
+
protected boolean useMsgIdInResponse;
//
// Spring configuration
@@ -75,6 +79,12 @@
// Other configuration flags
protected boolean needJavaIdentifiers;
+
+ /**
+ * The store to keep pending exchanges
+ */
+ protected Store store;
+ protected StoreFactory storeFactory;
/**
* The BootstrapContext to use for a JCA consumer endpoint.
@@ -193,6 +203,43 @@
}
/**
+ * The name of the JMS Reply-to destination to lookup in JNDI.
+ * Optional: a temporary queue will be used
+ * if a replyTo is not provided.
+ *
+ * @return Returns the jndiReplyToName.
+ */
+ public String getJndiReplyToName() {
+ return jndiReplyToName;
+ }
+
+ /**
+ * @param jndiReplyToName The jndiReplyToName to set.
+ */
+ public void setJndiReplyToName(String jndiReplyToName) {
+ this.jndiReplyToName = jndiReplyToName;
+ }
+ /**
+ * The name of the reply destination create by a call to
+ * <code>Session.createQueue</code> or <code>Session.createTopic</code>.
+ * This property is used when <code>jndiReplyToName</code> is
+ * <code>null</code>. Optional: a temporary queue will be used
+ * if a replyTo is not provided.
+ *
+ * @return Returns the jmsProviderReplyToName.
+ */
+ public String getJmsProviderReplyToName() {
+ return jmsProviderReplyToName;
+ }
+
+ /**
+ * @param jmsProviderReplyToName The jmsProviderReplyToName to set.
+ */
+ public void setJmsProviderReplyToName(String jmsProviderReplyToName) {
+ this.jmsProviderReplyToName = jmsProviderReplyToName;
+ }
+
+ /**
* The name of the JMS ConnectionFactory to lookup in JNDI.
* Used if <code>connectionFactory</code> is <code>null</code>
*
@@ -314,6 +361,31 @@
*/
public void setRoleAsString(String role) {
super.setRoleAsString(role);
+ }
+
+ /**
+ * @return Returns the store.
+ */
+ public Store getStore() {
+ return store;
+ }
+ /**
+ * @param store The store to set.
+ */
+ public void setStore(Store store) {
+ this.store = store;
+ }
+ /**
+ * @return Returns the storeFactory.
+ */
+ public StoreFactory getStoreFactory() {
+ return storeFactory;
+ }
+ /**
+ * @param storeFactory The storeFactory to set.
+ */
+ public void setStoreFactory(StoreFactory storeFactory) {
+ this.storeFactory = storeFactory;
}
protected ExchangeProcessor createProviderProcessor() {
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
Thu Sep 27 07:52:36 2007
@@ -18,7 +18,6 @@
import java.io.ByteArrayInputStream;
import java.io.InputStream;
-import java.util.Map;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
@@ -39,8 +38,6 @@
import javax.jms.TextMessage;
import javax.naming.InitialContext;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
import org.apache.servicemix.jms.AbstractJmsProcessor;
import org.apache.servicemix.jms.JmsEndpoint;
import org.apache.servicemix.soap.marshalers.SoapMessage;
@@ -52,7 +49,6 @@
protected Destination replyToDestination;
protected MessageConsumer consumer;
protected MessageProducer producer;
- protected Map pendingExchanges = new ConcurrentHashMap();
protected DeliveryChannel channel;
public MultiplexingProviderProcessor(JmsEndpoint endpoint) throws
Exception {
@@ -76,10 +72,20 @@
throw new IllegalStateException("No destination provided");
}
}
- if (destination instanceof Queue) {
- replyToDestination = session.createTemporaryQueue();
+ if (endpoint.getJndiReplyToName() != null) {
+ replyToDestination = (Destination)
ctx.lookup(endpoint.getJndiReplyToName());
+ } else if (endpoint.getJmsProviderReplyToName() != null) {
+ if (destination instanceof Queue) {
+ replyToDestination =
session.createQueue(endpoint.getJmsProviderReplyToName());
+ } else {
+ replyToDestination =
session.createTopic(endpoint.getJmsProviderReplyToName());
+ }
} else {
- replyToDestination = session.createTemporaryTopic();
+ if (destination instanceof Queue) {
+ replyToDestination = session.createTemporaryQueue();
+ } else {
+ replyToDestination = session.createTemporaryTopic();
+ }
}
producer = session.createProducer(destination);
consumer = session.createConsumer(replyToDestination);
@@ -104,7 +110,7 @@
if (log.isDebugEnabled()) {
log.debug("Handling jms message " + message);
}
- InOut exchange = (InOut)
pendingExchanges.remove(message.getJMSCorrelationID());
+ InOut exchange = (InOut)
store.load(message.getJMSCorrelationID());
if (exchange == null) {
throw new IllegalStateException("Could not find
exchange " + message.getJMSCorrelationID());
}
@@ -159,13 +165,13 @@
} else if (exchange instanceof InOut) {
msg.setJMSCorrelationID(exchange.getExchangeId());
msg.setJMSReplyTo(replyToDestination);
- pendingExchanges.put(exchange.getExchangeId(), exchange);
+ store.store(exchange.getExchangeId(), exchange);
try {
synchronized (producer) {
producer.send(msg);
}
} catch (Exception e) {
- pendingExchanges.remove(exchange.getExchangeId());
+ store.load(exchange.getExchangeId());
throw e;
}
} else {
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java?rev=580043&r1=580042&r2=580043&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
Thu Sep 27 07:52:36 2007
@@ -44,6 +44,7 @@
public class StandardProviderProcessor extends AbstractJmsProcessor {
protected Destination destination;
+ protected Destination permanentReplyToDestination;
protected DeliveryChannel channel;
public StandardProviderProcessor(JmsEndpoint endpoint) throws Exception {
@@ -52,12 +53,37 @@
protected void doStart(InitialContext ctx) throws Exception {
channel =
endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
+ Session session = null;
destination = endpoint.getDestination();
- if (destination == null) {
- if (endpoint.getJndiDestinationName() != null) {
- destination = (Destination)
ctx.lookup(endpoint.getJndiDestinationName());
- } else if (endpoint.getJmsProviderDestinationName() == null) {
- throw new IllegalStateException("No destination provided");
+ try {
+ if (destination == null) {
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ if (endpoint.getJndiDestinationName() != null) {
+ destination = (Destination)
ctx.lookup(endpoint.getJndiDestinationName());
+ } else if (endpoint.getJmsProviderDestinationName() != null) {
+ if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
+ destination =
session.createQueue(endpoint.getJmsProviderDestinationName());
+ } else {
+ destination =
session.createTopic(endpoint.getJmsProviderDestinationName());
+ }
+ } else {
+ throw new IllegalStateException("No destination provided");
+ }
+
+ if (endpoint.getJndiReplyToName() != null) {
+ permanentReplyToDestination = (Destination)
ctx.lookup(endpoint.getJndiReplyToName());
+ } else if (endpoint.getJmsProviderReplyToName() != null) {
+ if (destination instanceof Queue) {
+ permanentReplyToDestination =
session.createQueue(endpoint.getJmsProviderReplyToName());
+ } else {
+ permanentReplyToDestination =
session.createTopic(endpoint.getJmsProviderReplyToName());
+ }
+ }
+ }
+ } finally {
+ if (session != null) {
+ session.close();
}
}
}
@@ -75,13 +101,7 @@
Session session = null;
try {
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- if (destination == null) {
- if (STYLE_QUEUE.equals(endpoint.getDestinationStyle())) {
- destination =
session.createQueue(endpoint.getJmsProviderDestinationName());
- } else {
- destination =
session.createTopic(endpoint.getJmsProviderDestinationName());
- }
- }
+
MessageProducer producer = session.createProducer(destination);
TextMessage msg = session.createTextMessage();
@@ -94,10 +114,14 @@
channel.send(exchange);
} else if (exchange instanceof InOut) {
Destination replyToDestination;
- if (destination instanceof Queue) {
- replyToDestination = session.createTemporaryQueue();
+ if (permanentReplyToDestination != null) {
+ replyToDestination = permanentReplyToDestination;
} else {
- replyToDestination = session.createTemporaryTopic();
+ if (destination instanceof Queue) {
+ replyToDestination = session.createTemporaryQueue();
+ } else {
+ replyToDestination = session.createTemporaryTopic();
+ }
}
MessageConsumer consumer =
session.createConsumer(replyToDestination);
msg.setJMSCorrelationID(exchange.getExchangeId());