Diff
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java (805 => 806)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java 2005-11-12 02:50:08 UTC (rev 805)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AbstractAuditor.java 2005-11-12 22:13:42 UTC (rev 806)
@@ -26,7 +26,6 @@
import org.servicemix.jbi.management.BaseLifeCycle;
import org.servicemix.jbi.management.OperationInfoHelper;
import org.servicemix.jbi.management.ParameterHelper;
-import org.servicemix.jbi.messaging.MessageExchangeImpl;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
@@ -35,8 +34,11 @@
import javax.management.MBeanOperationInfo;
/**
- * @author gnodet
- *
+ * Base class for ServiceMix auditors implementations.
+ *
+ * @author Guillaume Nodet (gnt)
+ * @since 2.1
+ * @version $Revision: 657 $
*/
public abstract class AbstractAuditor extends BaseLifeCycle implements AuditorMBean, MessageExchangeListener {
@@ -97,97 +99,109 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeCount()
*/
- public abstract int getMessageCount() throws Exception;
+ public abstract int getExchangeCount() throws AuditorException;
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageId(int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeId(int)
*/
- public String getMessageId(int index) throws Exception {
- return getMessageIds(index, index + 1)[0];
+ public String getExchangeId(int index) throws AuditorException {
+ if (index < 0) {
+ throw new IllegalArgumentException("index should be greater or equal to zero");
+ }
+ return getExchangeIds(index, index + 1)[0];
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds()
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeIds()
*/
- public String[] getMessageIds() throws Exception {
- return getMessageIds(0, getMessageCount());
+ public String[] getExchangeIds() throws AuditorException {
+ return getExchangeIds(0, getExchangeCount());
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeIds(int, int)
*/
- public abstract String[] getMessageIds(int fromIndex, int toIndex) throws Exception;
+ public abstract String[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException;
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchange(int)
*/
- public MessageExchange getMessage(int index) throws Exception {
- return getMessages(index, index + 1)[0];
+ public MessageExchange getExchange(int index) throws AuditorException {
+ if (index < 0) {
+ throw new IllegalArgumentException("index should be greater or equal to zero");
+ }
+ return getExchanges(index, index + 1)[0];
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessage(java.lang.String)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchange(java.lang.String)
*/
- public MessageExchange getMessage(String id) throws Exception {
- return getMessages(new String[] { id })[0];
+ public MessageExchange getExchange(String id) throws AuditorException {
+ if (id == null || id.length() == 0) {
+ throw new IllegalArgumentException("id should be non null and non empty");
+ }
+ return getExchanges(new String[] { id })[0];
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessages()
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchanges()
*/
- public MessageExchange[] getMessages() throws Exception {
- return getMessages(0, getMessageCount());
+ public MessageExchange[] getExchanges() throws AuditorException {
+ return getExchanges(0, getExchangeCount());
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchanges(int, int)
*/
- public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
- return getMessages(getMessageIds(fromIndex, toIndex));
+ public MessageExchange[] getExchanges(int fromIndex, int toIndex) throws AuditorException {
+ return getExchanges(getExchangeIds(fromIndex, toIndex));
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchanges(java.lang.String[])
*/
- public abstract MessageExchange[] getMessages(String[] ids) throws Exception;
+ public abstract MessageExchange[] getExchanges(String[] ids) throws AuditorException;
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges()
*/
- public void deleteMessages() throws Exception {
- deleteMessages(0, getMessageCount());
+ public int deleteExchanges() throws AuditorException {
+ return deleteExchanges(0, getExchangeCount());
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchange(int)
*/
- public void deleteMessage(int index) throws Exception {
- deleteMessages(index, index + 1);
+ public boolean deleteExchange(int index) throws AuditorException {
+ if (index < 0) {
+ throw new IllegalArgumentException("index should be greater or equal to zero");
+ }
+ return deleteExchanges(index, index + 1) == 1;
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessage(java.lang.String)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchange(java.lang.String)
*/
- public void deleteMessage(String id) throws Exception {
- deleteMessages(new String[] { id });
+ public boolean deleteExchange(String id) throws AuditorException {
+ return deleteExchanges(new String[] { id }) == 1;
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges(int, int)
*/
- public abstract void deleteMessages(int fromIndex, int toIndex) throws Exception;
+ public abstract int deleteExchanges(int fromIndex, int toIndex) throws AuditorException;
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges(java.lang.String[])
*/
- public abstract void deleteMessages(String[] ids) throws Exception;
+ public abstract int deleteExchanges(String[] ids) throws AuditorException;
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#resendMessage(javax.jbi.messaging.MessageExchange)
+ * @see org.servicemix.jbi.audit.AuditorMBean#resendExchange(javax.jbi.messaging.MessageExchange)
*/
- public void resendMessage(MessageExchange exchange) throws Exception {
+ public void resendExchange(MessageExchange exchange) throws JBIException {
container.resendExchange(exchange);
}
}
Added: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorException.java (805 => 806)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorException.java 2005-11-12 02:50:08 UTC (rev 805)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorException.java 2005-11-12 22:13:42 UTC (rev 806)
@@ -0,0 +1,80 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.servicemix.jbi.audit;
+
+import javax.jbi.JBIException;
+
+/**
+ * AuditorException is the exception that can be thrown by the [EMAIL PROTECTED] AuditorMBean}
+ * when an error occurs accessing the data store when performing an operation.
+ *
+ * @author Guillaume Nodet (gnt)
+ * @since 2.1
+ * @version $Revision: 657 $
+ */
+public class AuditorException extends JBIException {
+
+ /** Serial Version UID */
+ private static final long serialVersionUID = -1259059806617598480L;
+
+ /**
+ * Constructs a new AuditorException with the specified detail message. The
+ * cause is not initialized, and may subsequently be initialized by
+ * a call to [EMAIL PROTECTED] #initCause}.
+ *
+ * @param aMessage the detail message. The detail message is saved for
+ * later retrieval by the [EMAIL PROTECTED] #getMessage()} method.
+ */
+ public AuditorException(String aMessage) {
+ super(aMessage);
+ }
+
+ /**
+ * Constructs a new AuditorException with the specified detail message and
+ * cause. <p>Note that the detail message associated with
+ * <code>cause</code> is <i>not</i> automatically incorporated in
+ * this exception's detail message.
+ *
+ * @param aMessage the detail message (which is saved for later retrieval
+ * by the [EMAIL PROTECTED] #getMessage()} method).
+ * @param aCause the cause (which is saved for later retrieval by the
+ * [EMAIL PROTECTED] #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public AuditorException(String aMessage, Throwable aCause) {
+ super(aMessage, aCause);
+ }
+
+ /**
+ * Constructs a new AuditorException with the specified cause and a detail
+ * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+ * typically contains the class and detail message of <tt>cause</tt>).
+ * This constructor is useful for exceptions that are little more than
+ * wrappers for other throwables (for example, [EMAIL PROTECTED]
+ * java.security.PrivilegedActionException}).
+ *
+ * @param aCause the cause (which is saved for later retrieval by the
+ * [EMAIL PROTECTED] #getCause()} method). (A <tt>null</tt> value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public AuditorException(Throwable aCause) {
+ super(aCause);
+ }
+}
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java (805 => 806)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java 2005-11-12 02:50:08 UTC (rev 805)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/AuditorMBean.java 2005-11-12 22:13:42 UTC (rev 806)
@@ -18,38 +18,224 @@
**/
package org.servicemix.jbi.audit;
+import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
import javax.jbi.messaging.MessageExchange;
+/**
+ * Main interface for ServiceMix auditor.
+ * This interface may be used to view and delete exchanges
+ * or to re-send an exchange on behalf of the component that
+ * initiated the exchange.
+ *
+ * The implementation is free to offer additional features for
+ * selecting message exchanges which are dependant of the underlying
+ * store.
+ *
+ * @author Guillaume Nodet (gnt)
+ * @since 2.1
+ * @version $Revision: 657 $
+ */
public interface AuditorMBean extends LifeCycleMBean {
- int getMessageCount() throws Exception;
+ /**
+ * Get the number of exchanges stored by this auditor.
+ *
+ * @return the number of exchanges stored
+ * @throws AuditorException if an error occurs accessing the data store.
+ */
+ int getExchangeCount() throws AuditorException;
- String getMessageId(int index) throws Exception;
+ /**
+ * Retrieve the exchange id of the exchange at the specified index.
+ * Index must be a null or positive integer.
+ * If index is greater than the number of exchanges stored,
+ * a null string should be returned.
+ *
+ * @param index the index of the exchange
+ * @return the exchange id, or null of index is greater than the exchange count
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if index is less than zero
+ */
+ String getExchangeId(int index) throws AuditorException;
- String[] getMessageIds() throws Exception;
+ /**
+ * Retrieve all exchanges ids from the data store.
+ *
+ * @return an array of exchange ids
+ * @throws AuditorException if an error occurs accessing the data store.
+ */
+ String[] getExchangeIds() throws AuditorException;
- String[] getMessageIds(int fromIndex, int toIndex) throws Exception;
+ /**
+ * Retrieve a range of message exchange ids.
+ * The ids retrieved range from fromIndex (inclusive) to
+ * toIndex (exclusive).
+ * If fromIndex == toIndex, an empty array must be returned.
+ * If fromIndex is less than zero, or if toIndex is less than
+ * fromIndex, an exception will be thrown.
+ * An array of exactly (toIndex - fromIndex) element should be
+ * returned.
+ * This array must be filled by null, for indexes that are greater
+ * than the number of exchanges stored.
+ *
+ * @param fromIndex the lower bound index of the ids to be retrieved.
+ * fromIndex must be greater or equal to zero.
+ * @param toIndex the upper bound (exclusive) of the ids to be retrieved.
+ * toIndex must be greater or equal to fromIndex
+ * @return an array of exchange ids
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if fromIndex is less than zero or if toIndex is
+ * less than fromIndex.
+ */
+ String[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException;
- MessageExchange getMessage(int index) throws Exception;
+ /**
+ * Retrieve the exchange at the specified index.
+ * Index must be a null or positive integer, and should be less than
+ * the current exchange count stored.
+ * If index is greater than the number of exchanges stored,
+ * a null exchange should be returned.
+ *
+ * @param index the index of the exchange
+ * @return the exchange, or null of index is greater than the exchange count
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if index is less than zero
+ */
+ MessageExchange getExchange(int index) throws AuditorException;
- MessageExchange getMessage(String id) throws Exception;
+ /**
+ * Retrieve the exchange for a specified id.
+ * Id must be non null and non empty.
+ * If the exchange with the specified id is not found, null should be returned.
+ *
+ * @param id the id of the exchange
+ * @return the exchange with the specified id, or null if not found
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if id is null or empty
+ */
+ MessageExchange getExchange(String id) throws AuditorException;
- MessageExchange[] getMessages() throws Exception;
+ /**
+ * Retrieve all exchanges =from the data store.
+ *
+ * @return an array of exchange
+ * @throws AuditorException if an error occurs accessing the data store.
+ */
+ MessageExchange[] getExchanges() throws AuditorException;
- MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception;
+ /**
+ * Retrieve a range of message exchange.
+ * The exchanges retrieved range from fromIndex (inclusive) to
+ * toIndex (exclusive).
+ * If fromIndex == toIndex, an empty array must be returned.
+ * If fromIndex is less than zero, or if toIndex is less than
+ * fromIndex, an exception will be thrown.
+ * An array of exactly (toIndex - fromIndex) element should be
+ * returned.
+ * This array must be filled by null, for indexes that are greater
+ * than the number of exchanges stored.
+ *
+ * @param fromIndex the lower bound index of the exchanges to be retrieved.
+ * fromIndex must be greater or equal to zero.
+ * @param toIndex the upper bound (exclusive) of the exchanges to be retrieved.
+ * toIndex must be greater or equal to fromIndex
+ * @return an array of exchange
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if fromIndex is less than zero or if toIndex is
+ * less than fromIndex.
+ */
+ MessageExchange[] getExchanges(int fromIndex, int toIndex) throws AuditorException;
- MessageExchange[] getMessages(String[] ids) throws Exception;
+ /**
+ * Retrieve exchanges for the specified ids.
+ * An array of exactly ids.length elements must be returned.
+ * This array should be filled with null for exchanges that
+ * have not been found in the store.
+ *
+ * @param ids the ids of exchanges to retrieve
+ * @return an array of exchanges
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if ids is null, or one of its
+ * element is null or empty.
+ */
+ MessageExchange[] getExchanges(String[] ids) throws AuditorException;
- void deleteMessages() throws Exception;
+ /**
+ * Delete all exchanges =from the data store.
+ *
+ * @return the number of exchanges deleted, or -1 if such information
+ * can not be provided
+ * @throws AuditorException if an error occurs accessing the data store.
+ */
+ int deleteExchanges() throws AuditorException;
- void deleteMessage(int index) throws Exception;
+ /**
+ * Delete a message, given its index.
+ * Index must be a null or positive integer, and should be less than
+ * the current exchange count stored.
+ * If index is greater than the number of exchanges stored,
+ * false should be returned.
+ *
+ * @param index the index of the exchange
+ * @return true if the exchange has been successfully deleted,
+ * false if index is greater than the number of exchanges stored
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if index is less than zero
+ */
+ boolean deleteExchange(int index) throws AuditorException;
- void deleteMessage(String id) throws Exception;
-
- void deleteMessages(int fromIndex, int toIndex) throws Exception;
-
- void deleteMessages(String[] ids) throws Exception;
-
- void resendMessage(MessageExchange exchange)throws Exception;
+ /**
+ * Delete the exchange with the specified id.
+ * Id must be non null and non empty.
+ *
+ * @param id the id of the exchange to delete
+ * @return true if the exchange has been successfully deleted,
+ * false if the exchange was not found
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if id is null or empty
+ */
+ boolean deleteExchange(String id) throws AuditorException;
+
+ /**
+ * Delete exchanges ranging from fromIndex to toIndex.
+ *
+ * @param fromIndex the lower bound index of the exchanges to be retrieved.
+ * fromIndex must be greater or equal to zero.
+ * @param toIndex the upper bound (exclusive) of the exchanges to be retrieved.
+ * toIndex must be greater or equal to fromIndex
+ * @return the number of exchanges deleted
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if fromIndex is less than zero or if toIndex is
+ * less than fromIndex.
+ */
+ int deleteExchanges(int fromIndex, int toIndex) throws AuditorException;
+
+ /**
+ * Delete exchanges given their ids.
+ *
+ * @param ids the ids of exchanges to retrieve
+ * @return an array of exchanges
+ * @return the number of exchanges deleted
+ * @throws AuditorException if an error occurs accessing the data store.
+ * @throws IllegalArgumentException if ids is null, or one of its
+ * element is null or empty.
+ */
+ int deleteExchanges(String[] ids) throws AuditorException;
+
+ /**
+ * Resend an exchange on behalf of the consumer component that initiated this exchange.
+ * The exchange must have been retrieved from this auditor, else the behavior
+ * is undefined.
+ * The exchange will be given a new id and will be reset to its original state:
+ * the out and fault messages will be removed (if they exist), the error will be
+ * set to null, state to ACTIVE.
+ * The consumer component must be prepared
+ * to receive a response or a DONE status to an exchange it did not
+ * have directly initiated.
+ *
+ * @param exchange the exchange to be sent
+ * @throws JBIException if an error occurs re-sending the exchange
+ */
+ void resendExchange(MessageExchange exchange)throws JBIException;
}
Modified: trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java (805 => 806)
--- trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java 2005-11-12 02:50:08 UTC (rev 805)
+++ trunk/core/src/main/java/org/servicemix/jbi/audit/jdbc/JdbcAuditor.java 2005-11-12 22:13:42 UTC (rev 806)
@@ -23,6 +23,7 @@
import org.apache.ddlutils.io.DatabaseIO;
import org.apache.ddlutils.model.Database;
import org.servicemix.jbi.audit.AbstractAuditor;
+import org.servicemix.jbi.audit.AuditorException;
import org.servicemix.jbi.audit.AuditorMBean;
import org.servicemix.jbi.messaging.ExchangePacket;
import org.servicemix.jbi.messaging.InOnlyImpl;
@@ -37,16 +38,30 @@
import javax.jbi.messaging.MessagingException;
import javax.sql.DataSource;
-import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.ObjectInputStream;
import java.net.URI;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
+/**
+ * Basic implementation of ServiceMix auditor on a jdbc store.
+ * This implementation, for performance purposes, only relies
+ * on one table SM_AUDIT with two columns:
+ * <ul>
+ * <li><b>ID</b> the exchange id (varchar)</li>
+ * <li><b>EXCHANGE</b> the serialized exchange (blob)</li>
+ * </ul>
+ * To minimize overhead, the exchange serialized is the undelying
+ * [EMAIL PROTECTED] org.servicemix.jbi.messaging.ExchangePacket}.
+ *
+ * @author Guillaume Nodet (gnt)
+ * @version $Revision: 657 $
+ * @since 2.1
+ */
public class JdbcAuditor extends AbstractAuditor implements InitializingBean {
public static String DATABASE_MODEL = "database.xml";
@@ -134,9 +149,9 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageCount()
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeCount()
*/
- public int getMessageCount()throws Exception {
+ public int getExchangeCount()throws AuditorException {
Connection con = platform.borrowConnection();
Statement statement = null;
try {
@@ -144,6 +159,8 @@
ResultSet rs = statement.executeQuery("SELECT COUNT(ID) FROM SM_AUDIT");
rs.next();
return rs.getInt(1);
+ } catch (SQLException e) {
+ throw new AuditorException("Could not retrieve exchange count", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -151,9 +168,19 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchanges(int, int)
*/
- public MessageExchange[] getMessages(int fromIndex, int toIndex) throws Exception {
+ public MessageExchange[] getExchanges(int fromIndex, int toIndex) throws AuditorException {
+ if (fromIndex < 0) {
+ throw new IllegalArgumentException("fromIndex should be greater or equal to zero");
+ }
+ if (toIndex < fromIndex) {
+ throw new IllegalArgumentException("toIndex should be greater or equal to fromIndex");
+ }
+ // Do not hit the database if no exchanges are requested
+ if (fromIndex == toIndex) {
+ return new MessageExchange[0];
+ }
Connection con = platform.borrowConnection();
Statement statement = null;
try {
@@ -169,6 +196,8 @@
}
}
return exchanges;
+ } catch (SQLException e) {
+ throw new AuditorException("Could not retrieve exchanges", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -176,9 +205,19 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessageIds(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchangeIds(int, int)
*/
- public String[] getMessageIds(int fromIndex, int toIndex) throws Exception {
+ public String[] getExchangeIds(int fromIndex, int toIndex) throws AuditorException {
+ if (fromIndex < 0) {
+ throw new IllegalArgumentException("fromIndex should be greater or equal to zero");
+ }
+ if (toIndex < fromIndex) {
+ throw new IllegalArgumentException("toIndex should be greater or equal to fromIndex");
+ }
+ // Do not hit the database if no ids are requested
+ if (fromIndex == toIndex) {
+ return new String[0];
+ }
Connection con = platform.borrowConnection();
Statement statement = null;
try {
@@ -194,6 +233,8 @@
}
}
return ids;
+ } catch (SQLException e) {
+ throw new AuditorException("Could not retrieve exchange ids", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -201,9 +242,9 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#getMessages(java.lang.String[])
+ * @see org.servicemix.jbi.audit.AuditorMBean#getExchanges(java.lang.String[])
*/
- public MessageExchange[] getMessages(String[] ids) throws Exception {
+ public MessageExchange[] getExchanges(String[] ids) throws AuditorException {
MessageExchange[] exchanges = new MessageExchange[ids.length];
Connection con = platform.borrowConnection();
PreparedStatement statement = null;
@@ -216,6 +257,8 @@
exchanges[i] = getExchange(rs.getBytes(1));
}
return exchanges;
+ } catch (SQLException e) {
+ throw new AuditorException("Could not retrieve exchanges", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -223,14 +266,16 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages()
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges()
*/
- public void deleteMessages() throws Exception {
+ public int deleteExchanges() throws AuditorException {
Connection con = platform.borrowConnection();
Statement statement = null;
try {
statement = con.createStatement();
- statement.executeUpdate("DELETE FROM SM_AUDIT");
+ return statement.executeUpdate("DELETE FROM SM_AUDIT");
+ } catch (SQLException e) {
+ throw new AuditorException("Could not delete exchanges", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -238,9 +283,19 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(int, int)
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges(int, int)
*/
- public void deleteMessages(int fromIndex, int toIndex) throws Exception {
+ public int deleteExchanges(int fromIndex, int toIndex) throws AuditorException {
+ if (fromIndex < 0) {
+ throw new IllegalArgumentException("fromIndex should be greater or equal to zero");
+ }
+ if (toIndex < fromIndex) {
+ throw new IllegalArgumentException("toIndex should be greater or equal to fromIndex");
+ }
+ // Do not hit the database if no removal is requested
+ if (fromIndex == toIndex) {
+ return 0;
+ }
Connection con = platform.borrowConnection();
Statement statement = null;
try {
@@ -250,9 +305,12 @@
for (int row = 0; row < toIndex - fromIndex; row++) {
rs.deleteRow();
if (!rs.next()) {
- break;
+ return row + 1;
}
}
+ return toIndex - fromIndex;
+ } catch (SQLException e) {
+ throw new AuditorException("Could not delete exchanges", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -260,17 +318,21 @@
}
/* (non-Javadoc)
- * @see org.servicemix.jbi.audit.AuditorMBean#deleteMessages(java.lang.String[])
+ * @see org.servicemix.jbi.audit.AuditorMBean#deleteExchanges(java.lang.String[])
*/
- public void deleteMessages(String[] ids) throws Exception {
+ public int deleteExchanges(String[] ids) throws AuditorException {
Connection con = platform.borrowConnection();
PreparedStatement statement = null;
try {
+ int deleted = 0;
statement = con.prepareStatement("DELETE FROM SM_AUDIT WHERE ID = ?");
for (int i = 0; i < ids.length; i++) {
statement.setString(1, ids[i]);
- statement.executeUpdate();
+ deleted += statement.executeUpdate();
}
+ return deleted;
+ } catch (SQLException e) {
+ throw new AuditorException("Could not delete exchanges", e);
} finally {
closeStatement(statement);
platform.returnConnection(con);
@@ -278,8 +340,13 @@
}
// TODO: this should be somewhere in org.servicemix.jbi.messaging
- protected MessageExchange getExchange(byte[] data) throws Exception {
- ExchangePacket packet = ExchangePacket.readPacket(data);
+ protected MessageExchange getExchange(byte[] data) throws AuditorException {
+ ExchangePacket packet = null;
+ try {
+ packet = ExchangePacket.readPacket(data);
+ } catch (Exception e) {
+ throw new AuditorException("Unable to reconstruct exchange", e);
+ }
URI mep = packet.getPattern();
if (MessageExchangeSupport.IN_ONLY.equals(mep)) {
return new InOnlyImpl(packet);
@@ -289,8 +356,9 @@
return new InOutImpl(packet);
} else if (MessageExchangeSupport.ROBUST_IN_ONLY.equals(mep)) {
return new RobustInOnlyImpl(packet);
+ } else {
+ throw new AuditorException("Unhandled mep: " + mep);
}
- return null;
}
/**