User: chirino Date: 01/08/30 21:39:08 Modified: src/main/org/jboss/mq/pm/jdbc PersistenceManager.java MessageLog.java Log: This PM was trying to get JNDI to a DataSource too soon. Moved to when the PM is started. Still need to test. Revision Changes Path 1.4 +269 -301 jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- PersistenceManager.java 2001/08/30 02:35:55 1.3 +++ PersistenceManager.java 2001/08/31 04:39:08 1.4 @@ -5,341 +5,309 @@ * See terms of license at gnu.org. */ package org.jboss.mq.pm.jdbc; -import java.io.*; + +import javax.rmi.PortableRemoteObject; +import javax.jms.JMSException; +import javax.sql.*; +import javax.naming.*; +import javax.management.*; +import javax.naming.InitialContext; import java.net.URL; -import java.sql.*; import java.util.HashMap; +import java.util.TreeSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Properties; -import java.util.TreeSet; -import javax.jms.JMSException; - -import javax.management.*; -import javax.naming.*; - -import javax.naming.InitialContext; - -import javax.rmi.PortableRemoteObject; -import javax.sql.*; -import org.jboss.mq.ConnectionToken; +import java.sql.*; +import java.io.*; +import org.jboss.util.ServiceMBeanSupport; import org.jboss.mq.SpyDestination; -import org.jboss.mq.SpyJMSException; -import org.jboss.mq.SpyMessage; -import org.jboss.mq.pm.TxManager; +import org.jboss.mq.xml.XElement; +import org.jboss.mq.ConnectionToken; import org.jboss.mq.server.JMSDestination; import org.jboss.mq.server.JMSServer; -import org.jboss.mq.xml.XElement; - -import org.jboss.system.ServiceMBeanSupport; +import org.jboss.mq.pm.TxManager; +import org.jboss.mq.SpyMessage; +import org.jboss.mq.SpyJMSException; /** * This class manages all persistence related services for file based * persistence. * - * @created August 16, 2001 - * @author: Jayesh Parayali ([EMAIL PROTECTED]) - * @version $Revision: 1.3 $ + * @author: Jayesh Parayali ([EMAIL PROTECTED]) + * + * @version $Revision: 1.4 $ */ -public class PersistenceManager extends org.jboss.system.ServiceMBeanSupport implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager { +public class PersistenceManager extends org.jboss.util.ServiceMBeanSupport implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager { + + protected static DataSource datasource; // Log file used to store commited transactions. - TxLog txLog; + TxLog txLog; // Maps SpyDestinations to SpyMessageLogs - HashMap messageLogs = new HashMap(); + HashMap messageLogs= new HashMap(); // Maps (Long)txIds to LinkedList of AddFile tasks - HashMap transactedTasks = new HashMap(); - TxManager txManager; + HashMap transactedTasks= new HashMap(); - private String jmsDBPoolName; - - protected static DataSource datasource; - + static class LogInfo { + MessageLog log; + SpyDestination destination; - /** - * Insert the method's description here. Creation date: (6/27/2001 1:07:07 - * AM) - * - * @param newJmsDBPoolName java.lang.String - */ - public void setJmsDBPoolName( java.lang.String newJmsDBPoolName ) { - jmsDBPoolName = newJmsDBPoolName; + LogInfo(MessageLog log, SpyDestination destination) { + this.log= log; + this.destination= destination; + } } - /** - * Insert the method's description here. Creation date: (6/27/2001 1:07:07 - * AM) - * - * @return java.lang.String - */ - public java.lang.String getJmsDBPoolName() { - return jmsDBPoolName; + class Transaction { + private LogInfo logInfo; + private SpyMessage message; + private org.jboss.mq.pm.Tx txId; + private boolean add; + public Transaction(boolean add, LogInfo logInfo, SpyMessage message, org.jboss.mq.pm.Tx txId) { + this.add= add; + this.logInfo= logInfo; + this.message= message; + this.txId= txId; + } + public void commit() throws JMSException { + if (!add) + logInfo.log.remove(message, txId); + } + public void rollback() throws JMSException { + if (add) + logInfo.log.remove(message, txId); + } } - public String getName() { - return "JBossMQ-PersistenceManager"; - } + private String jmsDBPoolName; + TxManager txManager; /** - * getTxManager method comment. - * - * @return The TxManager value - */ - public org.jboss.mq.pm.TxManager getTxManager() { - return txManager; - } - - - public void initService() - throws Exception { - - //Get an InitialContext - InitialContext ctx = new InitialContext(); - datasource = ( DataSource )ctx.lookup( jmsDBPoolName ); - txLog = new TxLog( datasource ); - - JMSServer server = ( JMSServer )getServer().invoke( new ObjectName( org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{}, new String[]{} ); - server.setPersistenceManager( this ); - - } - - public void startService() - throws Exception { - - JMSServer server = ( JMSServer )getServer().invoke( new ObjectName( org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{}, new String[]{} ); - restore( server ); - - } - - - public void destroyQueue( SpyDestination dest ) - throws javax.jms.JMSException { - - try { - - //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId); - //java.io.File file = new java.io.File(logDir.getFile()); - - LogInfo logInfo; - synchronized ( messageLogs ) { - logInfo = ( LogInfo )messageLogs.remove( "" + dest ); - } - if ( logInfo == null ) { - throw new SpyJMSException( "The persistence log was never initialized" ); - } - - logInfo.log.close(); - //file.delete(); - - } catch ( javax.jms.JMSException e ) { - throw e; - } catch ( Exception e ) { - javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid configuration." ); - newE.setLinkedException( e ); - throw newE; - } - - } - - public void initQueue( SpyDestination dest ) - throws javax.jms.JMSException { - try { - //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId); - - MessageLog log = new MessageLog( datasource, dest.toString() ); - - LogInfo info = new LogInfo( log, dest ); - - synchronized ( messageLogs ) { - messageLogs.put( "" + dest, info ); - } - - } catch ( javax.jms.JMSException e ) { - throw e; - } catch ( Exception e ) { - javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid configuration." ); - newE.setLinkedException( e ); - throw newE; - } - - } - - - public void add( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - LogInfo logInfo; - - synchronized ( messageLogs ) { - logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() ); - } - - if ( logInfo == null ) { - throw new javax.jms.JMSException( "Destination was not initalized with the PersistenceManager" ); - } - - logInfo.log.add( message, txId ); - - if ( txId != null ) { - LinkedList tasks; - synchronized ( transactedTasks ) { - tasks = ( LinkedList )transactedTasks.get( txId ); - } - if ( tasks == null ) { - throw new javax.jms.JMSException( "Transaction is not active 5." ); - } - synchronized ( tasks ) { - tasks.addLast( new Transaction( true, logInfo, message, txId ) ); - } - } - } - - public void commitPersistentTx( org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - - LinkedList transacted; - synchronized ( transactedTasks ) { - transacted = ( LinkedList )transactedTasks.remove( txId ); - } - synchronized ( transacted ) { - Iterator iter = transacted.iterator(); - while ( iter.hasNext() ) { - Transaction task = ( Transaction )iter.next(); - task.commit(); - } - } - - txLog.commitTx( txId ); - } - - public org.jboss.mq.pm.Tx createPersistentTx() - throws javax.jms.JMSException { - org.jboss.mq.pm.Tx txId = txLog.createTx(); - synchronized ( transactedTasks ) { - transactedTasks.put( txId, new LinkedList() ); - } - return txId; - } - - public void remove( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - LogInfo logInfo; - - synchronized ( messageLogs ) { - logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() ); - } - - if ( logInfo == null ) { - throw new javax.jms.JMSException( "Destination was not initalized with the PersistenceManager" ); - } - - if ( txId == null ) { - logInfo.log.remove( message, txId ); - } else { - LinkedList tasks; - synchronized ( transactedTasks ) { - tasks = ( LinkedList )transactedTasks.get( txId ); - } - if ( tasks == null ) { - throw new javax.jms.JMSException( "Transaction is not active 6." ); - } - synchronized ( tasks ) { - tasks.addLast( new Transaction( false, logInfo, message, txId ) ); - } - } - - } - - public void restore( org.jboss.mq.server.JMSServer server ) - throws javax.jms.JMSException { - - TreeSet committingTXs = txLog.restore(); - HashMap clone; - synchronized ( messageLogs ) { - clone = ( HashMap )messageLogs.clone(); - } - - Iterator iter = clone.values().iterator(); - while ( iter.hasNext() ) { - - LogInfo logInfo = ( LogInfo )iter.next(); - - JMSDestination q = server.getJMSDestination( logInfo.destination ); - - SpyMessage rebuild[] = logInfo.log.restore( committingTXs, q.toString() ); - - //TODO: make sure this lock is good enough - synchronized ( q ) { - for ( int i = 0; i < rebuild.length; i++ ) { - q.restoreMessage( rebuild[i] ); - } - } - } - - } - - public void rollbackPersistentTx( org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { + * Insert the method's description here. + * Creation date: (6/27/2001 1:07:07 AM) + * @return java.lang.String + */ + public java.lang.String getJmsDBPoolName() { + return jmsDBPoolName; + } - LinkedList transacted; - synchronized ( transactedTasks ) { - transacted = ( LinkedList )transactedTasks.remove( txId ); - } - synchronized ( transacted ) { - Iterator iter = transacted.iterator(); - while ( iter.hasNext() ) { - Transaction task = ( Transaction )iter.next(); - task.rollback(); - } - } + public String getName() { + return "JBossMQ-PersistenceManager"; + } - txLog.rollbackTx( txId ); - } + public void initService() throws Exception { + JMSServer server= (JMSServer) getServer().invoke(new ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] { + }, new String[] { + }); + server.setPersistenceManager(this); + } /** - * @created August 16, 2001 - */ - class Transaction { - private LogInfo logInfo; - private SpyMessage message; - private org.jboss.mq.pm.Tx txId; - private boolean add; - - public Transaction( boolean add, LogInfo logInfo, SpyMessage message, org.jboss.mq.pm.Tx txId ) { - this.add = add; - this.logInfo = logInfo; - this.message = message; - this.txId = txId; - } - - public void commit() - throws JMSException { - if ( !add ) { - logInfo.log.remove( message, txId ); - } - } - - public void rollback() - throws JMSException { - if ( add ) { - logInfo.log.remove( message, txId ); - } - } - } + * Insert the method's description here. + * Creation date: (6/27/2001 1:07:07 AM) + * @param newJmsDBPoolName java.lang.String + */ + public void setJmsDBPoolName(java.lang.String newJmsDBPoolName) { + jmsDBPoolName= newJmsDBPoolName; + } + + public void startService() throws Exception { + + //Get an InitialContext + InitialContext ctx= new InitialContext(); + datasource= (DataSource) ctx.lookup(jmsDBPoolName); + txLog= new TxLog(datasource); + + Iterator i= messageLogs.values().iterator(); + while (i.hasNext()) { + LogInfo li= (LogInfo) i.next(); + li.log.setDatasource(datasource); + } + + JMSServer server= (JMSServer) getServer().invoke(new ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[] { + }, new String[] { + }); + restore(server); + + } + + public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException { + + try { + + //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId); + //java.io.File file = new java.io.File(logDir.getFile()); + + LogInfo logInfo; + synchronized (messageLogs) { + logInfo= (LogInfo) messageLogs.remove("" + dest); + } + if (logInfo == null) + throw new SpyJMSException("The persistence log was never initialized"); + + logInfo.log.close(); + //file.delete(); + + } catch (javax.jms.JMSException e) { + throw e; + } catch (Exception e) { + javax.jms.JMSException newE= new javax.jms.JMSException("Invalid configuration."); + newE.setLinkedException(e); + throw newE; + } + + } + + public void initQueue(SpyDestination dest) throws javax.jms.JMSException { + try { + + MessageLog log= new MessageLog(dest.toString()); + LogInfo info= new LogInfo(log, dest); + + synchronized (messageLogs) { + messageLogs.put("" + dest, info); + } + + } catch (javax.jms.JMSException e) { + throw e; + } catch (Exception e) { + javax.jms.JMSException newE= new javax.jms.JMSException("Invalid configuration."); + newE.setLinkedException(e); + throw newE; + } + + } + + public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { + LogInfo logInfo; + + synchronized (messageLogs) { + logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination()); + } + + if (logInfo == null) + throw new javax.jms.JMSException("Destination was not initalized with the PersistenceManager"); + + logInfo.log.add(message, txId); + + if (txId != null) { + LinkedList tasks; + synchronized (transactedTasks) { + tasks= (LinkedList) transactedTasks.get(txId); + } + if (tasks == null) + throw new javax.jms.JMSException("Transaction is not active 5."); + synchronized (tasks) { + tasks.addLast(new Transaction(true, logInfo, message, txId)); + } + } + + } + + public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { + + LinkedList transacted; + synchronized (transactedTasks) { + transacted= (LinkedList) transactedTasks.remove(txId); + } + synchronized (transacted) { + Iterator iter= transacted.iterator(); + while (iter.hasNext()) { + Transaction task= (Transaction) iter.next(); + task.commit(); + } + } + + txLog.commitTx(txId); + } + + public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException { + org.jboss.mq.pm.Tx txId= txLog.createTx(); + synchronized (transactedTasks) { + transactedTasks.put(txId, new LinkedList()); + } + return txId; + } /** - * @created August 16, 2001 - */ - static class LogInfo { - MessageLog log; - SpyDestination destination; + * getTxManager method comment. + */ + public org.jboss.mq.pm.TxManager getTxManager() { + return txManager; + } - LogInfo( MessageLog log, SpyDestination destination ) { - this.log = log; - this.destination = destination; - } - } -} + public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { + LogInfo logInfo; + + synchronized (messageLogs) { + logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination()); + } + + if (logInfo == null) + throw new javax.jms.JMSException("Destination was not initalized with the PersistenceManager"); + + if (txId == null) + logInfo.log.remove(message, txId); + else { + LinkedList tasks; + synchronized (transactedTasks) { + tasks= (LinkedList) transactedTasks.get(txId); + } + if (tasks == null) + throw new javax.jms.JMSException("Transaction is not active 6."); + synchronized (tasks) { + tasks.addLast(new Transaction(false, logInfo, message, txId)); + } + } + + } + + public void restore(org.jboss.mq.server.JMSServer server) throws javax.jms.JMSException { + + TreeSet committingTXs= txLog.restore(); + HashMap clone; + synchronized (messageLogs) { + clone= (HashMap) messageLogs.clone(); + } + + Iterator iter= clone.values().iterator(); + while (iter.hasNext()) { + + LogInfo logInfo= (LogInfo) iter.next(); + + JMSDestination q= server.getJMSDestination(logInfo.destination); + + SpyMessage rebuild[]= logInfo.log.restore(committingTXs, q.toString()); + + //TODO: make sure this lock is good enough + synchronized (q) { + for (int i= 0; i < rebuild.length; i++) { + q.restoreMessage(rebuild[i]); + } + } + } + + } + + public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { + + LinkedList transacted; + synchronized (transactedTasks) { + transacted= (LinkedList) transactedTasks.remove(txId); + } + synchronized (transacted) { + Iterator iter= transacted.iterator(); + while (iter.hasNext()) { + Transaction task= (Transaction) iter.next(); + task.rollback(); + } + } + + txLog.rollbackTx(txId); + } +} \ No newline at end of file 1.3 +154 -176 jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java Index: MessageLog.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- MessageLog.java 2001/08/17 03:04:05 1.2 +++ MessageLog.java 2001/08/31 04:39:08 1.3 @@ -5,31 +5,30 @@ * See terms of license at gnu.org. */ package org.jboss.mq.pm.jdbc; -import java.io.*; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; +import java.io.FileOutputStream; +import java.io.ObjectOutputStream; +import java.io.FileInputStream; +import java.io.ObjectInputStream; +import java.io.File; import java.sql.*; +import java.io.*; import javax.jms.JMSException; import javax.sql.*; import org.jboss.mq.SpyDestination; -import org.jboss.mq.SpyJMSException; import org.jboss.mq.SpyMessage; +import org.jboss.mq.SpyJMSException; /** - * This is used to keep SpyMessages on the disk and is used reconstruct the - * queue in case of provider failure. + * This is used to keep SpyMessages on the disk and is used reconstruct the + * queue in case of provider failure. * - * @created August 16, 2001 - * @author: Jayesh Parayali ([EMAIL PROTECTED]) - * @version $Revision: 1.2 $ + * @author: Jayesh Parayali ([EMAIL PROTECTED]) + * @version $Revision: 1.3 $ */ public class MessageLog { @@ -37,174 +36,153 @@ // Attributes ///////////////////////////////////////////////////////////////////// //private File queueName; - protected static DataSource datasource; + protected DataSource datasource; ///////////////////////////////////////////////////////////////////// - // Constructor + // Public Methods ///////////////////////////////////////////////////////////////////// - public MessageLog( DataSource datasource, String dest ) - throws JMSException { - if ( this.datasource == null ) { - this.datasource = datasource; - } - } + public void close() throws JMSException { + } + + public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws JMSException { + String destin= dest.substring(21, dest.length()); + java.util.TreeMap messageIndex= new java.util.TreeMap(); + PreparedStatement pstmt= null; + ResultSet rs= null; + Connection con= null; + + try { + con= datasource.getConnection(); + pstmt= con.prepareStatement("select messageblob, messageid from jms_messages where destination = ?"); + pstmt.setString(1, destin); + rs= pstmt.executeQuery(); + + while (rs.next()) { + byte[] st= (byte[]) rs.getObject(1); + ByteArrayInputStream baip= new ByteArrayInputStream(st); + ObjectInputStream ois= new ObjectInputStream(baip); + // re-create the object + SpyMessage message= (SpyMessage) ois.readObject(); + + //Long msgId = new Long(Long.parseLong(rs.getString(2).trim(),16)); + //restore the messageId which is not persistent. + message.messageId= Long.parseLong(rs.getString(2).trim(), 16); + Long msgId= new Long(message.messageId); + messageIndex.put(msgId, message); + } + } catch (SQLException e) { + throwJMSException("SQL error while rebuilding the tranaction log.", e); + } catch (Exception e) { + throwJMSException("Could not rebuild the queue from the queue's tranaction log.", e); + } finally { + try { + if (rs != null) + rs.close(); + if (pstmt != null) + pstmt.close(); + if (con != null) + con.close(); + } catch (SQLException e) { + throwJMSException("SQL error while closing the database connection", e); + } + } + + SpyMessage rc[]= new SpyMessage[messageIndex.size()]; + java.util.Iterator iter= messageIndex.values().iterator(); + for (int i= 0; iter.hasNext(); i++) + rc[i]= (SpyMessage) iter.next(); + return rc; + } + + private void throwJMSException(String message, Exception e) throws JMSException { + JMSException newE= new SpyJMSException(message); + newE.setLinkedException(e); + throw newE; + } ///////////////////////////////////////////////////////////////////// - // Public Methods + // Constructor ///////////////////////////////////////////////////////////////////// - public void close() - throws JMSException { - } - - - public SpyMessage[] restore( java.util.TreeSet comittingTXs, String dest ) - throws JMSException { - String destin = dest.substring( 21, dest.length() ); - - java.util.TreeMap messageIndex = new java.util.TreeMap(); - PreparedStatement pstmt = null; - ResultSet rs = null; - Connection con = null; - - try { - con = datasource.getConnection(); - pstmt = - con.prepareStatement - ( "select messageblob, messageid from jms_messages where destination = ?" ); - pstmt.setString( 1, destin ); - rs = pstmt.executeQuery(); - - while ( rs.next() ) { - byte[] st = ( byte[] )rs.getObject( 1 ); - ByteArrayInputStream baip = - new ByteArrayInputStream( st ); - ObjectInputStream ois = - new ObjectInputStream( baip ); - // re-create the object - SpyMessage message = ( SpyMessage )ois.readObject(); - - //Long msgId = new Long(Long.parseLong(rs.getString(2).trim(),16)); - //restore the messageId which is not persistent. - message.messageId = Long.parseLong( rs.getString( 2 ).trim(), 16 ); - Long msgId = new Long( message.messageId ); - messageIndex.put( msgId, message ); - } - } catch ( SQLException e ) { - throwJMSException( "SQL error while rebuilding the tranaction log.", e ); - } catch ( Exception e ) { - throwJMSException( "Could not rebuild the queue from the queue's tranaction log.", e ); - } finally { - try { - if ( rs != null ) { - rs.close(); - } - if ( pstmt != null ) { - pstmt.close(); - } - if ( con != null ) { - con.close(); - } - } catch ( SQLException e ) { - throwJMSException( "SQL error while closing the database connection", e ); - } - } - - SpyMessage rc[] = new SpyMessage[messageIndex.size()]; - java.util.Iterator iter = messageIndex.values().iterator(); - for ( int i = 0; iter.hasNext(); i++ ) { - rc[i] = ( SpyMessage )iter.next(); - } - return rc; - } - - public void add( SpyMessage message, org.jboss.mq.pm.Tx transactionId ) - throws JMSException { - PreparedStatement pstmt = null; - Connection con = null; - - try { - con = datasource.getConnection(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream( baos ); - oos.writeObject( message ); - byte[] messageAsBytes = baos.toByteArray(); - pstmt = - con.prepareStatement - ( "insert into jms_messages (messageid, destination, messageblob) VALUES(?,?,?)" ); - ByteArrayInputStream bais = - new ByteArrayInputStream( messageAsBytes ); - pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination() ).getName() ); - pstmt.setBinaryStream( 3, bais, messageAsBytes.length ); - String hexString = null; - if ( message.messageId <= 0 ) { - hexString = "-" + Long.toHexString( ( -1 ) * message.messageId ); - } else { - hexString = Long.toHexString( message.messageId ); - } - - pstmt.setString( 1, hexString ); - pstmt.executeUpdate(); - - pstmt.close(); - } catch ( IOException e ) { - throwJMSException( "Could serialize the message.", e ); - } catch ( SQLException e ) { - throwJMSException( "Could not write message to the database.", e ); - } finally { - try { - //if (pstmt != null) - //pstmt.close(); - if ( con != null ) { - con.close(); - } - } catch ( SQLException e ) { - throwJMSException( "Could not close the database.", e ); - } - - } - } - - public void remove( SpyMessage message, org.jboss.mq.pm.Tx transactionId ) - throws JMSException { - PreparedStatement pstmt = null; - Connection con = null; - try { - con = datasource.getConnection(); - pstmt = - con.prepareStatement - ( "delete from jms_messages where messageid = ? and destination = ?" ); - String hexString = null; - if ( message.messageId <= 0 ) { - hexString = "-" + Long.toHexString( ( -1 ) * message.messageId ); - } else { - hexString = Long.toHexString( message.messageId ); - } - pstmt.setString( 1, hexString ); - pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination() ).getName().trim() ); - - pstmt.execute(); - } catch ( SQLException e ) { - throwJMSException( "Could not remove the message.", e ); - } finally { - try { - if ( pstmt != null ) { - pstmt.close(); - } - if ( con != null ) { - con.close(); - } - } catch ( SQLException e ) { - throwJMSException( "Could not close the database.", e ); - } - - } - } - - private void throwJMSException( String message, Exception e ) - throws JMSException { - JMSException newE = new SpyJMSException( message ); - newE.setLinkedException( e ); - throw newE; - } -} + public MessageLog(String dest) throws JMSException { + } + + public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws JMSException { + PreparedStatement pstmt= null; + Connection con= null; + + try { + con= datasource.getConnection(); + ByteArrayOutputStream baos= new ByteArrayOutputStream(); + ObjectOutputStream oos= new ObjectOutputStream(baos); + oos.writeObject(message); + byte[] messageAsBytes= baos.toByteArray(); + pstmt= con.prepareStatement("insert into jms_messages (messageid, destination, messageblob) VALUES(?,?,?)"); + ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes); + pstmt.setString(2, ((SpyDestination) message.getJMSDestination()).getName()); + pstmt.setBinaryStream(3, bais, messageAsBytes.length); + String hexString= null; + if (message.messageId <= 0) + hexString= "-" + Long.toHexString((-1) * message.messageId); + else + hexString= Long.toHexString(message.messageId); + + pstmt.setString(1, hexString); + pstmt.executeUpdate(); + + pstmt.close(); + } catch (IOException e) { + throwJMSException("Could serialize the message.", e); + } catch (SQLException e) { + throwJMSException("Could not write message to the database.", e); + } finally { + try { + //if (pstmt != null) + //pstmt.close(); + if (con != null) + con.close(); + } catch (SQLException e) { + throwJMSException("Could not close the database.", e); + } + + } + } + + public javax.sql.DataSource getDatasource() { + return datasource; + } + + public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws JMSException { + PreparedStatement pstmt= null; + Connection con= null; + try { + con= datasource.getConnection(); + pstmt= con.prepareStatement("delete from jms_messages where messageid = ? and destination = ?"); + String hexString= null; + if (message.messageId <= 0) + hexString= "-" + Long.toHexString((-1) * message.messageId); + else + hexString= Long.toHexString(message.messageId); + pstmt.setString(1, hexString); + pstmt.setString(2, ((SpyDestination) message.getJMSDestination()).getName().trim()); + + pstmt.execute(); + } catch (SQLException e) { + throwJMSException("Could not remove the message.", e); + } finally { + try { + if (pstmt != null) + pstmt.close(); + if (con != null) + con.close(); + } catch (SQLException e) { + throwJMSException("Could not close the database.", e); + } + + } + } + + public void setDatasource(javax.sql.DataSource newDatasource) { + datasource= newDatasource; + } +} \ No newline at end of file _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development