Author: gnodet
Date: Thu Sep 27 08:44:12 2007
New Revision: 580069

URL: http://svn.apache.org/viewvc?rev=580069&view=rev
Log:
SM-1054: Port JmsMarshaler from lightweight jms component to servicemix-jms 
component

Added:
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
Modified:
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
    
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/AbstractJmsProcessor.java
 Thu Sep 27 08:44:12 2007
@@ -16,11 +16,6 @@
  */
 package org.apache.servicemix.jms;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.util.Date;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
@@ -30,12 +25,10 @@
 import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
@@ -49,7 +42,6 @@
 import org.apache.servicemix.soap.SoapFault;
 import org.apache.servicemix.soap.SoapHelper;
 import org.apache.servicemix.soap.marshalers.SoapMessage;
-import org.apache.servicemix.soap.marshalers.SoapWriter;
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.memory.MemoryStoreFactory;
 
@@ -161,50 +153,19 @@
     protected void doStop() throws Exception {
     }
     
-    protected void fromNMS(NormalizedMessage nm, TextMessage msg) throws 
Exception {
-        Map headers = (Map) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
-        SoapMessage soap = new SoapMessage();
-        soapHelper.getJBIMarshaler().fromNMS(soap, nm);
-        fromNMS(soap, msg, headers);
-    }
-    
-    protected void fromNMS(SoapMessage soap, TextMessage msg, Map headers) 
throws Exception {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        SoapWriter writer = soapHelper.getSoapMarshaler().createWriter(soap);
-        writer.write(baos);
-        msg.setText(baos.toString());
-        if (headers != null) {
-            for (Iterator it = headers.keySet().iterator(); it.hasNext();) {
-                String name = (String) it.next();
-                Object value = headers.get(name);
-                if (shouldIncludeHeader(name, value)) {
-                    msg.setObjectProperty(name, value);
-                }
-            }
-        }
-        // overwrite whatever content-type was passed on to us with the one
-        // the SoapWriter constructed
-        msg.setStringProperty(CONTENT_TYPE, writer.getContentType());
-    }
-    
     protected Context createContext() {
         return soapHelper.createContext();
     }
     
+    protected Message fromNMS(NormalizedMessage nm, Session session) throws 
Exception {
+        SoapMessage soap = new SoapMessage();
+        soapHelper.getJBIMarshaler().fromNMS(soap, nm);
+        Map headers = (Map) nm.getProperty(JbiConstants.PROTOCOL_HEADERS);
+        return endpoint.getMarshaler().toJMS(soap, headers, session);
+    }
+    
     protected MessageExchange toNMS(Message message, Context ctx) throws 
Exception {
-        InputStream is = null;
-        if (message instanceof TextMessage) {
-            is = new ByteArrayInputStream(((TextMessage) 
message).getText().getBytes());
-        } else if (message instanceof BytesMessage) {
-            int length = (int) ((BytesMessage) message).getBodyLength();
-            byte[] bytes = new byte[length];
-            ((BytesMessage) message).readBytes(bytes);
-            is = new ByteArrayInputStream(bytes);
-        } else {
-            throw new IllegalArgumentException("JMS message should be a text 
or bytes message");
-        }
-        String contentType = message.getStringProperty(CONTENT_TYPE);
-        SoapMessage soap = 
soapHelper.getSoapMarshaler().createReader().read(is, contentType);
+        SoapMessage soap = endpoint.getMarshaler().toSOAP(message);
         ctx.setInMessage(soap);
         ctx.setProperty(Message.class.getName(), message);
         MessageExchange exchange = soapHelper.onReceive(ctx);
@@ -216,51 +177,31 @@
     protected Message fromNMSResponse(MessageExchange exchange, Context ctx, 
Session session) throws Exception {
         Message response = null;
         if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            // marshal error
             Exception e = exchange.getError();
             if (e == null) {
                 e = new Exception("Unkown error");
             }
-            response = session.createObjectMessage(e);
+            response = endpoint.getMarshaler().toJMS(e, session);
         } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            // check for fault
             Fault jbiFault = exchange.getFault(); 
             if (jbiFault != null) {
+                // convert fault to SOAP message
                 SoapFault fault = new SoapFault(SoapFault.RECEIVER, null, 
null, null, jbiFault.getContent());
                 SoapMessage soapFault = soapHelper.onFault(ctx, fault);
-                TextMessage txt = session.createTextMessage();
-                fromNMS(soapFault, txt, (Map) 
jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS));
-                response = txt;
+                Map headers = (Map) 
jbiFault.getProperty(JbiConstants.PROTOCOL_HEADERS);
+                response = endpoint.getMarshaler().toJMS(soapFault, headers, 
session);
             } else {
                 NormalizedMessage outMsg = exchange.getMessage("out");
                 if (outMsg != null) {
                     SoapMessage out = soapHelper.onReply(ctx, outMsg);
-                    TextMessage txt = session.createTextMessage();
-                    fromNMS(out, txt, (Map) 
outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS));
-                    response = txt;
+                    Map headers = (Map) 
outMsg.getProperty(JbiConstants.PROTOCOL_HEADERS);
+                    response = endpoint.getMarshaler().toJMS(out, headers, 
session);
                 }
             }
         }
         return response;
     }
 
-    private boolean shouldIncludeHeader(String name, Object value) {
-        return (value instanceof String || value instanceof Number || value 
instanceof Date)
-                        && (!endpoint.isNeedJavaIdentifiers() || 
isJavaIdentifier(name));
-    }
-
-    private static boolean isJavaIdentifier(String s) {
-        int n = s.length();
-        if (n == 0) {
-            return false;
-        }
-        if (!Character.isJavaIdentifierStart(s.charAt(0))) {
-            return false;
-        }
-        for (int i = 1; i < n; i++) {
-            if (!Character.isJavaIdentifierPart(s.charAt(i))) {
-                return false;
-            }
-        }
-        return true;
-    }
-    
 }

Added: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java?rev=580069&view=auto
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
 (added)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/DefaultJmsMarshaler.java
 Thu Sep 27 08:44:12 2007
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.servicemix.soap.SoapHelper;
+import org.apache.servicemix.soap.marshalers.SoapMessage;
+import org.apache.servicemix.soap.marshalers.SoapWriter;
+
+/**
+ * Encapsulates the conversion to and from JMS messages
+ */
+public class DefaultJmsMarshaler implements JmsMarshaler {
+    public static final String CONTENT_TYPE = "MimeContentType";
+
+    private JmsEndpoint endpoint;
+    
+    public DefaultJmsMarshaler(JmsEndpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+    
+    /**
+     * Converts an [EMAIL PROTECTED] Exception} into an JMS message. This 
method will be
+     * invoked when the [EMAIL PROTECTED] MessageExchange} contains an error.
+     * 
+     * @param e
+     *            Exception to convert
+     * @param session
+     *            JMS session used to create JMS messages
+     * @return JMS message
+     * @see MessageExchange#getError()
+     */
+    public Message toJMS(Exception e, Session session) throws Exception {
+        return session.createObjectMessage(e);
+    }
+    
+    /**
+     * Template method to allow custom functionality. Custom JmsMarshallers
+     * should override this method.
+     * 
+     * @param message Source message
+     * @param session JMS session used to create JMS messages
+     * @return JMS version of the specified source SOAP message
+     * @throws Exception if an IO error occurs
+     * @throws JMSException if a JMS error occurs
+     */
+    protected Message toJMS(SoapMessage message, Session session) throws 
Exception {
+        SoapHelper soapHelper = new SoapHelper(endpoint);
+        
+        // turn SOAP message into byte array/string
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        SoapWriter writer = 
soapHelper.getSoapMarshaler().createWriter(message);
+        writer.write(baos);
+        
+        // create text message
+        TextMessage msg = session.createTextMessage();
+        msg.setText(baos.toString());
+        
+        // overwrite whatever content-type was passed on to us with the one
+        // the SoapWriter constructed
+        msg.setStringProperty(CONTENT_TYPE, writer.getContentType());
+        
+        return msg;
+    }
+    
+    /**
+     * Converts a SOAP message to a JMS message, including any message headers.
+     * 
+     * @param message
+     *            message to convert
+     * @param headers
+     *            protocol headers present in the NormalizedMessage
+     * @param session
+     *            JMS session used to create JMS messages
+     * @throws Exception if something bad happens
+     * @return JMS message
+     */
+    public Message toJMS(SoapMessage message, Map headers, Session session) 
throws Exception {
+        // create message
+        Message msg = toJMS(message, session);
+        
+        // add protocol headers to message
+        if (headers != null) {
+            for (Iterator it = headers.keySet().iterator(); it.hasNext();) {
+                String name = (String) it.next();
+                Object value = headers.get(name);
+                if (shouldIncludeHeader(name, value)) {
+                    msg.setObjectProperty(name, value);
+                }
+            }
+        }
+        
+        return msg;
+    }
+
+    /**
+     * Template method to allow custom functionality. Custom JmsMarshalers
+     * should override this method.
+     * 
+     * @param message
+     *            Message to be turned into XML/SOAP
+     * @return Stream containing either the whole SOAP envelope or just the
+     *         payload of the body.
+     * @throws Exception
+     *             if JMS message is an ObjectMessage containing an Exception
+     *             (the containing exception is thrown.)
+     * @throws JMSException
+     *             if a JMS problem occurs
+     * @throws UnsupportedOperationException
+     *             if the JMS message is an ObjectMessage which contains
+     *             something other than an Exception
+     * @throws IllegalArgumentException
+     *             if the message is anything other than a TextMessage or
+     *             BytesMessage
+     */
+    protected InputStream toXmlInputStream(Message message) throws Exception {
+        InputStream is = null;
+        if (message instanceof ObjectMessage) {
+            Object o = ((ObjectMessage) message).getObject();
+            if (o instanceof Exception) {
+                throw (Exception) o;
+            } else {
+                throw new UnsupportedOperationException("Can not handle 
objects of type " + o.getClass().getName());
+            }
+        } else if (message instanceof TextMessage) {
+            is = new ByteArrayInputStream(((TextMessage) 
message).getText().getBytes());
+        } else if (message instanceof BytesMessage) {
+            int length = (int) ((BytesMessage) message).getBodyLength();
+            byte[] bytes = new byte[length];
+            ((BytesMessage) message).readBytes(bytes);
+            is = new ByteArrayInputStream(bytes);
+        } else {
+            throw new IllegalArgumentException("JMS message should be a text 
or bytes message");
+        }
+        
+        return is;
+    }
+    
+    /**
+     * Converts a JMS message into a SOAP message
+     * 
+     * @param message
+     *            JMS message to convert
+     * @return SOAP representation of the specified JMS message
+     * @throws Exception
+     *             if an IO exception occurs
+     */
+    public SoapMessage toSOAP(Message message) throws Exception {
+        SoapHelper soapHelper = new SoapHelper(endpoint);
+       
+        InputStream is = toXmlInputStream(message);
+        String contentType = message.getStringProperty(CONTENT_TYPE);
+        SoapMessage soap = 
soapHelper.getSoapMarshaler().createReader().read(is, contentType);
+
+        return soap;
+    }
+
+    private boolean shouldIncludeHeader(String name, Object value) {
+        return (value instanceof String || value instanceof Number || value 
instanceof Date)
+                        && (!endpoint.isNeedJavaIdentifiers() || 
isJavaIdentifier(name));
+    }
+    
+    private static boolean isJavaIdentifier(String s) {
+        int n = s.length();
+        if (n == 0) {
+            return false;
+        }
+        if (!Character.isJavaIdentifierStart(s.charAt(0))) {
+            return false;
+        }
+        for (int i = 1; i < n; i++) {
+            if (!Character.isJavaIdentifierPart(s.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
+

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsEndpoint.java
 Thu Sep 27 08:44:12 2007
@@ -68,6 +68,7 @@
     protected ConnectionFactory connectionFactory;
     protected Destination destination;
     protected String processorName;
+    protected JmsMarshaler marshaler;
     //
     // JCA config
     //
@@ -86,6 +87,10 @@
     protected Store store;
     protected StoreFactory storeFactory;
     
+    public JmsEndpoint() {
+        marshaler = new DefaultJmsMarshaler(this);
+    }
+    
     /**
      * The BootstrapContext to use for a JCA consumer endpoint.
      * 
@@ -560,5 +565,13 @@
     public void setUseMsgIdInResponse(boolean useMsgIdInResponse) {
         this.useMsgIdInResponse = useMsgIdInResponse;
     }
+
+    public JmsMarshaler getMarshaler() {
+        return marshaler;
+    }
+
+    public void setMarshaler(JmsMarshaler marshaler) {
+        this.marshaler = marshaler;
+    }  
 
 }

Added: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java?rev=580069&view=auto
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
 (added)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/JmsMarshaler.java
 Thu Sep 27 08:44:12 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.util.Map;
+
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.servicemix.soap.marshalers.SoapMessage;
+
+public interface JmsMarshaler {
+    
+    /**
+     * Marshalls the JMS message into an XML/SOAP message
+     *
+     * @param src Message to marshall
+     * @param soapHelper 
+     * @throws Exception 
+     */
+    SoapMessage toSOAP(Message src) throws Exception;
+    
+    /**
+     * Unmarshalls the SOAP message into an JMS message
+     *
+     * @param message Message to unmarshall
+     * @param session Used to create the JMS message
+     * @throws MessagingException
+     * @throws JMSException
+     */
+    Message toJMS(SoapMessage message, Map headers, Session session) throws 
Exception;
+
+    Message toJMS(Exception e, Session session) throws Exception;
+
+}
+

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/jca/JcaProviderProcessor.java
 Thu Sep 27 08:44:12 2007
@@ -25,9 +25,9 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.Message;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 
 import org.apache.servicemix.jms.AbstractJmsProcessor;
@@ -93,10 +93,9 @@
                 }
             }
             MessageProducer producer = session.createProducer(destination);
-            
-            TextMessage msg = session.createTextMessage();
+
             NormalizedMessage nm = exchange.getMessage("in");
-            fromNMS(nm, msg);
+            Message msg = fromNMS(nm, session);
             producer.send(msg);
         } finally {
             if (session != null) {

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/multiplexing/MultiplexingProviderProcessor.java
 Thu Sep 27 08:44:12 2007
@@ -16,9 +16,6 @@
  */
 package org.apache.servicemix.jms.multiplexing;
 
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
@@ -26,16 +23,13 @@
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
-import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 
 import org.apache.servicemix.jms.AbstractJmsProcessor;
@@ -106,39 +100,25 @@
         }
         endpoint.getServiceUnit().getComponent().getExecutor().execute(new 
Runnable() {
             public void run() {
+                InOut exchange = null;
                 try {
                     if (log.isDebugEnabled()) {
                         log.debug("Handling jms message " + message);
                     }
-                    InOut exchange = (InOut) 
store.load(message.getJMSCorrelationID());
+                    exchange = (InOut) 
store.load(message.getJMSCorrelationID());
                     if (exchange == null) {
                         throw new IllegalStateException("Could not find 
exchange " + message.getJMSCorrelationID());
                     }
-                    if (message instanceof ObjectMessage) {
-                        Object o = ((ObjectMessage) message).getObject();
-                        if (o instanceof Exception) {
-                            exchange.setError((Exception) o);
-                        } else {
-                            throw new UnsupportedOperationException("Can not 
handle objects of type " + o.getClass().getName());
-                        }
-                    } else {
-                        InputStream is = null;
-                        if (message instanceof TextMessage) {
-                            is = new ByteArrayInputStream(((TextMessage) 
message).getText().getBytes());
-                        } else if (message instanceof BytesMessage) {
-                            int length = (int) ((BytesMessage) 
message).getBodyLength();
-                            byte[] bytes = new byte[length];
-                            ((BytesMessage) message).readBytes(bytes);
-                            is = new ByteArrayInputStream(bytes);
-                        } else {
-                            throw new IllegalArgumentException("JMS message 
should be a text or bytes message");
-                        }
-                        SoapMessage soap = 
soapHelper.getSoapMarshaler().createReader().read(is, 
message.getStringProperty(CONTENT_TYPE));
-                        NormalizedMessage out = exchange.createMessage();
-                        soapHelper.getJBIMarshaler().toNMS(out, soap);
-                        ((InOut) exchange).setOutMessage(out);
-                    }
+                    SoapMessage soap = endpoint.getMarshaler().toSOAP(message);
+                    NormalizedMessage out = exchange.createMessage();
+                    soapHelper.getJBIMarshaler().toNMS(out, soap);
+                    ((InOut) exchange).setOutMessage(out);
                     channel.send(exchange);
+                } catch (Exception e) {
+                    log.error("Error while handling jms message", e);
+                    if (exchange != null) {
+                        exchange.setError(e);
+                    }                    
                 } catch (Throwable e) {
                     log.error("Error while handling jms message", e);
                 }
@@ -152,9 +132,8 @@
         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
             return;
         }
-        TextMessage msg = session.createTextMessage();
         NormalizedMessage nm = exchange.getMessage("in");
-        fromNMS(nm, msg);
+        Message msg = fromNMS(nm, session);
 
         if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
             synchronized (producer) {

Modified: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java?rev=580069&r1=580068&r2=580069&view=diff
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
 (original)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java
 Thu Sep 27 08:44:12 2007
@@ -104,9 +104,8 @@
 
             MessageProducer producer = session.createProducer(destination);
             
-            TextMessage msg = session.createTextMessage();
             NormalizedMessage nm = exchange.getMessage("in");
-            fromNMS(nm, msg);
+            Message msg = fromNMS(nm, session);
     
             if (exchange instanceof InOnly || exchange instanceof 
RobustInOnly) {
                 producer.send(msg);

Added: 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java?rev=580069&view=auto
==============================================================================
--- 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
 (added)
+++ 
incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-jms/src/test/java/org/apache/servicemix/jms/JmsMarshalerTest.java
 Thu Sep 27 08:44:12 2007
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jms;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.springframework.core.io.ClassPathResource;
+
+public class JmsMarshalerTest extends TestCase {
+    
+    protected JBIContainer container;
+    protected BrokerService broker;
+    protected ActiveMQConnectionFactory connectionFactory;
+    protected ActiveMQQueue queue;
+
+    protected void setUp() throws Exception {
+        BrokerFactoryBean bfb = new BrokerFactoryBean(new 
ClassPathResource("org/apache/servicemix/jms/activemq.xml"));
+        bfb.afterPropertiesSet();
+        broker = bfb.getBroker();
+        broker.start();
+
+        container = new JBIContainer();
+        container.setUseMBeanServer(true);
+        container.setCreateMBeanServer(true);
+        container.setMonitorInstallationDirectory(false);
+        container.init();
+        container.start();
+
+        connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
+        queue = new ActiveMQQueue("foo.queue");
+    }
+
+    protected void tearDown() throws Exception {
+        if (container != null) {
+            container.shutDown();
+        }
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    public void testMarshalTextMessage() throws Exception {
+        JmsComponent jms = new JmsComponent();
+        jms.getConfiguration().setConnectionFactory(connectionFactory);
+        JmsEndpoint ep = new JmsEndpoint();
+        ep.setService(ReceiverComponent.SERVICE);
+        ep.setEndpoint("jms");
+        ep.setTargetService(ReceiverComponent.SERVICE);
+        ep.setTargetEndpoint(ReceiverComponent.ENDPOINT);
+        ep.setRole(MessageExchange.Role.CONSUMER);
+        ep.setDestinationStyle(AbstractJmsProcessor.STYLE_QUEUE);
+        ep.setDestination(queue);
+        ep.setDefaultMep(MessageExchangeSupport.IN_ONLY);
+        ep.setMarshaler(new DefaultJmsMarshaler(ep));
+        jms.setEndpoints(new JmsEndpoint[] {ep });
+        container.activateComponent(jms, "servicemix-jms");
+
+        ReceiverComponent receiver = new ReceiverComponent();
+        container.activateComponent(receiver, "receiver");
+
+        QueueConnection qConn = connectionFactory.createQueueConnection();
+        QueueSession qSess = qConn.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        QueueSender qSender = qSess.createSender(queue);
+        TextMessage message = qSess.createTextMessage("<?xml version=\"1.0\" 
encoding=\"UTF-8\"?><hello>world</hello>");
+        qSender.send(message);
+
+        receiver.getMessageList().assertMessagesReceived(1);
+        List msgs = receiver.getMessageList().flushMessages();
+        NormalizedMessage msg = (NormalizedMessage) msgs.get(0);
+        assertEquals("Messages match", message.getText(), new 
SourceTransformer().contentToString(msg));
+
+        // Wait for DONE status
+        Thread.sleep(50);
+    }
+    
+
+}
+


Reply via email to