Author: gnodet
Date: Thu Sep 27 08:44:12 2007
New Revision: 580069
URL: http://svn.apache.org/viewvc?rev=580069&view=rev
Log:
SM-1054: Port JmsMarshaler from lightweight jms component to servicemix-jms
component
Added:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
Thu Sep 27 08:44:12 2007
@@ -16,11 +16,6 @@
*/
package org.apache.servicemix.jms;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.util.Date;
-import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -30,12 +25,10 @@
import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -49,7 +42,6 @@
import org.apache.servicemix.soap.SoapFault;
import org.apache.servicemix.soap.SoapHelper;
import org.apache.servicemix.soap.marshalers.SoapMessage;
-import org.apache.servicemix.soap.marshalers.SoapWriter;
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
@@ -161,50 +153,19 @@
protected void doStop() throws Exception {
}
- protected void fromNMS(NormalizedMessage nm, TextMessage msg) throws
Exception {
- Map headers = (Map) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
- SoapMessage soap = new SoapMessage();
- soapHelper.getJBIMarshaler().fromNMS(soap, nm);
- fromNMS(soap, msg, headers);
- }
-
- protected void fromNMS(SoapMessage soap, TextMessage msg, Map headers)
throws Exception {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soap);
- writer.write(baos);
- msg.setText(baos.toString());
- if (headers != null) {
- for (Iterator it = headers.keySet().iterator(); it.hasNext();) {
- String name = (String) it.next();
- Object value = headers.get(name);
- if (shouldIncludeHeader(name, value)) {
- msg.setObjectProperty(name, value);
- }
- }
- }
- // overwrite whatever content-type was passed on to us with the one
- // the SoapWriter constructed
- msg.setStringProperty(CONTENT_TYPE, writer.getContentType());
- }
-
protected Context createContext() {
return soapHelper.createContext();
}
+ protected Message fromNMS(NormalizedMessage nm, Session session) throws
Exception {
+ SoapMessage soap = new SoapMessage();
+ soapHelper.getJBIMarshaler().fromNMS(soap, nm);
+ Map headers = (Map) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
+ return endpoint.getMarshaler().toJMS(soap, headers, session);
+ }
+
protected MessageExchange toNMS(Message message, Context ctx) throws
Exception {
- InputStream is = null;
- if (message instanceof TextMessage) {
- is = new ByteArrayInputStream(((TextMessage)
message).getText().getBytes());
- } else if (message instanceof BytesMessage) {
- int length = (int) ((BytesMessage) message).getBodyLength();
- byte[] bytes = new byte[length];
- ((BytesMessage) message).readBytes(bytes);
- is = new ByteArrayInputStream(bytes);
- } else {
- throw new IllegalArgumentException("JMS message should be a text
or bytes message");
- }
- String contentType = message.getStringProperty(CONTENT_TYPE);
- SoapMessage soap =
soapHelper.getSoapMarshaler().createReader().read(is, contentType);
+ SoapMessage soap = endpoint.getMarshaler().toSOAP(message);
ctx.setInMessage(soap);
ctx.setProperty(Message.class.getName(), message);
MessageExchange exchange = soapHelper.onReceive(ctx);
@@ -216,51 +177,31 @@
protected Message fromNMSResponse(MessageExchange exchange, Context ctx,
Session session) throws Exception {
Message response = null;
if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ // marshal error
Exception e = exchange.getError();
if (e == null) {
e = new Exception("Unkown error");
}
- response = session.createObjectMessage(e);
+ response = endpoint.getMarshaler().toJMS(e, session);
} else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ // check for fault
Fault jbiFault = exchange.getFault();
if (jbiFault != null) {
+ // convert fault to SOAP message
SoapFault fault = new SoapFault(SoapFault.RECEIVER, null,
null, null, jbiFault.getContent());
SoapMessage soapFault = soapHelper.onFault(ctx, fault);
- TextMessage txt = session.createTextMessage();
- fromNMS(soapFault, txt, (Map)
jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS));
- response = txt;
+ Map headers = (Map)
jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS);
+ response = endpoint.getMarshaler().toJMS(soapFault, headers,
session);
} else {
NormalizedMessage outMsg = exchange.getMessage("out");
if (outMsg != null) {
SoapMessage out = soapHelper.onReply(ctx, outMsg);
- TextMessage txt = session.createTextMessage();
- fromNMS(out, txt, (Map)
outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS));
- response = txt;
+ Map headers = (Map)
outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS);
+ response = endpoint.getMarshaler().toJMS(out, headers,
session);
}
}
}
return response;
}
- private boolean shouldIncludeHeader(String name, Object value) {
- return (value instanceof String || value instanceof Number || value
instanceof Date)
- && (!endpoint.isNeedJavaIdentifiers() ||
isJavaIdentifier(name));
- }
-
- private static boolean isJavaIdentifier(String s) {
- int n = s.length();
- if (n == 0) {
- return false;
- }
- if (!Character.isJavaIdentifierStart(s.charAt(0))) {
- return false;
- }
- for (int i = 1; i < n; i++) {
- if (!Character.isJavaIdentifierPart(s.charAt(i))) {
- return false;
- }
- }
- return true;
- }
-
}
Added:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java?rev=580069&view=auto
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
(added)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
Thu Sep 27 08:44:12 2007
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.servicemix.soap.SoapHelper;
+import org.apache.servicemix.soap.marshalers.SoapMessage;
+import org.apache.servicemix.soap.marshalers.SoapWriter;
+
+/**
+ * Encapsulates the conversion to and from JMS messages
+ */
+public class DefaultJmsMarshaler implements JmsMarshaler {
+ public static final String CONTENT_TYPE = "MimeContentType";
+
+ private JmsEndpoint endpoint;
+
+ public DefaultJmsMarshaler(JmsEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ /**
+ * Converts an [EMAIL PROTECTED] Exception} into an JMS message. This
method will be
+ * invoked when the [EMAIL PROTECTED] MessageExchange} contains an error.
+ *
+ * @param e
+ * Exception to convert
+ * @param session
+ * JMS session used to create JMS messages
+ * @return JMS message
+ * @see MessageExchange#getError()
+ */
+ public Message toJMS(Exception e, Session session) throws Exception {
+ return session.createObjectMessage(e);
+ }
+
+ /**
+ * Template method to allow custom functionality. Custom JmsMarshallers
+ * should override this method.
+ *
+ * @param message Source message
+ * @param session JMS session used to create JMS messages
+ * @return JMS version of the specified source SOAP message
+ * @throws Exception if an IO error occurs
+ * @throws JMSException if a JMS error occurs
+ */
+ protected Message toJMS(SoapMessage message, Session session) throws
Exception {
+ SoapHelper soapHelper = new SoapHelper(endpoint);
+
+ // turn SOAP message into byte array/string
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ SoapWriter writer =
soapHelper.getSoapMarshaler().createWriter(message);
+ writer.write(baos);
+
+ // create text message
+ TextMessage msg = session.createTextMessage();
+ msg.setText(baos.toString());
+
+ // overwrite whatever content-type was passed on to us with the one
+ // the SoapWriter constructed
+ msg.setStringProperty(CONTENT_TYPE, writer.getContentType());
+
+ return msg;
+ }
+
+ /**
+ * Converts a SOAP message to a JMS message, including any message headers.
+ *
+ * @param message
+ * message to convert
+ * @param headers
+ * protocol headers present in the NormalizedMessage
+ * @param session
+ * JMS session used to create JMS messages
+ * @throws Exception if something bad happens
+ * @return JMS message
+ */
+ public Message toJMS(SoapMessage message, Map headers, Session session)
throws Exception {
+ // create message
+ Message msg = toJMS(message, session);
+
+ // add protocol headers to message
+ if (headers != null) {
+ for (Iterator it = headers.keySet().iterator(); it.hasNext();) {
+ String name = (String) it.next();
+ Object value = headers.get(name);
+ if (shouldIncludeHeader(name, value)) {
+ msg.setObjectProperty(name, value);
+ }
+ }
+ }
+
+ return msg;
+ }
+
+ /**
+ * Template method to allow custom functionality. Custom JmsMarshalers
+ * should override this method.
+ *
+ * @param message
+ * Message to be turned into XML/SOAP
+ * @return Stream containing either the whole SOAP envelope or just the
+ * payload of the body.
+ * @throws Exception
+ * if JMS message is an ObjectMessage containing an Exception
+ * (the containing exception is thrown.)
+ * @throws JMSException
+ * if a JMS problem occurs
+ * @throws UnsupportedOperationException
+ * if the JMS message is an ObjectMessage which contains
+ * something other than an Exception
+ * @throws IllegalArgumentException
+ * if the message is anything other than a TextMessage or
+ * BytesMessage
+ */
+ protected InputStream toXmlInputStream(Message message) throws Exception {
+ InputStream is = null;
+ if (message instanceof ObjectMessage) {
+ Object o = ((ObjectMessage) message).getObject();
+ if (o instanceof Exception) {
+ throw (Exception) o;
+ } else {
+ throw new UnsupportedOperationException("Can not handle
objects of type " + o.getClass().getName());
+ }
+ } else if (message instanceof TextMessage) {
+ is = new ByteArrayInputStream(((TextMessage)
message).getText().getBytes());
+ } else if (message instanceof BytesMessage) {
+ int length = (int) ((BytesMessage) message).getBodyLength();
+ byte[] bytes = new byte[length];
+ ((BytesMessage) message).readBytes(bytes);
+ is = new ByteArrayInputStream(bytes);
+ } else {
+ throw new IllegalArgumentException("JMS message should be a text
or bytes message");
+ }
+
+ return is;
+ }
+
+ /**
+ * Converts a JMS message into a SOAP message
+ *
+ * @param message
+ * JMS message to convert
+ * @return SOAP representation of the specified JMS message
+ * @throws Exception
+ * if an IO exception occurs
+ */
+ public SoapMessage toSOAP(Message message) throws Exception {
+ SoapHelper soapHelper = new SoapHelper(endpoint);
+
+ InputStream is = toXmlInputStream(message);
+ String contentType = message.getStringProperty(CONTENT_TYPE);
+ SoapMessage soap =
soapHelper.getSoapMarshaler().createReader().read(is, contentType);
+
+ return soap;
+ }
+
+ private boolean shouldIncludeHeader(String name, Object value) {
+ return (value instanceof String || value instanceof Number || value
instanceof Date)
+ && (!endpoint.isNeedJavaIdentifiers() ||
isJavaIdentifier(name));
+ }
+
+ private static boolean isJavaIdentifier(String s) {
+ int n = s.length();
+ if (n == 0) {
+ return false;
+ }
+ if (!Character.isJavaIdentifierStart(s.charAt(0))) {
+ return false;
+ }
+ for (int i = 1; i < n; i++) {
+ if (!Character.isJavaIdentifierPart(s.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
+
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
Thu Sep 27 08:44:12 2007
@@ -68,6 +68,7 @@
protected ConnectionFactory connectionFactory;
protected Destination destination;
protected String processorName;
+ protected JmsMarshaler marshaler;
//
// JCA config
//
@@ -86,6 +87,10 @@
protected Store store;
protected StoreFactory storeFactory;
+ public JmsEndpoint() {
+ marshaler = new DefaultJmsMarshaler(this);
+ }
+
/**
* The BootstrapContext to use for a JCA consumer endpoint.
*
@@ -560,5 +565,13 @@
public void setUseMsgIdInResponse(boolean useMsgIdInResponse) {
this.useMsgIdInResponse = useMsgIdInResponse;
}
+
+ public JmsMarshaler getMarshaler() {
+ return marshaler;
+ }
+
+ public void setMarshaler(JmsMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
}
Added:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java?rev=580069&view=auto
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
(added)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
Thu Sep 27 08:44:12 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.util.Map;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.servicemix.soap.marshalers.SoapMessage;
+
+public interface JmsMarshaler {
+
+ /**
+ * Marshalls the JMS message into an XML/SOAP message
+ *
+ * @param src Message to marshall
+ * @param soapHelper
+ * @throws Exception
+ */
+ SoapMessage toSOAP(Message src) throws Exception;
+
+ /**
+ * Unmarshalls the SOAP message into an JMS message
+ *
+ * @param message Message to unmarshall
+ * @param session Used to create the JMS message
+ * @throws MessagingException
+ * @throws JMSException
+ */
+ Message toJMS(SoapMessage message, Map headers, Session session) throws
Exception;
+
+ Message toJMS(Exception e, Session session) throws Exception;
+
+}
+
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
Thu Sep 27 08:44:12 2007
@@ -25,9 +25,9 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.servicemix.jms.AbstractJmsProcessor;
@@ -93,10 +93,9 @@
}
}
MessageProducer producer = session.createProducer(destination);
-
- TextMessage msg = session.createTextMessage();
+
NormalizedMessage nm = exchange.getMessage("in");
- fromNMS(nm, msg);
+ Message msg = fromNMS(nm, session);
producer.send(msg);
} finally {
if (session != null) {
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
Thu Sep 27 08:44:12 2007
@@ -16,9 +16,6 @@
*/
package org.apache.servicemix.jms.multiplexing;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
@@ -26,16 +23,13 @@
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
-import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.servicemix.jms.AbstractJmsProcessor;
@@ -106,39 +100,25 @@
}
endpoint.getServiceUnit().getComponent().getExecutor().execute(new
Runnable() {
public void run() {
+ InOut exchange = null;
try {
if (log.isDebugEnabled()) {
log.debug("Handling jms message " + message);
}
- InOut exchange = (InOut)
store.load(message.getJMSCorrelationID());
+ exchange = (InOut)
store.load(message.getJMSCorrelationID());
if (exchange == null) {
throw new IllegalStateException("Could not find
exchange " + message.getJMSCorrelationID());
}
- if (message instanceof ObjectMessage) {
- Object o = ((ObjectMessage) message).getObject();
- if (o instanceof Exception) {
- exchange.setError((Exception) o);
- } else {
- throw new UnsupportedOperationException("Can not
handle objects of type " + o.getClass().getName());
- }
- } else {
- InputStream is = null;
- if (message instanceof TextMessage) {
- is = new ByteArrayInputStream(((TextMessage)
message).getText().getBytes());
- } else if (message instanceof BytesMessage) {
- int length = (int) ((BytesMessage)
message).getBodyLength();
- byte[] bytes = new byte[length];
- ((BytesMessage) message).readBytes(bytes);
- is = new ByteArrayInputStream(bytes);
- } else {
- throw new IllegalArgumentException("JMS message
should be a text or bytes message");
- }
- SoapMessage soap =
soapHelper.getSoapMarshaler().createReader().read(is,
message.getStringProperty(CONTENT_TYPE));
- NormalizedMessage out = exchange.createMessage();
- soapHelper.getJBIMarshaler().toNMS(out, soap);
- ((InOut) exchange).setOutMessage(out);
- }
+ SoapMessage soap = endpoint.getMarshaler().toSOAP(message);
+ NormalizedMessage out = exchange.createMessage();
+ soapHelper.getJBIMarshaler().toNMS(out, soap);
+ ((InOut) exchange).setOutMessage(out);
channel.send(exchange);
+ } catch (Exception e) {
+ log.error("Error while handling jms message", e);
+ if (exchange != null) {
+ exchange.setError(e);
+ }
} catch (Throwable e) {
log.error("Error while handling jms message", e);
}
@@ -152,9 +132,8 @@
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
return;
}
- TextMessage msg = session.createTextMessage();
NormalizedMessage nm = exchange.getMessage("in");
- fromNMS(nm, msg);
+ Message msg = fromNMS(nm, session);
if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
synchronized (producer) {
Modified:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
(original)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
Thu Sep 27 08:44:12 2007
@@ -104,9 +104,8 @@
MessageProducer producer = session.createProducer(destination);
- TextMessage msg = session.createTextMessage();
NormalizedMessage nm = exchange.getMessage("in");
- fromNMS(nm, msg);
+ Message msg = fromNMS(nm, session);
if (exchange instanceof InOnly || exchange instanceof
RobustInOnly) {
producer.send(msg);
Added:
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java?rev=580069&view=auto
==============================================================================
---
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
(added)
+++
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
Thu Sep 27 08:44:12 2007
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.springframework.core.io.ClassPathResource;
+
+public class JmsMarshalerTest extends TestCase {
+
+ protected JBIContainer container;
+ protected BrokerService broker;
+ protected ActiveMQConnectionFactory connectionFactory;
+ protected ActiveMQQueue queue;
+
+ protected void setUp() throws Exception {
+ BrokerFactoryBean bfb = new BrokerFactoryBean(new
ClassPathResource("org/apache/servicemix/jms/activemq.xml"));
+ bfb.afterPropertiesSet();
+ broker = bfb.getBroker();
+ broker.start();
+
+ container = new JBIContainer();
+ container.setUseMBeanServer(true);
+ container.setCreateMBeanServer(true);
+ container.setMonitorInstallationDirectory(false);
+ container.init();
+ container.start();
+
+ connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+ queue = new ActiveMQQueue("foo.queue");
+ }
+
+ protected void tearDown() throws Exception {
+ if (container != null) {
+ container.shutDown();
+ }
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ public void testMarshalTextMessage() throws Exception {
+ JmsComponent jms = new JmsComponent();
+ jms.getConfiguration().setConnectionFactory(connectionFactory);
+ JmsEndpoint ep = new JmsEndpoint();
+ ep.setService(ReceiverComponent.SERVICE);
+ ep.setEndpoint("jms");
+ ep.setTargetService(ReceiverComponent.SERVICE);
+ ep.setTargetEndpoint(ReceiverComponent.ENDPOINT);
+ ep.setRole(MessageExchange.Role.CONSUMER);
+ ep.setDestinationStyle(AbstractJmsProcessor.STYLE_QUEUE);
+ ep.setDestination(queue);
+ ep.setDefaultMep(MessageExchangeSupport.IN_ONLY);
+ ep.setMarshaler(new DefaultJmsMarshaler(ep));
+ jms.setEndpoints(new JmsEndpoint[] {ep });
+ container.activateComponent(jms, "servicemix-jms");
+
+ ReceiverComponent receiver = new ReceiverComponent();
+ container.activateComponent(receiver, "receiver");
+
+ QueueConnection qConn = connectionFactory.createQueueConnection();
+ QueueSession qSess = qConn.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ QueueSender qSender = qSess.createSender(queue);
+ TextMessage message = qSess.createTextMessage("<?xml version=\"1.0\"
encoding=\"UTF-8\"?><hello>world</hello>");
+ qSender.send(message);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ List msgs = receiver.getMessageList().flushMessages();
+ NormalizedMessage msg = (NormalizedMessage) msgs.get(0);
+ assertEquals("Messages match", message.getText(), new
SourceTransformer().contentToString(msg));
+
+ // Wait for DONE status
+ Thread.sleep(50);
+ }
+
+
+}
+