Title: [805] trunk/core/src/main/resources: SM-146 & SM-172 : auditing on jdbc

Diff

Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -18,20 +18,30 @@
  **/
 package org.servicemix.jbi.audit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.servicemix.MessageExchangeListener;
 import org.servicemix.jbi.container.JBIContainer;
 import org.servicemix.jbi.management.AttributeInfoHelper;
 import org.servicemix.jbi.management.BaseLifeCycle;
 import org.servicemix.jbi.management.OperationInfoHelper;
 import org.servicemix.jbi.management.ParameterHelper;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanOperationInfo;
 
+/**
+ * @author gnodet
+ *
+ */
 public abstract class AbstractAuditor extends BaseLifeCycle implements AuditorMBean, MessageExchangeListener {
 
+    protected final Log log = LogFactory.getLog(getClass());
+    
     protected JBIContainer container;
 
     public JBIContainer getContainer() {
@@ -42,10 +52,8 @@
         this.container = container;
     }
 
-    /**
-     * Start the item.
-     * 
-     * @exception javax.jbi.JBIException if the item fails to start.
+    /* (non-Javadoc)
+     * @see javax.jbi.management.LifeCycleMBean#start()
      */
     public void start() throws javax.jbi.JBIException {
         super.start();
@@ -53,10 +61,8 @@
         this.container.addListener(this);
     }
 
-    /**
-     * Stop the item. This suspends current messaging activities.
-     * 
-     * @exception javax.jbi.JBIException if the item fails to stop.
+    /* (non-Javadoc)
+     * @see javax.jbi.management.LifeCycleMBean#stop()
      */
     public void stop() throws javax.jbi.JBIException {
         this.container.removeListener(this);
@@ -70,11 +76,8 @@
     protected void doStop() throws JBIException {
     }
 
-    /**
-     * Get an array of MBeanAttributeInfo
-     * 
-     * @return array of AttributeInfos
-     * @throws JMException
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.management.MBeanInfoProvider#getAttributeInfos()
      */
     public MBeanAttributeInfo[] getAttributeInfos() throws JMException {
         AttributeInfoHelper helper = new AttributeInfoHelper();
@@ -82,11 +85,8 @@
         return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
     }
     
-    /**
-     * Get an array of MBeanOperationInfo
-     * 
-     * @return array of OperationInfos
-     * @throws JMException
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.management.MBeanInfoProvider#getOperationInfos()
      */
     public MBeanOperationInfo[] getOperationInfos() throws JMException {
         OperationInfoHelper helper = new OperationInfoHelper();
@@ -95,4 +95,99 @@
         ph.setDescription(1, "toIndex", "upper index of message (exclusive, > fromIndex)");
         return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
     }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+     */
+    public abstract int getMessageCount() throws Exception;
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageId(int)
+     */
+    public String getMessageId(int index) throws Exception {
+        return getMessageIds(index, index + 1)[0];
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds()
+     */
+    public String[] getMessageIds() throws Exception {
+        return getMessageIds(0, getMessageCount());
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+     */
+    public abstract String[] getMessageIds(int fromIndex, int toIndex)  throws Exception;
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(int)
+     */
+    public MessageExchange getMessage(int index) throws Exception {
+        return getMessages(index, index + 1)[0];
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(java.lang.String)
+     */
+    public MessageExchange getMessage(String id) throws Exception {
+        return getMessages(new String[] { id })[0];
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessages()
+     */
+    public MessageExchange[] getMessages() throws Exception {
+        return getMessages(0, getMessageCount());
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+     */
+    public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
+        return getMessages(getMessageIds(fromIndex, toIndex));
+    }
+
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+     */
+    public abstract MessageExchange[] getMessages(String[] ids) throws Exception;
+
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+     */
+    public void deleteMessages() throws Exception {
+        deleteMessages(0, getMessageCount());
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(int)
+     */
+    public void deleteMessage(int index) throws Exception {
+        deleteMessages(index, index + 1);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(java.lang.String)
+     */
+    public void deleteMessage(String id) throws Exception {
+        deleteMessages(new String[] { id });
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+     */
+    public abstract void deleteMessages(int fromIndex, int toIndex) throws Exception;
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+     */
+    public abstract void deleteMessages(String[] ids) throws Exception;
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#resendMessage(javax.jbi.messaging.MessageExchange)
+     */
+    public void resendMessage(MessageExchange exchange) throws Exception {
+        container.resendExchange(exchange);
+    }
 }

Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -25,5 +25,31 @@
 
     int getMessageCount() throws Exception;
     
+    String getMessageId(int index) throws Exception;
+    
+    String[] getMessageIds() throws Exception;
+    
+    String[] getMessageIds(int fromIndex, int toIndex)  throws Exception;
+    
+    MessageExchange getMessage(int index) throws Exception;
+    
+    MessageExchange getMessage(String id) throws Exception;
+    
+    MessageExchange[] getMessages() throws Exception;
+    
     MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception;
+
+    MessageExchange[] getMessages(String[] ids) throws Exception;
+    
+    void deleteMessages() throws Exception;
+    
+    void deleteMessage(int index) throws Exception;
+    
+    void deleteMessage(String id) throws Exception;
+    
+    void deleteMessages(int fromIndex, int toIndex) throws Exception;
+    
+    void deleteMessages(String[] ids) throws Exception;
+    
+    void resendMessage(MessageExchange exchange)throws Exception;
 }

Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -22,7 +22,6 @@
 import org.apache.ddlutils.PlatformFactory;
 import org.apache.ddlutils.io.DatabaseIO;
 import org.apache.ddlutils.model.Database;
-import org.apache.ddlutils.util.JdbcSupport;
 import org.servicemix.jbi.audit.AbstractAuditor;
 import org.servicemix.jbi.audit.AuditorMBean;
 import org.servicemix.jbi.messaging.ExchangePacket;
@@ -39,12 +38,9 @@
 import javax.sql.DataSource;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -92,7 +88,7 @@
         try {
             ExchangePacket packet = ((MessageExchangeImpl) exchange).getPacket();
             String id = packet.getExchangeId();
-            byte[] data = ""
+            byte[] data = ""
             Connection connection = platform.borrowConnection();
             try {
                 store(connection, id, data);
@@ -124,19 +120,11 @@
                 storeStatement.execute();
             }
         } finally {
-            ((JdbcSupport) platform).closeStatement(selectStatement);
-            ((JdbcSupport) platform).closeStatement(storeStatement);
+            closeStatement(selectStatement);
+            closeStatement(storeStatement);
         }
     }
     
-    protected byte[] getData(ExchangePacket packet) throws IOException {
-        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-        ObjectOutputStream out = new ObjectOutputStream(buffer);
-        out.writeObject(packet);
-        out.close();
-        return buffer.toByteArray();
-    }
-
     public DataSource getDataSource() {
         return dataSource;
     }
@@ -145,48 +133,153 @@
         this.dataSource = dataSource;
     }
 
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+     */
     public int getMessageCount()throws Exception {
         Connection con = platform.borrowConnection();
-        Statement ps;
+        Statement statement = null;
         try {
-            ps = con.createStatement();
-            ResultSet rs = ps.executeQuery("SELECT COUNT(ID) FROM SM_AUDIT");
+            statement = con.createStatement();
+            ResultSet rs = statement.executeQuery("SELECT COUNT(ID) FROM SM_AUDIT");
             rs.next();
             return rs.getInt(1);
         } finally {
-            if (con != null) {
-                con.close();
-            }
+            closeStatement(statement);
+            platform.returnConnection(con);
         }
     }
 
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+     */
     public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
         Connection con = platform.borrowConnection();
-        Statement ps;
+        Statement statement = null;
         try {
-            ps = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
-            ps.setFetchSize(toIndex - fromIndex);
-            ResultSet rs = ps.executeQuery("SELECT EXCHANGE FROM SM_AUDIT");
+            statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+            statement.setFetchSize(toIndex - fromIndex);
+            ResultSet rs = statement.executeQuery("SELECT EXCHANGE FROM SM_AUDIT");
             rs.absolute(fromIndex + 1);
             MessageExchange[] exchanges = new MessageExchange[toIndex - fromIndex];
             for (int row = 0; row < toIndex - fromIndex; row++) {
-                byte[] data = ""
-                exchanges[row] = getExchange(data);
+                exchanges[row] = getExchange(rs.getBytes(1));
                 if (!rs.next()) {
                     break;
                 }
             }
             return exchanges;
         } finally {
-            if (con != null) {
-                con.close();
+            closeStatement(statement);
+            platform.returnConnection(con);
+        }
+    }
+    
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+     */
+    public String[] getMessageIds(int fromIndex, int toIndex) throws Exception {
+        Connection con = platform.borrowConnection();
+        Statement statement = null;
+        try {
+            statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+            statement.setFetchSize(toIndex - fromIndex);
+            ResultSet rs = statement.executeQuery("SELECT ID FROM SM_AUDIT");
+            rs.absolute(fromIndex + 1);
+            String[] ids = new String[toIndex - fromIndex];
+            for (int row = 0; row < toIndex - fromIndex; row++) {
+                ids[row] = rs.getString(1);
+                if (!rs.next()) {
+                    break;
+                }
             }
+            return ids;
+        } finally {
+            closeStatement(statement);
+            platform.returnConnection(con);
         }
     }
+
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+     */
+    public MessageExchange[] getMessages(String[] ids) throws Exception {
+        MessageExchange[] exchanges = new MessageExchange[ids.length];
+        Connection con = platform.borrowConnection();
+        PreparedStatement statement = null;
+        try {
+            statement = con.prepareStatement("SELECT EXCHANGE FROM SM_AUDIT WHERE ID = ?");
+            for (int i = 0; i < exchanges.length; i++) {
+                statement.setString(1, ids[i]);
+                ResultSet rs = statement.executeQuery();
+                rs.next();
+                exchanges[i] = getExchange(rs.getBytes(1));
+            }
+            return exchanges;
+        } finally {
+            closeStatement(statement);
+            platform.returnConnection(con);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+     */
+    public void deleteMessages() throws Exception {
+        Connection con = platform.borrowConnection();
+        Statement statement = null;
+        try {
+            statement = con.createStatement();
+            statement.executeUpdate("DELETE FROM SM_AUDIT");
+        } finally {
+            closeStatement(statement);
+            platform.returnConnection(con);
+        }
+    }
     
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+     */
+    public void deleteMessages(int fromIndex, int toIndex) throws Exception {
+        Connection con = platform.borrowConnection();
+        Statement statement = null;
+        try {
+            statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+            ResultSet rs = statement.executeQuery("SELECT ID FROM SM_AUDIT");
+            rs.absolute(fromIndex + 1);
+            for (int row = 0; row < toIndex - fromIndex; row++) {
+                rs.deleteRow();
+                if (!rs.next()) {
+                    break;
+                }
+            }
+        } finally {
+            closeStatement(statement);
+            platform.returnConnection(con);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+     */
+    public void deleteMessages(String[] ids) throws Exception {
+        Connection con = platform.borrowConnection();
+        PreparedStatement statement = null;
+        try {
+            statement = con.prepareStatement("DELETE FROM SM_AUDIT WHERE ID = ?");
+            for (int i = 0; i < ids.length; i++) {
+                statement.setString(1, ids[i]);
+                statement.executeUpdate();
+            }
+        } finally {
+            closeStatement(statement);
+            platform.returnConnection(con);
+        }
+    }
+    
+    // TODO: this should be somewhere in org.servicemix.jbi.messaging
     protected MessageExchange getExchange(byte[] data) throws Exception {
-        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
-        ExchangePacket packet = (ExchangePacket) ois.readObject();
+        ExchangePacket packet = ExchangePacket.readPacket(data);
         URI mep = packet.getPattern();
         if (MessageExchangeSupport.IN_ONLY.equals(mep)) {
             return new InOnlyImpl(packet);
@@ -199,6 +292,23 @@
         }
         return null;
     }
+    
+    /**
+     * Close the given statement, logging any exception.
+     * @param statement the statement to close
+     */
+    protected void closeStatement(Statement statement) {
+        if (statement != null) {
+            try {
+                Connection conn = statement.getConnection();
+                if ((conn != null) && !conn.isClosed()) {
+                    statement.close();
+                }
+            } catch (Exception e) {
+                log.warn("Error closing statement", e);
+            }
+        }
+    }
 
     
 }

Deleted: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml	2005-11-12 02:50:08 UTC (rev 805)
@@ -1,11 +0,0 @@
-<?xml version="1.0"?>
-<database name="JDBCAudit">
-  <table name="SM_AUDIT">
-    <column name="ID"
-            type="VARCHAR"
-            primaryKey="true"
-            required="true" />
-    <column name="EXCHANGE"
-            type="BLOB" />
-  </table>
-</database>
\ No newline at end of file

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html	2005-11-12 02:50:08 UTC (rev 805)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+Plain JDBC auditor.
+
+</body>
+</html>

Added: trunk/core/src/main/java/org/servicemix/jbi/audit/package.html (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/audit/package.html	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/package.html	2005-11-12 02:50:08 UTC (rev 805)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+ServiceMix Auditing interfaces.
+
+</body>
+</html>

Modified: trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -56,6 +56,7 @@
 import javax.jbi.component.ServiceUnitManager;
 import javax.jbi.management.DeploymentException;
 import javax.jbi.management.LifeCycleMBean;
+import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.servicedesc.ServiceEndpoint;
@@ -1036,4 +1037,19 @@
         }
     }
 
+    public void resendExchange(MessageExchange exchange) throws JBIException {
+        if (exchange instanceof MessageExchangeImpl == false) {
+            throw new IllegalArgumentException("exchange should be a MessageExchangeImpl");
+        }
+        MessageExchangeImpl me = (MessageExchangeImpl) exchange;
+        me.getPacket().setExchangeId(new IdGenerator().generateId());
+        me.getPacket().setOut(null);
+        me.getPacket().setFault(null);
+        me.getPacket().setError(null);
+        me.getPacket().setStatus(ExchangeStatus.ACTIVE);
+        callListeners(me);
+        me.handleSend(false);
+        sendExchange(me.getMirror());
+    }
+
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -18,25 +18,28 @@
  **/
 package org.servicemix.jbi.messaging;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
 import org.servicemix.components.util.CopyTransformer;
 import org.servicemix.jbi.framework.ComponentNameSpace;
 
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.transaction.Transaction;
 import javax.xml.namespace.QName;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
 import java.net.URI;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -57,7 +60,9 @@
     private QName operationName;
     private Exception error;
     private Map properties;
-    private Map messages;
+    private NormalizedMessage in;
+    private NormalizedMessage out;
+    private Fault fault;
     private ServiceEndpoint endpoint;
     private transient Transaction transactionContext;
     private Boolean persistent;
@@ -74,14 +79,18 @@
         this.exchangeId = null; //???;
         this.interfaceName = packet.interfaceName;
         CopyTransformer ct = new CopyTransformer();
-        if (packet.messages != null && packet.messages.size() > 0) {
-            for (Iterator it = packet.messages.entrySet().iterator(); it.hasNext();) {
-                Map.Entry entry = (Map.Entry) it.next();
-                NormalizedMessage copy = new NormalizedMessageImpl();
-                ct.transform(null, (NormalizedMessage) entry.getValue(), copy);
-                setMessage(copy, (String) entry.getKey());
-            }
+        if (packet.in != null) {
+            in = new NormalizedMessageImpl();
+            ct.transform(null, packet.in, in);
         }
+        if (packet.out != null) {
+            out = new NormalizedMessageImpl();
+            ct.transform(null, packet.out, out);
+        }
+        if (packet.fault != null) {
+            fault = new FaultImpl();
+            ct.transform(null, packet.fault, fault);
+        }
         this.operationName = packet.operationName;
         this.pattern = packet.pattern;
         if (packet.properties != null && packet.properties.size() > 0) {
@@ -222,45 +231,13 @@
     }
 
     /**
-     * @return Returns the messages.
-     */
-    public Map getMessages() {
-        if (messages == null) {
-            messages = new ConcurrentHashMap();
-        }
-        return messages;
-    }
-
-    /**
-     * get a NormalizedMessage based on the message reference
-     * 
-     * @param name
-     * @return a NormalizedMessage
-     */
-    public NormalizedMessage getMessage(String name) {
-        if (messages != null) {
-            return (NormalizedMessage) messages.get(name);
-        }
-        return null;
-    }
-
-    /**
-     * set a NormalizedMessage with a named reference
-     * 
-     * @param message
-     * @param name
-     * @throws MessagingException
-     */
-    public void setMessage(NormalizedMessage message, String name) throws MessagingException {
-        getMessages().put(name, message);
-    }
-
-    /**
      * @return Returns the properties.
      */
     public Map getProperties() {
         if (properties == null) {
-            properties = new ConcurrentHashMap();
+            // No need to have concurrent access, as the
+            // message exchange can only be used from a single thread at a time
+            properties = new HashMap();
         }
         return properties;
     }
@@ -331,6 +308,48 @@
     }
 
     /**
+     * @return Returns the fault.
+     */
+    public Fault getFault() {
+        return fault;
+    }
+
+    /**
+     * @param fault The fault to set.
+     */
+    public void setFault(Fault fault) {
+        this.fault = fault;
+    }
+
+    /**
+     * @return Returns the in.
+     */
+    public NormalizedMessage getIn() {
+        return in;
+    }
+
+    /**
+     * @param in The in to set.
+     */
+    public void setIn(NormalizedMessage in) {
+        this.in = in;
+    }
+
+    /**
+     * @return Returns the out.
+     */
+    public NormalizedMessage getOut() {
+        return out;
+    }
+
+    /**
+     * @param out The out to set.
+     */
+    public void setOut(NormalizedMessage out) {
+        this.out = out;
+    }
+
+    /**
      * @return pretty print
      */
     public String toString() {
@@ -339,44 +358,48 @@
 
     /**
      * Write to a Stream
-     * @param out
+     * @param output
      * @throws IOException
      */
-    public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeUTF(pattern.toString());
-        out.writeUTF(exchangeId != null ? exchangeId : "");
-        out.writeUTF(status.toString());
-        out.writeObject(destinationId);
-        out.writeObject(sourceId);
-        out.writeObject(serviceName);
-        out.writeObject(interfaceName);
-        out.writeObject(error);
-        out.writeObject(properties);
-        out.writeObject(messages);
-        out.writeObject(endpoint);
-        out.writeByte((persistent == null) ? 0 : persistent.booleanValue() ? 1 : 2);
+    public void writeExternal(ObjectOutput output) throws IOException {
+        output.writeUTF(pattern.toString());
+        output.writeUTF(exchangeId != null ? exchangeId : "");
+        output.writeUTF(status.toString());
+        output.writeObject(destinationId);
+        output.writeObject(sourceId);
+        output.writeObject(serviceName);
+        output.writeObject(interfaceName);
+        output.writeObject(error);
+        output.writeObject(properties);
+        output.writeObject(in);
+        output.writeObject(out);
+        output.writeObject(fault);
+        output.writeObject(endpoint);
+        output.writeByte((persistent == null) ? 0 : persistent.booleanValue() ? 1 : 2);
     }
 
     /**
      * Read from a stream
      * 
-     * @param in
+     * @param input
      * @throws IOException
      * @throws ClassNotFoundException
      */
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        pattern = URI.create(in.readUTF());
-        exchangeId = in.readUTF();
-        status = ExchangeStatus.valueOf(in.readUTF());
-        destinationId = (ComponentNameSpace) in.readObject();
-        sourceId = (ComponentNameSpace) in.readObject();
-        serviceName = (QName) in.readObject();
-        interfaceName = (QName) in.readObject();
-        error = (Exception) in.readObject();
-        properties = (Map) in.readObject();
-        messages = (Map) in.readObject();
-        endpoint = (ServiceEndpoint) in.readObject();
-        byte p = in.readByte();
+    public void readExternal(ObjectInput input) throws IOException, ClassNotFoundException {
+        pattern = URI.create(input.readUTF());
+        exchangeId = input.readUTF();
+        status = ExchangeStatus.valueOf(input.readUTF());
+        destinationId = (ComponentNameSpace) input.readObject();
+        sourceId = (ComponentNameSpace) input.readObject();
+        serviceName = (QName) input.readObject();
+        interfaceName = (QName) input.readObject();
+        error = (Exception) input.readObject();
+        properties = (Map) input.readObject();
+        in = (NormalizedMessage) input.readObject();
+        out = (NormalizedMessage) input.readObject();
+        fault = (Fault) input.readObject();
+        endpoint = (ServiceEndpoint) input.readObject();
+        byte p = input.readByte();
         persistent = (p == 0) ? null : p == 1 ? Boolean.TRUE : Boolean.FALSE;
     }
 
@@ -403,5 +426,30 @@
     public void setAborted(boolean timedOut) {
         this.aborted = timedOut;
     }
+    
+    /**
+     * Retrieve the serialized from of this packet
+     * @return the serialized packet
+     * @throws IOException 
+     */
+    public byte[] getData() throws IOException {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(buffer);
+        out.writeObject(this);
+        out.close();
+        return buffer.toByteArray();
+    }
+    
+    /**
+     * Deserialize an ExchangePacket.
+     * @param data the serialized packet
+     * @return the deserialized packet
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    public static ExchangePacket readPacket(byte[] data) throws IOException, ClassNotFoundException {
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
+        return (ExchangePacket) ois.readObject();
+    }
 
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (804 => 805)

--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -202,7 +202,7 @@
      * @return the fault message for an exchange
      */
     public Fault getFault() {
-        return (Fault) packet.getMessage(FAULT);
+        return packet.getFault();
     }
 
     /**
@@ -240,7 +240,15 @@
      * @return a NormalizedMessage
      */
     public NormalizedMessage getMessage(String name) {
-        return packet.getMessage(name);
+        if (IN.equals(name)) {
+            return packet.getIn();
+        } else if (OUT.equals(name)) {
+            return packet.getOut();
+        } else if (FAULT.equals(name)) {
+            return packet.getFault();
+        } else {
+            return null;
+        }
     }
 
     /**
@@ -261,22 +269,36 @@
             throw new IllegalArgumentException("name should not be null");
         }
         name = name.toLowerCase();
-        if (IN.equals(name) && !can(CAN_SET_IN_MSG)) {
-            throw new MessagingException("In not supported");
+        if (IN.equals(name)) {
+            if (!can(CAN_SET_IN_MSG)) {
+                throw new MessagingException("In not supported");
+            }
+            if (packet.getIn() != null) {
+                throw new MessagingException("In message is already set");
+            }
+            packet.setIn(message);
+        } else if (OUT.equals(name)) {
+            if (!can(CAN_SET_OUT_MSG)) {
+                throw new MessagingException("Out not supported");
+            }
+            if (packet.getOut() != null) {
+                throw new MessagingException("Out message is already set");
+            }
+            packet.setOut(message);
+        } else if (FAULT.equals(name)) {
+            if (!can(CAN_SET_FAULT_MSG)) {
+                throw new MessagingException("Fault not supported");
+            }
+            if (!(message instanceof Fault)) {
+                throw new MessagingException("Setting fault, but message is not a fault");
+            }
+            if (packet.getFault() != null) {
+                throw new MessagingException("Fault message is already set");
+            }
+               packet.setFault((Fault) message);
+        } else {
+            throw new MessagingException("Message name must be in, out or fault");
         }
-        if (OUT.equals(name) && !can(CAN_SET_OUT_MSG)) {
-            throw new MessagingException("Out not supported");
-        }
-        if (FAULT.equals(name) && !can(CAN_SET_FAULT_MSG)) {
-            throw new MessagingException("Fault not supported");
-        }
-        if (FAULT.equals(name) && !(message instanceof Fault)) {
-            throw new MessagingException("Setting fault, but message is not a fault");
-        }
-        if (packet.getMessage(name) != null) {
-            throw new MessagingException("Can not set the message since it has already been set");
-        }
-        packet.setMessage(message,name);
     }
 
     /**
@@ -421,7 +443,7 @@
      * @return the in message
      */
     public  NormalizedMessage getInMessage() {
-        return getMessage(IN);
+        return this.packet.getIn();
     }
 
     /**

Copied: trunk/core/src/main/resources/org/servicemix/jbi/audit/jdbc/database.xml (from rev 800, trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml) ( => )

Modified: trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java
===================================================================
--- trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java	2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java	2005-11-12 02:50:08 UTC (rev 805)
@@ -78,6 +78,15 @@
         assertNotNull(exchanges);
         assertEquals(1, exchanges.length);
         assertEquals(ExchangeStatus.DONE, exchanges[0].getStatus());
+        
+        auditor.resendMessage(exchanges[0]);
+
+        nbMessages = auditor.getMessageCount();
+        assertEquals(2, nbMessages);
+        MessageExchange exchange = auditor.getMessage(1);
+        assertNotNull(exchange);
+        assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+        
         /*
         PreparedStatement st = connection.prepareStatement("SELECT EXCHANGE FROM SM_AUDIT WHERE ID = ?");
         try {

Reply via email to