| Commit in servicemix/base/src/main/java/org/servicemix/components/jms on MAIN | |||
| JmsServiceComponent.java | +215 | added 1.1 | |
| JmsInOutBinding.java | +2 | -2 | 1.1 -> 1.2 |
| DestinationChooser.java | +2 | -2 | 1.1 -> 1.2 |
| SimpleDestinationChooser.java | +2 | -3 | 1.1 -> 1.2 |
| JmsOutBinding.java | +2 | -2 | 1.1 -> 1.2 |
| +223 | -9 | ||
Added JmsServiceComponent
servicemix/base/src/main/java/org/servicemix/components/jms
JmsServiceComponent.java added at 1.1
diff -N JmsServiceComponent.java --- /dev/null 1 Jan 1970 00:00:00 -0000 +++ JmsServiceComponent.java 1 Aug 2005 00:03:05 -0000 1.1 @@ -0,0 +1,215 @@
+/**
+ *
+ * Copyright RAJD Consultanct Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.servicemix.components.jms;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.xml.transform.TransformerException;
+import org.activemq.util.JMSExceptionHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.servicemix.components.util.ComponentSupport;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.jms.JmsException;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.core.SessionCallback;
+
+/**
+ * A component which uses a [EMAIL PROTECTED] JmsTemplate} to consume messages from a destination, forward then intot the JBI
+ * container for processing, and send back the result to the JMS requestor - used for the TopipcRequestor and
+ * QueueRequestor pattern
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JmsServiceComponent extends ComponentSupport implements MessageListener, InitializingBean, DisposableBean {
+ private static final Log log = LogFactory.getLog(JmsServiceComponent.class);
+ private DestinationChooser destinationChooser;
+ private JmsMarshaler marshaler = new JmsMarshaler();
+ private JmsTemplate template;
+ private String selector;
+ private MessageConsumer consumer;
+
+ public void afterPropertiesSet() throws Exception {
+ if (template == null) {
+ throw new IllegalArgumentException("Must have a template set");
+ }
+ template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ Destination defaultDestination = template.getDefaultDestination();
+ if (defaultDestination == null) {
+ defaultDestination = template.getDestinationResolver().resolveDestinationName(session,
+ template.getDefaultDestinationName(), template.isPubSubDomain());
+ }
+ consumer = session.createConsumer(defaultDestination, selector);
+ return null;
+ }
+ }, true);
+ consumer.setMessageListener(this);
+ }
+
+ public void destroy() throws Exception {
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+
+ /**
+ * @return Return the DestinationChooser
+ */
+ public DestinationChooser getDestinationChooser() {
+ return destinationChooser;
+ }
+
+ /**
+ * Set the DestinationChooser
+ *
+ * @param destinationChooser
+ */
+ public void setDestinationChooser(DestinationChooser destinationChooser) {
+ this.destinationChooser = destinationChooser;
+ }
+
+ /**
+ * Get the JMSMarshaller
+ *
+ * @return the Marshaller
+ */
+ public JmsMarshaler getMarshaller() {
+ return marshaler;
+ }
+
+ /**
+ * Set the JMSMarshaller
+ *
+ * @param marshaler
+ */
+ public void setMarshaller(JmsMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
+
+ /**
+ * @return the JmsTemplate
+ */
+ public JmsTemplate getTemplate() {
+ return template;
+ }
+
+ /**
+ * Set the JmsTemplate
+ *
+ * @param template
+ */
+ public void setTemplate(JmsTemplate template) {
+ this.template = template;
+ }
+
+ /**
+ * @return Return the selector
+ */
+ public String getSelector() {
+ return selector;
+ }
+
+ /**
+ * Set the Selector
+ *
+ * @param selector
+ */
+ public void setSelector(String selector) {
+ this.selector = selector;
+ }
+
+ public void onMessage(Message jmsMessage) {
+ try {
+ final InOut messageExchange = getDeliveryChannel().createExchangeFactory().createInOutExchange();
+ NormalizedMessage inMessage = messageExchange.createMessage();
+ try {
+ marshaler.toNMS(inMessage, jmsMessage);
+ messageExchange.setInMessage(inMessage);
+ if (getDeliveryChannel().sendSync(messageExchange)) {
+ Destination destination = destinationChooser.chooseDestination(messageExchange);
+ try {
+ template.send(destination, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ try {
+ Message message = marshaler.createMessage(messageExchange.getOutMessage(), session);
+ if (log.isTraceEnabled()) {
+ log.trace("Sending message to: " + template.getDefaultDestinationName()
+ + " message: " + message);
+ }
+ return message;
+ }
+ catch (TransformerException e) {
+ throw JMSExceptionHelper.newJMSException("Failed to create JMS Message: " + e, e);
+ }
+ }
+ });
+ done(messageExchange);
+ }
+ catch (JmsException e) {
+ fail(messageExchange, e);
+ }
+ }
+ }
+ catch (JMSException e) {
+ log.error("Couldn't process " + jmsMessage, e);
+ messageExchange.setError(e);
+ messageExchange.setStatus(ExchangeStatus.ERROR);
+ }
+ }
+ catch (MessagingException e) {
+ log.error("Failed to process inbound JMS Message: " + jmsMessage, e);
+ }
+ }
+
+ /**
+ * Choose the out bound destination to send the repsonse from JBI too If a DestinatonChooser is set, this is used,
+ * else the replyTo destination on the inbound message is used
+ *
+ * @param exchange
+ * @param inboundMessage
+ * @return the choosen outbound destination or null
+ * @throws JMSException if no destination can be found
+ */
+ protected Destination chooseOutBoundDestination(MessageExchange exchange, Message inboundMessage)
+ throws JMSException {
+ Destination result = null;
+ if (destinationChooser != null) {
+ result = destinationChooser.chooseDestination(exchange);
+ }
+ else if (inboundMessage != null && inboundMessage.getJMSReplyTo() != null) {
+ result = inboundMessage.getJMSReplyTo();
+ }
+ if (result == null) {
+ log.error("Coul not find an outbound destination for " + inboundMessage);
+ throw new JMSException("No outbound JMS Destination can be found");
+ }
+ return result;
+ }
+}
servicemix/base/src/main/java/org/servicemix/components/jms
diff -u -r1.1 -r1.2 --- JmsInOutBinding.java 22 Jun 2005 17:17:21 -0000 1.1 +++ JmsInOutBinding.java 1 Aug 2005 00:03:05 -0000 1.2 @@ -37,7 +37,7 @@
/** * A JMS [EMAIL PROTECTED] javax.jms.MessageListener}which sends the inbound JMS message into the JBI container for processing *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JmsInOutBinding extends ComponentSupport implements MessageListener {
private JmsProducerPool producerPool;
@@ -78,7 +78,7 @@
messageExchange.setInMessage(inMessage);
if (getDeliveryChannel().sendSync(messageExchange)) {
Session session = producer.getSession();
- Destination destination = destinationChooser.chooseDestination(session, messageExchange);
+ Destination destination = destinationChooser.chooseDestination(messageExchange);
Message message = marshaler.createMessage(messageExchange.getOutMessage(), session);
producer.getMessageProducer().send(destination, message);
}
servicemix/base/src/main/java/org/servicemix/components/jms
diff -u -r1.1 -r1.2 --- DestinationChooser.java 22 Jun 2005 17:17:21 -0000 1.1 +++ DestinationChooser.java 1 Aug 2005 00:03:05 -0000 1.2 @@ -24,12 +24,12 @@
/** * A pluggable strategy used to decide which JMS Destination to use for an outbound JMS message *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface DestinationChooser {
/**
* Chooses which JMS destintation to use for the outbound message
*/
- Destination chooseDestination(Session session, MessageExchange messageExchange);
+ Destination chooseDestination(MessageExchange messageExchange);
}
servicemix/base/src/main/java/org/servicemix/components/jms
diff -u -r1.1 -r1.2 --- SimpleDestinationChooser.java 22 Jun 2005 17:17:21 -0000 1.1 +++ SimpleDestinationChooser.java 1 Aug 2005 00:03:05 -0000 1.2 @@ -19,13 +19,12 @@
import javax.jbi.messaging.MessageExchange; import javax.jms.Destination;
-import javax.jms.Session;
/** * A simple destination chooser which will use the value of the [EMAIL PROTECTED] #OUT_DESTINATION_KEY} * property on the message exchange, or fall back to a default destination *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SimpleDestinationChooser implements DestinationChooser {
public static final String OUT_DESTINATION_KEY = "org.servicemix.binding.jms.out.destination";
@@ -39,7 +38,7 @@
this.defaultDestination = defaultDestination;
}
- public Destination chooseDestination(Session session, MessageExchange messageExchange) {
+ public Destination chooseDestination(MessageExchange messageExchange) {
Object property = messageExchange.getProperty(OUT_DESTINATION_KEY);
if (property instanceof Destination) {
return (Destination) property;
servicemix/base/src/main/java/org/servicemix/components/jms
diff -u -r1.1 -r1.2 --- JmsOutBinding.java 22 Jun 2005 17:17:21 -0000 1.1 +++ JmsOutBinding.java 1 Aug 2005 00:03:05 -0000 1.2 @@ -34,7 +34,7 @@
/** * Consumers JBI messages and sends them to a JMS destination *
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JmsOutBinding extends OutBinding {
@@ -70,7 +70,7 @@
try {
Session session = producer.getSession();
Message message = marshaler.createMessage(inMessage, session);
- Destination destination = destinationChooser.chooseDestination(session, messageExchange);
+ Destination destination = destinationChooser.chooseDestination(messageExchange);
producer.getMessageProducer().send(destination, message);
messageExchange.setStatus(ExchangeStatus.DONE);
}
