Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -18,20 +18,30 @@
**/
package org.servicemix.jbi.audit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.servicemix.MessageExchangeListener;
import org.servicemix.jbi.container.JBIContainer;
import org.servicemix.jbi.management.AttributeInfoHelper;
import org.servicemix.jbi.management.BaseLifeCycle;
import org.servicemix.jbi.management.OperationInfoHelper;
import org.servicemix.jbi.management.ParameterHelper;
+import org.servicemix.jbi.messaging.MessageExchangeImpl;
import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
+/**
+ * @author gnodet
+ *
+ */
public abstract class AbstractAuditor extends BaseLifeCycle implements AuditorMBean, MessageExchangeListener {
+ protected final Log log = LogFactory.getLog(getClass());
+
protected JBIContainer container;
public JBIContainer getContainer() {
@@ -42,10 +52,8 @@
this.container = container;
}
- /**
- * Start the item.
- *
- * @exception javax.jbi.JBIException if the item fails to start.
+ /* (non-Javadoc)
+ * @see javax.jbi.management.LifeCycleMBean#start()
*/
public void start() throws javax.jbi.JBIException {
super.start();
@@ -53,10 +61,8 @@
this.container.addListener(this);
}
- /**
- * Stop the item. This suspends current messaging activities.
- *
- * @exception javax.jbi.JBIException if the item fails to stop.
+ /* (non-Javadoc)
+ * @see javax.jbi.management.LifeCycleMBean#stop()
*/
public void stop() throws javax.jbi.JBIException {
this.container.removeListener(this);
@@ -70,11 +76,8 @@
protected void doStop() throws JBIException {
}
- /**
- * Get an array of MBeanAttributeInfo
- *
- * @return array of AttributeInfos
- * @throws JMException
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.management.MBeanInfoProvider#getAttributeInfos()
*/
public MBeanAttributeInfo[] getAttributeInfos() throws JMException {
AttributeInfoHelper helper = new AttributeInfoHelper();
@@ -82,11 +85,8 @@
return AttributeInfoHelper.join(super.getAttributeInfos(), helper.getAttributeInfos());
}
- /**
- * Get an array of MBeanOperationInfo
- *
- * @return array of OperationInfos
- * @throws JMException
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.management.MBeanInfoProvider#getOperationInfos()
*/
public MBeanOperationInfo[] getOperationInfos() throws JMException {
OperationInfoHelper helper = new OperationInfoHelper();
@@ -95,4 +95,99 @@
ph.setDescription(1, "toIndex", "upper index of message (exclusive, > fromIndex)");
return OperationInfoHelper.join(super.getOperationInfos(), helper.getOperationInfos());
}
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+ */
+ public abstract int getMessageCount() throws Exception;
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageId(int)
+ */
+ public String getMessageId(int index) throws Exception {
+ return getMessageIds(index, index + 1)[0];
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds()
+ */
+ public String[] getMessageIds() throws Exception {
+ return getMessageIds(0, getMessageCount());
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+ */
+ public abstract String[] getMessageIds(int fromIndex, int toIndex) throws Exception;
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(int)
+ */
+ public MessageExchange getMessage(int index) throws Exception {
+ return getMessages(index, index + 1)[0];
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(java.lang.String)
+ */
+ public MessageExchange getMessage(String id) throws Exception {
+ return getMessages(new String[] { id })[0];
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessages()
+ */
+ public MessageExchange[] getMessages() throws Exception {
+ return getMessages(0, getMessageCount());
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+ */
+ public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
+ return getMessages(getMessageIds(fromIndex, toIndex));
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+ */
+ public abstract MessageExchange[] getMessages(String[] ids) throws Exception;
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+ */
+ public void deleteMessages() throws Exception {
+ deleteMessages(0, getMessageCount());
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(int)
+ */
+ public void deleteMessage(int index) throws Exception {
+ deleteMessages(index, index + 1);
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(java.lang.String)
+ */
+ public void deleteMessage(String id) throws Exception {
+ deleteMessages(new String[] { id });
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+ */
+ public abstract void deleteMessages(int fromIndex, int toIndex) throws Exception;
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+ */
+ public abstract void deleteMessages(String[] ids) throws Exception;
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#resendMessage(javax.jbi.messaging.MessageExchange)
+ */
+ public void resendMessage(MessageExchange exchange) throws Exception {
+ container.resendExchange(exchange);
+ }
}
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -25,5 +25,31 @@
int getMessageCount() throws Exception;
+ String getMessageId(int index) throws Exception;
+
+ String[] getMessageIds() throws Exception;
+
+ String[] getMessageIds(int fromIndex, int toIndex) throws Exception;
+
+ MessageExchange getMessage(int index) throws Exception;
+
+ MessageExchange getMessage(String id) throws Exception;
+
+ MessageExchange[] getMessages() throws Exception;
+
MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception;
+
+ MessageExchange[] getMessages(String[] ids) throws Exception;
+
+ void deleteMessages() throws Exception;
+
+ void deleteMessage(int index) throws Exception;
+
+ void deleteMessage(String id) throws Exception;
+
+ void deleteMessages(int fromIndex, int toIndex) throws Exception;
+
+ void deleteMessages(String[] ids) throws Exception;
+
+ void resendMessage(MessageExchange exchange)throws Exception;
}
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -22,7 +22,6 @@
import org.apache.ddlutils.PlatformFactory;
import org.apache.ddlutils.io.DatabaseIO;
import org.apache.ddlutils.model.Database;
-import org.apache.ddlutils.util.JdbcSupport;
import org.servicemix.jbi.audit.AbstractAuditor;
import org.servicemix.jbi.audit.AuditorMBean;
import org.servicemix.jbi.messaging.ExchangePacket;
@@ -39,12 +38,9 @@
import javax.sql.DataSource;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.net.URI;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -92,7 +88,7 @@
try {
ExchangePacket packet = ((MessageExchangeImpl) exchange).getPacket();
String id = packet.getExchangeId();
- byte[] data = ""
+ byte[] data = ""
Connection connection = platform.borrowConnection();
try {
store(connection, id, data);
@@ -124,19 +120,11 @@
storeStatement.execute();
}
} finally {
- ((JdbcSupport) platform).closeStatement(selectStatement);
- ((JdbcSupport) platform).closeStatement(storeStatement);
+ closeStatement(selectStatement);
+ closeStatement(storeStatement);
}
}
- protected byte[] getData(ExchangePacket packet) throws IOException {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer);
- out.writeObject(packet);
- out.close();
- return buffer.toByteArray();
- }
-
public DataSource getDataSource() {
return dataSource;
}
@@ -145,48 +133,153 @@
this.dataSource = dataSource;
}
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+ */
public int getMessageCount()throws Exception {
Connection con = platform.borrowConnection();
- Statement ps;
+ Statement statement = null;
try {
- ps = con.createStatement();
- ResultSet rs = ps.executeQuery("SELECT COUNT(ID) FROM SM_AUDIT");
+ statement = con.createStatement();
+ ResultSet rs = statement.executeQuery("SELECT COUNT(ID) FROM SM_AUDIT");
rs.next();
return rs.getInt(1);
} finally {
- if (con != null) {
- con.close();
- }
+ closeStatement(statement);
+ platform.returnConnection(con);
}
}
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+ */
public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
Connection con = platform.borrowConnection();
- Statement ps;
+ Statement statement = null;
try {
- ps = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
- ps.setFetchSize(toIndex - fromIndex);
- ResultSet rs = ps.executeQuery("SELECT EXCHANGE FROM SM_AUDIT");
+ statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(toIndex - fromIndex);
+ ResultSet rs = statement.executeQuery("SELECT EXCHANGE FROM SM_AUDIT");
rs.absolute(fromIndex + 1);
MessageExchange[] exchanges = new MessageExchange[toIndex - fromIndex];
for (int row = 0; row < toIndex - fromIndex; row++) {
- byte[] data = ""
- exchanges[row] = getExchange(data);
+ exchanges[row] = getExchange(rs.getBytes(1));
if (!rs.next()) {
break;
}
}
return exchanges;
} finally {
- if (con != null) {
- con.close();
+ closeStatement(statement);
+ platform.returnConnection(con);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+ */
+ public String[] getMessageIds(int fromIndex, int toIndex) throws Exception {
+ Connection con = platform.borrowConnection();
+ Statement statement = null;
+ try {
+ statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+ statement.setFetchSize(toIndex - fromIndex);
+ ResultSet rs = statement.executeQuery("SELECT ID FROM SM_AUDIT");
+ rs.absolute(fromIndex + 1);
+ String[] ids = new String[toIndex - fromIndex];
+ for (int row = 0; row < toIndex - fromIndex; row++) {
+ ids[row] = rs.getString(1);
+ if (!rs.next()) {
+ break;
+ }
}
+ return ids;
+ } finally {
+ closeStatement(statement);
+ platform.returnConnection(con);
}
}
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+ */
+ public MessageExchange[] getMessages(String[] ids) throws Exception {
+ MessageExchange[] exchanges = new MessageExchange[ids.length];
+ Connection con = platform.borrowConnection();
+ PreparedStatement statement = null;
+ try {
+ statement = con.prepareStatement("SELECT EXCHANGE FROM SM_AUDIT WHERE ID = ?");
+ for (int i = 0; i < exchanges.length; i++) {
+ statement.setString(1, ids[i]);
+ ResultSet rs = statement.executeQuery();
+ rs.next();
+ exchanges[i] = getExchange(rs.getBytes(1));
+ }
+ return exchanges;
+ } finally {
+ closeStatement(statement);
+ platform.returnConnection(con);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+ */
+ public void deleteMessages() throws Exception {
+ Connection con = platform.borrowConnection();
+ Statement statement = null;
+ try {
+ statement = con.createStatement();
+ statement.executeUpdate("DELETE FROM SM_AUDIT");
+ } finally {
+ closeStatement(statement);
+ platform.returnConnection(con);
+ }
+ }
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+ */
+ public void deleteMessages(int fromIndex, int toIndex) throws Exception {
+ Connection con = platform.borrowConnection();
+ Statement statement = null;
+ try {
+ statement = con.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
+ ResultSet rs = statement.executeQuery("SELECT ID FROM SM_AUDIT");
+ rs.absolute(fromIndex + 1);
+ for (int row = 0; row < toIndex - fromIndex; row++) {
+ rs.deleteRow();
+ if (!rs.next()) {
+ break;
+ }
+ }
+ } finally {
+ closeStatement(statement);
+ platform.returnConnection(con);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+ */
+ public void deleteMessages(String[] ids) throws Exception {
+ Connection con = platform.borrowConnection();
+ PreparedStatement statement = null;
+ try {
+ statement = con.prepareStatement("DELETE FROM SM_AUDIT WHERE ID = ?");
+ for (int i = 0; i < ids.length; i++) {
+ statement.setString(1, ids[i]);
+ statement.executeUpdate();
+ }
+ } finally {
+ closeStatement(statement);
+ platform.returnConnection(con);
+ }
+ }
+
+ // TODO: this should be somewhere in org.servicemix.jbi.messaging
protected MessageExchange getExchange(byte[] data) throws Exception {
- ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
- ExchangePacket packet = (ExchangePacket) ois.readObject();
+ ExchangePacket packet = ExchangePacket.readPacket(data);
URI mep = packet.getPattern();
if (MessageExchangeSupport.IN_ONLY.equals(mep)) {
return new InOnlyImpl(packet);
@@ -199,6 +292,23 @@
}
return null;
}
+
+ /**
+ * Close the given statement, logging any exception.
+ * @param statement the statement to close
+ */
+ protected void closeStatement(Statement statement) {
+ if (statement != null) {
+ try {
+ Connection conn = statement.getConnection();
+ if ((conn != null) && !conn.isClosed()) {
+ statement.close();
+ }
+ } catch (Exception e) {
+ log.warn("Error closing statement", e);
+ }
+ }
+ }
}
Deleted: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml 2005-11-12 02:50:08 UTC (rev 805)
@@ -1,11 +0,0 @@
-<?xml version="1.0"?>
-<database name="JDBCAudit">
- <table name="SM_AUDIT">
- <column name="ID"
- type="VARCHAR"
- primaryKey="true"
- required="true" />
- <column name="EXCHANGE"
- type="BLOB" />
- </table>
-</database>
\ No newline at end of file
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/package.html 2005-11-12 02:50:08 UTC (rev 805)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+Plain JDBC auditor.
+
+</body>
+</html>
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/package.html (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/package.html 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/package.html 2005-11-12 02:50:08 UTC (rev 805)
@@ -0,0 +1,9 @@
+<html>
+<head>
+</head>
+<body>
+
+ServiceMix Auditing interfaces.
+
+</body>
+</html>
Modified: trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/container/JBIContainer.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -56,6 +56,7 @@
import javax.jbi.component.ServiceUnitManager;
import javax.jbi.management.DeploymentException;
import javax.jbi.management.LifeCycleMBean;
+import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
@@ -1036,4 +1037,19 @@
}
}
+ public void resendExchange(MessageExchange exchange) throws JBIException {
+ if (exchange instanceof MessageExchangeImpl == false) {
+ throw new IllegalArgumentException("exchange should be a MessageExchangeImpl");
+ }
+ MessageExchangeImpl me = (MessageExchangeImpl) exchange;
+ me.getPacket().setExchangeId(new IdGenerator().generateId());
+ me.getPacket().setOut(null);
+ me.getPacket().setFault(null);
+ me.getPacket().setError(null);
+ me.getPacket().setStatus(ExchangeStatus.ACTIVE);
+ callListeners(me);
+ me.handleSend(false);
+ sendExchange(me.getMirror());
+ }
+
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/ExchangePacket.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -18,25 +18,28 @@
**/
package org.servicemix.jbi.messaging;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-
import org.servicemix.components.util.CopyTransformer;
import org.servicemix.jbi.framework.ComponentNameSpace;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.transaction.Transaction;
import javax.xml.namespace.QName;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
+import java.io.ObjectInputStream;
import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
import java.net.URI;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -57,7 +60,9 @@
private QName operationName;
private Exception error;
private Map properties;
- private Map messages;
+ private NormalizedMessage in;
+ private NormalizedMessage out;
+ private Fault fault;
private ServiceEndpoint endpoint;
private transient Transaction transactionContext;
private Boolean persistent;
@@ -74,14 +79,18 @@
this.exchangeId = null; //???;
this.interfaceName = packet.interfaceName;
CopyTransformer ct = new CopyTransformer();
- if (packet.messages != null && packet.messages.size() > 0) {
- for (Iterator it = packet.messages.entrySet().iterator(); it.hasNext();) {
- Map.Entry entry = (Map.Entry) it.next();
- NormalizedMessage copy = new NormalizedMessageImpl();
- ct.transform(null, (NormalizedMessage) entry.getValue(), copy);
- setMessage(copy, (String) entry.getKey());
- }
+ if (packet.in != null) {
+ in = new NormalizedMessageImpl();
+ ct.transform(null, packet.in, in);
}
+ if (packet.out != null) {
+ out = new NormalizedMessageImpl();
+ ct.transform(null, packet.out, out);
+ }
+ if (packet.fault != null) {
+ fault = new FaultImpl();
+ ct.transform(null, packet.fault, fault);
+ }
this.operationName = packet.operationName;
this.pattern = packet.pattern;
if (packet.properties != null && packet.properties.size() > 0) {
@@ -222,45 +231,13 @@
}
/**
- * @return Returns the messages.
- */
- public Map getMessages() {
- if (messages == null) {
- messages = new ConcurrentHashMap();
- }
- return messages;
- }
-
- /**
- * get a NormalizedMessage based on the message reference
- *
- * @param name
- * @return a NormalizedMessage
- */
- public NormalizedMessage getMessage(String name) {
- if (messages != null) {
- return (NormalizedMessage) messages.get(name);
- }
- return null;
- }
-
- /**
- * set a NormalizedMessage with a named reference
- *
- * @param message
- * @param name
- * @throws MessagingException
- */
- public void setMessage(NormalizedMessage message, String name) throws MessagingException {
- getMessages().put(name, message);
- }
-
- /**
* @return Returns the properties.
*/
public Map getProperties() {
if (properties == null) {
- properties = new ConcurrentHashMap();
+ // No need to have concurrent access, as the
+ // message exchange can only be used from a single thread at a time
+ properties = new HashMap();
}
return properties;
}
@@ -331,6 +308,48 @@
}
/**
+ * @return Returns the fault.
+ */
+ public Fault getFault() {
+ return fault;
+ }
+
+ /**
+ * @param fault The fault to set.
+ */
+ public void setFault(Fault fault) {
+ this.fault = fault;
+ }
+
+ /**
+ * @return Returns the in.
+ */
+ public NormalizedMessage getIn() {
+ return in;
+ }
+
+ /**
+ * @param in The in to set.
+ */
+ public void setIn(NormalizedMessage in) {
+ this.in = in;
+ }
+
+ /**
+ * @return Returns the out.
+ */
+ public NormalizedMessage getOut() {
+ return out;
+ }
+
+ /**
+ * @param out The out to set.
+ */
+ public void setOut(NormalizedMessage out) {
+ this.out = out;
+ }
+
+ /**
* @return pretty print
*/
public String toString() {
@@ -339,44 +358,48 @@
/**
* Write to a Stream
- * @param out
+ * @param output
* @throws IOException
*/
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeUTF(pattern.toString());
- out.writeUTF(exchangeId != null ? exchangeId : "");
- out.writeUTF(status.toString());
- out.writeObject(destinationId);
- out.writeObject(sourceId);
- out.writeObject(serviceName);
- out.writeObject(interfaceName);
- out.writeObject(error);
- out.writeObject(properties);
- out.writeObject(messages);
- out.writeObject(endpoint);
- out.writeByte((persistent == null) ? 0 : persistent.booleanValue() ? 1 : 2);
+ public void writeExternal(ObjectOutput output) throws IOException {
+ output.writeUTF(pattern.toString());
+ output.writeUTF(exchangeId != null ? exchangeId : "");
+ output.writeUTF(status.toString());
+ output.writeObject(destinationId);
+ output.writeObject(sourceId);
+ output.writeObject(serviceName);
+ output.writeObject(interfaceName);
+ output.writeObject(error);
+ output.writeObject(properties);
+ output.writeObject(in);
+ output.writeObject(out);
+ output.writeObject(fault);
+ output.writeObject(endpoint);
+ output.writeByte((persistent == null) ? 0 : persistent.booleanValue() ? 1 : 2);
}
/**
* Read from a stream
*
- * @param in
+ * @param input
* @throws IOException
* @throws ClassNotFoundException
*/
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- pattern = URI.create(in.readUTF());
- exchangeId = in.readUTF();
- status = ExchangeStatus.valueOf(in.readUTF());
- destinationId = (ComponentNameSpace) in.readObject();
- sourceId = (ComponentNameSpace) in.readObject();
- serviceName = (QName) in.readObject();
- interfaceName = (QName) in.readObject();
- error = (Exception) in.readObject();
- properties = (Map) in.readObject();
- messages = (Map) in.readObject();
- endpoint = (ServiceEndpoint) in.readObject();
- byte p = in.readByte();
+ public void readExternal(ObjectInput input) throws IOException, ClassNotFoundException {
+ pattern = URI.create(input.readUTF());
+ exchangeId = input.readUTF();
+ status = ExchangeStatus.valueOf(input.readUTF());
+ destinationId = (ComponentNameSpace) input.readObject();
+ sourceId = (ComponentNameSpace) input.readObject();
+ serviceName = (QName) input.readObject();
+ interfaceName = (QName) input.readObject();
+ error = (Exception) input.readObject();
+ properties = (Map) input.readObject();
+ in = (NormalizedMessage) input.readObject();
+ out = (NormalizedMessage) input.readObject();
+ fault = (Fault) input.readObject();
+ endpoint = (ServiceEndpoint) input.readObject();
+ byte p = input.readByte();
persistent = (p == 0) ? null : p == 1 ? Boolean.TRUE : Boolean.FALSE;
}
@@ -403,5 +426,30 @@
public void setAborted(boolean timedOut) {
this.aborted = timedOut;
}
+
+ /**
+ * Retrieve the serialized from of this packet
+ * @return the serialized packet
+ * @throws IOException
+ */
+ public byte[] getData() throws IOException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+ out.writeObject(this);
+ out.close();
+ return buffer.toByteArray();
+ }
+
+ /**
+ * Deserialize an ExchangePacket.
+ * @param data the serialized packet
+ * @return the deserialized packet
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public static ExchangePacket readPacket(byte[] data) throws IOException, ClassNotFoundException {
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
+ return (ExchangePacket) ois.readObject();
+ }
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java (804 => 805)
--- trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/main/java/org/servicemix/jbi/messaging/MessageExchangeImpl.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -202,7 +202,7 @@
* @return the fault message for an exchange
*/
public Fault getFault() {
- return (Fault) packet.getMessage(FAULT);
+ return packet.getFault();
}
/**
@@ -240,7 +240,15 @@
* @return a NormalizedMessage
*/
public NormalizedMessage getMessage(String name) {
- return packet.getMessage(name);
+ if (IN.equals(name)) {
+ return packet.getIn();
+ } else if (OUT.equals(name)) {
+ return packet.getOut();
+ } else if (FAULT.equals(name)) {
+ return packet.getFault();
+ } else {
+ return null;
+ }
}
/**
@@ -261,22 +269,36 @@
throw new IllegalArgumentException("name should not be null");
}
name = name.toLowerCase();
- if (IN.equals(name) && !can(CAN_SET_IN_MSG)) {
- throw new MessagingException("In not supported");
+ if (IN.equals(name)) {
+ if (!can(CAN_SET_IN_MSG)) {
+ throw new MessagingException("In not supported");
+ }
+ if (packet.getIn() != null) {
+ throw new MessagingException("In message is already set");
+ }
+ packet.setIn(message);
+ } else if (OUT.equals(name)) {
+ if (!can(CAN_SET_OUT_MSG)) {
+ throw new MessagingException("Out not supported");
+ }
+ if (packet.getOut() != null) {
+ throw new MessagingException("Out message is already set");
+ }
+ packet.setOut(message);
+ } else if (FAULT.equals(name)) {
+ if (!can(CAN_SET_FAULT_MSG)) {
+ throw new MessagingException("Fault not supported");
+ }
+ if (!(message instanceof Fault)) {
+ throw new MessagingException("Setting fault, but message is not a fault");
+ }
+ if (packet.getFault() != null) {
+ throw new MessagingException("Fault message is already set");
+ }
+ packet.setFault((Fault) message);
+ } else {
+ throw new MessagingException("Message name must be in, out or fault");
}
- if (OUT.equals(name) && !can(CAN_SET_OUT_MSG)) {
- throw new MessagingException("Out not supported");
- }
- if (FAULT.equals(name) && !can(CAN_SET_FAULT_MSG)) {
- throw new MessagingException("Fault not supported");
- }
- if (FAULT.equals(name) && !(message instanceof Fault)) {
- throw new MessagingException("Setting fault, but message is not a fault");
- }
- if (packet.getMessage(name) != null) {
- throw new MessagingException("Can not set the message since it has already been set");
- }
- packet.setMessage(message,name);
}
/**
@@ -421,7 +443,7 @@
* @return the in message
*/
public NormalizedMessage getInMessage() {
- return getMessage(IN);
+ return this.packet.getIn();
}
/**
Copied: trunk/core/src/main/resources/org/servicemix/jbi/audit/jdbc/database.xml (from rev 800, trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/database.xml) ( => )
Modified: trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java
===================================================================
--- trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java 2005-11-12 01:51:15 UTC (rev 804)
+++ trunk/core/src/test/java/org/servicemix/jbi/audit/jdbc/JdbcAuditorTest.java 2005-11-12 02:50:08 UTC (rev 805)
@@ -78,6 +78,15 @@
assertNotNull(exchanges);
assertEquals(1, exchanges.length);
assertEquals(ExchangeStatus.DONE, exchanges[0].getStatus());
+
+ auditor.resendMessage(exchanges[0]);
+
+ nbMessages = auditor.getMessageCount();
+ assertEquals(2, nbMessages);
+ MessageExchange exchange = auditor.getMessage(1);
+ assertNotNull(exchange);
+ assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+
/*
PreparedStatement st = connection.prepareStatement("SELECT EXCHANGE FROM SM_AUDIT WHERE ID = ?");
try {